diff --git a/src/EventStore.Client.Operations/EventStoreOperationsClient.Admin.cs b/src/EventStore.Client.Operations/EventStoreOperationsClient.Admin.cs index ac390549c..57becfc6c 100644 --- a/src/EventStore.Client.Operations/EventStoreOperationsClient.Admin.cs +++ b/src/EventStore.Client.Operations/EventStoreOperationsClient.Admin.cs @@ -17,9 +17,10 @@ public async Task ShutdownAsync( UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Operations.Operations.OperationsClient( + using var call = new Operations.Operations.OperationsClient( CreateCallInvoker(channel)).ShutdownAsync(EmptyResult, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -32,9 +33,10 @@ public async Task MergeIndexesAsync( UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Operations.Operations.OperationsClient( + using var call = new Operations.Operations.OperationsClient( CreateCallInvoker(channel)).MergeIndexesAsync(EmptyResult, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -47,9 +49,10 @@ public async Task ResignNodeAsync( UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Operations.Operations.OperationsClient( + using var call = new Operations.Operations.OperationsClient( CreateCallInvoker(channel)).ResignNodeAsync(EmptyResult, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -63,10 +66,11 @@ public async Task SetNodePriorityAsync(int nodePriority, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Operations.Operations.OperationsClient( + using var call = new Operations.Operations.OperationsClient( CreateCallInvoker(channel)).SetNodePriorityAsync( new SetNodePriorityReq {Priority = nodePriority}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -78,10 +82,11 @@ public async Task SetNodePriorityAsync(int nodePriority, public async Task RestartPersistentSubscriptions(UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Operations.Operations.OperationsClient( + using var call = new Operations.Operations.OperationsClient( CreateCallInvoker(channel)).RestartPersistentSubscriptionsAsync( EmptyResult, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } } } diff --git a/src/EventStore.Client.Operations/EventStoreOperationsClient.Scavenge.cs b/src/EventStore.Client.Operations/EventStoreOperationsClient.Scavenge.cs index 6ce7266f3..1f7866e29 100644 --- a/src/EventStore.Client.Operations/EventStoreOperationsClient.Scavenge.cs +++ b/src/EventStore.Client.Operations/EventStoreOperationsClient.Scavenge.cs @@ -29,7 +29,7 @@ public async Task StartScavengeAsync( } var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - var result = await new Operations.Operations.OperationsClient( + using var call = new Operations.Operations.OperationsClient( CreateCallInvoker(channel)).StartScavengeAsync( new StartScavengeReq { Options = new StartScavengeReq.Types.Options { @@ -39,6 +39,7 @@ public async Task StartScavengeAsync( }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + var result = await call.ResponseAsync.ConfigureAwait(false); return result.ScavengeResult switch { ScavengeResp.Types.ScavengeResult.Started => DatabaseScavengeResult.Started(result.ScavengeId), diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs index cb9ee97e1..c713700dc 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs @@ -179,7 +179,7 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve var callInvoker = CreateCallInvoker(channel); - await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( + using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( callInvoker).CreateAsync(new CreateReq { Options = new CreateReq.Types.Options { Stream = streamName != SystemStreams.AllStream @@ -217,6 +217,7 @@ private async Task CreateInternalAsync(string streamName, string groupName, IEve } } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs index b0c54ae49..8f37367e8 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Delete.cs @@ -34,10 +34,12 @@ public async Task DeleteAsync(string streamName, string groupName, UserCredentia deleteOptions.StreamIdentifier = streamName; } - await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) + using var call = + new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) .DeleteAsync(new DeleteReq {Options = deleteOptions}, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs index 1ffdf6627..0389c3290 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Read.cs @@ -59,11 +59,8 @@ public async Task SubscribeAsync(string streamName, stri !serverCapabilities.SupportsPersistentSubscriptionsToAll) { throw new NotSupportedException("The server does not support persistent subscriptions to $all."); } - var callInvoker = CreateCallInvoker(channel); - var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) - .Read(EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken)); var readOptions = new ReadReq.Types.Options { BufferSize = bufferSize, @@ -77,7 +74,8 @@ public async Task SubscribeAsync(string streamName, stri readOptions.StreamIdentifier = streamName; } - return await PersistentSubscription.Confirm(call, readOptions, autoAck, _log, eventAppeared, + return await PersistentSubscription.Confirm(channel, callInvoker, Settings, + operationOptions, userCredentials, readOptions, autoAck, _log, eventAppeared, subscriptionDropped ?? delegate { }, cancellationToken).ConfigureAwait(false); } diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs index 4fe6c80a7..5c625fbd4 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Update.cs @@ -103,7 +103,7 @@ public async Task UpdateAsync(string streamName, string groupName, PersistentSub } var callInvoker = CreateCallInvoker(channel); - await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) + using var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) .UpdateAsync(new UpdateReq { Options = new UpdateReq.Types.Options { GroupName = groupName, @@ -144,6 +144,7 @@ public async Task UpdateAsync(string streamName, string groupName, PersistentSub }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// diff --git a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs index 5b6e75a80..260187f6b 100644 --- a/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs +++ b/src/EventStore.Client.PersistentSubscriptions/PersistentSubscription.cs @@ -27,33 +27,46 @@ public class PersistentSubscription : IDisposable { /// public string SubscriptionId { get; } - internal static async Task Confirm(AsyncDuplexStreamingCall call, + internal static async Task Confirm( + ChannelBase channel, CallInvoker callInvoker, EventStoreClientSettings settings, + EventStoreClientOperationOptions operationOptions, UserCredentials? userCredentials, ReadReq.Types.Options options, bool autoAck, ILogger log, Func eventAppeared, Action subscriptionDropped, CancellationToken cancellationToken = default) { + var cts = +#if GRPC_CORE + CancellationTokenSource.CreateLinkedTokenSource(((Channel) channel).ShutdownToken) +#else + new CancellationTokenSource() +#endif + ; + var call = new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(callInvoker) + .Read(EventStoreCallOptions.Create(settings, operationOptions, userCredentials, cts.Token)); + await call.RequestStream.WriteAsync(new ReadReq { Options = options }).ConfigureAwait(false); - if (!await call.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false) || - call.ResponseStream.Current.ContentCase != ReadResp.ContentOneofCase.SubscriptionConfirmation) { - throw new InvalidOperationException(); - } + if (await call.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false) && + call.ResponseStream.Current.ContentCase == ReadResp.ContentOneofCase.SubscriptionConfirmation) + return new PersistentSubscription(call, autoAck, log, eventAppeared, subscriptionDropped, cts); - return new PersistentSubscription(call, autoAck, log, eventAppeared, subscriptionDropped); + cts.Dispose(); + throw new InvalidOperationException("Subscription could not be confirmed."); } private PersistentSubscription( AsyncDuplexStreamingCall call, bool autoAck, ILogger log, Func eventAppeared, - Action subscriptionDropped) { + Action subscriptionDropped, + CancellationTokenSource disposed) { _call = call; _autoAck = autoAck; _eventAppeared = eventAppeared; _subscriptionDropped = subscriptionDropped; - _disposed = new CancellationTokenSource(); + _disposed = disposed; _log = log; SubscriptionId = call.ResponseStream.Current.SubscriptionConfirmation.SubscriptionId; Task.Run(Subscribe); @@ -132,10 +145,12 @@ private async Task Subscribe() { _log.LogDebug("Persistent Subscription {subscriptionId} confirmed.", SubscriptionId); using var _ = _disposed; try { - await foreach (var response in _call.ResponseStream.ReadAllAsync(_disposed.Token).ConfigureAwait(false)) { + await foreach (var response in + _call.ResponseStream.ReadAllAsync(_disposed.Token).ConfigureAwait(false)) { if (response.ContentCase != ReadResp.ContentOneofCase.Event) { continue; } + try { var resolvedEvent = ConvertToResolvedEvent(response); _log.LogTrace( @@ -161,6 +176,7 @@ await AckInternal( if (_subscriptionDroppedInvoked != 0) { return; } + _log.LogWarning(ex, "Persistent Subscription {subscriptionId} was dropped because cancellation was requested by another caller.", SubscriptionId); diff --git a/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Control.cs b/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Control.cs index 0e2294d10..551c4c6d8 100644 --- a/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Control.cs +++ b/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Control.cs @@ -88,9 +88,10 @@ private async Task DisableInternalAsync(string name, bool writeCheckpoint, UserC public async Task RestartSubsystemAsync(UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Projections.Projections.ProjectionsClient( + using var call = new Projections.Projections.ProjectionsClient( CreateCallInvoker(channel)).RestartSubsystemAsync(new Empty(), EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } } } diff --git a/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Statistics.cs b/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Statistics.cs index e856b5be1..84e9f11cf 100644 --- a/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Statistics.cs +++ b/src/EventStore.Client.ProjectionManagement/EventStoreProjectionManagementClient.Statistics.cs @@ -39,11 +39,13 @@ public IAsyncEnumerable ListContinuousAsync(UserCredentials? /// /// /// - public Task GetStatusAsync(string name, UserCredentials? userCredentials = null, - CancellationToken cancellationToken = default) => - ListInternalAsync(new StatisticsReq.Types.Options { + public async Task GetStatusAsync(string name, UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default) { + var result = await ListInternalAsync(new StatisticsReq.Types.Options { Name = name - }, userCredentials, cancellationToken).FirstOrDefaultAsync(cancellationToken).AsTask()!; + }, userCredentials, cancellationToken).ToArrayAsync(cancellationToken).ConfigureAwait(false); + return result.FirstOrDefault(); + } private async IAsyncEnumerable ListInternalAsync(StatisticsReq.Types.Options options, UserCredentials? userCredentials, diff --git a/src/EventStore.Client.Streams/EventStoreClient.Delete.cs b/src/EventStore.Client.Streams/EventStoreClient.Delete.cs index 1a0862e20..6e0ec10b8 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Delete.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Delete.cs @@ -80,9 +80,10 @@ private async Task DeleteInternal(DeleteReq request, CancellationToken cancellationToken) { _log.LogDebug("Deleting stream {streamName}.", request.Options.StreamIdentifier); var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - var result = await new Streams.Streams.StreamsClient( + using var call = new Streams.Streams.StreamsClient( CreateCallInvoker(channel)).DeleteAsync(request, EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken)); + var result = await call.ResponseAsync.ConfigureAwait(false); return new DeleteResult(new Position(result.Position.CommitPosition, result.Position.PreparePosition)); } diff --git a/src/EventStore.Client.Streams/EventStoreClient.Read.cs b/src/EventStore.Client.Streams/EventStoreClient.Read.cs index 2d36bb0b9..11c96ebaf 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Read.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Read.cs @@ -381,14 +381,22 @@ public async IAsyncEnumerator GetAsyncEnumerator( request.Options.UuidOption = new ReadReq.Types.Options.Types.UUIDOption {Structured = new Empty()}; var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); + + var cts = +#if GRPC_CORE + CancellationTokenSource.CreateLinkedTokenSource(((Grpc.Core.Channel)channel).ShutdownToken) +#else + new CancellationTokenSource() +#endif + ; using var call = new Streams.Streams.StreamsClient( CreateCallInvoker(channel)).Read(request, - EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken)); + EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cts.Token)); await foreach (var e in call.ResponseStream - .ReadAllAsync(cancellationToken) + .ReadAllAsync(cts.Token) .Select(ConvertToItem) - .WithCancellation(cancellationToken) + .WithCancellation(cts.Token) .ConfigureAwait(false)) { if (e.HasValue) { yield return e.Value; diff --git a/src/EventStore.Client.Streams/EventStoreClient.Tombstone.cs b/src/EventStore.Client.Streams/EventStoreClient.Tombstone.cs index 316735c45..52c4273af 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Tombstone.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Tombstone.cs @@ -82,9 +82,10 @@ private async Task TombstoneInternal(TombstoneReq request, _log.LogDebug("Tombstoning stream {streamName}.", request.Options.StreamIdentifier); var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - var result = await new Streams.Streams.StreamsClient( + using var call = new Streams.Streams.StreamsClient( CreateCallInvoker(channel)).TombstoneAsync(request, EventStoreCallOptions.Create(Settings, operationOptions, userCredentials, cancellationToken)); + var result = await call.ResponseAsync.ConfigureAwait(false); return new DeleteResult(new Position(result.Position.CommitPosition, result.Position.PreparePosition)); } diff --git a/src/EventStore.Client.Streams/StreamSubscription.cs b/src/EventStore.Client.Streams/StreamSubscription.cs index fe1d02f61..70cbf52df 100644 --- a/src/EventStore.Client.Streams/StreamSubscription.cs +++ b/src/EventStore.Client.Streams/StreamSubscription.cs @@ -30,14 +30,13 @@ internal static async Task Confirm( ILogger log, Func? checkpointReached = null, CancellationToken cancellationToken = default) { - var enumerator = read.GetAsyncEnumerator(cancellationToken); - if (!await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false) || - enumerator.Current.confirmation == SubscriptionConfirmation.None) { - throw new InvalidOperationException(); - } - return new StreamSubscription(enumerator, eventAppeared, subscriptionDropped, log, - checkpointReached, cancellationToken); + var enumerator = read.GetAsyncEnumerator(cancellationToken); + if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(false) && + enumerator.Current.confirmation != SubscriptionConfirmation.None) + return new StreamSubscription(enumerator, eventAppeared, subscriptionDropped, log, + checkpointReached, cancellationToken); + throw new InvalidOperationException($"Subscription to {enumerator} could not be confirmed."); } private StreamSubscription( diff --git a/src/EventStore.Client.UserManagement/EventStoreUserManagementClient.cs b/src/EventStore.Client.UserManagement/EventStoreUserManagementClient.cs index 6d8f6936d..0db273de2 100644 --- a/src/EventStore.Client.UserManagement/EventStoreUserManagementClient.cs +++ b/src/EventStore.Client.UserManagement/EventStoreUserManagementClient.cs @@ -51,7 +51,7 @@ public async Task CreateUserAsync(string loginName, string fullName, string[] gr if (password == string.Empty) throw new ArgumentOutOfRangeException(nameof(password)); var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Users.Users.UsersClient( + using var call = new Users.Users.UsersClient( CreateCallInvoker(channel)).CreateAsync(new CreateReq { Options = new CreateReq.Types.Options { LoginName = loginName, @@ -60,6 +60,7 @@ public async Task CreateUserAsync(string loginName, string fullName, string[] gr Groups = {groups} } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -118,12 +119,13 @@ public async Task DeleteUserAsync(string loginName, UserCredentials? userCredent } var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Users.Users.UsersClient( + var call = new Users.Users.UsersClient( CreateCallInvoker(channel)).DeleteAsync(new DeleteReq { Options = new DeleteReq.Types.Options { LoginName = loginName } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -146,12 +148,13 @@ public async Task EnableUserAsync(string loginName, UserCredentials? userCredent } var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Users.Users.UsersClient( + using var call = new Users.Users.UsersClient( CreateCallInvoker(channel)).EnableAsync(new EnableReq { Options = new EnableReq.Types.Options { LoginName = loginName } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -167,12 +170,13 @@ public async Task DisableUserAsync(string loginName, UserCredentials? userCreden if (loginName == string.Empty) throw new ArgumentOutOfRangeException(nameof(loginName)); var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Users.Users.UsersClient( + var call = new Users.Users.UsersClient( CreateCallInvoker(channel)).DisableAsync(new DisableReq { Options = new DisableReq.Types.Options { LoginName = loginName } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -218,7 +222,7 @@ public async Task ChangePasswordAsync(string loginName, string currentPassword, if (newPassword == string.Empty) throw new ArgumentOutOfRangeException(nameof(newPassword)); var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Users.Users.UsersClient( + using var call = new Users.Users.UsersClient( CreateCallInvoker(channel)).ChangePasswordAsync( new ChangePasswordReq { Options = new ChangePasswordReq.Types.Options { @@ -228,6 +232,7 @@ public async Task ChangePasswordAsync(string loginName, string currentPassword, } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } /// @@ -248,7 +253,7 @@ public async Task ResetPasswordAsync(string loginName, string newPassword, if (newPassword == string.Empty) throw new ArgumentOutOfRangeException(nameof(newPassword)); var (channel, _) = await GetCurrentChannelInfo().ConfigureAwait(false); - await new Users.Users.UsersClient( + var call = new Users.Users.UsersClient( CreateCallInvoker(channel)).ResetPasswordAsync( new ResetPasswordReq { Options = new ResetPasswordReq.Types.Options { @@ -257,6 +262,7 @@ public async Task ResetPasswordAsync(string loginName, string newPassword, } }, EventStoreCallOptions.Create(Settings, Settings.OperationOptions, userCredentials, cancellationToken)); + await call.ResponseAsync.ConfigureAwait(false); } private static readonly IDictionary> ExceptionMap = diff --git a/src/EventStore.Client/ChannelBaseExtensions.cs b/src/EventStore.Client/ChannelBaseExtensions.cs index 3b9f45697..7ede2f97b 100644 --- a/src/EventStore.Client/ChannelBaseExtensions.cs +++ b/src/EventStore.Client/ChannelBaseExtensions.cs @@ -1,9 +1,20 @@ using System; +using System.Threading; using System.Threading.Tasks; using Grpc.Core; namespace EventStore.Client { internal static class ChannelBaseExtensions { + public static CancellationTokenSource GetCancellationTokenSource(this ChannelBase channel) { + return +#if GRPC_CORE + CancellationTokenSource.CreateLinkedTokenSource(((Channel) channel).ShutdownToken) +#else + new CancellationTokenSource() +#endif + ; + + } public static async ValueTask DisposeAsync(this ChannelBase channel) { await channel.ShutdownAsync().ConfigureAwait(false);