From 965256af8e1e679aa8b207efa217e91c239e5cdd Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 24 Oct 2023 14:41:44 -0700 Subject: [PATCH] Fix sync Channel Close and add test using CloseAsync() --- .../client/api/IChannelExtensions.cs | 23 ++++++++-- .../client/impl/ChannelBase.cs | 44 +++++++++++++++++-- .../Unit/APIApproval.Approve.verified.txt | 1 + projects/Unit/TestChannelAllocation.cs | 14 +++++- 4 files changed, 75 insertions(+), 7 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs index 6beb1d6bb5..22b7463fe5 100644 --- a/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/client/api/IChannelExtensions.cs @@ -250,15 +250,32 @@ public static void Close(this IChannel channel) channel.Close(Constants.ReplySuccess, "Goodbye", false); } - /// Close this session. + /// Asynchronously close this session. + /// + /// If the session is already closed (or closing), then this + /// method does nothing but wait for the in-progress close + /// operation to complete. This method will not return to the + /// caller until the shutdown is complete. + /// + public static ValueTask CloseAsync(this IChannel channel) + { + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, "Goodbye"); + return channel.CloseAsync(reason, false); + } + + /// + /// Close this channel. + /// + /// The channel. + /// The reply code. + /// The reply text. /// /// The method behaves in the same way as Close(), with the only /// difference that the channel is closed with the given channel /// close code and message. /// /// The close code (See under "Reply Codes" in the AMQP specification) - /// - /// + /// /// A message indicating the reason for closing the channel /// /// diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index f47113b30c..3faf8fd304 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -194,7 +194,47 @@ protected void TakeOver(ChannelBase other) public void Close(ushort replyCode, string replyText, bool abort) { - _ = CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText), abort); + var reason = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText); + var k = new ShutdownContinuation(); + ChannelShutdown += k.OnConnectionShutdown; + + try + { + ConsumerDispatcher.Quiesce(); + + if (SetCloseReason(reason)) + { + _Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0); + } + + k.Wait(TimeSpan.FromMilliseconds(10000)); + ConsumerDispatcher.WaitForShutdownAsync().ConfigureAwait(false); + } + catch (AlreadyClosedException) + { + if (!abort) + { + throw; + } + } + catch (IOException) + { + if (!abort) + { + throw; + } + } + catch (Exception) + { + if (!abort) + { + throw; + } + } + finally + { + ChannelShutdown -= k.OnConnectionShutdown; + } } public async ValueTask CloseAsync(ShutdownEventArgs reason, bool abort) @@ -216,8 +256,6 @@ public async ValueTask CloseAsync(ShutdownEventArgs reason, bool abort) bool result = await k; Debug.Assert(result); - // TODO LRB rabbitmq/rabbitmq-dotnet-client#1347 - // k.Wait(TimeSpan.FromMilliseconds(10000)); await ConsumerDispatcher.WaitForShutdownAsync().ConfigureAwait(false); } catch (AlreadyClosedException) diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index f028f62040..67e398dbcf 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -489,6 +489,7 @@ namespace RabbitMQ.Client public static System.Threading.Tasks.ValueTask BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory body = default, bool mandatory = false) { } public static void Close(this RabbitMQ.Client.IChannel channel) { } public static void Close(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) { } + public static System.Threading.Tasks.ValueTask CloseAsync(this RabbitMQ.Client.IChannel channel) { } public static void ExchangeBind(this RabbitMQ.Client.IChannel channel, string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments = null) { } public static void ExchangeBindNoWait(this RabbitMQ.Client.IChannel channel, string destination, string source, string routingKey, System.Collections.Generic.IDictionary arguments = null) { } public static void ExchangeDeclare(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary arguments = null) { } diff --git a/projects/Unit/TestChannelAllocation.cs b/projects/Unit/TestChannelAllocation.cs index 3a4671f6bf..3794b25084 100644 --- a/projects/Unit/TestChannelAllocation.cs +++ b/projects/Unit/TestChannelAllocation.cs @@ -31,6 +31,7 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using RabbitMQ.Client.Impl; using Xunit; @@ -50,7 +51,8 @@ public int ChannelNumber(IChannel channel) public TestChannelAllocation() { - _c = new ConnectionFactory().CreateConnection(); + var cf = new ConnectionFactory(); + _c = cf.CreateConnection(); } public void Dispose() => _c.Close(); @@ -72,6 +74,16 @@ public void AllocateAfterFreeingLast() Assert.Equal(1, ChannelNumber(ch)); } + [Fact] + public async Task AllocateAfterFreeingLastAsync() + { + IChannel ch = _c.CreateChannel(); + Assert.Equal(1, ChannelNumber(ch)); + await ch.CloseAsync(); + ch = _c.CreateChannel(); + Assert.Equal(1, ChannelNumber(ch)); + } + public int CompareChannels(IChannel x, IChannel y) { int i = ChannelNumber(x);