From 21e014bad7fcb79363f981443db423023f8fd7eb Mon Sep 17 00:00:00 2001 From: Rahul Patel Date: Wed, 26 Jul 2023 05:08:39 -0400 Subject: [PATCH] iterator based subscription API --- Protos.cs | 34 +++ .../EventStoreClient.Read.cs | 36 ++- .../EventStoreClient.Subscriptions.cs | 259 ++++++++++++++++ .../StreamMessage.cs | 33 ++ .../SubscriptionState.cs | 19 ++ .../Bugs/Issue_104.cs | 73 ++++- .../Bugs/Issue_2544.cs | 97 +++++- .../Subscriptions/reconnection.cs | 10 +- .../Subscriptions/subscribe_to_all.cs | 285 +++++++++++++++++- .../Subscriptions/subscribe_to_stream.cs | 142 ++++++++- 10 files changed, 950 insertions(+), 38 deletions(-) create mode 100644 Protos.cs create mode 100644 src/EventStore.Client.Streams/SubscriptionState.cs diff --git a/Protos.cs b/Protos.cs new file mode 100644 index 000000000..329e49dd9 --- /dev/null +++ b/Protos.cs @@ -0,0 +1,34 @@ +// +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: src/EventStore.Client.Common/protos +// +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +/// Holder for reflection information generated from src/EventStore.Client.Common/protos +public static partial class ProtosReflection { + + #region Descriptor + /// File descriptor for src/EventStore.Client.Common/protos + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static ProtosReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "CiNzcmMvRXZlbnRTdG9yZS5DbGllbnQuQ29tbW9uL3Byb3Rvcw==")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedClrTypeInfo(null, null, null)); + } + #endregion + +} + +#endregion Designer generated code diff --git a/src/EventStore.Client.Streams/EventStoreClient.Read.cs b/src/EventStore.Client.Streams/EventStoreClient.Read.cs index 25635f26a..1e873ec87 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Read.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Read.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using EventStore.Client.Streams; using Grpc.Core; -using static EventStore.Client.Streams.ReadResp; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; namespace EventStore.Client { @@ -321,18 +320,11 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo } } - await _channel.Writer.WriteAsync(response.ContentCase switch { - StreamNotFound => StreamMessage.NotFound.Instance, - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), - ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition( - new StreamPosition(response.FirstStreamPosition)), - ContentOneofCase.LastStreamPosition => new StreamMessage.LastStreamPosition( - new StreamPosition(response.LastStreamPosition)), - LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( - new Position(response.LastAllStreamPosition.CommitPosition, - response.LastAllStreamPosition.PreparePosition)), - _ => StreamMessage.Unknown.Instance - }, linkedCancellationToken).ConfigureAwait(false); + var messageToWrite = ConvertResponseToMessage(response); + messageToWrite = messageToWrite.IsStreamReadMessage() ? messageToWrite : StreamMessage.Unknown.Instance; + await _channel.Writer + .WriteAsync(messageToWrite, linkedCancellationToken) + .ConfigureAwait(false); } _channel.Writer.Complete(); @@ -413,6 +405,24 @@ private static (SubscriptionConfirmation, Position?, ResolvedEvent)? ConvertToIt _ => null }; + private static StreamMessage ConvertResponseToMessage(ReadResp response) => + response.ContentCase switch { + Checkpoint => new StreamMessage.SubscriptionMessage.Checkpoint( + new Position(response.Checkpoint.CommitPosition, response.Checkpoint.PreparePosition)), + Confirmation => new StreamMessage.SubscriptionMessage.SubscriptionConfirmation(response.Confirmation + .SubscriptionId), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + FirstStreamPosition => new StreamMessage.FirstStreamPosition( + new StreamPosition(response.FirstStreamPosition)), + LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( + new Position(response.LastAllStreamPosition.CommitPosition, + response.LastAllStreamPosition.PreparePosition)), + LastStreamPosition => new StreamMessage.LastStreamPosition( + new StreamPosition(response.LastStreamPosition)), + StreamNotFound => StreamMessage.NotFound.Instance, + _ => StreamMessage.Unknown.Instance + }; + private static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent) => new ResolvedEvent( ConvertToEventRecord(readEvent.Event)!, diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs index 47ea634f1..01d26393b 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs @@ -1,7 +1,12 @@ using System; +using System.Collections.Generic; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using EventStore.Client.Streams; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using static EventStore.Client.SubscriptionState; namespace EventStore.Client { public partial class EventStoreClient { @@ -33,6 +38,31 @@ public Task SubscribeToAllAsync( } }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, filterOptions?.CheckpointReached, cancellationToken); + + /// + /// Subscribes to all events. + /// + /// A (exclusive of) to start the subscription from. + /// Whether to resolve LinkTo events automatically. + /// The optional to apply. + /// The optional user credentials to perform operation with. + /// The optional . + /// An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages + public SubscriptionResult SubscribeToAll( + FromAll start, bool resolveLinkTos = false, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { + return new SubscriptionResult(async _ => { + var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); + return channelInfo.CallInvoker; + }, new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + Filter = GetFilterOptions(filterOptions)! + } + }, Settings, userCredentials, cancellationToken, _log); + } /// /// Subscribes to a stream from a checkpoint. @@ -60,5 +90,234 @@ public Task SubscribeToStreamAsync(string streamName, } }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken); + + /// + /// Subscribes to a stream from a checkpoint. + /// + /// A (exclusive of) to start the subscription from. + /// The name of the stream to read events from. + /// Whether to resolve LinkTo events automatically. + /// The optional user credentials to perform operation with. + /// The optional . + /// An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages + public SubscriptionResult SubscribeToStream(string streamName, + FromStream start, bool resolveLinkTos = false, + UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { + return new SubscriptionResult(async _ => { + var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); + return channelInfo.CallInvoker; + }, new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + } + }, Settings, userCredentials, cancellationToken, _log); + } + + + + /// + /// A class which represents current subscription state and an enumerator to consume messages + /// + public class SubscriptionResult { + private readonly Channel _internalChannel; + private readonly CancellationTokenSource _cts; + private int _messagesEnumerated; + private ILogger _log; + /// + /// The name of the stream. + /// + public string StreamName { get; } + + /// + /// + /// + public Position StreamPosition { get; private set; } + + /// + /// Represents subscription ID for the current subscription + /// + public string? SubscriptionId { get; private set; } + + /// + /// Current subscription state + /// + + public SubscriptionState SubscriptionState { + get { + if (_exceptionInternal is not null) { + throw _exceptionInternal; + } + + return _subscriptionStateInternal; + } + } + + private volatile SubscriptionState _subscriptionStateInternal; + + private volatile Exception? _exceptionInternal; + + /// + /// An . Do not enumerate more than once. + /// + public IAsyncEnumerable Messages { + get { + return GetMessages(); + + async IAsyncEnumerable GetMessages() { + if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) { + throw new InvalidOperationException("Messages may only be enumerated once."); + } + + try { + await foreach (var message in _internalChannel.Reader.ReadAllAsync() + .ConfigureAwait(false)) { + if (!message.IsSubscriptionMessage()) { + continue; + } + + switch (message) { + case StreamMessage.SubscriptionMessage.SubscriptionConfirmation(var + subscriptionId): + SubscriptionId = subscriptionId; + continue; + case StreamMessage.SubscriptionMessage.Checkpoint(var position): + StreamPosition = position; + break; + } + + yield return message; + } + } finally { + Dispose(); + } + } + } + } + + /// + /// Terminates subscription + /// + public void Dispose() { + if (_subscriptionStateInternal == Disposed) { + return; + } + _subscriptionStateInternal = Disposed; + _cts.Cancel(); + } + + internal SubscriptionResult(Func> selectCallInvoker, ReadReq request, + EventStoreClientSettings settings, UserCredentials? userCredentials, + CancellationToken cancellationToken, ILogger log) { + Sanitize(request); + Validate(request); + + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + var callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials, + cancellationToken: _cts.Token); + + _internalChannel = Channel.CreateBounded(new BoundedChannelOptions(1) { + SingleReader = true, + SingleWriter = true, + AllowSynchronousContinuations = true + }); + + _log = log; + + StreamName = request.Options.All != null + ? SystemStreams.AllStream + : request.Options.Stream.StreamIdentifier!; + + _subscriptionStateInternal = Initializing; + + _ = PumpMessages(selectCallInvoker, request, callOptions); + } + + async Task PumpMessages(Func> selectCallInvoker, ReadReq request, CallOptions callOptions) { + var firstMessageRead = false; + var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false); + var streamsClient = new Streams.Streams.StreamsClient(callInvoker); + try { + using var call = streamsClient.Read(request, callOptions); + + await foreach (var response in call.ResponseStream + .ReadAllAsync(_cts.Token) + .WithCancellation(_cts.Token) + .ConfigureAwait(false)) { + if (response is null) { + continue; + } + + var message = ConvertResponseToMessage(response); + if (!firstMessageRead) { + firstMessageRead = true; + + if (message is not StreamMessage.SubscriptionMessage.SubscriptionConfirmation) { + throw new InvalidOperationException( + $"Subscription to {StreamName} could not be confirmed."); + } + + _subscriptionStateInternal = Ok; + } + + var messageToWrite = message.IsSubscriptionMessage() + ? message + : StreamMessage.Unknown.Instance; + await _internalChannel.Writer.WriteAsync(messageToWrite, _cts.Token).ConfigureAwait(false); + + if (messageToWrite is StreamMessage.NotFound) { + _exceptionInternal = new StreamNotFoundException(StreamName); + break; + } + } + } catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled && + ex.Status.Detail.Contains("Call canceled by the client.")) { + _log.LogInformation( + "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", + SubscriptionId); + } catch (Exception ex) { + if (ex is ObjectDisposedException or OperationCanceledException) { + _log.LogWarning( + ex, + "Subscription {subscriptionId} was dropped because cancellation was requested by another caller.", + SubscriptionId + ); + } else { + _exceptionInternal = ex; + } + } finally { + _internalChannel.Writer.Complete(); + } + } + + private static void Sanitize(ReadReq request) { + if (request.Options.Filter == null) { + request.Options.NoFilter = new Empty(); + } + + request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()}; + } + + private static void Validate(ReadReq request) { + if (request.Options.CountOptionCase == ReadReq.Types.Options.CountOptionOneofCase.Count && + request.Options.Count <= 0) { + throw new ArgumentOutOfRangeException("count"); + } + + var streamOptions = request.Options.Stream; + var allOptions = request.Options.All; + + if (allOptions == null && streamOptions == null) { + throw new ArgumentException("No stream provided to subscribe"); + } + + if (allOptions != null && streamOptions != null) { + throw new ArgumentException($"Cannot subscribe both ${SystemStreams.AllStream}, and ${streamOptions.StreamIdentifier}"); + } + } + } } } diff --git a/src/EventStore.Client.Streams/StreamMessage.cs b/src/EventStore.Client.Streams/StreamMessage.cs index f6b564e71..f26a19e9c 100644 --- a/src/EventStore.Client.Streams/StreamMessage.cs +++ b/src/EventStore.Client.Streams/StreamMessage.cs @@ -41,11 +41,44 @@ public record LastStreamPosition(StreamPosition StreamPosition) : StreamMessage; /// The . public record LastAllStreamPosition(Position Position) : StreamMessage; + /// + /// The base record of all subscription specific messages. + /// + public abstract record SubscriptionMessage : StreamMessage { + + /// + /// A that represents a subscription confirmation. + /// + public record SubscriptionConfirmation(string SubscriptionId) : SubscriptionMessage; + + /// + /// A representing position reached in subscribed stream. This message will only be received when subscribing to $all stream + /// + public record Checkpoint(Position Position) : SubscriptionMessage; + } + /// /// A that could not be identified, usually indicating a lower client compatibility level than the server supports. /// public record Unknown : StreamMessage { internal static readonly Unknown Instance = new(); } + + + /// + /// A test method that returns true if this message can be expected to be received when reading from stream; otherwise, this method returns false + /// + /// + public bool IsStreamReadMessage() { + return this is not SubscriptionMessage && this is not Ok && this is not Unknown; + } + + /// + /// A test method that returns true if this message can be expected to be received when subscribing to a stream; otherwise, this method returns false + /// + /// + public bool IsSubscriptionMessage() { + return this is SubscriptionMessage || this is NotFound || this is Event; + } } } diff --git a/src/EventStore.Client.Streams/SubscriptionState.cs b/src/EventStore.Client.Streams/SubscriptionState.cs new file mode 100644 index 000000000..8d167887d --- /dev/null +++ b/src/EventStore.Client.Streams/SubscriptionState.cs @@ -0,0 +1,19 @@ +namespace EventStore.Client { + /// + /// An enumeration representing the state of a subscription. + /// + public enum SubscriptionState { + /// + /// Subscription is initializing + /// + Initializing = 0, + /// + /// Subscription has been successfully established + /// + Ok, + /// + /// Subscription has been terminated + /// + Disposed + } +} diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs index 67815004d..9f0f07e22 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs @@ -1,11 +1,11 @@ namespace EventStore.Client.Streams.Tests.Bugs; [Trait("Category", "Bug")] -public class Issue_104(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) { +public class Issue_104(ITestOutputHelper output, EventStoreFixture fixture) : EventStoreTests(output, fixture) { [Fact] - public async Task subscription_does_not_send_checkpoint_reached_after_disposal() { - var streamName = Fixture.GetStreamName(); - var ignoredStreamName = $"ignore_{streamName}"; + public async Task Callback_API_subscription_does_not_send_checkpoint_reached_after_disposal() { + var streamName = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var ignoredStreamName = $"ignore_{streamName}_{Guid.NewGuid()}"; var subscriptionDisposed = new TaskCompletionSource(); var eventAppeared = new TaskCompletionSource(); var checkpointReachAfterDisposed = new TaskCompletionSource(); @@ -49,4 +49,67 @@ await Fixture.Streams.AppendToStreamAsync( var result = await Task.WhenAny(delay, checkpointReachAfterDisposed.Task); result.ShouldBe(delay); // iow 300ms have passed without seeing checkpointReachAfterDisposed } -} \ No newline at end of file + + [Fact] + public async Task Iterator_API_subscription_does_not_send_checkpoint_reached_after_disposal() { + var streamName = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var ignoredStreamName = $"ignore_{streamName}_{Guid.NewGuid()}"; + var subscriptionDisposed = new TaskCompletionSource(); + var eventAppeared = new TaskCompletionSource(); + var checkpointReachAfterDisposed = new TaskCompletionSource(); + + await Fixture.Streams.AppendToStreamAsync(streamName, StreamRevision.None, Fixture.CreateTestEvents()); + + var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start,false, new SubscriptionFilterOptions(StreamFilter.Prefix(streamName))); + + ReadMessages(subscription, _ => { + eventAppeared.TrySetResult(true); + return Task.CompletedTask; + }, _ => subscriptionDisposed.TrySetResult(true), _ => { + if (!subscriptionDisposed.Task.IsCompleted) { + return Task.CompletedTask; + } + + checkpointReachAfterDisposed.TrySetResult(true); + return Task.CompletedTask; + }); + + await eventAppeared.Task; + + subscription.Dispose(); + await subscriptionDisposed.Task; + + await Fixture.Streams.AppendToStreamAsync(ignoredStreamName, StreamRevision.None, + Fixture.CreateTestEvents(50)); + + var delay = Task.Delay(300); + var result = await Task.WhenAny(delay, checkpointReachAfterDisposed.Task); + Assert.Equal(delay, result); // iow 300ms have passed without seeing checkpointReachAfterDisposed + } + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped, Func checkpointReached) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } else if (message is StreamMessage.SubscriptionMessage.Checkpoint checkpointMessage) { + await checkpointReached(checkpointMessage.Position); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } + } +} diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs index 26f9dbd83..a031faa3a 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs @@ -27,8 +27,8 @@ public Issue_2544(ITestOutputHelper output, EventStoreFixture fixture) { [Theory] [MemberData(nameof(TestCases))] - public async Task subscribe_to_stream(int iteration) { - var streamName = $"{Fixture.GetStreamName()}_{iteration}"; + public async Task Callback_subscribe_to_stream(int iteration) { + var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; var startFrom = FromStream.Start; async Task Subscribe() => @@ -48,10 +48,37 @@ await Fixture.Streams await _completed.Task.WithTimeout(); } + [Theory, MemberData(nameof(TestCases))] + public async Task Iterator_subscribe_to_stream(int iteration) { + var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; + var startFrom = FromStream.Start; + + var subscriptionResult = Fixture.Streams.SubscribeToStream(streamName, startFrom, resolveLinkTos: false); + + await AppendEvents(streamName); + + await foreach (var message in subscriptionResult.Messages) { + if (message is not StreamMessage.Event @event) continue; + var e = @event.ResolvedEvent; + if (e.OriginalStreamId != streamName) { + continue; + } + + if (_seen[e.Event.EventNumber]) { + throw new Exception($"Event {e.Event.EventNumber} was already seen"); + } + + _seen[e.Event.EventNumber] = true; + if (e.Event.EventType == "completed") { + break; + } + } + } + [Theory] [MemberData(nameof(TestCases))] - public async Task subscribe_to_all(int iteration) { - var streamName = $"{Fixture.GetStreamName()}_{iteration}"; + public async Task Callback_subscribe_to_all(int iteration) { + var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; var startFrom = FromAll.Start; async Task Subscribe() => @@ -70,10 +97,37 @@ await Fixture.Streams await _completed.Task.WithTimeout(); } + [Theory, MemberData(nameof(TestCases))] + public async Task Iterator_subscribe_to_all(int iteration) { + var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; + var startFrom = FromAll.Start; + + var subscriptionResult = Fixture.Streams.SubscribeToAll(startFrom, resolveLinkTos: false); + + await AppendEvents(streamName); + + await foreach (var message in subscriptionResult.Messages) { + if (message is not StreamMessage.Event @event) continue; + var e = @event.ResolvedEvent; + if (e.OriginalStreamId != streamName) { + continue; + } + + if (_seen[e.Event.EventNumber]) { + throw new Exception($"Event {e.Event.EventNumber} was already seen"); + } + + _seen[e.Event.EventNumber] = true; + if (e.Event.EventType == "completed") { + break; + } + } + } + [Theory] [MemberData(nameof(TestCases))] - public async Task subscribe_to_all_filtered(int iteration) { - var streamName = $"{Fixture.GetStreamName()}_{iteration}"; + public async Task Callback_subscribe_to_all_filtered(int iteration) { + var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; var startFrom = FromAll.Start; async Task Subscribe() => @@ -93,6 +147,35 @@ await Fixture.Streams await _completed.Task.WithTimeout(); } + [Theory, MemberData(nameof(TestCases))] + public async Task Iterator_subscribe_to_all_filtered(int iteration) { + var streamName = $"{Fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; + var startFrom = FromAll.Start; + + var subscriptionResult = Fixture.Streams + .SubscribeToAll(startFrom, resolveLinkTos: false, + new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents())); + + await AppendEvents(streamName); + + await foreach (var message in subscriptionResult.Messages) { + if (message is not StreamMessage.Event @event) continue; + var e = @event.ResolvedEvent; + if (e.OriginalStreamId != streamName) { + continue; + } + + if (_seen[e.Event.EventNumber]) { + throw new Exception($"Event {e.Event.EventNumber} was already seen"); + } + + _seen[e.Event.EventNumber] = true; + if (e.Event.EventType == "completed") { + break; + } + } + } + async Task AppendEvents(string streamName) { await Task.Delay(TimeSpan.FromMilliseconds(10)); @@ -168,4 +251,4 @@ Task EventAppeared(ResolvedEvent e, string streamName) { return Task.CompletedTask; } -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs index df9162130..5362c7870 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/reconnection.cs @@ -7,7 +7,7 @@ namespace EventStore.Client.Streams.Tests.Subscriptions; public class @reconnection(ITestOutputHelper output, ReconnectionFixture fixture) : EventStoreTests(output, fixture) { [Theory] [InlineData(4, 1000, 0, 15000)] - public async Task when_the_connection_is_lost(int expectedNumberOfEvents, int reconnectDelayMs, int serviceRestartDelayMs, int testTimeoutMs) { + public async Task Callback_when_the_connection_is_lost(int expectedNumberOfEvents, int reconnectDelayMs, int serviceRestartDelayMs, int testTimeoutMs) { using var cancellator = new CancellationTokenSource().With(x => x.CancelAfter(testTimeoutMs)); var streamName = Fixture.GetStreamName(); @@ -84,9 +84,9 @@ public Task ConsumeEvents( CancellationToken cancellationToken ) { var receivedAllEvents = new TaskCompletionSource(); - + var receivedEventsCount = 0; - + _ = SubscribeToStream( streamName, checkpoint: null, @@ -106,7 +106,7 @@ Func OnReceive() { Log.Information("Test complete. {ReceivedEventsCount}/{ExpectedNumberOfEvents} events received.", receivedEventsCount, expectedNumberOfEvents); receivedAllEvents.TrySetResult(); } - + return Task.CompletedTask; }; } @@ -173,4 +173,4 @@ CancellationToken cancellationToken if (resubscribe) _ = SubscribeToStream(stream, checkpoint, onReceive, onDrop, cancellationToken); } } -} \ No newline at end of file +} diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs index 9ea4add2b..0851074dd 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_all.cs @@ -546,7 +546,7 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti } [Fact] - public async Task drops_when_disposed() { + public async Task Callback_drops_when_disposed() { var subscriptionDropped = new TaskCompletionSource(); using var subscription = await Fixture.Streams @@ -569,7 +569,7 @@ public async Task drops_when_disposed() { } [Fact] - public async Task drops_when_subscriber_error() { + public async Task Callback_drops_when_subscriber_error() { var expectedResult = SubscriptionDroppedResult.SubscriberError(); var subscriptionDropped = new TaskCompletionSource(); @@ -588,4 +588,283 @@ public async Task drops_when_subscriber_error() { var result = await subscriptionDropped.Task.WithTimeout(); result.ShouldBe(expectedResult); } -} \ No newline at end of file + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { + var dropped = new TaskCompletionSource(); + + var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); + var testEvent = Fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await Fixture.Streams.AppendToStreamAsync($"test-{Guid.NewGuid()}", StreamState.NoStream, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + [Fact] + public async Task Callback_calls_subscription_dropped_when_error_processing_event() { + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); + var expectedException = new Exception("Error"); + + using var subscription = await Fixture.Streams.SubscribeToAllAsync(FromAll.Start, + EventAppeared, false, SubscriptionDropped) + .WithTimeout(); + + await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents()); + + var (reason, ex) = await dropped.Task.WithTimeout(); + + Assert.Equal(SubscriptionDroppedReason.SubscriberError, reason); + Assert.Same(expectedException, ex); + + Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) => + Task.FromException(expectedException); + + void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => + dropped.SetResult((reason, ex)); + } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + var expectedException = new Exception("Error"); + int numTimesCalled = 0; + + var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents(2)); + + var ex = await dropped.Task.WithTimeout(); + Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); + + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + [Fact] + public async Task Callback_subscribe_to_empty_database() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); + + using var subscription = await Fixture.Streams.SubscribeToAllAsync(FromAll.Start, + EventAppeared, false, SubscriptionDropped) + .WithTimeout(); + + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result.ToString()); + } + + subscription.Dispose(); + + var (reason, ex) = await dropped.Task.WithTimeout(); + + Assert.Equal(SubscriptionDroppedReason.Disposed, reason); + Assert.Null(ex); + + Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { + if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => + dropped.SetResult((reason, ex)); + } + + [Fact] + public async Task Iterator_subscribe_to_empty_database() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + [Fact] + public async Task Callback_reads_all_existing_events_and_keep_listening_to_new_ones() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); + var appearedEvents = new List(); + var beforeEvents = Fixture.CreateTestEvents(10).ToArray(); + var afterEvents = Fixture.CreateTestEvents(10).ToArray(); + + var allStreams = new List(); + + foreach (var @event in beforeEvents.Concat((afterEvents))) { + allStreams.Add($"stream-{@event.EventId:n}"); + } + + foreach (var @event in beforeEvents) { + await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + using var subscription = await Fixture.Streams.SubscribeToAllAsync(FromAll.Start, + EventAppeared, false, SubscriptionDropped) + .WithTimeout(); + + foreach (var @event in afterEvents) { + await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + await appeared.Task.WithTimeout(); + + Assert.Equal(beforeEvents.Concat(afterEvents).Select(x => x.EventId), + appearedEvents.Select(x => x.EventId)); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result.ToString()); + } + + subscription.Dispose(); + + var (reason, ex) = await dropped.Task.WithTimeout(); + + Assert.Equal(SubscriptionDroppedReason.Disposed, reason); + Assert.Null(ex); + + Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { + if (allStreams.Contains(e.OriginalStreamId)) { + appearedEvents.Add(e.Event); + + if (appearedEvents.Count >= beforeEvents.Length + afterEvents.Length) { + appeared.TrySetResult(true); + } + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => + dropped.SetResult((reason, ex)); + } + + [Fact] + public async Task Iterator_reads_all_existing_events_and_keep_listening_to_new_ones() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + var appearedEvents = new List(); + var beforeEvents = Fixture.CreateTestEvents(10).ToArray(); + var afterEvents = Fixture.CreateTestEvents(10).ToArray(); + + var allStreams = new List(); + + foreach (var @event in beforeEvents.Concat((afterEvents))) { + allStreams.Add($"stream-{@event.EventId:n}"); + } + + foreach (var @event in beforeEvents) { + await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + var subscription = Fixture.Streams.SubscribeToAll(FromAll.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + foreach (var @event in afterEvents) { + await Fixture.Streams.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] { @event }); + } + + await appeared.Task.WithTimeout(); + + Assert.Equal(beforeEvents.Concat(afterEvents).Select(x => x.EventId), + appearedEvents.Select(x => x.EventId)); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (allStreams.Contains(e.OriginalStreamId)) { + appearedEvents.Add(e.Event); + + if (appearedEvents.Count >= beforeEvents.Length + afterEvents.Length) { + appeared.TrySetResult(true); + } + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } + } +} diff --git a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs index ca5dc122e..edc40561c 100644 --- a/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Subscriptions/subscribe_to_stream.cs @@ -103,7 +103,7 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti } [Fact] - public async Task receives_all_events_from_non_existing_stream() { + public async Task Callback_receives_all_events_from_non_existing_stream() { var streamName = Fixture.GetStreamName(); var receivedAllEvents = new TaskCompletionSource(); @@ -148,7 +148,35 @@ void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Excepti } [Fact] - public async Task allow_multiple_subscriptions_to_same_stream() { + public async Task Iterator_receives_all_events_from_non_existing_stream() { + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + appeared.TrySetResult(true); + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + [Fact] + public async Task Callback_allow_multiple_subscriptions_to_same_stream() { var streamName = Fixture.GetStreamName(); var receivedAllEvents = new TaskCompletionSource(); @@ -180,7 +208,34 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) } [Fact] - public async Task drops_when_disposed() { + public async Task Iterator_allow_multiple_subscriptions_to_same_stream() { + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + + var appeared = new TaskCompletionSource(); + + int appearedCount = 0; + + await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents()); + + var s1 = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); + ReadMessages(s1, EventAppeared, null); + + var s2 = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); + ReadMessages(s2, EventAppeared, null); + + Assert.True(await appeared.Task.WithTimeout()); + + Task EventAppeared(ResolvedEvent e) { + if (++appearedCount == 2) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + } + + [Fact] + public async Task Callback_drops_when_disposed() { var streamName = Fixture.GetStreamName(); var subscriptionDropped = new TaskCompletionSource(); @@ -206,7 +261,35 @@ public async Task drops_when_disposed() { } [Fact] - public async Task drops_when_subscriber_error() { + public async Task Iterator_drops_when_disposed() { + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + + var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); + var testEvent = Fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await Fixture.Streams.AppendToStreamAsync(stream, StreamState.Any, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + [Fact] + public async Task Callback_drops_when_subscriber_error() { var streamName = Fixture.GetStreamName(); var expectedResult = SubscriptionDroppedResult.SubscriberError(); @@ -229,6 +312,31 @@ public async Task drops_when_subscriber_error() { result.ShouldBe(expectedResult); } + [Fact] + public async Task Iterator_drops_when_subscriber_error() { + var stream = $"{Fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + var expectedException = new Exception("Error"); + int numTimesCalled = 0; + + var subscription = Fixture.Streams.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await Fixture.Streams.AppendToStreamAsync(stream, StreamState.NoStream, Fixture.CreateTestEvents(2)); + + var ex = await dropped.Task.WithTimeout(); + Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); + + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + [Fact] public async Task drops_when_stream_tombstoned() { var streamName = Fixture.GetStreamName(); @@ -299,4 +407,28 @@ Task OnReceived(StreamSubscription sub, ResolvedEvent re, CancellationToken ct) void OnDropped(StreamSubscription sub, SubscriptionDroppedReason reason, Exception? ex) => subscriptionDropped.SetResult(new(reason, ex)); } -} \ No newline at end of file + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped?.Invoke(exception); + } catch (Exception ex) { + subscriptionDropped?.Invoke(ex); + } + } +}