Show / Hide Table of Contents

    Azure Storage Queue adapter

    QueueSource

    You can create a Source for the Storage Queue either via Source.FromGraph(new QueueSource) or by calling the QueueSource.Create method.

    By default the Source will completes the stream with failure if a call to the queue for new messages failed, you can change that behavior by using Restart or Resume SupervisionStrategy.

    QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
        .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
    

    The Source reads messages in batches from the queue and then emits the single messages into the stream, once all messages are emitted another request is send to the queue. The number of messages that are requested per batch can be configured via the prefetchCount parameter, by default 10 messages. If you want a behavior were the source is making the request for new messages while the messages from the previous request are still not completely processed, you can easily do that by adding a Buffer directly after the Source like this:

    QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
        .Buffer(5, OverflowStrategy.Backpressure)
    

    This will send the next request to the queue once the first five messages have been processed with five messages left in the Buffer.

    If the queue is empty the source will periodically poll for new messages, this interval can be configured via the pollInterval parameter, by default 10 seconds.

    Additional parameter for the CloudQueue.GetMessagesAsync call can be set via the options parameter.

    QueueSink

    You can create a Sink for the Storage Queue either via Sink.FromGraph(new QueueSink) or by calling the QueueSink.Create method or use the extension method ToStorageQueue on a Source<CloudQueueMessage, TMat> directly. The Sink is materialized into a Task which will be completed with Success when reaching the normal end of the stream, or completed with Failure if there is a failure signaled in the stream.

    You can configure different behaviors if a message couldn't be added to the queue by using the SupervisionStrategy attribute, the following behaviors are available:

    • Stop: Default behavior, completes the stream with failure.
    • Resume: Sends the message again.
    • Restart: Skips the current message and continues with the next message.

    Additional parameter for the CloudQueue.AddMessageAsync call can be set via the options parameter.

    Examples

    You can find some examples in the test project.

    • Improve this Doc
    Back to top Copyright © 2013-2020 Akka.NET project
    Generated by DocFX