From c82e4bc698e39d1bcde25e98a9b39071d128c6d6 Mon Sep 17 00:00:00 2001 From: Rahul Patel Date: Wed, 26 Jul 2023 05:08:39 -0400 Subject: [PATCH] iterator based subscription API --- .../EventStoreClient.Read.cs | 36 ++- .../EventStoreClient.Subscriptions.cs | 259 ++++++++++++++++ .../StreamMessage.cs | 33 ++ .../SubscriptionState.cs | 19 ++ .../Bugs/Issue_104.cs | 70 ++++- .../Bugs/Issue_2544.cs | 95 +++++- .../reconnection.cs | 4 +- .../subscribe_resolve_link_to.cs | 201 ++++++++++++- .../subscribe_to_all.cs | 181 ++++++++++- ...subscribe_to_all_filtered_with_position.cs | 101 ++++++- .../subscribe_to_all_live.cs | 191 +++++++++++- .../subscribe_to_all_with_position.cs | 211 ++++++++++++- .../subscribe_to_stream.cs | 256 +++++++++++++++- .../subscribe_to_stream_live.cs | 158 +++++++++- .../subscribe_to_stream_with_revision.cs | 282 ++++++++++++++++-- 15 files changed, 1978 insertions(+), 119 deletions(-) create mode 100644 src/EventStore.Client.Streams/SubscriptionState.cs diff --git a/src/EventStore.Client.Streams/EventStoreClient.Read.cs b/src/EventStore.Client.Streams/EventStoreClient.Read.cs index 25635f26a..1e873ec87 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Read.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Read.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using EventStore.Client.Streams; using Grpc.Core; -using static EventStore.Client.Streams.ReadResp; using static EventStore.Client.Streams.ReadResp.ContentOneofCase; namespace EventStore.Client { @@ -321,18 +320,11 @@ await _channel.Writer.WriteAsync(StreamMessage.Ok.Instance, linkedCancellationTo } } - await _channel.Writer.WriteAsync(response.ContentCase switch { - StreamNotFound => StreamMessage.NotFound.Instance, - Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), - ContentOneofCase.FirstStreamPosition => new StreamMessage.FirstStreamPosition( - new StreamPosition(response.FirstStreamPosition)), - ContentOneofCase.LastStreamPosition => new StreamMessage.LastStreamPosition( - new StreamPosition(response.LastStreamPosition)), - LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( - new Position(response.LastAllStreamPosition.CommitPosition, - response.LastAllStreamPosition.PreparePosition)), - _ => StreamMessage.Unknown.Instance - }, linkedCancellationToken).ConfigureAwait(false); + var messageToWrite = ConvertResponseToMessage(response); + messageToWrite = messageToWrite.IsStreamReadMessage() ? messageToWrite : StreamMessage.Unknown.Instance; + await _channel.Writer + .WriteAsync(messageToWrite, linkedCancellationToken) + .ConfigureAwait(false); } _channel.Writer.Complete(); @@ -413,6 +405,24 @@ private static (SubscriptionConfirmation, Position?, ResolvedEvent)? ConvertToIt _ => null }; + private static StreamMessage ConvertResponseToMessage(ReadResp response) => + response.ContentCase switch { + Checkpoint => new StreamMessage.SubscriptionMessage.Checkpoint( + new Position(response.Checkpoint.CommitPosition, response.Checkpoint.PreparePosition)), + Confirmation => new StreamMessage.SubscriptionMessage.SubscriptionConfirmation(response.Confirmation + .SubscriptionId), + Event => new StreamMessage.Event(ConvertToResolvedEvent(response.Event)), + FirstStreamPosition => new StreamMessage.FirstStreamPosition( + new StreamPosition(response.FirstStreamPosition)), + LastAllStreamPosition => new StreamMessage.LastAllStreamPosition( + new Position(response.LastAllStreamPosition.CommitPosition, + response.LastAllStreamPosition.PreparePosition)), + LastStreamPosition => new StreamMessage.LastStreamPosition( + new StreamPosition(response.LastStreamPosition)), + StreamNotFound => StreamMessage.NotFound.Instance, + _ => StreamMessage.Unknown.Instance + }; + private static ResolvedEvent ConvertToResolvedEvent(ReadResp.Types.ReadEvent readEvent) => new ResolvedEvent( ConvertToEventRecord(readEvent.Event)!, diff --git a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs index 47ea634f1..01d26393b 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Subscriptions.cs @@ -1,7 +1,12 @@ using System; +using System.Collections.Generic; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using EventStore.Client.Streams; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using static EventStore.Client.SubscriptionState; namespace EventStore.Client { public partial class EventStoreClient { @@ -33,6 +38,31 @@ public Task SubscribeToAllAsync( } }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, filterOptions?.CheckpointReached, cancellationToken); + + /// + /// Subscribes to all events. + /// + /// A (exclusive of) to start the subscription from. + /// Whether to resolve LinkTo events automatically. + /// The optional to apply. + /// The optional user credentials to perform operation with. + /// The optional . + /// An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages + public SubscriptionResult SubscribeToAll( + FromAll start, bool resolveLinkTos = false, SubscriptionFilterOptions? filterOptions = null, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { + return new SubscriptionResult(async _ => { + var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); + return channelInfo.CallInvoker; + }, new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + All = ReadReq.Types.Options.Types.AllOptions.FromSubscriptionPosition(start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + Filter = GetFilterOptions(filterOptions)! + } + }, Settings, userCredentials, cancellationToken, _log); + } /// /// Subscribes to a stream from a checkpoint. @@ -60,5 +90,234 @@ public Task SubscribeToStreamAsync(string streamName, } }, userCredentials, cancellationToken), eventAppeared, subscriptionDropped, _log, cancellationToken: cancellationToken); + + /// + /// Subscribes to a stream from a checkpoint. + /// + /// A (exclusive of) to start the subscription from. + /// The name of the stream to read events from. + /// Whether to resolve LinkTo events automatically. + /// The optional user credentials to perform operation with. + /// The optional . + /// An instance of SubscriptionResult which contains current state of the subscription and an enumerator to consume messages + public SubscriptionResult SubscribeToStream(string streamName, + FromStream start, bool resolveLinkTos = false, + UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { + return new SubscriptionResult(async _ => { + var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false); + return channelInfo.CallInvoker; + }, new ReadReq { + Options = new ReadReq.Types.Options { + ReadDirection = ReadReq.Types.Options.Types.ReadDirection.Forwards, + ResolveLinks = resolveLinkTos, + Stream = ReadReq.Types.Options.Types.StreamOptions.FromSubscriptionPosition(streamName, start), + Subscription = new ReadReq.Types.Options.Types.SubscriptionOptions(), + } + }, Settings, userCredentials, cancellationToken, _log); + } + + + + /// + /// A class which represents current subscription state and an enumerator to consume messages + /// + public class SubscriptionResult { + private readonly Channel _internalChannel; + private readonly CancellationTokenSource _cts; + private int _messagesEnumerated; + private ILogger _log; + /// + /// The name of the stream. + /// + public string StreamName { get; } + + /// + /// + /// + public Position StreamPosition { get; private set; } + + /// + /// Represents subscription ID for the current subscription + /// + public string? SubscriptionId { get; private set; } + + /// + /// Current subscription state + /// + + public SubscriptionState SubscriptionState { + get { + if (_exceptionInternal is not null) { + throw _exceptionInternal; + } + + return _subscriptionStateInternal; + } + } + + private volatile SubscriptionState _subscriptionStateInternal; + + private volatile Exception? _exceptionInternal; + + /// + /// An . Do not enumerate more than once. + /// + public IAsyncEnumerable Messages { + get { + return GetMessages(); + + async IAsyncEnumerable GetMessages() { + if (Interlocked.Exchange(ref _messagesEnumerated, 1) == 1) { + throw new InvalidOperationException("Messages may only be enumerated once."); + } + + try { + await foreach (var message in _internalChannel.Reader.ReadAllAsync() + .ConfigureAwait(false)) { + if (!message.IsSubscriptionMessage()) { + continue; + } + + switch (message) { + case StreamMessage.SubscriptionMessage.SubscriptionConfirmation(var + subscriptionId): + SubscriptionId = subscriptionId; + continue; + case StreamMessage.SubscriptionMessage.Checkpoint(var position): + StreamPosition = position; + break; + } + + yield return message; + } + } finally { + Dispose(); + } + } + } + } + + /// + /// Terminates subscription + /// + public void Dispose() { + if (_subscriptionStateInternal == Disposed) { + return; + } + _subscriptionStateInternal = Disposed; + _cts.Cancel(); + } + + internal SubscriptionResult(Func> selectCallInvoker, ReadReq request, + EventStoreClientSettings settings, UserCredentials? userCredentials, + CancellationToken cancellationToken, ILogger log) { + Sanitize(request); + Validate(request); + + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + var callOptions = EventStoreCallOptions.CreateStreaming(settings, userCredentials: userCredentials, + cancellationToken: _cts.Token); + + _internalChannel = Channel.CreateBounded(new BoundedChannelOptions(1) { + SingleReader = true, + SingleWriter = true, + AllowSynchronousContinuations = true + }); + + _log = log; + + StreamName = request.Options.All != null + ? SystemStreams.AllStream + : request.Options.Stream.StreamIdentifier!; + + _subscriptionStateInternal = Initializing; + + _ = PumpMessages(selectCallInvoker, request, callOptions); + } + + async Task PumpMessages(Func> selectCallInvoker, ReadReq request, CallOptions callOptions) { + var firstMessageRead = false; + var callInvoker = await selectCallInvoker(_cts.Token).ConfigureAwait(false); + var streamsClient = new Streams.Streams.StreamsClient(callInvoker); + try { + using var call = streamsClient.Read(request, callOptions); + + await foreach (var response in call.ResponseStream + .ReadAllAsync(_cts.Token) + .WithCancellation(_cts.Token) + .ConfigureAwait(false)) { + if (response is null) { + continue; + } + + var message = ConvertResponseToMessage(response); + if (!firstMessageRead) { + firstMessageRead = true; + + if (message is not StreamMessage.SubscriptionMessage.SubscriptionConfirmation) { + throw new InvalidOperationException( + $"Subscription to {StreamName} could not be confirmed."); + } + + _subscriptionStateInternal = Ok; + } + + var messageToWrite = message.IsSubscriptionMessage() + ? message + : StreamMessage.Unknown.Instance; + await _internalChannel.Writer.WriteAsync(messageToWrite, _cts.Token).ConfigureAwait(false); + + if (messageToWrite is StreamMessage.NotFound) { + _exceptionInternal = new StreamNotFoundException(StreamName); + break; + } + } + } catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Cancelled && + ex.Status.Detail.Contains("Call canceled by the client.")) { + _log.LogInformation( + "Subscription {subscriptionId} was dropped because cancellation was requested by the client.", + SubscriptionId); + } catch (Exception ex) { + if (ex is ObjectDisposedException or OperationCanceledException) { + _log.LogWarning( + ex, + "Subscription {subscriptionId} was dropped because cancellation was requested by another caller.", + SubscriptionId + ); + } else { + _exceptionInternal = ex; + } + } finally { + _internalChannel.Writer.Complete(); + } + } + + private static void Sanitize(ReadReq request) { + if (request.Options.Filter == null) { + request.Options.NoFilter = new Empty(); + } + + request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()}; + } + + private static void Validate(ReadReq request) { + if (request.Options.CountOptionCase == ReadReq.Types.Options.CountOptionOneofCase.Count && + request.Options.Count <= 0) { + throw new ArgumentOutOfRangeException("count"); + } + + var streamOptions = request.Options.Stream; + var allOptions = request.Options.All; + + if (allOptions == null && streamOptions == null) { + throw new ArgumentException("No stream provided to subscribe"); + } + + if (allOptions != null && streamOptions != null) { + throw new ArgumentException($"Cannot subscribe both ${SystemStreams.AllStream}, and ${streamOptions.StreamIdentifier}"); + } + } + } } } diff --git a/src/EventStore.Client.Streams/StreamMessage.cs b/src/EventStore.Client.Streams/StreamMessage.cs index f6b564e71..f26a19e9c 100644 --- a/src/EventStore.Client.Streams/StreamMessage.cs +++ b/src/EventStore.Client.Streams/StreamMessage.cs @@ -41,11 +41,44 @@ public record LastStreamPosition(StreamPosition StreamPosition) : StreamMessage; /// The . public record LastAllStreamPosition(Position Position) : StreamMessage; + /// + /// The base record of all subscription specific messages. + /// + public abstract record SubscriptionMessage : StreamMessage { + + /// + /// A that represents a subscription confirmation. + /// + public record SubscriptionConfirmation(string SubscriptionId) : SubscriptionMessage; + + /// + /// A representing position reached in subscribed stream. This message will only be received when subscribing to $all stream + /// + public record Checkpoint(Position Position) : SubscriptionMessage; + } + /// /// A that could not be identified, usually indicating a lower client compatibility level than the server supports. /// public record Unknown : StreamMessage { internal static readonly Unknown Instance = new(); } + + + /// + /// A test method that returns true if this message can be expected to be received when reading from stream; otherwise, this method returns false + /// + /// + public bool IsStreamReadMessage() { + return this is not SubscriptionMessage && this is not Ok && this is not Unknown; + } + + /// + /// A test method that returns true if this message can be expected to be received when subscribing to a stream; otherwise, this method returns false + /// + /// + public bool IsSubscriptionMessage() { + return this is SubscriptionMessage || this is NotFound || this is Event; + } } } diff --git a/src/EventStore.Client.Streams/SubscriptionState.cs b/src/EventStore.Client.Streams/SubscriptionState.cs new file mode 100644 index 000000000..8d167887d --- /dev/null +++ b/src/EventStore.Client.Streams/SubscriptionState.cs @@ -0,0 +1,19 @@ +namespace EventStore.Client { + /// + /// An enumeration representing the state of a subscription. + /// + public enum SubscriptionState { + /// + /// Subscription is initializing + /// + Initializing = 0, + /// + /// Subscription has been successfully established + /// + Ok, + /// + /// Subscription has been terminated + /// + Disposed + } +} diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs index 1fc8636a1..37e6906dd 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_104.cs @@ -1,3 +1,4 @@ +using System; using System.Threading.Tasks; using Xunit; @@ -10,9 +11,9 @@ public Issue_104(Fixture fixture) { } [Fact] - public async Task subscription_does_not_send_checkpoint_reached_after_disposal() { - var streamName = _fixture.GetStreamName(); - var ignoredStreamName = $"ignore_{streamName}"; + public async Task Callback_API_subscription_does_not_send_checkpoint_reached_after_disposal() { + var streamName = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var ignoredStreamName = $"ignore_{streamName}_{Guid.NewGuid()}"; var subscriptionDisposed = new TaskCompletionSource(); var eventAppeared = new TaskCompletionSource(); var checkpointReachAfterDisposed = new TaskCompletionSource(); @@ -46,11 +47,74 @@ await _fixture.Client.AppendToStreamAsync(ignoredStreamName, StreamRevision.None var result = await Task.WhenAny(delay, checkpointReachAfterDisposed.Task); Assert.Equal(delay, result); // iow 300ms have passed without seeing checkpointReachAfterDisposed } + + [Fact] + public async Task Iterator_API_subscription_does_not_send_checkpoint_reached_after_disposal() { + var streamName = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var ignoredStreamName = $"ignore_{streamName}_{Guid.NewGuid()}"; + var subscriptionDisposed = new TaskCompletionSource(); + var eventAppeared = new TaskCompletionSource(); + var checkpointReachAfterDisposed = new TaskCompletionSource(); + + await _fixture.Client.AppendToStreamAsync(streamName, StreamRevision.None, _fixture.CreateTestEvents()); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.Start,false, new SubscriptionFilterOptions(StreamFilter.Prefix(streamName))); + + ReadMessages(subscription, _ => { + eventAppeared.TrySetResult(true); + return Task.CompletedTask; + }, _ => subscriptionDisposed.TrySetResult(true), _ => { + if (!subscriptionDisposed.Task.IsCompleted) { + return Task.CompletedTask; + } + + checkpointReachAfterDisposed.TrySetResult(true); + return Task.CompletedTask; + }); + + await eventAppeared.Task; + + subscription.Dispose(); + await subscriptionDisposed.Task; + + await _fixture.Client.AppendToStreamAsync(ignoredStreamName, StreamRevision.None, + _fixture.CreateTestEvents(50)); + + var delay = Task.Delay(300); + var result = await Task.WhenAny(delay, checkpointReachAfterDisposed.Task); + Assert.Equal(delay, result); // iow 300ms have passed without seeing checkpointReachAfterDisposed + } public class Fixture : EventStoreClientFixture { protected override Task Given() => Task.CompletedTask; protected override Task When() => Task.CompletedTask; } + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped, Func checkpointReached) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } else if (message is StreamMessage.SubscriptionMessage.Checkpoint checkpointMessage) { + await checkpointReached(checkpointMessage.Position); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } + } } } diff --git a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs index 15482ee97..c059f31cd 100644 --- a/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs +++ b/test/EventStore.Client.Streams.Tests/Bugs/Issue_2544.cs @@ -28,8 +28,8 @@ public Issue_2544(ITestOutputHelper outputHelper) { .Select(i => new object[] {i}); [Theory, MemberData(nameof(TestCases))] - public async Task subscribe_to_stream(int iteration) { - var streamName = $"{_fixture.GetStreamName()}_{iteration}"; + public async Task Callback_subscribe_to_stream(int iteration) { + var streamName = $"{_fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; var startFrom = FromStream.Start; async Task Subscribe() => await _fixture.Client @@ -46,8 +46,35 @@ async Task Subscribe() => await _fixture.Client } [Theory, MemberData(nameof(TestCases))] - public async Task subscribe_to_all(int iteration) { - var streamName = $"{_fixture.GetStreamName()}_{iteration}"; + public async Task Iterator_subscribe_to_stream(int iteration) { + var streamName = $"{_fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; + var startFrom = FromStream.Start; + + var subscriptionResult = _fixture.Client.SubscribeToStream(streamName, startFrom, resolveLinkTos: false); + + await AppendEvents(streamName); + + await foreach (var message in subscriptionResult.Messages) { + if (message is not StreamMessage.Event @event) continue; + var e = @event.ResolvedEvent; + if (e.OriginalStreamId != streamName) { + continue; + } + + if (_seen[e.Event.EventNumber]) { + throw new Exception($"Event {e.Event.EventNumber} was already seen"); + } + + _seen[e.Event.EventNumber] = true; + if (e.Event.EventType == "completed") { + break; + } + } + } + + [Theory, MemberData(nameof(TestCases))] + public async Task Callback_subscribe_to_all(int iteration) { + var streamName = $"{_fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; var startFrom = FromAll.Start; async Task Subscribe() => await _fixture.Client @@ -62,10 +89,37 @@ async Task Subscribe() => await _fixture.Client await _completed.Task.WithTimeout(); } + + [Theory, MemberData(nameof(TestCases))] + public async Task Iterator_subscribe_to_all(int iteration) { + var streamName = $"{_fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; + var startFrom = FromAll.Start; + + var subscriptionResult = _fixture.Client.SubscribeToAll(startFrom, resolveLinkTos: false); + + await AppendEvents(streamName); + + await foreach (var message in subscriptionResult.Messages) { + if (message is not StreamMessage.Event @event) continue; + var e = @event.ResolvedEvent; + if (e.OriginalStreamId != streamName) { + continue; + } + + if (_seen[e.Event.EventNumber]) { + throw new Exception($"Event {e.Event.EventNumber} was already seen"); + } + + _seen[e.Event.EventNumber] = true; + if (e.Event.EventType == "completed") { + break; + } + } + } [Theory, MemberData(nameof(TestCases))] - public async Task subscribe_to_all_filtered(int iteration) { - var streamName = $"{_fixture.GetStreamName()}_{iteration}"; + public async Task Callback_subscribe_to_all_filtered(int iteration) { + var streamName = $"{_fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; var startFrom = FromAll.Start; async Task Subscribe() => await _fixture.Client @@ -81,6 +135,35 @@ async Task Subscribe() => await _fixture.Client await _completed.Task.WithTimeout(); } + + [Theory, MemberData(nameof(TestCases))] + public async Task Iterator_subscribe_to_all_filtered(int iteration) { + var streamName = $"{_fixture.GetStreamName()}_{iteration}_{Guid.NewGuid()}"; + var startFrom = FromAll.Start; + + var subscriptionResult = _fixture.Client + .SubscribeToAll(startFrom, resolveLinkTos: false, + new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents())); + + await AppendEvents(streamName); + + await foreach (var message in subscriptionResult.Messages) { + if (message is not StreamMessage.Event @event) continue; + var e = @event.ResolvedEvent; + if (e.OriginalStreamId != streamName) { + continue; + } + + if (_seen[e.Event.EventNumber]) { + throw new Exception($"Event {e.Event.EventNumber} was already seen"); + } + + _seen[e.Event.EventNumber] = true; + if (e.Event.EventType == "completed") { + break; + } + } + } private async Task AppendEvents(string streamName) { await Task.Delay(TimeSpan.FromMilliseconds(10)); diff --git a/test/EventStore.Client.Streams.Tests/reconnection.cs b/test/EventStore.Client.Streams.Tests/reconnection.cs index 73e0fe9ea..7b39edf74 100644 --- a/test/EventStore.Client.Streams.Tests/reconnection.cs +++ b/test/EventStore.Client.Streams.Tests/reconnection.cs @@ -14,8 +14,8 @@ public reconnection(Fixture fixture) { } [Fact] - public async Task when_the_connection_is_lost() { - var streamName = _fixture.GetStreamName(); + public async Task Callback_when_the_connection_is_lost() { + var streamName = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var eventCount = 512; var receivedAllEvents = new TaskCompletionSource(); var serverRestarted = new TaskCompletionSource(); diff --git a/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs b/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs index e89526b5c..cad09b351 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_resolve_link_to.cs @@ -16,8 +16,8 @@ public subscribe_resolve_link_to(ITestOutputHelper outputHelper) { } [Fact] - public async Task stream_subscription() { - var stream = _fixture.GetStreamName(); + public async Task Callback_stream_subscription() { + var stream = $"{_fixture.GetStreamName()}{Guid.NewGuid()}"; var events = _fixture.CreateTestEvents(20).ToArray(); @@ -35,7 +35,7 @@ await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEv .WithTimeout(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync($"$et-{EventStoreClientFixtureBase.TestEventType}", + .SubscribeToStreamAsync(stream, FromStream.Start, EventAppeared, true, SubscriptionDropped, userCredentials: TestCredentials.Root) .WithTimeout(); @@ -69,10 +69,59 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_stream_subscription() { + var stream = $"{_fixture.GetStreamName()}{Guid.NewGuid()}"; + + var events = _fixture.CreateTestEvents(20).ToArray(); + + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var beforeEvents = events.Take(10); + var afterEvents = events.Skip(10); + + using var enumerator = events.AsEnumerable().GetEnumerator(); + + enumerator.MoveNext(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEvents) + .WithTimeout(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start, true); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents) + .WithTimeout(); + + await appeared.Task.WithTimeout(); + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + try { + Assert.Equal(enumerator.Current.EventId, e.Event.EventId); + if (!enumerator.MoveNext()) { + appeared.TrySetResult(true); + } + } catch (Exception ex) { + appeared.TrySetException(ex); + throw; + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task all_subscription() { - var stream = _fixture.GetStreamName(); + public async Task Callback_all_subscription() { + var stream = $"{_fixture.GetStreamName()}{Guid.NewGuid()}"; var events = _fixture.CreateTestEvents(20).ToArray(); @@ -105,7 +154,7 @@ await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEv Assert.Null(ex); Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { - if (e.OriginalEvent.EventStreamId != $"$et-{EventStoreClientFixtureBase.TestEventType}") { + if (e.OriginalEvent.EventStreamId != stream) { return Task.CompletedTask; } @@ -125,10 +174,62 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_all_subscription() { + var stream = $"{_fixture.GetStreamName()}{Guid.NewGuid()}"; + + var events = _fixture.CreateTestEvents(20).ToArray(); + + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var beforeEvents = events.Take(10); + var afterEvents = events.Skip(10); + + using var enumerator = events.AsEnumerable().GetEnumerator(); + + enumerator.MoveNext(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEvents) + .WithTimeout(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.Start, true); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents).WithTimeout(); + + await appeared.Task.WithTimeout(); + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (e.OriginalEvent.EventStreamId != stream) { + return Task.CompletedTask; + } + + try { + Assert.Equal(enumerator.Current.EventId, e.Event.EventId); + if (!enumerator.MoveNext()) { + appeared.TrySetResult(true); + } + } catch (Exception ex) { + appeared.TrySetException(ex); + throw; + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task all_filtered_subscription() { - var stream = _fixture.GetStreamName(); + public async Task Callback_all_filtered_subscription() { + var stream = $"{_fixture.GetStreamName()}{Guid.NewGuid()}"; var events = _fixture.CreateTestEvents(20).ToArray(); @@ -142,17 +243,17 @@ public async Task all_filtered_subscription() { enumerator.MoveNext(); - var result = await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEvents) + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEvents) .WithTimeout(); using var subscription = await _fixture.Client.SubscribeToAllAsync(FromAll.Start, EventAppeared, true, SubscriptionDropped, new SubscriptionFilterOptions( - StreamFilter.Prefix($"$et-{EventStoreClientFixtureBase.TestEventType}")), + StreamFilter.Prefix(stream)), userCredentials: TestCredentials.Root) .WithTimeout(); - result = await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents) + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents) .WithTimeout(); await appeared.Task.WithTimeout(); @@ -165,10 +266,6 @@ public async Task all_filtered_subscription() { Assert.Null(ex); Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { - if (e.OriginalEvent.EventStreamId != $"$et-{EventStoreClientFixtureBase.TestEventType}") { - return Task.CompletedTask; - } - try { Assert.Equal(enumerator.Current.EventId, e.Event.EventId); if (!enumerator.MoveNext()) { @@ -185,6 +282,56 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_all_filtered_subscription() { + var stream = $"{_fixture.GetStreamName()}{Guid.NewGuid()}"; + + var events = _fixture.CreateTestEvents(20).ToArray(); + + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var beforeEvents = events.Take(10); + var afterEvents = events.Skip(10); + + using var enumerator = events.AsEnumerable().GetEnumerator(); + + enumerator.MoveNext(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, beforeEvents) + .WithTimeout(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.Start, true, new SubscriptionFilterOptions( + StreamFilter.Prefix(stream))); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents) + .WithTimeout(); + + await appeared.Task.WithTimeout(); + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + try { + Assert.Equal(enumerator.Current.EventId, e.Event.EventId); + if (!enumerator.MoveNext()) { + appeared.TrySetResult(true); + } + } catch (Exception ex) { + appeared.TrySetException(ex); + throw; + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } public class Fixture : EventStoreClientFixture { public Fixture() : base(env: new Dictionary { @@ -199,5 +346,29 @@ public class Fixture : EventStoreClientFixture { public Task InitializeAsync() => _fixture.InitializeAsync(); public Task DisposeAsync() => _fixture.DisposeAsync(); + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } + } } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs index e902ef90c..1167a4117 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all.cs @@ -20,7 +20,7 @@ public subscribe_to_all(ITestOutputHelper outputHelper) { } [Fact] - public async Task calls_subscription_dropped_when_disposed() { + public async Task Callback_calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); using var subscription = await _fixture.Client.SubscribeToAllAsync(FromAll.Start, @@ -43,10 +43,37 @@ public async Task calls_subscription_dropped_when_disposed() { void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.Start); + var testEvent = _fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await _fixture.Client.AppendToStreamAsync($"test-{Guid.NewGuid()}", StreamState.NoStream, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task calls_subscription_dropped_when_error_processing_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var expectedException = new Exception("Error"); @@ -67,9 +94,34 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + var expectedException = new Exception("Error"); + int numTimesCalled = 0; + + var subscription = _fixture.Client.SubscribeToAll(FromAll.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + + var ex = await dropped.Task.WithTimeout(); + Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); + + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task subscribe_to_empty_database() { + public async Task Callback_subscribe_to_empty_database() { var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -101,15 +153,50 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_subscribe_to_empty_database() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task reads_all_existing_events_and_keep_listening_to_new_ones() { + public async Task Callback_reads_all_existing_events_and_keep_listening_to_new_ones() { var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var appearedEvents = new List(); var beforeEvents = _fixture.CreateTestEvents(10).ToArray(); var afterEvents = _fixture.CreateTestEvents(10).ToArray(); + var allStreams = new List(); + + foreach (var @event in beforeEvents.Concat((afterEvents))) { + allStreams.Add($"stream-{@event.EventId:n}"); + } + foreach (var @event in beforeEvents) { await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, new[] {@event}); @@ -141,7 +228,7 @@ await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamSt Assert.Null(ex); Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { - if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + if (allStreams.Contains(e.OriginalStreamId)) { appearedEvents.Add(e.Event); if (appearedEvents.Count >= beforeEvents.Length + afterEvents.Length) { @@ -155,10 +242,66 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_reads_all_existing_events_and_keep_listening_to_new_ones() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + var appearedEvents = new List(); + var beforeEvents = _fixture.CreateTestEvents(10).ToArray(); + var afterEvents = _fixture.CreateTestEvents(10).ToArray(); + + var allStreams = new List(); + + foreach (var @event in beforeEvents.Concat((afterEvents))) { + allStreams.Add($"stream-{@event.EventId:n}"); + } + + foreach (var @event in beforeEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + var subscription = _fixture.Client.SubscribeToAll(FromAll.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + foreach (var @event in afterEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] { @event }); + } + + await appeared.Task.WithTimeout(); + + Assert.Equal(beforeEvents.Concat(afterEvents).Select(x => x.EventId), + appearedEvents.Select(x => x.EventId)); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (allStreams.Contains(e.OriginalStreamId)) { + appearedEvents.Add(e.Event); + + if (appearedEvents.Count >= beforeEvents.Length + afterEvents.Length) { + appeared.TrySetResult(true); + } + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } public class Fixture : EventStoreClientFixture { protected override Task Given() => - Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.NoStream, + Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.Any, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => Task.CompletedTask; @@ -166,5 +309,29 @@ protected override Task Given() => public Task InitializeAsync() => _fixture.InitializeAsync(); public Task DisposeAsync() => _fixture.DisposeAsync(); + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } + } } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs index 797f946f9..bdcfbc381 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_filtered_with_position.cs @@ -18,7 +18,7 @@ public subscribe_to_all_filtered_with_position(ITestOutputHelper outputHelper) { public static IEnumerable FilterCases() => Filters.All.Select(filter => new object[] {filter}); [Theory, MemberData(nameof(FilterCases))] - public async Task reads_all_existing_events_and_keep_listening_to_new_ones(string filterName) { + public async Task Callback_reads_all_existing_events_and_keep_listening_to_new_ones(string filterName) { var streamPrefix = _fixture.GetStreamName(); var (getFilter, prepareEvent) = Filters.GetFilter(filterName); @@ -40,7 +40,7 @@ await _fixture.Client.AppendToStreamAsync($"{streamPrefix}_{Guid.NewGuid():n}", } var writeResult = await _fixture.Client.AppendToStreamAsync("checkpoint", - StreamState.NoStream, _fixture.CreateTestEvents()); + StreamState.Any, _fixture.CreateTestEvents()); await _fixture.Client.AppendToStreamAsync(Guid.NewGuid().ToString(), StreamState.NoStream, _fixture.CreateTestEvents(256)); @@ -89,6 +89,75 @@ Task CheckpointReached(StreamSubscription _, Position position, CancellationToke return Task.CompletedTask; } } + + [Theory, MemberData(nameof(FilterCases))] + public async Task Iterator_reads_all_existing_events_and_keep_listening_to_new_ones(string filterName) { + var streamPrefix = _fixture.GetStreamName(); + var (getFilter, prepareEvent) = Filters.GetFilter(filterName); + + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + var checkpointSeen = new TaskCompletionSource(); + var filter = getFilter(streamPrefix); + var events = _fixture.CreateTestEvents(20).Select(e => prepareEvent(streamPrefix, e)) + .ToArray(); + var beforeEvents = events.Take(10); + var afterEvents = events.Skip(10); + + using var enumerator = afterEvents.OfType().GetEnumerator(); + enumerator.MoveNext(); + + foreach (var e in beforeEvents) { + await _fixture.Client.AppendToStreamAsync($"{streamPrefix}_{Guid.NewGuid():n}", + StreamState.NoStream, new[] {e}); + } + + var writeResult = await _fixture.Client.AppendToStreamAsync("checkpoint", + StreamState.Any, _fixture.CreateTestEvents()); + + await _fixture.Client.AppendToStreamAsync(Guid.NewGuid().ToString(), StreamState.NoStream, + _fixture.CreateTestEvents(256)); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.After(writeResult.LogPosition), false, new SubscriptionFilterOptions(filter, 4)); + ReadMessages(subscription, EventAppeared, SubscriptionDropped, CheckpointReached); + + foreach (var e in afterEvents) { + await _fixture.Client.AppendToStreamAsync($"{streamPrefix}_{Guid.NewGuid():n}", + StreamState.NoStream, new[] {e}); + } + + await Task.WhenAll(appeared.Task, checkpointSeen.Task).WithTimeout(); + + Assert.False(dropped.Task.IsCompleted); + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + try { + Assert.Equal(enumerator.Current.EventId, e.OriginalEvent.EventId); + if (!enumerator.MoveNext()) { + appeared.TrySetResult(true); + } + } catch (Exception ex) { + appeared.TrySetException(ex); + throw; + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + + Task CheckpointReached(Position position) { + checkpointSeen.TrySetResult(true); + + return Task.CompletedTask; + } + } public Task InitializeAsync() => _fixture.InitializeAsync(); @@ -101,7 +170,33 @@ protected override Task Given() => Client.SetStreamMetadataAsync(SystemStreams.A new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => - Client.AppendToStreamAsync(FilteredOutStream, StreamState.NoStream, CreateTestEvents(10)); + Client.AppendToStreamAsync(FilteredOutStream, StreamState.Any, CreateTestEvents(10)); + } + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped, Func checkpointReached) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } else if (message is StreamMessage.SubscriptionMessage.Checkpoint checkpointMessage) { + await checkpointReached(checkpointMessage.Position); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } } } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs index 7fb5f1155..12498e53b 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs @@ -20,7 +20,7 @@ public subscribe_to_all_live(ITestOutputHelper outputHelper) { } [Fact] - public async Task calls_subscription_dropped_when_disposed() { + public async Task Callback_calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); using var subscription = await _fixture.Client @@ -43,10 +43,37 @@ public async Task calls_subscription_dropped_when_disposed() { void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.End); + var testEvent = _fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await _fixture.Client.AppendToStreamAsync($"test-{Guid.NewGuid()}", StreamState.NoStream, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task calls_subscription_dropped_when_error_processing_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var expectedException = new Exception("Error"); @@ -67,9 +94,35 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + var expectedException = new Exception("Error"); + int numTimesCalled = 0; + + var subscription = _fixture.Client.SubscribeToAll(FromAll.End); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + + var ex = await dropped.Task.WithTimeout(); + + Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); + + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task subscribe_to_empty_database() { + public async Task Callback_subscribe_to_empty_database() { var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -101,26 +154,65 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_subscribe_to_empty_database() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.End); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task does_not_read_existing_events_but_keep_listening_to_new_ones() { + public async Task Callback_does_not_read_existing_events_but_keep_listening_to_new_ones() { var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var appearedEvents = new List(); - var afterEvents = _fixture.CreateTestEvents(10).ToArray(); + + var events = _fixture.CreateTestEvents(20); + + var beforeEvents = events.Take(10); + foreach (var @event in beforeEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + var afterEvents = events.Skip(10); + var latestEvents = afterEvents as EventData[] ?? afterEvents.ToArray(); using var subscription = await _fixture.Client .SubscribeToAllAsync(FromAll.End, EventAppeared, false, SubscriptionDropped) .WithTimeout(); - - foreach (var @event in afterEvents) { + + foreach (var @event in latestEvents) { await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, new[] {@event}); } await appeared.Task.WithTimeout(); - Assert.Equal(afterEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId)); + Assert.Equal(latestEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId)); if (dropped.Task.IsCompleted) { Assert.False(dropped.Task.IsCompleted, dropped.Task.Result.ToString()); @@ -137,7 +229,7 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { appearedEvents.Add(e.Event); - if (appearedEvents.Count >= afterEvents.Length) { + if (appearedEvents.Count >= latestEvents.Length) { appeared.TrySetResult(true); } } @@ -148,10 +240,63 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_does_not_read_existing_events_but_keep_listening_to_new_ones() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + var appearedEvents = new List(); + + var events = _fixture.CreateTestEvents(20); + + var beforeEvents = events.Take(10); + foreach (var @event in beforeEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + var afterEvents = events.Skip(10); + var latestEvents = afterEvents as EventData[] ?? afterEvents.ToArray(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.End); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + foreach (var @event in latestEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + await appeared.Task.WithTimeout(); + + Assert.Equal(latestEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId)); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + appearedEvents.Add(e.Event); + + if (appearedEvents.Count >= latestEvents.Length) { + appeared.TrySetResult(true); + } + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } public class Fixture : EventStoreClientFixture { protected override Task Given() => - Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.NoStream, + Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.Any, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => Task.CompletedTask; @@ -159,5 +304,29 @@ protected override Task Given() => public Task InitializeAsync() => _fixture.InitializeAsync(); public Task DisposeAsync() => _fixture.DisposeAsync(); + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } + } } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs index 56eed8080..d69ff4b63 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs @@ -19,7 +19,7 @@ public subscribe_to_all_with_position(ITestOutputHelper outputHelper) { } [Fact] - public async Task calls_subscription_dropped_when_disposed() { + public async Task Callback_calls_subscription_dropped_when_disposed() { var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var firstEvent = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1) @@ -46,10 +46,39 @@ public async Task calls_subscription_dropped_when_disposed() { void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { + var dropped = new TaskCompletionSource(); + + var firstEvent = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1) + .FirstOrDefaultAsync(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.After(firstEvent.OriginalEvent.Position)); + var testEvent = _fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await _fixture.Client.AppendToStreamAsync($"test-{Guid.NewGuid()}", StreamState.NoStream, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task calls_subscription_dropped_when_error_processing_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var expectedException = new Exception("Error"); @@ -74,9 +103,37 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + var expectedException = new Exception("Error"); + int numTimesCalled = 0; + var firstEvent = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1) + .FirstOrDefaultAsync(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.After(firstEvent.OriginalEvent.Position)); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + + var ex = await dropped.Task.WithTimeout(); + + Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); + + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task subscribe_to_empty_database() { + public async Task Callback_subscribe_to_empty_database() { var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -117,9 +174,47 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_subscribe_to_empty_database() { + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var firstEvent = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1) + .FirstOrDefaultAsync(); + + var subscription = _fixture.Client.SubscribeToAll(FromAll.After(firstEvent.OriginalEvent.Position)); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (e.OriginalEvent.Position == firstEvent.OriginalEvent.Position) { + appeared.TrySetException(new Exception()); + return Task.CompletedTask; + } + + if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task reads_all_existing_events_after_position_and_keep_listening_to_new_ones() { + public async Task Callback_reads_all_existing_events_after_position_and_keep_listening_to_new_ones() { var events = _fixture.CreateTestEvents(20).ToArray(); var appeared = new TaskCompletionSource(); @@ -127,10 +222,8 @@ public async Task reads_all_existing_events_after_position_and_keep_listening_to var beforeEvents = events.Take(10); var afterEvents = events.Skip(10); - - using var enumerator = events.AsEnumerable().GetEnumerator(); - - enumerator.MoveNext(); + + var eventIds = events.AsEnumerable().Select(e => e.EventId).ToList(); var position = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1) .Select(x => x.OriginalEvent.Position) @@ -168,9 +261,12 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { try { - Assert.Equal(enumerator.Current.EventId, e.OriginalEvent.EventId); - if (!enumerator.MoveNext()) { - appeared.TrySetResult(true); + var eventId = e.OriginalEvent.EventId; + if (eventIds.Contains(eventId)) { + eventIds.Remove(eventId); + if (eventIds.Count == 0) { + appeared.TrySetResult(true); + } } } catch (Exception ex) { appeared.TrySetException(ex); @@ -185,9 +281,74 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, dropped.SetResult((reason, ex)); } + [Fact] + public async Task Iterator_reads_all_existing_events_after_position_and_keep_listening_to_new_ones() { + var events = _fixture.CreateTestEvents(20).ToArray(); + + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var beforeEvents = events.Take(10); + var afterEvents = events.Skip(10); + + var eventIds = events.AsEnumerable().Select(e => e.EventId).ToList(); + + var position = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1) + .Select(x => x.OriginalEvent.Position) + .FirstAsync(); + + foreach (var @event in beforeEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + var subscription = _fixture.Client.SubscribeToAll(FromAll.After(position)); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + foreach (var @event in afterEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + await appeared.Task.WithTimeout(); + + Assert.False(dropped.Task.IsCompleted); + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (position >= e.OriginalEvent.Position) { + appeared.TrySetException(new Exception()); + } + + if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { + try { + var eventId = e.OriginalEvent.EventId; + if (eventIds.Contains(eventId)) { + eventIds.Remove(eventId); + if (eventIds.Count == 0) { + appeared.TrySetResult(true); + } + } + } catch (Exception ex) { + appeared.TrySetException(ex); + throw; + } + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + public class Fixture : EventStoreClientFixture { protected override Task Given() => - Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.NoStream, + Client.SetStreamMetadataAsync(SystemStreams.AllStream, StreamState.Any, new StreamMetadata(acl: new StreamAcl(SystemRoles.All)), userCredentials: TestCredentials.Root); protected override Task When() => Task.CompletedTask; @@ -195,5 +356,29 @@ protected override Task Given() => public Task InitializeAsync() => _fixture.InitializeAsync(); public Task DisposeAsync() => _fixture.DisposeAsync(); + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped(exception); + } catch (Exception ex) { + subscriptionDropped(ex); + } + } } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs index ae14417fd..829488324 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_stream.cs @@ -16,8 +16,8 @@ public subscribe_to_stream(Fixture fixture, ITestOutputHelper outputHelper) { } [Fact] - public async Task subscribe_to_non_existing_stream() { - var stream = _fixture.GetStreamName(); + public async Task Callback_subscribe_to_non_existing_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -46,10 +46,38 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_subscribe_to_non_existing_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + appeared.TrySetResult(true); + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task subscribe_to_non_existing_stream_then_get_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_subscribe_to_non_existing_stream_then_get_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -82,8 +110,38 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, } [Fact] - public async Task allow_multiple_subscriptions_to_same_stream() { - var stream = _fixture.GetStreamName(); + public async Task Iterator_subscribe_to_non_existing_stream_then_get_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); + + Assert.True(await appeared.Task.WithTimeout()); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + appeared.TrySetResult(true); + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + [Fact] + public async Task Callback_allow_multiple_subscriptions_to_same_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); @@ -105,10 +163,37 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) return Task.CompletedTask; } } + + [Fact] + public async Task Iterator_allow_multiple_subscriptions_to_same_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + + var appeared = new TaskCompletionSource(); + + int appearedCount = 0; + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); + + var s1 = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(s1, EventAppeared, null); + + var s2 = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(s2, EventAppeared, null); + + Assert.True(await appeared.Task.WithTimeout()); + + Task EventAppeared(ResolvedEvent e) { + if (++appearedCount == 2) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + } [Fact] - public async Task calls_subscription_dropped_when_disposed() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_disposed() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); using var subscription = await _fixture.Client @@ -132,10 +217,38 @@ public async Task calls_subscription_dropped_when_disposed() { void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + var testEvent = _fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task calls_subscription_dropped_when_error_processing_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var expectedException = new Exception("Error"); @@ -161,10 +274,35 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + var expectedException = new Exception("Error"); + int numTimesCalled = 0; + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + + var ex = await dropped.Task.WithTimeout(); + Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); + + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task reads_all_existing_events_and_keep_listening_to_new_ones() { - var stream = _fixture.GetStreamName(); + public async Task Callback_reads_all_existing_events_and_keep_listening_to_new_ones() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var events = _fixture.CreateTestEvents(20).ToArray(); @@ -215,10 +353,56 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_reads_all_existing_events_and_keep_listening_to_new_ones() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var events = _fixture.CreateTestEvents(20).ToArray(); + + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var beforeEvents = events.Take(10); + var afterEvents = events.Skip(10); + + using var enumerator = events.AsEnumerable().GetEnumerator(); + + enumerator.MoveNext(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, beforeEvents); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents); + + await appeared.Task.WithTimeout(); + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + try { + Assert.Equal(enumerator.Current.EventId, e.OriginalEvent.EventId); + if (!enumerator.MoveNext()) { + appeared.TrySetResult(true); + } + } catch (Exception ex) { + appeared.TrySetException(ex); + throw; + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task catches_deletions() { - var stream = _fixture.GetStreamName(); + public async Task Callback_catches_deletions() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -239,6 +423,26 @@ public async Task catches_deletions() { void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_catches_deletions() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.TombstoneAsync(stream, StreamState.NoStream); + var ex = await dropped.Task.WithTimeout(); + + var sdex = Assert.IsType(ex); + Assert.Equal(stream, sdex.Stream); + + Task EventAppeared(ResolvedEvent e) => Task.CompletedTask; + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } public class Fixture : EventStoreClientFixture { @@ -252,5 +456,29 @@ public Fixture() { protected override Task When() => Task.CompletedTask; } + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped?.Invoke(exception); + } catch (Exception ex) { + subscriptionDropped?.Invoke(ex); + } + } } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs index 5b29baad7..1308b2a57 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_live.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -15,8 +16,8 @@ public subscribe_to_stream_live(Fixture fixture, ITestOutputHelper outputHelper) } [Fact] - public async Task does_not_read_existing_events_but_keep_listening_to_new_ones() { - var stream = _fixture.GetStreamName(); + public async Task Callback_does_not_read_existing_events_but_keep_listening_to_new_ones() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource(); @@ -35,10 +36,31 @@ await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, Assert.Equal(new StreamPosition(1), await appeared.Task.WithTimeout()); } + + [Fact] + public async Task Iterator_does_not_read_existing_events_but_keep_listening_to_new_ones() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, + _fixture.CreateTestEvents()); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.End); + ReadMessages(subscription, e => { + appeared.TrySetResult(e.OriginalEventNumber); + return Task.CompletedTask; + }, _ => dropped.TrySetResult(true)); + + await _fixture.Client.AppendToStreamAsync(stream, new StreamRevision(0), + _fixture.CreateTestEvents()); + + Assert.Equal(new StreamPosition(1), await appeared.Task.WithTimeout()); + } [Fact] - public async Task subscribe_to_non_existing_stream_and_then_catch_new_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_subscribe_to_non_existing_stream_and_then_catch_new_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource(); @@ -53,10 +75,27 @@ await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, Assert.True(await appeared.Task.WithTimeout()); } + + [Fact] + public async Task Iterator_subscribe_to_non_existing_stream_and_then_catch_new_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.End); + ReadMessages(subscription, _ => { + appeared.TrySetResult(true); + return Task.CompletedTask; + }, _ => dropped.TrySetResult(true)); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); + + Assert.True(await appeared.Task.WithTimeout()); + } [Fact] - public async Task allow_multiple_subscriptions_to_same_stream() { - var stream = _fixture.GetStreamName(); + public async Task Callback_allow_multiple_subscriptions_to_same_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); @@ -80,10 +119,37 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) return Task.CompletedTask; } } + + [Fact] + public async Task Iterator_allow_multiple_subscriptions_to_same_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + + var appeared = new TaskCompletionSource(); + + int appearedCount = 0; + + var s1 = _fixture.Client.SubscribeToStream(stream, FromStream.End); + ReadMessages(s1, EventAppeared, null); + + var s2 = _fixture.Client.SubscribeToStream(stream, FromStream.End); + ReadMessages(s2, EventAppeared, null); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); + + Assert.True(await appeared.Task.WithTimeout()); + + Task EventAppeared(ResolvedEvent e) { + if (++appearedCount == 2) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + } [Fact] - public async Task calls_subscription_dropped_when_disposed() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_disposed() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -108,8 +174,36 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, } [Fact] - public async Task catches_deletions() { - var stream = _fixture.GetStreamName(); + public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.End); + var testEvent = _fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } + + [Fact] + public async Task Callback_catches_deletions() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -130,10 +224,54 @@ public async Task catches_deletions() { void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_catches_deletions() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.End); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.TombstoneAsync(stream, StreamState.NoStream); + var ex = await dropped.Task.WithTimeout(); + + var sdex = Assert.IsType(ex); + Assert.Equal(stream, sdex.Stream); + + Task EventAppeared(ResolvedEvent e) => Task.CompletedTask; + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } public class Fixture : EventStoreClientFixture { protected override Task Given() => Task.CompletedTask; protected override Task When() => Task.CompletedTask; } + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped?.Invoke(exception); + } catch (Exception ex) { + subscriptionDropped?.Invoke(ex); + } + } } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs index 2b5304df4..db3fc4e73 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_stream_with_revision.cs @@ -16,8 +16,8 @@ public subscribe_to_stream_with_revision(ITestOutputHelper outputHelper) { } [Fact] - public async Task subscribe_to_non_existing_stream() { - var stream = _fixture.GetStreamName(); + public async Task Callback_subscribe_to_non_existing_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -46,10 +46,38 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_subscribe_to_non_existing_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.Start); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + Assert.False(appeared.Task.IsCompleted); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + appeared.TrySetResult(true); + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task subscribe_to_non_existing_stream_then_get_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_subscribe_to_non_existing_stream_then_get_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); @@ -86,28 +114,96 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_subscribe_to_non_existing_stream_then_get_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.After(StreamPosition.Start)); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + + Assert.True(await appeared.Task.WithTimeout()); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + if (e.OriginalEvent.EventNumber == StreamPosition.Start) { + appeared.TrySetException(new Exception()); + } else { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task allow_multiple_subscriptions_to_same_stream() { - var stream = _fixture.GetStreamName(); + public async Task Callback_allow_multiple_subscriptions_to_same_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var appeared = new TaskCompletionSource(); - + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1).FirstOrDefaultAsync(); int appearedCount = 0; using var s1 = await _fixture.Client - .SubscribeToStreamAsync(stream, FromStream.After(StreamPosition.Start), EventAppeared) + .SubscribeToStreamAsync(stream, FromStream.After(firstEvent.OriginalEventNumber), EventAppeared) .WithTimeout(); using var s2 = await _fixture.Client - .SubscribeToStreamAsync(stream, FromStream.After(StreamPosition.Start), EventAppeared) + .SubscribeToStreamAsync(stream, FromStream.After(firstEvent.OriginalEventNumber), EventAppeared) .WithTimeout(); + Assert.True(await appeared.Task.WithTimeout()); + + Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { + if (e.OriginalEvent.EventNumber == firstEvent.OriginalEventNumber) { + appeared.TrySetException(new Exception()); + return Task.CompletedTask; + } + + if (++appearedCount == 2) { + appeared.TrySetResult(true); + } + + return Task.CompletedTask; + } + } + + [Fact] + public async Task Iterator_allow_multiple_subscriptions_to_same_stream() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + + var appeared = new TaskCompletionSource(); + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1).FirstOrDefaultAsync(); + + int appearedCount = 0; + + var s1 = _fixture.Client.SubscribeToStream(stream, FromStream.After(firstEvent.OriginalEventNumber)); + ReadMessages(s1, EventAppeared, null); + + var s2 = _fixture.Client.SubscribeToStream(stream, FromStream.After(firstEvent.OriginalEventNumber)); + ReadMessages(s2, EventAppeared, null); Assert.True(await appeared.Task.WithTimeout()); - Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) { - if (e.OriginalEvent.EventNumber == StreamPosition.Start) { + Task EventAppeared(ResolvedEvent e) { + if (e.OriginalEvent.EventNumber == firstEvent.OriginalEventNumber) { appeared.TrySetException(new Exception()); return Task.CompletedTask; } @@ -121,12 +217,16 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) } [Fact] - public async Task calls_subscription_dropped_when_disposed() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_disposed() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1) + .FirstOrDefaultAsync(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, FromStream.Start, EventAppeared, false, + .SubscribeToStreamAsync(stream, FromStream.After(firstEvent.OriginalEventNumber), EventAppeared, false, SubscriptionDropped) .WithTimeout(); @@ -146,17 +246,51 @@ public async Task calls_subscription_dropped_when_disposed() { void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_subscription_disposed() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1) + .FirstOrDefaultAsync(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.After(firstEvent.OriginalEventNumber)); + var testEvent = _fixture.CreateTestEvents(1).First(); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + if (dropped.Task.IsCompleted) { + Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); + } + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + // new event after subscription is disposed + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, new[]{testEvent}); + + Task EventAppeared(ResolvedEvent e) { + return testEvent.EventId.Equals(e.OriginalEvent.EventId) ? Task.FromException(new Exception("Subscription not dropped")) : Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task calls_subscription_dropped_when_error_processing_event() { - var stream = _fixture.GetStreamName(); + public async Task Callback_calls_subscription_dropped_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var expectedException = new Exception("Error"); - await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(2)); + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(3)); + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1) + .FirstOrDefaultAsync(); using var subscription = await _fixture.Client.SubscribeToStreamAsync(stream, - FromStream.Start, + FromStream.After(firstEvent.OriginalEventNumber), EventAppeared, false, SubscriptionDropped) .WithTimeout(); @@ -171,10 +305,37 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, Exception? ex) => dropped.SetResult((reason, ex)); } + + [Fact] + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var dropped = new TaskCompletionSource(); + var expectedException = new Exception("Error"); + int numTimesCalled = 0; + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents(3)); + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1) + .FirstOrDefaultAsync(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.After(firstEvent.OriginalEventNumber)); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + var ex = await dropped.Task.WithTimeout(); + Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); + + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } [Fact] - public async Task reads_all_existing_events_and_keep_listening_to_new_ones() { - var stream = _fixture.GetStreamName(); + public async Task Callback_reads_all_existing_events_and_keep_listening_to_new_ones() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var events = _fixture.CreateTestEvents(20).ToArray(); @@ -191,13 +352,16 @@ public async Task reads_all_existing_events_and_keep_listening_to_new_ones() { await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, beforeEvents); + + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1) + .FirstOrDefaultAsync(); using var subscription = await _fixture.Client - .SubscribeToStreamAsync(stream, FromStream.After(StreamPosition.Start), EventAppeared, + .SubscribeToStreamAsync(stream, FromStream.After(firstEvent.OriginalEventNumber), EventAppeared, false, SubscriptionDropped) .WithTimeout(); - var writeResult = await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents); + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents); await appeared.Task.WithTimeout(); @@ -226,6 +390,56 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, dropped.SetResult((reason, ex)); } } + + [Fact] + public async Task Iterator_reads_all_existing_events_and_keep_listening_to_new_ones() { + var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; + var events = _fixture.CreateTestEvents(20).ToArray(); + + var appeared = new TaskCompletionSource(); + var dropped = new TaskCompletionSource(); + + var beforeEvents = events.Take(10); + var afterEvents = events.Skip(10); + + using var enumerator = events.AsEnumerable().GetEnumerator(); + + enumerator.MoveNext(); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.NoStream, _fixture.CreateTestEvents()); + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, beforeEvents); + + var firstEvent = await _fixture.Client.ReadStreamAsync(Direction.Forwards, stream, StreamPosition.Start, 1) + .FirstOrDefaultAsync(); + + var subscription = _fixture.Client.SubscribeToStream(stream, FromStream.After(firstEvent.OriginalEventNumber)); + ReadMessages(subscription, EventAppeared, SubscriptionDropped); + + await _fixture.Client.AppendToStreamAsync(stream, StreamState.Any, afterEvents); + + await appeared.Task.WithTimeout(); + + subscription.Dispose(); + + var ex = await dropped.Task.WithTimeout(); + Assert.Null(ex); + + Task EventAppeared(ResolvedEvent e) { + try { + Assert.Equal(enumerator.Current.EventId, e.OriginalEvent.EventId); + if (!enumerator.MoveNext()) { + appeared.TrySetResult(true); + } + } catch (Exception ex) { + appeared.TrySetException(ex); + throw; + } + + return Task.CompletedTask; + } + + void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); + } public class Fixture : EventStoreClientFixture { protected override Task Given() => Task.CompletedTask; @@ -235,5 +449,29 @@ public class Fixture : EventStoreClientFixture { public Task InitializeAsync() => _fixture.InitializeAsync(); public Task DisposeAsync() => _fixture.DisposeAsync(); + + async void ReadMessages(EventStoreClient.SubscriptionResult subscription, Func eventAppeared, Action? subscriptionDropped) { + Exception? exception = null; + try { + await foreach (var message in subscription.Messages) { + if (message is StreamMessage.Event eventMessage) { + await eventAppeared(eventMessage.ResolvedEvent); + } + } + } catch (Exception ex) { + exception = ex; + } + + //allow some time for subscription cleanup and chance for exception to be raised + await Task.Delay(100); + + try { + //subscription.SubscriptionState will throw exception if some problem occurred for the subscription + Assert.Equal(SubscriptionState.Disposed, subscription.SubscriptionState); + subscriptionDropped?.Invoke(exception); + } catch (Exception ex) { + subscriptionDropped?.Invoke(ex); + } + } } }