Akka.Streams.Channels
An integration plugin between Akka.Streams and primitives from System.Threading.Channels Nuget package.
All features are encapsulated in two static classes:
Akka.Streams.Channels.ChannelSource:ChannelSource.FromReader<T>(ChannelReader<T> reader)will connect itself as a consumer to a givenreader.
Akka.Streams.Channels.ChannelSink:ChannelSink.FromWriter<T>(ChannelWriter<T> writer, bool isOwner)will send elements directly to a givenwriter. IfisOwneris set, it will also be responsible for completingwriteronce upstream completes.ChannelSink.AsReader<T>(int bufferSize, bool singleReader, BoundedChannelFullMode fullMode)can be materialized intoChannelReader<T>used to consume events produced by akka stream.
Stages created this way will apply non-blocking backpressure rules to ensure resource-safe communication with channels.
Example
This is an adapted client example from official ASP.NET SignalR documentation:
var materializer = actorSystem.Materializer();
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
"Counter", 10, 500, cancellationTokenSource.Token);
await ChannelSource.FromReader<int>(channel)
.RunForeach(Console.WriteLine, materializer);
Console.WriteLine("Streaming completed");