diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs index e56f01a99..12498e53b 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs @@ -96,11 +96,12 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, } [Fact] - public async Task Iterator_subscription_dropped_when_error_processing_event() { + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource(); var expectedException = new Exception("Error"); - + int numTimesCalled = 0; + var subscription = _fixture.Client.SubscribeToAll(FromAll.End); ReadMessages(subscription, EventAppeared, SubscriptionDropped); @@ -109,8 +110,13 @@ public async Task Iterator_subscription_dropped_when_error_processing_event() { var ex = await dropped.Task.WithTimeout(); Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); - Task EventAppeared(ResolvedEvent e) => Task.FromException(expectedException); + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); } @@ -183,20 +189,30 @@ public async Task Callback_does_not_read_existing_events_but_keep_listening_to_n var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>(); var appearedEvents = new List(); - var afterEvents = _fixture.CreateTestEvents(10).ToArray(); + + var events = _fixture.CreateTestEvents(20); + + var beforeEvents = events.Take(10); + foreach (var @event in beforeEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + var afterEvents = events.Skip(10); + var latestEvents = afterEvents as EventData[] ?? afterEvents.ToArray(); using var subscription = await _fixture.Client .SubscribeToAllAsync(FromAll.End, EventAppeared, false, SubscriptionDropped) .WithTimeout(); - - foreach (var @event in afterEvents) { + + foreach (var @event in latestEvents) { await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, new[] {@event}); } await appeared.Task.WithTimeout(); - Assert.Equal(afterEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId)); + Assert.Equal(latestEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId)); if (dropped.Task.IsCompleted) { Assert.False(dropped.Task.IsCompleted, dropped.Task.Result.ToString()); @@ -213,7 +229,7 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct) if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { appearedEvents.Add(e.Event); - if (appearedEvents.Count >= afterEvents.Length) { + if (appearedEvents.Count >= latestEvents.Length) { appeared.TrySetResult(true); } } @@ -230,19 +246,29 @@ public async Task Iterator_does_not_read_existing_events_but_keep_listening_to_n var appeared = new TaskCompletionSource(); var dropped = new TaskCompletionSource(); var appearedEvents = new List(); - var afterEvents = _fixture.CreateTestEvents(10).ToArray(); + + var events = _fixture.CreateTestEvents(20); + + var beforeEvents = events.Take(10); + foreach (var @event in beforeEvents) { + await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, + new[] {@event}); + } + + var afterEvents = events.Skip(10); + var latestEvents = afterEvents as EventData[] ?? afterEvents.ToArray(); var subscription = _fixture.Client.SubscribeToAll(FromAll.End); ReadMessages(subscription, EventAppeared, SubscriptionDropped); - foreach (var @event in afterEvents) { + foreach (var @event in latestEvents) { await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream, new[] {@event}); } await appeared.Task.WithTimeout(); - Assert.Equal(afterEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId)); + Assert.Equal(latestEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId)); if (dropped.Task.IsCompleted) { Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString()); @@ -257,7 +283,7 @@ Task EventAppeared(ResolvedEvent e) { if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) { appearedEvents.Add(e.Event); - if (appearedEvents.Count >= afterEvents.Length) { + if (appearedEvents.Count >= latestEvents.Length) { appeared.TrySetResult(true); } } diff --git a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs index a12cfc846..d69ff4b63 100644 --- a/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs +++ b/test/EventStore.Client.Streams.Tests/subscribe_to_all_with_position.cs @@ -105,11 +105,11 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason, } [Fact] - public async Task Iterator_subscription_dropped_when_error_processing_event() { + public async Task Iterator_client_stops_reading_messages_when_error_processing_event() { var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}"; var dropped = new TaskCompletionSource(); var expectedException = new Exception("Error"); - + int numTimesCalled = 0; var firstEvent = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1) .FirstOrDefaultAsync(); @@ -121,8 +121,13 @@ public async Task Iterator_subscription_dropped_when_error_processing_event() { var ex = await dropped.Task.WithTimeout(); Assert.Same(expectedException, ex); + + Assert.Equal(1, numTimesCalled); - Task EventAppeared(ResolvedEvent e) => Task.FromException(expectedException); + Task EventAppeared(ResolvedEvent e) { + numTimesCalled++; + return Task.FromException(expectedException); + } void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex); }