Skip to content

Commit

Permalink
iterator based subscription API
Browse files Browse the repository at this point in the history
  • Loading branch information
hunter1703 committed Jul 26, 2023
1 parent 69e4b0e commit 23ad802
Show file tree
Hide file tree
Showing 15 changed files with 1,944 additions and 114 deletions.
36 changes: 23 additions & 13 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Grpc.Core;
using static EventStore.Client.Streams.ReadResp;
using static EventStore.Client.Streams.ReadResp.ContentOneofCase;

namespace EventStore.Client {
Expand Down Expand Up @@ -321,18 +320,11 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo
}
}

await _channel.Writer.WriteAsync(response.ContentCase switch {
StreamNotFound => StreamMessage.NotFound.Instance,
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)),
ContentOneofCase.LastStreamPosition => new StreamMessage.LastStreamPosition(
new StreamPosition(response.LastStreamPosition)),
LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
new Position(response.LastAllStreamPosition.CommitPosition,
response.LastAllStreamPosition.PreparePosition)),
_ => StreamMessage.Unknown.Instance
}, linkedCancellationToken).ConfigureAwait(false);
var messageToWrite = ConvertResponseToMessage(response);
messageToWrite = messageToWrite.IsStreamReadMessage() ? messageToWrite : StreamMessage.Unknown.Instance;
await _channel.Writer
.WriteAsync(messageToWrite, linkedCancellationToken)
.ConfigureAwait(false);
}

_channel.Writer.Complete();
Expand Down Expand Up @@ -413,6 +405,24 @@ private static (SubscriptionConfirmation, Position?, ResolvedEvent)? ConvertToIt
_ => null
};

private static StreamMessage ConvertResponseToMessage(ReadResp response) =>
response.ContentCase switch {
Checkpoint => new StreamMessage.SubscriptionMessage.Checkpoint(
new Position(response.Checkpoint.CommitPosition, response.Checkpoint.PreparePosition)),
Confirmation => new StreamMessage.SubscriptionMessage.SubscriptionConfirmation(response.Confirmation
.SubscriptionId),
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)),
LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
new Position(response.LastAllStreamPosition.CommitPosition,
response.LastAllStreamPosition.PreparePosition)),
LastStreamPosition => new StreamMessage.LastStreamPosition(
new StreamPosition(response.LastStreamPosition)),
StreamNotFound => StreamMessage.NotFound.Instance,
_ => StreamMessage.Unknown.Instance
};

private static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent) =>
new ResolvedEvent(
ConvertToEventRecord(readEvent.Event)!,
Expand Down
259 changes: 259 additions & 0 deletions src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using static EventStore.Client.SubscriptionState;

namespace EventStore.Client {
public partial class EventStoreClient {
Expand Down Expand Up @@ -33,6 +38,31 @@ public Task<StreamSubscription> SubscribeToAllAsync(
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
filterOptions?.CheckpointReached, cancellationToken);

/// <summary>
/// Subscribes to all events.
/// </summary>
/// <param name="start">A <see cref="FromAll"/> (exclusive of) to start the subscription from.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="filterOptions">The optional <see cref="SubscriptionFilterOptions"/> to apply.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public SubscriptionResult SubscribeToAll(
FromAll start, bool resolveLinkTos = false, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) {
return new SubscriptionResult(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!
}
}, Settings, userCredentials, cancellationToken, _log);
}

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
Expand Down Expand Up @@ -60,5 +90,234 @@ public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
cancellationToken: cancellationToken);

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
/// </summary>
/// <param name="start">A <see cref="FromStream"/> (exclusive of) to start the subscription from.</param>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns>An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages</returns>
public SubscriptionResult SubscribeToStream(string streamName,
FromStream start, bool resolveLinkTos = false,
UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) {
return new SubscriptionResult(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
}
}, Settings, userCredentials, cancellationToken, _log);
}



/// <summary>
/// A class which represents current subscription state and an enumerator to consume messages
/// </summary>
public class SubscriptionResult {
private readonly Channel<StreamMessage> _internalChannel;
private readonly CancellationTokenSource _cts;
private int _messagesEnumerated;
private ILogger _log;
/// <summary>
/// The name of the stream.
/// </summary>
public string StreamName { get; }

/// <summary>
///
/// </summary>
public Position StreamPosition { get; private set; }

/// <summary>
/// Represents subscription ID for the current subscription
/// </summary>
public string? SubscriptionId { get; private set; }

/// <summary>
/// Current subscription state
/// </summary>

public SubscriptionState SubscriptionState {
get {
if (_exceptionInternal is not null) {
throw _exceptionInternal;
}

return _subscriptionStateInternal;
}
}

private volatile SubscriptionState _subscriptionStateInternal;

private volatile Exception? _exceptionInternal;

/// <summary>
/// An <see cref="IAsyncEnumerable{StreamMessage}"/>. Do not enumerate more than once.
/// </summary>
public IAsyncEnumerable<StreamMessage> Messages {
get {
return GetMessages();

async IAsyncEnumerable<StreamMessage> GetMessages() {
if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) {
throw new InvalidOperationException("Messages may only be enumerated once.");
}

try {
await foreach (var message in _internalChannel.Reader.ReadAllAsync()
.ConfigureAwait(false)) {
if (!message.IsSubscriptionMessage()) {
continue;
}

switch (message) {
case StreamMessage.SubscriptionMessage.SubscriptionConfirmation(var
subscriptionId):
SubscriptionId = subscriptionId;
continue;
case StreamMessage.SubscriptionMessage.Checkpoint(var position):
StreamPosition = position;
break;
}

yield return message;
}
} finally {
Dispose();
}
}
}
}

/// <summary>
/// Terminates subscription
/// </summary>
public void Dispose() {
if (_subscriptionStateInternal == Disposed) {
return;
}
_subscriptionStateInternal = Disposed;
_cts.Cancel();
}

internal SubscriptionResult(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker, ReadReq request,
EventStoreClientSettings settings, UserCredentials? userCredentials,
CancellationToken cancellationToken, ILogger log) {
Sanitize(request);
Validate(request);

_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

var callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
cancellationToken: _cts.Token);

_internalChannel = Channel.CreateBounded<StreamMessage>(new BoundedChannelOptions(1) {
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = true
});

_log = log;

StreamName = request.Options.All != null
? SystemStreams.AllStream
: request.Options.Stream.StreamIdentifier!;

_subscriptionStateInternal = Initializing;

_ = PumpMessages(selectCallInvoker, request, callOptions);
}

async Task PumpMessages(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker, ReadReq request, CallOptions callOptions) {
var firstMessageRead = false;
var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
var streamsClient = new Streams.Streams.StreamsClient(callInvoker);
try {
using var call = streamsClient.Read(request, callOptions);

await foreach (var response in call.ResponseStream
.ReadAllAsync(_cts.Token)
.WithCancellation(_cts.Token)
.ConfigureAwait(false)) {
if (response is null) {
continue;
}

var message = ConvertResponseToMessage(response);
if (!firstMessageRead) {
firstMessageRead = true;

if (message is not StreamMessage.SubscriptionMessage.SubscriptionConfirmation) {
throw new InvalidOperationException(
$"Subscription to {StreamName} could not be confirmed.");
}

_subscriptionStateInternal = Ok;
}

var messageToWrite = message.IsSubscriptionMessage()
? message
: StreamMessage.Unknown.Instance;
await _internalChannel.Writer.WriteAsync(messageToWrite, _cts.Token).ConfigureAwait(false);

if (messageToWrite is StreamMessage.NotFound) {
_exceptionInternal = new StreamNotFoundException(StreamName);
break;
}
}
} catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled &&
ex.Status.Detail.Contains("Call canceled by the client.")) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
SubscriptionId);
} catch (Exception ex) {
if (ex is ObjectDisposedException or OperationCanceledException) {
_log.LogWarning(
ex,
"Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
SubscriptionId
);
} else {
_exceptionInternal = ex;
}
} finally {
_internalChannel.Writer.Complete();
}
}

private static void Sanitize(ReadReq request) {
if (request.Options.Filter == null) {
request.Options.NoFilter = new Empty();
}

request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()};
}

private static void Validate(ReadReq request) {
if (request.Options.CountOptionCase == ReadReq.Types.Options.CountOptionOneofCase.Count &&
request.Options.Count <= 0) {
throw new ArgumentOutOfRangeException("count");
}

var streamOptions = request.Options.Stream;
var allOptions = request.Options.All;

if (allOptions == null && streamOptions == null) {
throw new ArgumentException("No stream provided to subscribe");
}

if (allOptions != null && streamOptions != null) {
throw new ArgumentException($"Cannot subscribe both ${SystemStreams.AllStream}, and ${streamOptions.StreamIdentifier}");
}
}
}
}
}
33 changes: 33 additions & 0 deletions src/EventStore.Client.Streams/StreamMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,44 @@ public record LastStreamPosition(StreamPosition StreamPosition) : StreamMessage;
/// <param name="Position">The <see cref="EventStore.Client.Position"/>.</param>
public record LastAllStreamPosition(Position Position) : StreamMessage;

/// <summary>
/// The base record of all subscription specific messages.
/// </summary>
public abstract record SubscriptionMessage : StreamMessage {

/// <summary>
/// A <see cref="EventStore.Client.StreamMessage.SubscriptionMessage"/> that represents a subscription confirmation.
/// </summary>
public record SubscriptionConfirmation(string SubscriptionId) : SubscriptionMessage;

/// <summary>
/// A <see cref="EventStore.Client.StreamMessage.SubscriptionMessage"/> representing position reached in subscribed stream. This message will only be received when subscribing to $all stream
/// </summary>
public record Checkpoint(Position Position) : SubscriptionMessage;
}

/// <summary>
/// A <see cref="EventStore.Client.StreamMessage"/> that could not be identified, usually indicating a lower client compatibility level than the server supports.
/// </summary>
public record Unknown : StreamMessage {
internal static readonly Unknown Instance = new();
}


/// <summary>
/// A test method that returns true if this message can be expected to be received when reading from stream; otherwise, this method returns false
/// </summary>
/// <returns></returns>
public bool IsStreamReadMessage() {
return this is not SubscriptionMessage && this is not Ok && this is not Unknown;
}

/// <summary>
/// A test method that returns true if this message can be expected to be received when subscribing to a stream; otherwise, this method returns false
/// </summary>
/// <returns></returns>
public bool IsSubscriptionMessage() {
return this is SubscriptionMessage || this is NotFound || this is Event;
}
}
}
Loading

0 comments on commit 23ad802

Please sign in to comment.