Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More CancellationToken todos #1555

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -937,13 +937,13 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync.set -> void
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.get -> System.Func<RabbitMQ.Client.IRecordedQueue, System.Exception, RabbitMQ.Client.IConnection, System.Threading.Tasks.Task>
~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.IBasicConsumer consumer, string queue, bool autoAck = false, string consumerTag = "", bool noLocal = false, bool exclusive = false, System.Collections.Generic.IDictionary<string, object> arguments = null) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.Task<string>
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task
~static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel channel, string queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary<string, object> arguments = null, bool noWait = false) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
~static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel channel, string queue, bool ifUnused = false, bool ifEmpty = false) -> System.Threading.Tasks.Task<uint>
Expand Down
19 changes: 12 additions & 7 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;

Expand Down Expand Up @@ -145,12 +146,13 @@ public static Task QueueUnbindAsync(this IChannel channel, string queue, string
/// method does nothing but wait for the in-progress close
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// In comparison to normal <see cref="CloseAsync(IChannel)"/> method, <see cref="AbortAsync(IChannel)"/> will not throw
/// In comparison to normal <see cref="CloseAsync(IChannel, CancellationToken)"/> method, <see cref="AbortAsync(IChannel, CancellationToken)"/> will not throw
/// <see cref="Exceptions.AlreadyClosedException"/> or <see cref="System.IO.IOException"/> or any other <see cref="Exception"/> during closing channel.
/// </remarks>
public static Task AbortAsync(this IChannel channel)
public static Task AbortAsync(this IChannel channel, CancellationToken cancellationToken = default)
{
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true);
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", true,
cancellationToken);
}

/// <summary>Asynchronously close this session.</summary>
Expand All @@ -160,9 +162,10 @@ public static Task AbortAsync(this IChannel channel)
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// </remarks>
public static Task CloseAsync(this IChannel channel)
public static Task CloseAsync(this IChannel channel, CancellationToken cancellationToken = default)
{
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false);
return channel.CloseAsync(Constants.ReplySuccess, "Goodbye", false,
cancellationToken);
}

/// <summary>
Expand All @@ -171,6 +174,7 @@ public static Task CloseAsync(this IChannel channel)
/// <param name="channel">The channel.</param>
/// <param name="replyCode">The reply code.</param>
/// <param name="replyText">The reply text.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// The method behaves in the same way as Close(), with the only
/// difference that the channel is closed with the given channel
Expand All @@ -181,9 +185,10 @@ public static Task CloseAsync(this IChannel channel)
/// A message indicating the reason for closing the channel
/// </para>
/// </remarks>
public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText)
public static Task CloseAsync(this IChannel channel, ushort replyCode, string replyText,
CancellationToken cancellationToken = default)
{
return channel.CloseAsync(replyCode, replyText, false);
return channel.CloseAsync(replyCode, replyText, false, cancellationToken);
}
}
}
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public async Task ExchangeDeleteAsync(string exchange, bool ifUnused, bool noWai
{
await InnerChannel.ExchangeDeleteAsync(exchange, ifUnused, noWait, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}

Expand All @@ -361,11 +361,11 @@ public async Task ExchangeUnbindAsync(string destination, string source, string
{
ThrowIfDisposed();
var recordedBinding = new RecordedBinding(false, destination, source, routingKey, arguments);
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
await InnerChannel.ExchangeUnbindAsync(destination, source, routingKey, arguments, noWait, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteAutoDeleteExchangeAsync(source, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}

Expand Down Expand Up @@ -396,7 +396,7 @@ public async Task<QueueDeclareOk> QueueDeclareAsync(string queue, bool durable,
if (false == passive)
{
var recordedQueue = new RecordedQueue(result.QueueName, queue.Length == 0, durable, exclusive, autoDelete, arguments);
await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false)
await _connection.RecordQueueAsync(recordedQueue, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}
return result;
Expand All @@ -415,7 +415,7 @@ public async Task<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmp
{
uint result = await InnerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty, noWait, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
return result;
}
Expand All @@ -429,11 +429,11 @@ public async Task QueueUnbindAsync(string queue, string exchange, string routing
{
ThrowIfDisposed();
var recordedBinding = new RecordedBinding(true, queue, exchange, routingKey, arguments);
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteRecordedBindingAsync(recordedBinding, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
await _innerChannel.QueueUnbindAsync(queue, exchange, routingKey, arguments, cancellationToken)
.ConfigureAwait(false);
await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false)
await _connection.DeleteAutoDeleteExchangeAsync(exchange, recordedEntitiesSemaphoreHeld: false, cancellationToken)
.ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void DoRecordExchange(in RecordedExchange exchange)
}

internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -92,16 +92,16 @@ internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName,

if (recordedEntitiesSemaphoreHeld)
{
await DoDeleteRecordedExchangeAsync(exchangeName)
await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken)
.ConfigureAwait(false);
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
await DoDeleteRecordedExchangeAsync(exchangeName)
await DoDeleteRecordedExchangeAsync(exchangeName, cancellationToken)
.ConfigureAwait(false);
}
finally
Expand All @@ -110,7 +110,7 @@ await DoDeleteRecordedExchangeAsync(exchangeName)
}
}

async Task DoDeleteRecordedExchangeAsync(string exchangeName)
async Task DoDeleteRecordedExchangeAsync(string exchangeName, CancellationToken cancellationToken)
{
_recordedExchanges.Remove(exchangeName);

Expand All @@ -120,18 +120,18 @@ async Task DoDeleteRecordedExchangeAsync(string exchangeName)
if (binding.Destination == exchangeName)
{
await DeleteRecordedBindingAsync(binding,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
await DeleteAutoDeleteExchangeAsync(binding.Source,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
}
}
}
}

internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -144,7 +144,7 @@ internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName,
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand Down Expand Up @@ -185,7 +185,7 @@ bool AnyBindingsOnExchange(string exchange)
internal int RecordedQueuesCount => _recordedQueues.Count;

internal async ValueTask RecordQueueAsync(RecordedQueue queue,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -198,7 +198,7 @@ internal async ValueTask RecordQueueAsync(RecordedQueue queue,
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand All @@ -217,7 +217,7 @@ private void DoRecordQueue(RecordedQueue queue)
}

internal async ValueTask DeleteRecordedQueueAsync(string queueName,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -226,16 +226,16 @@ internal async ValueTask DeleteRecordedQueueAsync(string queueName,

if (recordedEntitiesSemaphoreHeld)
{
await DoDeleteRecordedQueueAsync(queueName)
await DoDeleteRecordedQueueAsync(queueName, cancellationToken)
.ConfigureAwait(false);
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
await DoDeleteRecordedQueueAsync(queueName)
await DoDeleteRecordedQueueAsync(queueName, cancellationToken)
.ConfigureAwait(false);
}
finally
Expand All @@ -244,7 +244,7 @@ await DoDeleteRecordedQueueAsync(queueName)
}
}

async ValueTask DoDeleteRecordedQueueAsync(string queueName)
async ValueTask DoDeleteRecordedQueueAsync(string queueName, CancellationToken cancellationToken)
{
_recordedQueues.Remove(queueName);

Expand All @@ -254,10 +254,10 @@ async ValueTask DoDeleteRecordedQueueAsync(string queueName)
if (binding.Destination == queueName)
{
await DeleteRecordedBindingAsync(binding,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
await DeleteAutoDeleteExchangeAsync(binding.Source,
recordedEntitiesSemaphoreHeld: true)
recordedEntitiesSemaphoreHeld: true, cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ private void DoRecordBinding(in RecordedBinding binding)
}

internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb,
bool recordedEntitiesSemaphoreHeld)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (_disposed)
{
Expand All @@ -311,7 +311,7 @@ internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb,
}
else
{
await _recordedEntitiesSemaphore.WaitAsync()
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand Down
Loading