Skip to content

Commit

Permalink
Fix sync Channel Close and add test using CloseAsync()
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Oct 24, 2023
1 parent fccd710 commit 965256a
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 7 deletions.
23 changes: 20 additions & 3 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,32 @@ public static void Close(this IChannel channel)
channel.Close(Constants.ReplySuccess, "Goodbye", false);
}

/// <summary>Close this session.</summary>
/// <summary>Asynchronously close this session.</summary>
/// <remarks>
/// If the session is already closed (or closing), then this
/// method does nothing but wait for the in-progress close
/// operation to complete. This method will not return to the
/// caller until the shutdown is complete.
/// </remarks>
public static ValueTask CloseAsync(this IChannel channel)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, "Goodbye");
return channel.CloseAsync(reason, false);
}

/// <summary>
/// Close this channel.
/// </summary>
/// <param name="channel">The channel.</param>
/// <param name="replyCode">The reply code.</param>
/// <param name="replyText">The reply text.</param>
/// <remarks>
/// The method behaves in the same way as Close(), with the only
/// difference that the channel is closed with the given channel
/// close code and message.
/// <para>
/// The close code (See under "Reply Codes" in the AMQP specification)
/// </para>
/// <para>
/// </para><para>
/// A message indicating the reason for closing the channel
/// </para>
/// </remarks>
Expand Down
44 changes: 41 additions & 3 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,47 @@ protected void TakeOver(ChannelBase other)

public void Close(ushort replyCode, string replyText, bool abort)
{
_ = CloseAsync(new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText), abort);
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
var k = new ShutdownContinuation();
ChannelShutdown += k.OnConnectionShutdown;

try
{
ConsumerDispatcher.Quiesce();

if (SetCloseReason(reason))
{
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0);
}

k.Wait(TimeSpan.FromMilliseconds(10000));
ConsumerDispatcher.WaitForShutdownAsync().ConfigureAwait(false);
}
catch (AlreadyClosedException)
{
if (!abort)
{
throw;
}
}
catch (IOException)
{
if (!abort)
{
throw;
}
}
catch (Exception)
{
if (!abort)
{
throw;
}
}
finally
{
ChannelShutdown -= k.OnConnectionShutdown;
}
}

public async ValueTask CloseAsync(ShutdownEventArgs reason, bool abort)
Expand All @@ -216,8 +256,6 @@ public async ValueTask CloseAsync(ShutdownEventArgs reason, bool abort)
bool result = await k;
Debug.Assert(result);

// TODO LRB rabbitmq/rabbitmq-dotnet-client#1347
// k.Wait(TimeSpan.FromMilliseconds(10000));
await ConsumerDispatcher.WaitForShutdownAsync().ConfigureAwait(false);
}
catch (AlreadyClosedException)
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ namespace RabbitMQ.Client
public static System.Threading.Tasks.ValueTask BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default, bool mandatory = false) { }
public static void Close(this RabbitMQ.Client.IChannel channel) { }
public static void Close(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) { }
public static System.Threading.Tasks.ValueTask CloseAsync(this RabbitMQ.Client.IChannel channel) { }
public static void ExchangeBind(this RabbitMQ.Client.IChannel channel, string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
public static void ExchangeBindNoWait(this RabbitMQ.Client.IChannel channel, string destination, string source, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
public static void ExchangeDeclare(this RabbitMQ.Client.IChannel channel, string exchange, string type, bool durable = false, bool autoDelete = false, System.Collections.Generic.IDictionary<string, object> arguments = null) { }
Expand Down
14 changes: 13 additions & 1 deletion projects/Unit/TestChannelAllocation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;
using Xunit;

Expand All @@ -50,7 +51,8 @@ public int ChannelNumber(IChannel channel)

public TestChannelAllocation()
{
_c = new ConnectionFactory().CreateConnection();
var cf = new ConnectionFactory();
_c = cf.CreateConnection();
}

public void Dispose() => _c.Close();
Expand All @@ -72,6 +74,16 @@ public void AllocateAfterFreeingLast()
Assert.Equal(1, ChannelNumber(ch));
}

[Fact]
public async Task AllocateAfterFreeingLastAsync()
{
IChannel ch = _c.CreateChannel();
Assert.Equal(1, ChannelNumber(ch));
await ch.CloseAsync();
ch = _c.CreateChannel();
Assert.Equal(1, ChannelNumber(ch));
}

public int CompareChannels(IChannel x, IChannel y)
{
int i = ChannelNumber(x);
Expand Down

0 comments on commit 965256a

Please sign in to comment.