Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Aug 15, 2024. It is now read-only.
This repository was archived by the owner on Aug 15, 2024. It is now read-only.

Feature Idea: support IAsyncEnumerable for subscriptions #346

Copy link
Copy link
@rikbosch

Description

@rikbosch
Issue body actions

With IAsyncEnumerable being added in c# it would be nice to support it in the subscriptions for all or individual streams using System.Threading.Channels as the underlying implementation.

proposed api could be:

public IAsyncEnumerable<StreamMessage> SubscribeToAllStream(long fromPosition,Action<SubscriptionOptions> configureSubscription=null, [EnumeratorCancellation] cancellationToken = default)

where SubscriptionOptions hold the Channel<T> (bounded / unbounded) and other options like fetchsize, fetchJson, name etc.

The implementation should provide a sensible default, and allow for easy reconfiguration.

public class SubscriptionOptions
{
    public Channel<T> Channel {get;set;}
    public bool FetchJson {get;set;}
    public int FetchSize {get;set;}
}

on the consumer side we can then write:

  var tcs = new CancellationTokenSource();
    var consumerTask = Task.Run(()=>
    {
        int position = 1000;

        // outer loop (handles retries)
        while(!tcs.IsCancellationRequested)
        {
            try
            {
                await foreach(var message in store.SubscribeToAllStream(
                    position, 
                    options=>
                    {
                        // configure options here
                        options.FetchSize=256;
                    },
                    cancellationToken: tcs.Token))
                {
                    position = message.Position;
                    // handle the message
                    // errorhandling of the consumer has to be done here                                                                  
                }
            }
            catch(Exception ex) when (IsRetryableException(ex))
            {
                // this would be a producer exception
                // database not available, timeout etc
                // just restart the subscription from last known position                            
            }
        }
    }
    //wait for some event to stop the application
    await Application.ExitRequesed;
    // cancel the subscription
    tcs.Cancel();
    // wait until consumer has processed last message    
    await consumerTask;

Using Channels as the backing implementation of IAsyncEnumerable gives the calling code the opportunity to specify how to handle a slow consumer (loadshedding, wait).

An in-depth introduction to System.Threading.Channels can be found here : https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

Reactions are currently unavailable

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.