From d8763765ecefb8d6b217e040aaf34821a51c8c8c Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 14 May 2024 14:40:25 -0700 Subject: [PATCH] Can't close channel from consumer dispatcher See #1567 --- .../Test/Integration/TestAsyncConsumer.cs | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 0d61367db0..70536a8f07 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -603,6 +603,38 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer() AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0); } + [Fact] + public async Task TestCloseWithinEventHandler_GH1567() + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + QueueDeclareOk q = await _channel.QueueDeclareAsync(); + string queueName = q.QueueName; + + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.Received += async (_, eventArgs) => + { + await _channel.BasicCancelAsync(eventArgs.ConsumerTag); + // await _channel.CloseAsync(); + // _channel.Dispose(); + // _channel = null; + tcs.TrySetResult(true); + }; + + await _channel.BasicConsumeAsync(consumer, queueName, true); + + var bp = new BasicProperties(); + + await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, + basicProperties: bp, mandatory: true, body: GetRandomBody(64)); + + Assert.True(await tcs.Task); + + await _channel.CloseAsync(); + _channel.Dispose(); + _channel = null; + } + private static void SetException(Exception ex, params TaskCompletionSource[] tcsAry) { foreach (TaskCompletionSource tcs in tcsAry)