Skip to content

Commit

Permalink
ensure calls are disposed to prevent deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
thefringeninja committed Dec 10, 2021
1 parent f1b04d5 commit 7b022b7
Show file tree
Hide file tree
Showing 15 changed files with 98 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ public async Task ShutdownAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
await new Operations.Operations.OperationsClient(
using var call = new Operations.Operations.OperationsClient(
CreateCallInvoker(channel)).ShutdownAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -32,9 +33,10 @@ public async Task MergeIndexesAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
await new Operations.Operations.OperationsClient(
using var call = new Operations.Operations.OperationsClient(
CreateCallInvoker(channel)).MergeIndexesAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -47,9 +49,10 @@ public async Task ResignNodeAsync(
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
await new Operations.Operations.OperationsClient(
using var call = new Operations.Operations.OperationsClient(
CreateCallInvoker(channel)).ResignNodeAsync(EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -63,10 +66,11 @@ public async Task SetNodePriorityAsync(int nodePriority,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
await new Operations.Operations.OperationsClient(
using var call = new Operations.Operations.OperationsClient(
CreateCallInvoker(channel)).SetNodePriorityAsync(
new SetNodePriorityReq {Priority = nodePriority},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand All @@ -78,10 +82,11 @@ public async Task SetNodePriorityAsync(int nodePriority,
public async Task RestartPersistentSubscriptions(UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
await new Operations.Operations.OperationsClient(
using var call = new Operations.Operations.OperationsClient(
CreateCallInvoker(channel)).RestartPersistentSubscriptionsAsync(
EmptyResult,
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
}

var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
var result = await new Operations.Operations.OperationsClient(
using var call = new Operations.Operations.OperationsClient(
CreateCallInvoker(channel)).StartScavengeAsync(
new StartScavengeReq {
Options = new StartScavengeReq.Types.Options {
Expand All @@ -39,6 +39,7 @@ public async Task<DatabaseScavengeResult> StartScavengeAsync(
},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
var result = await call.ResponseAsync.ConfigureAwait(false);

return result.ScavengeResult switch {
ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve

var callInvoker = CreateCallInvoker(channel);

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
callInvoker).CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
Stream = streamName != SystemStreams.AllStream
Expand Down Expand Up @@ -217,6 +217,7 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve
}
}
}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public async Task DeleteAsync(string streamName, string groupName, UserCredentia
deleteOptions.StreamIdentifier = streamName;
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
using var call =
new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.DeleteAsync(new DeleteReq {Options = deleteOptions},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,8 @@ public async Task<PersistentSubscription> SubscribeAsync(string streamName, stri
!serverCapabilities.SupportsPersistentSubscriptionsToAll) {
throw new NotSupportedException("The server does not support persistent subscriptions to $all.");
}

var callInvoker = CreateCallInvoker(channel);

var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.Read(EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken));

var readOptions = new ReadReq.Types.Options {
BufferSize = bufferSize,
Expand All @@ -77,7 +74,8 @@ public async Task<PersistentSubscription> SubscribeAsync(string streamName, stri
readOptions.StreamIdentifier = streamName;
}

return await PersistentSubscription.Confirm(call, readOptions, autoAck, _log, eventAppeared,
return await PersistentSubscription.Confirm(channel, callInvoker, Settings,
operationOptions, userCredentials, readOptions, autoAck, _log, eventAppeared,
subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public async Task UpdateAsync(string streamName, string groupName, PersistentSub
}

var callInvoker = CreateCallInvoker(channel);
await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.UpdateAsync(new UpdateReq {
Options = new UpdateReq.Types.Options {
GroupName = groupName,
Expand Down Expand Up @@ -144,6 +144,7 @@ public async Task UpdateAsync(string streamName, string groupName, PersistentSub
},
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials,
cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,46 @@ public class PersistentSubscription : IDisposable {
/// </summary>
public string SubscriptionId { get; }

internal static async Task<PersistentSubscription> Confirm(AsyncDuplexStreamingCall<ReadReq, ReadResp> call,
internal static async Task<PersistentSubscription> Confirm(
ChannelBase channel, CallInvoker callInvoker, EventStoreClientSettings settings,
EventStoreClientOperationOptions operationOptions, UserCredentials? userCredentials,
ReadReq.Types.Options options, bool autoAck, ILogger log,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
CancellationToken cancellationToken = default) {
var cts =
#if GRPC_CORE
CancellationTokenSource.CreateLinkedTokenSource(((Channel) channel).ShutdownToken)
#else
new CancellationTokenSource()
#endif
;
var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker)
.Read(EventStoreCallOptions.Create(settings, operationOptions, userCredentials, cts.Token));

await call.RequestStream.WriteAsync(new ReadReq {
Options = options
}).ConfigureAwait(false);

if (!await call.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false) ||
call.ResponseStream.Current.ContentCase != ReadResp.ContentOneofCase.SubscriptionConfirmation) {
throw new InvalidOperationException();
}
if (await call.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false) &&
call.ResponseStream.Current.ContentCase == ReadResp.ContentOneofCase.SubscriptionConfirmation)
return new PersistentSubscription(call, autoAck, log, eventAppeared, subscriptionDropped, cts);

return new PersistentSubscription(call, autoAck, log, eventAppeared, subscriptionDropped);
cts.Dispose();
throw new InvalidOperationException("Subscription could not be confirmed.");
}

private PersistentSubscription(
AsyncDuplexStreamingCall<ReadReq, ReadResp> call,
bool autoAck, ILogger log,
Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task> eventAppeared,
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped) {
Action<PersistentSubscription, SubscriptionDroppedReason, Exception?> subscriptionDropped,
CancellationTokenSource disposed) {
_call = call;
_autoAck = autoAck;
_eventAppeared = eventAppeared;
_subscriptionDropped = subscriptionDropped;
_disposed = new CancellationTokenSource();
_disposed = disposed;
_log = log;
SubscriptionId = call.ResponseStream.Current.SubscriptionConfirmation.SubscriptionId;
Task.Run(Subscribe);
Expand Down Expand Up @@ -132,10 +145,12 @@ private async Task Subscribe() {
_log.LogDebug("Persistent Subscription {subscriptionId} confirmed.", SubscriptionId);
using var _ = _disposed;
try {
await foreach (var response in _call.ResponseStream.ReadAllAsync(_disposed.Token).ConfigureAwait(false)) {
await foreach (var response in
_call.ResponseStream.ReadAllAsync(_disposed.Token).ConfigureAwait(false)) {
if (response.ContentCase != ReadResp.ContentOneofCase.Event) {
continue;
}

try {
var resolvedEvent = ConvertToResolvedEvent(response);
_log.LogTrace(
Expand All @@ -161,6 +176,7 @@ await AckInternal(
if (_subscriptionDroppedInvoked != 0) {
return;
}

_log.LogWarning(ex,
"Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.",
SubscriptionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ private async Task DisableInternalAsync(string name, bool writeCheckpoint, UserC
public async Task RestartSubsystemAsync(UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
await new Projections.Projections.ProjectionsClient(
using var call = new Projections.Projections.ProjectionsClient(
CreateCallInvoker(channel)).RestartSubsystemAsync(new Empty(),
EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken));
await call.ResponseAsync.ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ public IAsyncEnumerable<ProjectionDetails> ListContinuousAsync(UserCredentials?
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task<ProjectionDetails> GetStatusAsync(string name, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
ListInternalAsync(new StatisticsReq.Types.Options {
public async Task<ProjectionDetails?> GetStatusAsync(string name, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
var result = await ListInternalAsync(new StatisticsReq.Types.Options {
Name = name
}, userCredentials, cancellationToken).FirstOrDefaultAsync(cancellationToken).AsTask()!;
}, userCredentials, cancellationToken).ToArrayAsync(cancellationToken).ConfigureAwait(false);
return result.FirstOrDefault();
}

private async IAsyncEnumerable<ProjectionDetails> ListInternalAsync(StatisticsReq.Types.Options options,
UserCredentials? userCredentials,
Expand Down
3 changes: 2 additions & 1 deletion src/EventStore.Client.Streams/EventStoreClient.Delete.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ private async Task<DeleteResult> DeleteInternal(DeleteReq request,
CancellationToken cancellationToken) {
_log.LogDebug("Deleting stream {streamName}.", request.Options.StreamIdentifier);
var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
var result = await new Streams.Streams.StreamsClient(
using var call = new Streams.Streams.StreamsClient(
CreateCallInvoker(channel)).DeleteAsync(request,
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken));
var result = await call.ResponseAsync.ConfigureAwait(false);

return new DeleteResult(new Position(result.Position.CommitPosition, result.Position.PreparePosition));
}
Expand Down
14 changes: 11 additions & 3 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,22 @@ public async IAsyncEnumerator<ResolvedEvent> GetAsyncEnumerator(
request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()};

var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);

var cts =
#if GRPC_CORE
CancellationTokenSource.CreateLinkedTokenSource(((Grpc.Core.Channel)channel).ShutdownToken)
#else
new CancellationTokenSource()
#endif
;
using var call = new Streams.Streams.StreamsClient(
CreateCallInvoker(channel)).Read(request,
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken));
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cts.Token));

await foreach (var e in call.ResponseStream
.ReadAllAsync(cancellationToken)
.ReadAllAsync(cts.Token)
.Select(ConvertToItem)
.WithCancellation(cancellationToken)
.WithCancellation(cts.Token)
.ConfigureAwait(false)) {
if (e.HasValue) {
yield return e.Value;
Expand Down
3 changes: 2 additions & 1 deletion src/EventStore.Client.Streams/EventStoreClient.Tombstone.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ private async Task<DeleteResult> TombstoneInternal(TombstoneReq request,
_log.LogDebug("Tombstoning stream {streamName}.", request.Options.StreamIdentifier);

var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false);
var result = await new Streams.Streams.StreamsClient(
using var call = new Streams.Streams.StreamsClient(
CreateCallInvoker(channel)).TombstoneAsync(request,
EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken));
var result = await call.ResponseAsync.ConfigureAwait(false);

return new DeleteResult(new Position(result.Position.CommitPosition, result.Position.PreparePosition));
}
Expand Down
13 changes: 6 additions & 7 deletions src/EventStore.Client.Streams/StreamSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ internal static async Task<StreamSubscription> Confirm(
ILogger log,
Func<StreamSubscription, Position, CancellationToken, Task>? checkpointReached = null,
CancellationToken cancellationToken = default) {
var enumerator = read.GetAsyncEnumerator(cancellationToken);
if (!await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false) ||
enumerator.Current.confirmation == SubscriptionConfirmation.None) {
throw new InvalidOperationException();
}

return new StreamSubscription(enumerator, eventAppeared, subscriptionDropped, log,
checkpointReached, cancellationToken);
var enumerator = read.GetAsyncEnumerator(cancellationToken);
if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false) &&
enumerator.Current.confirmation != SubscriptionConfirmation.None)
return new StreamSubscription(enumerator, eventAppeared, subscriptionDropped, log,
checkpointReached, cancellationToken);
throw new InvalidOperationException($"Subscription to {enumerator} could not be confirmed.");
}

private StreamSubscription(
Expand Down
Loading

0 comments on commit 7b022b7

Please sign in to comment.