Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterator based subscription API (DEV-112) #252

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Grpc.Core;
using static EventStore.Client.Streams.ReadResp;
using static EventStore.Client.Streams.ReadResp.ContentOneofCase;

namespace EventStore.Client {
Expand Down Expand Up @@ -321,18 +320,11 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo
}
}

await _channel.Writer.WriteAsync(response.ContentCase switch {
StreamNotFound => StreamMessage.NotFound.Instance,
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)),
ContentOneofCase.LastStreamPosition => new StreamMessage.LastStreamPosition(
new StreamPosition(response.LastStreamPosition)),
LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
new Position(response.LastAllStreamPosition.CommitPosition,
response.LastAllStreamPosition.PreparePosition)),
_ => StreamMessage.Unknown.Instance
}, linkedCancellationToken).ConfigureAwait(false);
var messageToWrite = ConvertResponseToMessage(response);
messageToWrite = messageToWrite.IsStreamReadMessage() ? messageToWrite : StreamMessage.Unknown.Instance;
await _channel.Writer
.WriteAsync(messageToWrite, linkedCancellationToken)
.ConfigureAwait(false);
}

_channel.Writer.Complete();
Expand Down Expand Up @@ -413,6 +405,24 @@ private static (SubscriptionConfirmation, Position?, ResolvedEvent)? ConvertToIt
_ => null
};

private static StreamMessage ConvertResponseToMessage(ReadResp response) =>
response.ContentCase switch {
Checkpoint => new StreamMessage.SubscriptionMessage.Checkpoint(
new Position(response.Checkpoint.CommitPosition, response.Checkpoint.PreparePosition)),
Confirmation => new StreamMessage.SubscriptionMessage.SubscriptionConfirmation(response.Confirmation
.SubscriptionId),
Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)),
FirstStreamPosition => new StreamMessage.FirstStreamPosition(
new StreamPosition(response.FirstStreamPosition)),
LastAllStreamPosition => new StreamMessage.LastAllStreamPosition(
new Position(response.LastAllStreamPosition.CommitPosition,
response.LastAllStreamPosition.PreparePosition)),
LastStreamPosition => new StreamMessage.LastStreamPosition(
new StreamPosition(response.LastStreamPosition)),
StreamNotFound => StreamMessage.NotFound.Instance,
_ => StreamMessage.Unknown.Instance
};

private static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent) =>
new ResolvedEvent(
ConvertToEventRecord(readEvent.Event)!,
Expand Down
259 changes: 259 additions & 0 deletions src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using static EventStore.Client.SubscriptionState;

namespace EventStore.Client {
public partial class EventStoreClient {
Expand Down Expand Up @@ -33,6 +38,31 @@ public Task<StreamSubscription> SubscribeToAllAsync(
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
filterOptions?.CheckpointReached, cancellationToken);

/// <summary>
/// Subscribes to all events.
/// </summary>
/// <param name="start">A <see cref="FromAll"/> (exclusive of) to start the subscription from.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="filterOptions">The optional <see cref="SubscriptionFilterOptions"/> to apply.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns>An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages</returns>
public SubscriptionResult SubscribeToAll(
FromAll start, bool resolveLinkTos = false, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) {
return new SubscriptionResult(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
Filter = GetFilterOptions(filterOptions)!
}
}, Settings, userCredentials, cancellationToken, _log);
}

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
Expand Down Expand Up @@ -60,5 +90,234 @@ public Task<StreamSubscription> SubscribeToStreamAsync(string streamName,
}
}, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log,
cancellationToken: cancellationToken);

/// <summary>
/// Subscribes to a stream from a <see cref="StreamPosition">checkpoint</see>.
/// </summary>
/// <param name="start">A <see cref="FromStream"/> (exclusive of) to start the subscription from.</param>
/// <param name="streamName">The name of the stream to read events from.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="userCredentials">The optional user credentials to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns>An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages</returns>
public SubscriptionResult SubscribeToStream(string streamName,
FromStream start, bool resolveLinkTos = false,
UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) {
return new SubscriptionResult(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, new ReadReq {
Options = new ReadReq.Types.Options {
ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards,
ResolveLinks = resolveLinkTos,
Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start),
Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(),
}
}, Settings, userCredentials, cancellationToken, _log);
}



/// <summary>
/// A class which represents current subscription state and an enumerator to consume messages
/// </summary>
public class SubscriptionResult {
private readonly Channel<StreamMessage> _internalChannel;
private readonly CancellationTokenSource _cts;
private int _messagesEnumerated;
private ILogger _log;
/// <summary>
/// The name of the stream.
/// </summary>
public string StreamName { get; }

/// <summary>
///
/// </summary>
public Position StreamPosition { get; private set; }

/// <summary>
/// Represents subscription ID for the current subscription
/// </summary>
public string? SubscriptionId { get; private set; }

/// <summary>
/// Current subscription state
/// </summary>

public SubscriptionState SubscriptionState {
get {
if (_exceptionInternal is not null) {
throw _exceptionInternal;
}

return _subscriptionStateInternal;
}
}

private volatile SubscriptionState _subscriptionStateInternal;

private volatile Exception? _exceptionInternal;

/// <summary>
/// An <see cref="IAsyncEnumerable{StreamMessage}"/>. Do not enumerate more than once.
/// </summary>
public IAsyncEnumerable<StreamMessage> Messages {
get {
return GetMessages();

async IAsyncEnumerable<StreamMessage> GetMessages() {
if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) {
throw new InvalidOperationException("Messages may only be enumerated once.");
}

try {
await foreach (var message in _internalChannel.Reader.ReadAllAsync()
.ConfigureAwait(false)) {
if (!message.IsSubscriptionMessage()) {
continue;
}

switch (message) {
case StreamMessage.SubscriptionMessage.SubscriptionConfirmation(var
subscriptionId):
SubscriptionId = subscriptionId;
continue;
case StreamMessage.SubscriptionMessage.Checkpoint(var position):
StreamPosition = position;
break;
}

yield return message;
}
} finally {
Dispose();
}
}
}
}

/// <summary>
/// Terminates subscription
/// </summary>
public void Dispose() {
if (_subscriptionStateInternal == Disposed) {
return;
}
_subscriptionStateInternal = Disposed;
_cts.Cancel();
}

internal SubscriptionResult(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker, ReadReq request,
EventStoreClientSettings settings, UserCredentials? userCredentials,
CancellationToken cancellationToken, ILogger log) {
Sanitize(request);
Validate(request);

_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

var callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials,
cancellationToken: _cts.Token);

_internalChannel = Channel.CreateBounded<StreamMessage>(new BoundedChannelOptions(1) {
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = true
});

_log = log;

StreamName = request.Options.All != null
? SystemStreams.AllStream
: request.Options.Stream.StreamIdentifier!;

_subscriptionStateInternal = Initializing;

_ = PumpMessages(selectCallInvoker, request, callOptions);
}

async Task PumpMessages(Func<CancellationToken, Task<CallInvoker>> selectCallInvoker, ReadReq request, CallOptions callOptions) {
var firstMessageRead = false;
var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false);
var streamsClient = new Streams.Streams.StreamsClient(callInvoker);
try {
using var call = streamsClient.Read(request, callOptions);

await foreach (var response in call.ResponseStream
.ReadAllAsync(_cts.Token)
.WithCancellation(_cts.Token)
.ConfigureAwait(false)) {
if (response is null) {
continue;
}

var message = ConvertResponseToMessage(response);
if (!firstMessageRead) {
firstMessageRead = true;

if (message is not StreamMessage.SubscriptionMessage.SubscriptionConfirmation) {
throw new InvalidOperationException(
$"Subscription to {StreamName} could not be confirmed.");
}

_subscriptionStateInternal = Ok;
}

var messageToWrite = message.IsSubscriptionMessage()
? message
: StreamMessage.Unknown.Instance;
await _internalChannel.Writer.WriteAsync(messageToWrite, _cts.Token).ConfigureAwait(false);

if (messageToWrite is StreamMessage.NotFound) {
_exceptionInternal = new StreamNotFoundException(StreamName);
break;
}
}
} catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled &&
ex.Status.Detail.Contains("Call canceled by the client.")) {
_log.LogInformation(
"Subscription {subscriptionId} was dropped because cancellation was requested by the client.",
SubscriptionId);
} catch (Exception ex) {
if (ex is ObjectDisposedException or OperationCanceledException) {
_log.LogWarning(
ex,
"Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
SubscriptionId
);
} else {
_exceptionInternal = ex;
}
} finally {
_internalChannel.Writer.Complete();
}
}

private static void Sanitize(ReadReq request) {
if (request.Options.Filter == null) {
request.Options.NoFilter = new Empty();
}

request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()};
}

private static void Validate(ReadReq request) {
if (request.Options.CountOptionCase == ReadReq.Types.Options.CountOptionOneofCase.Count &&
request.Options.Count <= 0) {
throw new ArgumentOutOfRangeException("count");
}

var streamOptions = request.Options.Stream;
var allOptions = request.Options.All;

if (allOptions == null && streamOptions == null) {
throw new ArgumentException("No stream provided to subscribe");
}

if (allOptions != null && streamOptions != null) {
throw new ArgumentException($"Cannot subscribe both ${SystemStreams.AllStream}, and ${streamOptions.StreamIdentifier}");
}
}
}
}
}
33 changes: 33 additions & 0 deletions src/EventStore.Client.Streams/StreamMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,44 @@ public record LastStreamPosition(StreamPosition StreamPosition) : StreamMessage;
/// <param name="Position">The <see cref="EventStore.Client.Position"/>.</param>
public record LastAllStreamPosition(Position Position) : StreamMessage;

/// <summary>
/// The base record of all subscription specific messages.
/// </summary>
public abstract record SubscriptionMessage : StreamMessage {

/// <summary>
/// A <see cref="EventStore.Client.StreamMessage.SubscriptionMessage"/> that represents a subscription confirmation.
/// </summary>
public record SubscriptionConfirmation(string SubscriptionId) : SubscriptionMessage;

/// <summary>
/// A <see cref="EventStore.Client.StreamMessage.SubscriptionMessage"/> representing position reached in subscribed stream. This message will only be received when subscribing to $all stream
/// </summary>
public record Checkpoint(Position Position) : SubscriptionMessage;
}

/// <summary>
/// A <see cref="EventStore.Client.StreamMessage"/> that could not be identified, usually indicating a lower client compatibility level than the server supports.
/// </summary>
public record Unknown : StreamMessage {
internal static readonly Unknown Instance = new();
}


/// <summary>
/// A test method that returns true if this message can be expected to be received when reading from stream; otherwise, this method returns false
/// </summary>
/// <returns></returns>
public bool IsStreamReadMessage() {
return this is not SubscriptionMessage && this is not Ok && this is not Unknown;
}

/// <summary>
/// A test method that returns true if this message can be expected to be received when subscribing to a stream; otherwise, this method returns false
/// </summary>
/// <returns></returns>
public bool IsSubscriptionMessage() {
return this is SubscriptionMessage || this is NotFound || this is Event;
}
}
}
Loading
Loading