Show / Hide Table of Contents

    Akka.NET Streams connectors for AWS services

    Amazon Kinesis streams

    Akka.Streams.Kinesis can be used to either put data directly to AWS Kinesis streams (using KinesisFlow or KinesisSink) or reading from it (using KinesisSource).

    Example of sending the data:

    using Akka.Streams.Kinesis;
    
    const string streamName = "test-stream";
    const string partitionKey = "partition-key";
    
    /// Any client passed to KinesisFlow will then be managed by that flow, and disposed on graph stop.
    Func<IAmazonKinesis> clientFactory = () =>
        new AmazonKinesisClient(new BasicAWSCredentials(accessKey, secretKey), RegionEndpoint.USWest1);
    
    Source.From(new [] { "hello", "world" })
        .Select(data => new PutRecordsRequestEntry {
            PartitionKey = partitionKey,
            Data = new MemoryStream(Encoding.UTF8.GetBytes(data))
        })
        .Via(KinesisFlow.Create(streamName, KinesisFlowSettings.Default, clientFactory))
        .RunForeach(response => Console.WriteLine(response.SequenceNumber), materializer);
    

    Example of receiving data:

    using Akka.Streams.Kinesis;
    
    const string streamName = "test-stream";
    const string shardId = "shard-id";
    string startFrom = null;
    
    /// Any client passed to KinesisSource will then be managed by that source, and disposed on graph stop.
    Func<IAmazonKinesis> clientFactory = () =>
        new AmazonKinesisClient(new BasicAWSCredentials(accessKey, secretKey), RegionEndpoint.USWest1);
    
    var settings = ShardSettings.Create(streamName, shardId)
        .WithStartingSequenceNumber(startFrom);
    
    KinesisSource.Basic(settings, clientFactory)
        .Select(x => Encoding.UTF8.GetString(x.Data.ToArray()))
        .RunForeach(Console.WriteLine, materializer);
    
    • Improve this Doc
    Back to top Copyright © 2013-2020 Akka.NET project
    Generated by DocFX