diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e8a4d7b66f..779fcbf2d5 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -937,13 +937,13 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync.set -> void ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void -~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary arguments = null) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary arguments = null, bool noWait = false) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary arguments = null, bool noWait = false) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index f48be542fa..c71478a23b 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.client.impl; @@ -145,12 +146,13 @@ public static Task QueueUnbindAsync(this IChannel channel, string queue, string /// method does nothing but wait for the in-progress close /// operation to complete. This method will not return to the /// caller until the shutdown is complete. - /// In comparison to normal method, will not throw + /// In comparison to normal method, will not throw /// or or any other during closing channel. /// - public static Task AbortAsync(this IChannel channel) + public static Task AbortAsync(this IChannel channel, CancellationToken cancellationToken = default) { - return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true); + return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true, + cancellationToken); } /// Asynchronously close this session. @@ -160,9 +162,10 @@ public static Task AbortAsync(this IChannel channel) /// operation to complete. This method will not return to the /// caller until the shutdown is complete. /// - public static Task CloseAsync(this IChannel channel) + public static Task CloseAsync(this IChannel channel, CancellationToken cancellationToken = default) { - return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false); + return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false, + cancellationToken); } /// @@ -171,6 +174,7 @@ public static Task CloseAsync(this IChannel channel) /// The channel. /// The reply code. /// The reply text. + /// The cancellation token. /// /// The method behaves in the same way as Close(), with the only /// difference that the channel is closed with the given channel @@ -181,9 +185,10 @@ public static Task CloseAsync(this IChannel channel) /// A message indicating the reason for closing the channel /// /// - public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText) + public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText, + CancellationToken cancellationToken = default) { - return channel.CloseAsync(replyCode, replyText, false); + return channel.CloseAsync(replyCode, replyText, false, cancellationToken); } } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 3f0d472d2d..19d60e7479 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -351,7 +351,7 @@ public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWai { await InnerChannel.ExchangeDeleteAsync(exchange, ifUnused, noWait, cancellationToken) .ConfigureAwait(false); - await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false) + await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken) .ConfigureAwait(false); } @@ -361,11 +361,11 @@ public async Task ExchangeUnbindAsync(string destination, string source, string { ThrowIfDisposed(); var recordedBinding = new RecordedBinding(false, destination, source, routingKey, arguments); - await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false) + await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken) .ConfigureAwait(false); await InnerChannel.ExchangeUnbindAsync(destination, source, routingKey, arguments, noWait, cancellationToken) .ConfigureAwait(false); - await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false) + await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false, cancellationToken) .ConfigureAwait(false); } @@ -396,7 +396,7 @@ public async Task QueueDeclareAsync(string queue, bool durable, if (false == passive) { var recordedQueue = new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments); - await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false) + await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false, cancellationToken) .ConfigureAwait(false); } return result; @@ -415,7 +415,7 @@ public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmp { uint result = await InnerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty, noWait, cancellationToken) .ConfigureAwait(false); - await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false) + await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false, cancellationToken) .ConfigureAwait(false); return result; } @@ -429,11 +429,11 @@ public async Task QueueUnbindAsync(string queue, string exchange, string routing { ThrowIfDisposed(); var recordedBinding = new RecordedBinding(true, queue, exchange, routingKey, arguments); - await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false) + await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken) .ConfigureAwait(false); await _innerChannel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken) .ConfigureAwait(false); - await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false) + await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken) .ConfigureAwait(false); } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index 19c4782805..ac5c453bff 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -83,7 +83,7 @@ private void DoRecordExchange(in RecordedExchange exchange) } internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName, - bool recordedEntitiesSemaphoreHeld) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -92,16 +92,16 @@ internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName, if (recordedEntitiesSemaphoreHeld) { - await DoDeleteRecordedExchangeAsync(exchangeName) + await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken) .ConfigureAwait(false); } else { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { - await DoDeleteRecordedExchangeAsync(exchangeName) + await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken) .ConfigureAwait(false); } finally @@ -110,7 +110,7 @@ await DoDeleteRecordedExchangeAsync(exchangeName) } } - async Task DoDeleteRecordedExchangeAsync(string exchangeName) + async Task DoDeleteRecordedExchangeAsync(string exchangeName, CancellationToken cancellationToken) { _recordedExchanges.Remove(exchangeName); @@ -120,10 +120,10 @@ async Task DoDeleteRecordedExchangeAsync(string exchangeName) if (binding.Destination == exchangeName) { await DeleteRecordedBindingAsync(binding, - recordedEntitiesSemaphoreHeld: true) + recordedEntitiesSemaphoreHeld: true, cancellationToken) .ConfigureAwait(false); await DeleteAutoDeleteExchangeAsync(binding.Source, - recordedEntitiesSemaphoreHeld: true) + recordedEntitiesSemaphoreHeld: true, cancellationToken) .ConfigureAwait(false); } } @@ -131,7 +131,7 @@ await DeleteAutoDeleteExchangeAsync(binding.Source, } internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName, - bool recordedEntitiesSemaphoreHeld) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -144,7 +144,7 @@ internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName, } else { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { @@ -185,7 +185,7 @@ bool AnyBindingsOnExchange(string exchange) internal int RecordedQueuesCount => _recordedQueues.Count; internal async ValueTask RecordQueueAsync(RecordedQueue queue, - bool recordedEntitiesSemaphoreHeld) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -198,7 +198,7 @@ internal async ValueTask RecordQueueAsync(RecordedQueue queue, } else { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { @@ -217,7 +217,7 @@ private void DoRecordQueue(RecordedQueue queue) } internal async ValueTask DeleteRecordedQueueAsync(string queueName, - bool recordedEntitiesSemaphoreHeld) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -226,16 +226,16 @@ internal async ValueTask DeleteRecordedQueueAsync(string queueName, if (recordedEntitiesSemaphoreHeld) { - await DoDeleteRecordedQueueAsync(queueName) + await DoDeleteRecordedQueueAsync(queueName, cancellationToken) .ConfigureAwait(false); } else { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { - await DoDeleteRecordedQueueAsync(queueName) + await DoDeleteRecordedQueueAsync(queueName, cancellationToken) .ConfigureAwait(false); } finally @@ -244,7 +244,7 @@ await DoDeleteRecordedQueueAsync(queueName) } } - async ValueTask DoDeleteRecordedQueueAsync(string queueName) + async ValueTask DoDeleteRecordedQueueAsync(string queueName, CancellationToken cancellationToken) { _recordedQueues.Remove(queueName); @@ -254,10 +254,10 @@ async ValueTask DoDeleteRecordedQueueAsync(string queueName) if (binding.Destination == queueName) { await DeleteRecordedBindingAsync(binding, - recordedEntitiesSemaphoreHeld: true) + recordedEntitiesSemaphoreHeld: true, cancellationToken) .ConfigureAwait(false); await DeleteAutoDeleteExchangeAsync(binding.Source, - recordedEntitiesSemaphoreHeld: true) + recordedEntitiesSemaphoreHeld: true, cancellationToken) .ConfigureAwait(false); } } @@ -298,7 +298,7 @@ private void DoRecordBinding(in RecordedBinding binding) } internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb, - bool recordedEntitiesSemaphoreHeld) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -311,7 +311,7 @@ internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb, } else { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 271ffe2cf5..67310eb71d 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -153,7 +153,6 @@ private static void HandleTopologyRecoveryException(TopologyRecoveryException e) ESLog.Info($"Will not retry recovery because of {e.InnerException?.GetType().FullName}: it's not a known problem with connectivity, ignoring it", e); } - // TODO cancellation token private async ValueTask TryPerformAutomaticRecoveryAsync(CancellationToken cancellationToken) { ESLog.Info("Performing automatic recovery"); @@ -176,14 +175,11 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) // 2. Recover queues // 3. Recover bindings // 4. Recover consumers - // TODO cancellation token - await RecoverExchangesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true) + await RecoverExchangesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken) .ConfigureAwait(false); - // TODO cancellation token - await RecoverQueuesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true) + await RecoverQueuesAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken) .ConfigureAwait(false); - // TODO cancellation token - await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true) + await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true, cancellationToken) .ConfigureAwait(false); } @@ -258,7 +254,7 @@ await _innerConnection.OpenAsync(cancellationToken) } private async ValueTask RecoverExchangesAsync(IConnection connection, - bool recordedEntitiesSemaphoreHeld = false) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -274,11 +270,11 @@ private async ValueTask RecoverExchangesAsync(IConnection connection, { try { - using (IChannel ch = await connection.CreateChannelAsync().ConfigureAwait(false)) + using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false)) { - await recordedExchange.RecoverAsync(ch) + await recordedExchange.RecoverAsync(ch, cancellationToken) .ConfigureAwait(false); - await ch.CloseAsync() + await ch.CloseAsync(cancellationToken) .ConfigureAwait(false); } } @@ -290,12 +286,13 @@ await ch.CloseAsync() try { _recordedEntitiesSemaphore.Release(); + // FUTURE (?) cancellation token await _config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync(recordedExchange, ex, this) .ConfigureAwait(false); } finally { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); } } @@ -308,7 +305,7 @@ await _recordedEntitiesSemaphore.WaitAsync() } private async Task RecoverQueuesAsync(IConnection connection, - bool recordedEntitiesSemaphoreHeld = false) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -325,11 +322,11 @@ private async Task RecoverQueuesAsync(IConnection connection, try { string newName = string.Empty; - using (IChannel ch = await connection.CreateChannelAsync().ConfigureAwait(false)) + using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false)) { - newName = await recordedQueue.RecoverAsync(ch) + newName = await recordedQueue.RecoverAsync(ch, cancellationToken) .ConfigureAwait(false); - await ch.CloseAsync() + await ch.CloseAsync(cancellationToken) .ConfigureAwait(false); } string oldName = recordedQueue.Name; @@ -347,12 +344,12 @@ await ch.CloseAsync() if (recordedQueue.IsServerNamed) { await DeleteRecordedQueueAsync(oldName, - recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld) + recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken) .ConfigureAwait(false); } await RecordQueueAsync(new RecordedQueue(newName, recordedQueue), - recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld) + recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken) .ConfigureAwait(false); if (!_queueNameChangedAfterRecoveryWrapper.IsEmpty) @@ -364,7 +361,7 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue), } finally { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); } } @@ -378,6 +375,7 @@ await _recordedEntitiesSemaphore.WaitAsync() try { _recordedEntitiesSemaphore.Release(); + // FUTURE (?) cancellation token await _config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync(recordedQueue, ex, this) .ConfigureAwait(false); } @@ -419,7 +417,7 @@ void UpdateConsumerQueue(string oldName, string newName) } private async ValueTask RecoverBindingsAsync(IConnection connection, - bool recordedEntitiesSemaphoreHeld = false) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (_disposed) { @@ -435,11 +433,11 @@ private async ValueTask RecoverBindingsAsync(IConnection connection, { try { - using (IChannel ch = await connection.CreateChannelAsync().ConfigureAwait(false)) + using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false)) { - await binding.RecoverAsync(ch) + await binding.RecoverAsync(ch, cancellationToken) .ConfigureAwait(false); - await ch.CloseAsync() + await ch.CloseAsync(cancellationToken) .ConfigureAwait(false); } } @@ -451,12 +449,13 @@ await ch.CloseAsync() try { _recordedEntitiesSemaphore.Release(); + // FUTURE (?) cancellation token await _config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandlerAsync(binding, ex, this) .ConfigureAwait(false); } finally { - await _recordedEntitiesSemaphore.WaitAsync() + await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 91ff79c9f4..9ab700bd68 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -242,7 +242,6 @@ await ModelSendAsync(method, k.CancellationToken) bool result = await k; Debug.Assert(result); - // TODO cancellation token await ConsumerDispatcher.WaitForShutdownAsync() .ConfigureAwait(false); } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index 9809f1634c..ef1c9d03c1 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -195,20 +195,17 @@ private void MaybeStartCredentialRefresher() { if (_config.CredentialsProvider.ValidUntil != null) { - _config.CredentialsRefresher.Register(_config.CredentialsProvider, NotifyCredentialRefreshed); + _config.CredentialsRefresher.Register(_config.CredentialsProvider, NotifyCredentialRefreshedAsync); } } - private Task NotifyCredentialRefreshed(bool succesfully) + private async Task NotifyCredentialRefreshedAsync(bool succesfully) { if (succesfully) { - return UpdateSecretAsync(_config.CredentialsProvider.Password, "Token refresh", - CancellationToken.None); // TODO cancellation token - } - else - { - return Task.CompletedTask; + using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout); + await UpdateSecretAsync(_config.CredentialsProvider.Password, "Token refresh", cts.Token) + .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs index 7567f5cb74..153f4699b8 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Client.Impl @@ -67,15 +68,17 @@ public RecordedBinding(string destination, in RecordedBinding old) _arguments = old._arguments; } - public Task RecoverAsync(IChannel channel) + public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken) { if (_isQueueBinding) { - return channel.QueueBindAsync(_destination, _source, _routingKey, _arguments, false); + return channel.QueueBindAsync(_destination, _source, _routingKey, _arguments, false, + cancellationToken); } else { - return channel.ExchangeBindAsync(_destination, _source, _routingKey, _arguments, false); + return channel.ExchangeBindAsync(_destination, _source, _routingKey, _arguments, false, + cancellationToken); } } diff --git a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs index a7b1c1d946..0e6350d62f 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Client.Impl @@ -58,10 +59,11 @@ public RecordedExchange(string name, string type, bool durable, bool autoDelete, _arguments = arguments; } - public Task RecoverAsync(IChannel channel) + public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken) { return channel.ExchangeDeclareAsync(exchange: Name, type: _type, passive: false, - durable: _durable, autoDelete: AutoDelete, noWait: false, arguments: _arguments); + durable: _durable, autoDelete: AutoDelete, noWait: false, arguments: _arguments, + cancellationToken: cancellationToken); } public override string ToString() diff --git a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs index d491f98280..2ae2ddd8c5 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Client.Impl @@ -71,11 +72,12 @@ public RecordedQueue(string newName, in RecordedQueue old) _arguments = old._arguments; } - public Task RecoverAsync(IChannel channel) + public Task RecoverAsync(IChannel channel, CancellationToken cancellationToken) { string queueName = IsServerNamed ? string.Empty : Name; return channel.QueueDeclareAsync(queue: queueName, passive: false, - durable: _durable, exclusive: _exclusive, autoDelete: AutoDelete, arguments: _arguments); + durable: _durable, exclusive: _exclusive, autoDelete: AutoDelete, arguments: _arguments, + cancellationToken: cancellationToken); } public override string ToString()