Azure Storage Queue adapter
QueueSource
You can create a Source for the Storage Queue either via Source.FromGraph(new QueueSource) or by calling the QueueSource.Create method.
By default the Source will completes the stream with failure if a call to the queue for new messages failed, you can change that behavior by using Restart or Resume SupervisionStrategy.
QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
The Source reads messages in batches from the queue and then emits the single messages into the stream, once all messages are emitted another request is send to the queue. The number of messages that are requested per batch can be configured via the prefetchCount parameter, by default 10 messages. If you want a behavior were the source is making the request for new messages while the messages from the previous request are still not completely processed, you can easily do that by adding a Buffer directly after the Source like this:
QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1))
.Buffer(5, OverflowStrategy.Backpressure)
This will send the next request to the queue once the first five messages have been processed with five messages left in the Buffer.
If the queue is empty the source will periodically poll for new messages, this interval can be configured via the pollInterval parameter, by default 10 seconds.
Additional parameter for the CloudQueue.GetMessagesAsync call can be set via the options parameter.
QueueSink
You can create a Sink for the Storage Queue either via Sink.FromGraph(new QueueSink) or by calling the QueueSink.Create method or use the extension method ToStorageQueue on a Source<CloudQueueMessage, TMat> directly.
The Sink is materialized into a Task which will be completed with Success when reaching the normal end of the stream, or completed with Failure if there is a failure signaled in the stream.
You can configure different behaviors if a message couldn't be added to the queue by using the SupervisionStrategy attribute, the following behaviors are available:
Stop: Default behavior, completes the stream with failure.Resume: Sends the message again.Restart: Skips the current message and continues with the next message.
Additional parameter for the CloudQueue.AddMessageAsync call can be set via the options parameter.
Examples
You can find some examples in the test project.