Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
hunter1703 committed Jul 26, 2023
1 parent cf9fc78 commit fd3dbda
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 15 deletions.
50 changes: 38 additions & 12 deletions test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason,
}

[Fact]
public async Task Iterator_subscription_dropped_when_error_processing_event() {
public async Task Iterator_client_stops_reading_messages_when_error_processing_event() {
var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}";
var dropped = new TaskCompletionSource<Exception?>();
var expectedException = new Exception("Error");

int numTimesCalled = 0;

var subscription = _fixture.Client.SubscribeToAll(FromAll.End);
ReadMessages(subscription, EventAppeared, SubscriptionDropped);

Expand All @@ -109,8 +110,13 @@ public async Task Iterator_subscription_dropped_when_error_processing_event() {
var ex = await dropped.Task.WithTimeout();

Check failure on line 110 in test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs

View workflow job for this annotation

GitHub Actions / test / EventStore.Client.Streams/ubuntu-latest/net5.0/lts

EventStore.Client.subscribe_to_all_live.Iterator_client_stops_reading_messages_when_error_processing_event

System.TimeoutException : Timed out waiting for task

Check failure on line 110 in test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs

View workflow job for this annotation

GitHub Actions / test / EventStore.Client.Streams/ubuntu-latest/net5.0/previous-lts

EventStore.Client.subscribe_to_all_live.Iterator_client_stops_reading_messages_when_error_processing_event

System.TimeoutException : Timed out waiting for task

Assert.Same(expectedException, ex);

Assert.Equal(1, numTimesCalled);

Task EventAppeared(ResolvedEvent e) => Task.FromException(expectedException);
Task EventAppeared(ResolvedEvent e) {
numTimesCalled++;
return Task.FromException(expectedException);
}

void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
}
Expand Down Expand Up @@ -183,20 +189,30 @@ public async Task Callback_does_not_read_existing_events_but_keep_listening_to_n
var appeared = new TaskCompletionSource<bool>();
var dropped = new TaskCompletionSource<(SubscriptionDroppedReason, Exception?)>();
var appearedEvents = new List<EventRecord>();
var afterEvents = _fixture.CreateTestEvents(10).ToArray();

var events = _fixture.CreateTestEvents(20);

var beforeEvents = events.Take(10);
foreach (var @event in beforeEvents) {
await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
new[] {@event});
}

var afterEvents = events.Skip(10);
var latestEvents = afterEvents as EventData[] ?? afterEvents.ToArray();

using var subscription = await _fixture.Client
.SubscribeToAllAsync(FromAll.End, EventAppeared, false, SubscriptionDropped)
.WithTimeout();

foreach (var @event in afterEvents) {
foreach (var @event in latestEvents) {
await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
new[] {@event});
}

await appeared.Task.WithTimeout();

Assert.Equal(afterEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId));
Assert.Equal(latestEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId));

if (dropped.Task.IsCompleted) {
Assert.False(dropped.Task.IsCompleted, dropped.Task.Result.ToString());
Expand All @@ -213,7 +229,7 @@ Task EventAppeared(StreamSubscription s, ResolvedEvent e, CancellationToken ct)
if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) {
appearedEvents.Add(e.Event);

if (appearedEvents.Count >= afterEvents.Length) {
if (appearedEvents.Count >= latestEvents.Length) {
appeared.TrySetResult(true);
}
}
Expand All @@ -230,19 +246,29 @@ public async Task Iterator_does_not_read_existing_events_but_keep_listening_to_n
var appeared = new TaskCompletionSource<bool>();
var dropped = new TaskCompletionSource<Exception?>();
var appearedEvents = new List<EventRecord>();
var afterEvents = _fixture.CreateTestEvents(10).ToArray();

var events = _fixture.CreateTestEvents(20);

var beforeEvents = events.Take(10);
foreach (var @event in beforeEvents) {
await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
new[] {@event});
}

var afterEvents = events.Skip(10);
var latestEvents = afterEvents as EventData[] ?? afterEvents.ToArray();

var subscription = _fixture.Client.SubscribeToAll(FromAll.End);
ReadMessages(subscription, EventAppeared, SubscriptionDropped);

foreach (var @event in afterEvents) {
foreach (var @event in latestEvents) {
await _fixture.Client.AppendToStreamAsync($"stream-{@event.EventId:n}", StreamState.NoStream,
new[] {@event});
}

await appeared.Task.WithTimeout();

Check failure on line 269 in test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs

View workflow job for this annotation

GitHub Actions / test / EventStore.Client.Streams/ubuntu-latest/net5.0/lts

EventStore.Client.subscribe_to_all_live.Iterator_does_not_read_existing_events_but_keep_listening_to_new_ones

System.TimeoutException : Timed out waiting for task

Check failure on line 269 in test/EventStore.Client.Streams.Tests/subscribe_to_all_live.cs

View workflow job for this annotation

GitHub Actions / test / EventStore.Client.Streams/ubuntu-latest/net5.0/previous-lts

EventStore.Client.subscribe_to_all_live.Iterator_does_not_read_existing_events_but_keep_listening_to_new_ones

System.TimeoutException : Timed out waiting for task

Assert.Equal(afterEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId));
Assert.Equal(latestEvents.Select(x => x.EventId), appearedEvents.Select(x => x.EventId));

if (dropped.Task.IsCompleted) {
Assert.False(dropped.Task.IsCompleted, dropped.Task.Result?.ToString());
Expand All @@ -257,7 +283,7 @@ Task EventAppeared(ResolvedEvent e) {
if (!SystemStreams.IsSystemStream(e.OriginalStreamId)) {
appearedEvents.Add(e.Event);

if (appearedEvents.Count >= afterEvents.Length) {
if (appearedEvents.Count >= latestEvents.Length) {
appeared.TrySetResult(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ void SubscriptionDropped(StreamSubscription s, SubscriptionDroppedReason reason,
}

[Fact]
public async Task Iterator_subscription_dropped_when_error_processing_event() {
public async Task Iterator_client_stops_reading_messages_when_error_processing_event() {
var stream = $"{_fixture.GetStreamName()}_{Guid.NewGuid()}";
var dropped = new TaskCompletionSource<Exception?>();
var expectedException = new Exception("Error");

int numTimesCalled = 0;
var firstEvent = await _fixture.Client.ReadAllAsync(Direction.Forwards, Position.Start, 1)
.FirstOrDefaultAsync();

Expand All @@ -121,8 +121,13 @@ public async Task Iterator_subscription_dropped_when_error_processing_event() {
var ex = await dropped.Task.WithTimeout();

Assert.Same(expectedException, ex);

Assert.Equal(1, numTimesCalled);

Task EventAppeared(ResolvedEvent e) => Task.FromException(expectedException);
Task EventAppeared(ResolvedEvent e) {
numTimesCalled++;
return Task.FromException(expectedException);
}

void SubscriptionDropped(Exception? ex) => dropped.SetResult(ex);
}
Expand Down

0 comments on commit fd3dbda

Please sign in to comment.