Skip to content

Commit

Permalink
More CancellationToken todos
Browse files Browse the repository at this point in the history
* Add cancellation token to methods in `IChannelExtensions`
* Add cancellation token to Exchange recovery

* Add cancellation token to Queue recovery

* Add cancellation token to binding recovery
  • Loading branch information
lukebakken committed May 3, 2024
1 parent a9f331e commit c623241
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 77 deletions.
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -937,13 +937,13 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync.set -> void
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedQueue, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task<uint>
Expand Down
19 changes: 12 additions & 7 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;

Expand Down Expand Up @@ -145,12 +146,13 @@ public static Task QueueUnbindAsync(this IChannel channel, string queue, string
/// method does nothing but wait for the in-progress close
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// In comparison to normal <see cref="CloseAsync(IChannel)"/> method, <see cref="AbortAsync(IChannel)"/> will not throw
/// In comparison to normal <see cref="CloseAsync(IChannel, CancellationToken)"/> method, <see cref="AbortAsync(IChannel, CancellationToken)"/> will not throw
/// <see cref="Exceptions.AlreadyClosedException"/> or <see cref="System.IO.IOException"/> or any other <see cref="Exception"/> during closing channel.
/// </remarks>
public static Task AbortAsync(this IChannel channel)
public static Task AbortAsync(this IChannel channel, CancellationToken cancellationToken = default)
{
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true);
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true,
cancellationToken);
}

/// <summary>Asynchronously close this session.</summary>
Expand All @@ -160,9 +162,10 @@ public static Task AbortAsync(this IChannel channel)
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// </remarks>
public static Task CloseAsync(this IChannel channel)
public static Task CloseAsync(this IChannel channel, CancellationToken cancellationToken = default)
{
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false);
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false,
cancellationToken);
}

/// <summary>
Expand All @@ -171,6 +174,7 @@ public static Task CloseAsync(this IChannel channel)
/// <param name="channel">The channel.</param>
/// <param name="replyCode">The reply code.</param>
/// <param name="replyText">The reply text.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// The method behaves in the same way as Close(), with the only
/// difference that the channel is closed with the given channel
Expand All @@ -181,9 +185,10 @@ public static Task CloseAsync(this IChannel channel)
/// A message indicating the reason for closing the channel
/// </para>
/// </remarks>
public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText)
public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText,
CancellationToken cancellationToken = default)
{
return channel.CloseAsync(replyCode, replyText, false);
return channel.CloseAsync(replyCode, replyText, false, cancellationToken);
}
}
}
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWai
{
await InnerChannel.ExchangeDeleteAsync(exchange, ifUnused, noWait, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}

Expand All @@ -361,11 +361,11 @@ public async Task ExchangeUnbindAsync(string destination, string source, string
{
ThrowIfDisposed();
var recordedBinding = new RecordedBinding(false, destination, source, routingKey, arguments);
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
await InnerChannel.ExchangeUnbindAsync(destination, source, routingKey, arguments, noWait, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}

Expand Down Expand Up @@ -396,7 +396,7 @@ public async Task<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable,
if (false == passive)
{
var recordedQueue = new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments);
await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false)
await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}
return result;
Expand All @@ -415,7 +415,7 @@ public async Task<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmp
{
uint result = await InnerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty, noWait, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
return result;
}
Expand All @@ -429,11 +429,11 @@ public async Task QueueUnbindAsync(string queue, string exchange, string routing
{
ThrowIfDisposed();
var recordedBinding = new RecordedBinding(true, queue, exchange, routingKey, arguments);
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
await _innerChannel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void DoRecordExchange(in RecordedExchange exchange)
}

internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -92,16 +92,16 @@ internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName,

if (recordedEntitiesSemaphoreHeld)
{
await DoDeleteRecordedExchangeAsync(exchangeName)
await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken)
.ConfigureAwait(false);
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
await DoDeleteRecordedExchangeAsync(exchangeName)
await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken)
.ConfigureAwait(false);
}
finally
Expand All @@ -110,7 +110,7 @@ await DoDeleteRecordedExchangeAsync(exchangeName)
}
}

async Task DoDeleteRecordedExchangeAsync(string exchangeName)
async Task DoDeleteRecordedExchangeAsync(string exchangeName, CancellationToken cancellationToken)
{
_recordedExchanges.Remove(exchangeName);

Expand All @@ -120,18 +120,18 @@ async Task DoDeleteRecordedExchangeAsync(string exchangeName)
if (binding.Destination == exchangeName)
{
await DeleteRecordedBindingAsync(binding,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
await DeleteAutoDeleteExchangeAsync(binding.Source,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
}
}
}
}

internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -144,7 +144,7 @@ internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName,
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand Down Expand Up @@ -185,7 +185,7 @@ bool AnyBindingsOnExchange(string exchange)
internal int RecordedQueuesCount => _recordedQueues.Count;

internal async ValueTask RecordQueueAsync(RecordedQueue queue,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -198,7 +198,7 @@ internal async ValueTask RecordQueueAsync(RecordedQueue queue,
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand All @@ -217,7 +217,7 @@ private void DoRecordQueue(RecordedQueue queue)
}

internal async ValueTask DeleteRecordedQueueAsync(string queueName,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -226,16 +226,16 @@ internal async ValueTask DeleteRecordedQueueAsync(string queueName,

if (recordedEntitiesSemaphoreHeld)
{
await DoDeleteRecordedQueueAsync(queueName)
await DoDeleteRecordedQueueAsync(queueName, cancellationToken)
.ConfigureAwait(false);
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
await DoDeleteRecordedQueueAsync(queueName)
await DoDeleteRecordedQueueAsync(queueName, cancellationToken)
.ConfigureAwait(false);
}
finally
Expand All @@ -244,7 +244,7 @@ await DoDeleteRecordedQueueAsync(queueName)
}
}

async ValueTask DoDeleteRecordedQueueAsync(string queueName)
async ValueTask DoDeleteRecordedQueueAsync(string queueName, CancellationToken cancellationToken)
{
_recordedQueues.Remove(queueName);

Expand All @@ -254,10 +254,10 @@ async ValueTask DoDeleteRecordedQueueAsync(string queueName)
if (binding.Destination == queueName)
{
await DeleteRecordedBindingAsync(binding,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
await DeleteAutoDeleteExchangeAsync(binding.Source,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ private void DoRecordBinding(in RecordedBinding binding)
}

internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -311,7 +311,7 @@ internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb,
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand Down
Loading

0 comments on commit c623241

Please sign in to comment.