From 6acbf12c23a4ee17e513dbf99a77ea18fc598ec4 Mon Sep 17 00:00:00 2001 From: bollhals Date: Tue, 16 Jul 2024 00:01:56 +0200 Subject: [PATCH 1/2] simplify IncomingCommand.ReturnBuffers handling --- .../RabbitMQ.Client/client/framing/Channel.cs | 2 +- .../client/impl/AsyncRpcContinuations.cs | 262 ++++++--------- .../client/impl/ChannelBase.cs | 311 +++++++----------- 3 files changed, 217 insertions(+), 358 deletions(-) diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 7370f477c1..4736181234 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -143,7 +143,7 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella } case ProtocolCommandId.ConnectionUnblocked: { - HandleConnectionUnblocked(cmd); + HandleConnectionUnblocked(); return Task.FromResult(true); } default: diff --git a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs index 447b8d9829..28a4bb04f7 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs @@ -92,8 +92,6 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken canc _continuationTimeoutCancellationTokenSource.Token, cancellationToken); } - internal DateTime StartTime { get; } = DateTime.UtcNow; - public CancellationToken CancellationToken { get @@ -136,7 +134,7 @@ public void Dispose() } } - internal class ConnectionSecureOrTuneAsyncRpcContinuation : AsyncRpcContinuation + internal sealed class ConnectionSecureOrTuneAsyncRpcContinuation : AsyncRpcContinuation { public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(continuationTimeout, cancellationToken) @@ -145,34 +143,27 @@ public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout, public override Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == ProtocolCommandId.ConnectionSecure) { - if (cmd.CommandId == ProtocolCommandId.ConnectionSecure) - { - var secure = new ConnectionSecure(cmd.MethodSpan); - _tcs.TrySetResult(new ConnectionSecureOrTune(secure._challenge, default)); - } - else if (cmd.CommandId == ProtocolCommandId.ConnectionTune) - { - var tune = new ConnectionTune(cmd.MethodSpan); - _tcs.TrySetResult(new ConnectionSecureOrTune(default, new ConnectionTuneDetails - { - m_channelMax = tune._channelMax, - m_frameMax = tune._frameMax, - m_heartbeatInSeconds = tune._heartbeat - })); - } - else + var secure = new ConnectionSecure(cmd.MethodSpan); + _tcs.TrySetResult(new ConnectionSecureOrTune(secure._challenge, default)); + } + else if (cmd.CommandId == ProtocolCommandId.ConnectionTune) + { + var tune = new ConnectionTune(cmd.MethodSpan); + _tcs.TrySetResult(new ConnectionSecureOrTune(default, new ConnectionTuneDetails { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - - return Task.CompletedTask; + m_channelMax = tune._channelMax, + m_frameMax = tune._frameMax, + m_heartbeatInSeconds = tune._heartbeat + })); } - finally + else { - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } } @@ -189,27 +180,20 @@ public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan public override Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == _expectedCommandId) { - if (cmd.CommandId == _expectedCommandId) - { - _tcs.TrySetResult(true); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - - return Task.CompletedTask; + _tcs.TrySetResult(true); } - finally + else { - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } } - internal class BasicCancelAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class BasicCancelAsyncRpcContinuation : SimpleAsyncRpcContinuation { private readonly string _consumerTag; private readonly IConsumerDispatcher _consumerDispatcher; @@ -224,29 +208,21 @@ public BasicCancelAsyncRpcContinuation(string consumerTag, IConsumerDispatcher c public override async Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == ProtocolCommandId.BasicCancelOk) { - if (cmd.CommandId == ProtocolCommandId.BasicCancelOk) - { - var method = new Client.Framing.Impl.BasicCancelOk(cmd.MethodSpan); - _tcs.TrySetResult(true); - Debug.Assert(_consumerTag == method._consumerTag); - await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken) - .ConfigureAwait(false); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } + _tcs.TrySetResult(true); + Debug.Assert(_consumerTag == new Client.Framing.Impl.BasicCancelOk(cmd.MethodSpan)._consumerTag); + await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken) + .ConfigureAwait(false); } - finally + else { - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } } } - internal class BasicConsumeAsyncRpcContinuation : AsyncRpcContinuation + internal sealed class BasicConsumeAsyncRpcContinuation : AsyncRpcContinuation { private readonly IBasicConsumer _consumer; private readonly IConsumerDispatcher _consumerDispatcher; @@ -261,28 +237,21 @@ public BasicConsumeAsyncRpcContinuation(IBasicConsumer consumer, IConsumerDispat public override async Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == ProtocolCommandId.BasicConsumeOk) { - if (cmd.CommandId == ProtocolCommandId.BasicConsumeOk) - { - var method = new Client.Framing.Impl.BasicConsumeOk(cmd.MethodSpan); - _tcs.TrySetResult(method._consumerTag); - await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerTag, CancellationToken) - .ConfigureAwait(false); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } + var method = new Client.Framing.Impl.BasicConsumeOk(cmd.MethodSpan); + _tcs.TrySetResult(method._consumerTag); + await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerTag, CancellationToken) + .ConfigureAwait(false); } - finally + else { - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } } } - internal class BasicGetAsyncRpcContinuation : AsyncRpcContinuation + internal sealed class BasicGetAsyncRpcContinuation : AsyncRpcContinuation { private readonly Func _adjustDeliveryTag; @@ -293,46 +262,40 @@ public BasicGetAsyncRpcContinuation(Func adjustDeliveryTag, _adjustDeliveryTag = adjustDeliveryTag; } + internal DateTime StartTime { get; } = DateTime.UtcNow; + public override Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == ProtocolCommandId.BasicGetOk) { - if (cmd.CommandId == ProtocolCommandId.BasicGetOk) - { - var method = new Client.Framing.Impl.BasicGetOk(cmd.MethodSpan); - var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); - - var result = new BasicGetResult( - _adjustDeliveryTag(method._deliveryTag), - method._redelivered, - method._exchange, - method._routingKey, - method._messageCount, - header, - cmd.Body.ToArray()); - - _tcs.TrySetResult(result); - } - else if (cmd.CommandId == ProtocolCommandId.BasicGetEmpty) - { - _tcs.TrySetResult(null); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - - return Task.CompletedTask; + var method = new Client.Framing.Impl.BasicGetOk(cmd.MethodSpan); + var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); + + var result = new BasicGetResult( + _adjustDeliveryTag(method._deliveryTag), + method._redelivered, + method._exchange, + method._routingKey, + method._messageCount, + header, + cmd.Body.ToArray()); + + _tcs.TrySetResult(result); + } + else if (cmd.CommandId == ProtocolCommandId.BasicGetEmpty) + { + _tcs.TrySetResult(null); } - finally + else { - // Note: since we copy the body buffer above, we want to return all buffers here - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } } - internal class BasicQosAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class BasicQosAsyncRpcContinuation : SimpleAsyncRpcContinuation { public BasicQosAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.BasicQosOk, continuationTimeout, cancellationToken) @@ -340,7 +303,7 @@ public BasicQosAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationTo } } - internal class ChannelOpenAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class ChannelOpenAsyncRpcContinuation : SimpleAsyncRpcContinuation { public ChannelOpenAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.ChannelOpenOk, continuationTimeout, cancellationToken) @@ -348,7 +311,7 @@ public ChannelOpenAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellatio } } - internal class ChannelCloseAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class ChannelCloseAsyncRpcContinuation : SimpleAsyncRpcContinuation { public ChannelCloseAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.ChannelCloseOk, continuationTimeout, cancellationToken) @@ -366,7 +329,7 @@ public void OnConnectionShutdown(object? sender, ShutdownEventArgs reason) } } - internal class ConfirmSelectAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class ConfirmSelectAsyncRpcContinuation : SimpleAsyncRpcContinuation { public ConfirmSelectAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.ConfirmSelectOk, continuationTimeout, cancellationToken) @@ -374,7 +337,7 @@ public ConfirmSelectAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellat } } - internal class ExchangeBindAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class ExchangeBindAsyncRpcContinuation : SimpleAsyncRpcContinuation { public ExchangeBindAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.ExchangeBindOk, continuationTimeout, cancellationToken) @@ -382,7 +345,7 @@ public ExchangeBindAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellati } } - internal class ExchangeDeclareAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class ExchangeDeclareAsyncRpcContinuation : SimpleAsyncRpcContinuation { public ExchangeDeclareAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.ExchangeDeclareOk, continuationTimeout, cancellationToken) @@ -390,7 +353,7 @@ public ExchangeDeclareAsyncRpcContinuation(TimeSpan continuationTimeout, Cancell } } - internal class ExchangeDeleteAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class ExchangeDeleteAsyncRpcContinuation : SimpleAsyncRpcContinuation { public ExchangeDeleteAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.ExchangeDeleteOk, continuationTimeout, cancellationToken) @@ -398,7 +361,7 @@ public ExchangeDeleteAsyncRpcContinuation(TimeSpan continuationTimeout, Cancella } } - internal class ExchangeUnbindAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class ExchangeUnbindAsyncRpcContinuation : SimpleAsyncRpcContinuation { public ExchangeUnbindAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.ExchangeUnbindOk, continuationTimeout, cancellationToken) @@ -406,7 +369,7 @@ public ExchangeUnbindAsyncRpcContinuation(TimeSpan continuationTimeout, Cancella } } - internal class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation + internal sealed class QueueDeclareAsyncRpcContinuation : AsyncRpcContinuation { public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(continuationTimeout, cancellationToken) @@ -415,29 +378,22 @@ public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellati public override Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk) { - if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk) - { - var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodSpan); - var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); - _tcs.TrySetResult(result); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - - return Task.CompletedTask; + var method = new Client.Framing.Impl.QueueDeclareOk(cmd.MethodSpan); + var result = new QueueDeclareOk(method._queue, method._messageCount, method._consumerCount); + _tcs.TrySetResult(result); } - finally + else { - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } } - internal class QueueBindAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class QueueBindAsyncRpcContinuation : SimpleAsyncRpcContinuation { public QueueBindAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.QueueBindOk, continuationTimeout, cancellationToken) @@ -445,7 +401,7 @@ public QueueBindAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationT } } - internal class QueueUnbindAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class QueueUnbindAsyncRpcContinuation : SimpleAsyncRpcContinuation { public QueueUnbindAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.QueueUnbindOk, continuationTimeout, cancellationToken) @@ -453,7 +409,7 @@ public QueueUnbindAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellatio } } - internal class QueueDeleteAsyncRpcContinuation : AsyncRpcContinuation + internal sealed class QueueDeleteAsyncRpcContinuation : AsyncRpcContinuation { public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(continuationTimeout, cancellationToken) @@ -462,28 +418,21 @@ public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellatio public override Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk) { - if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk) - { - var method = new Client.Framing.Impl.QueueDeleteOk(cmd.MethodSpan); - _tcs.TrySetResult(method._messageCount); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - - return Task.CompletedTask; + var method = new Client.Framing.Impl.QueueDeleteOk(cmd.MethodSpan); + _tcs.TrySetResult(method._messageCount); } - finally + else { - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } } - internal class QueuePurgeAsyncRpcContinuation : AsyncRpcContinuation + internal sealed class QueuePurgeAsyncRpcContinuation : AsyncRpcContinuation { public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(continuationTimeout, cancellationToken) @@ -492,28 +441,21 @@ public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellation public override Task HandleCommandAsync(IncomingCommand cmd) { - try + if (cmd.CommandId == ProtocolCommandId.QueuePurgeOk) { - if (cmd.CommandId == ProtocolCommandId.QueuePurgeOk) - { - var method = new Client.Framing.Impl.QueuePurgeOk(cmd.MethodSpan); - _tcs.TrySetResult(method._messageCount); - } - else - { - _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); - } - - return Task.CompletedTask; + var method = new Client.Framing.Impl.QueuePurgeOk(cmd.MethodSpan); + _tcs.TrySetResult(method._messageCount); } - finally + else { - cmd.ReturnBuffers(); + _tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!")); } + + return Task.CompletedTask; } } - internal class TxCommitAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class TxCommitAsyncRpcContinuation : SimpleAsyncRpcContinuation { public TxCommitAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.TxCommitOk, continuationTimeout, cancellationToken) @@ -521,7 +463,7 @@ public TxCommitAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationTo } } - internal class TxRollbackAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class TxRollbackAsyncRpcContinuation : SimpleAsyncRpcContinuation { public TxRollbackAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.TxRollbackOk, continuationTimeout, cancellationToken) @@ -529,7 +471,7 @@ public TxRollbackAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellation } } - internal class TxSelectAsyncRpcContinuation : SimpleAsyncRpcContinuation + internal sealed class TxSelectAsyncRpcContinuation : SimpleAsyncRpcContinuation { public TxSelectAsyncRpcContinuation(TimeSpan continuationTimeout, CancellationToken cancellationToken) : base(ProtocolCommandId.TxSelectOk, continuationTimeout, cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 317e628814..757c99f25f 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -440,6 +440,8 @@ await c.HandleCommandAsync(cmd) .ConfigureAwait(false); } } + + cmd.ReturnBuffers(); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -567,41 +569,27 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart protected void HandleBasicAck(IncomingCommand cmd) { - try - { - var ack = new BasicAck(cmd.MethodSpan); - if (!_basicAcksWrapper.IsEmpty) - { - var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple); - _basicAcksWrapper.Invoke(this, args); - } - - HandleAckNack(ack._deliveryTag, ack._multiple, false); - } - finally + var ack = new BasicAck(cmd.MethodSpan); + if (!_basicAcksWrapper.IsEmpty) { - cmd.ReturnBuffers(); + var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple); + _basicAcksWrapper.Invoke(this, args); } + + HandleAckNack(ack._deliveryTag, ack._multiple, false); } protected void HandleBasicNack(IncomingCommand cmd) { - try - { - var nack = new BasicNack(cmd.MethodSpan); - if (!_basicNacksWrapper.IsEmpty) - { - var args = new BasicNackEventArgs( - nack._deliveryTag, nack._multiple, nack._requeue); - _basicNacksWrapper.Invoke(this, args); - } - - HandleAckNack(nack._deliveryTag, nack._multiple, true); - } - finally + var nack = new BasicNack(cmd.MethodSpan); + if (!_basicNacksWrapper.IsEmpty) { - cmd.ReturnBuffers(); + var args = new BasicNackEventArgs( + nack._deliveryTag, nack._multiple, nack._requeue); + _basicNacksWrapper.Invoke(this, args); } + + HandleAckNack(nack._deliveryTag, nack._multiple, true); } protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) @@ -657,44 +645,30 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) protected async Task HandleBasicCancelAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - try - { - string consumerTag = new Client.Framing.Impl.BasicCancel(cmd.MethodSpan)._consumerTag; - await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken) - .ConfigureAwait(false); - return true; - } - finally - { - cmd.ReturnBuffers(); - } + string consumerTag = new Client.Framing.Impl.BasicCancel(cmd.MethodSpan)._consumerTag; + await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken) + .ConfigureAwait(false); + return true; } protected async Task HandleBasicDeliverAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - try - { - var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodSpan); - var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); - await ConsumerDispatcher.HandleBasicDeliverAsync( - method._consumerTag, - AdjustDeliveryTag(method._deliveryTag), - method._redelivered, - method._exchange, - method._routingKey, - header, - /* - * Takeover Body so it doesn't get returned as it is necessary - * for handling the Basic.Deliver method by client code. - */ - cmd.TakeoverBody(), - cancellationToken).ConfigureAwait(false); - return true; - } - finally - { - cmd.ReturnBuffers(); - } + var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodSpan); + var header = new ReadOnlyBasicProperties(cmd.HeaderSpan); + await ConsumerDispatcher.HandleBasicDeliverAsync( + method._consumerTag, + AdjustDeliveryTag(method._deliveryTag), + method._redelivered, + method._exchange, + method._routingKey, + header, + /* + * Takeover Body so it doesn't get returned as it is necessary + * for handling the Basic.Deliver method by client code. + */ + cmd.TakeoverBody(), + cancellationToken).ConfigureAwait(false); + return true; } protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) @@ -704,152 +678,109 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) protected void HandleBasicReturn(IncomingCommand cmd) { - try - { - if (!_basicReturnWrapper.IsEmpty) - { - var basicReturn = new BasicReturn(cmd.MethodSpan); - var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, - basicReturn._exchange, basicReturn._routingKey, - new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory); - _basicReturnWrapper.Invoke(this, e); - } - } - finally + if (!_basicReturnWrapper.IsEmpty) { - // Note: we can return all the buffers here since the event has been invoked and has returned - cmd.ReturnBuffers(); + var basicReturn = new BasicReturn(cmd.MethodSpan); + var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, + basicReturn._exchange, basicReturn._routingKey, + new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory); + _basicReturnWrapper.Invoke(this, e); } } protected async Task HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - try - { - var channelClose = new ChannelClose(cmd.MethodSpan); - SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, - channelClose._replyCode, - channelClose._replyText, - channelClose._classId, - channelClose._methodId)); + var channelClose = new ChannelClose(cmd.MethodSpan); + SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer, + channelClose._replyCode, + channelClose._replyText, + channelClose._classId, + channelClose._methodId)); - Session.Close(_closeReason, false); + Session.Close(_closeReason, false); - var method = new ChannelCloseOk(); - await ModelSendAsync(method, cancellationToken) - .ConfigureAwait(false); + var method = new ChannelCloseOk(); + await ModelSendAsync(method, cancellationToken) + .ConfigureAwait(false); - return true; - } - finally - { - cmd.ReturnBuffers(); - Session.Notify(); - } + Session.Notify(); + return true; } protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - try - { - /* - * Note: - * This call _must_ come before completing the async continuation - */ - FinishClose(); - - if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) - { - _continuationQueue.Next(); - await k.HandleCommandAsync(cmd) - .ConfigureAwait(false); - } + /* + * Note: + * This call _must_ come before completing the async continuation + */ + FinishClose(); - return true; - } - finally + if (_continuationQueue.TryPeek(out ChannelCloseAsyncRpcContinuation? k)) { - cmd.ReturnBuffers(); + _continuationQueue.Next(); + await k.HandleCommandAsync(cmd) + .ConfigureAwait(false); } + + return true; } protected async Task HandleChannelFlowAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - try + bool active = new ChannelFlow(cmd.MethodSpan)._active; + if (active) { - bool active = new ChannelFlow(cmd.MethodSpan)._active; - if (active) - { - _flowControlBlock.Set(); - } - else - { - _flowControlBlock.Reset(); - } - - var method = new ChannelFlowOk(active); - await ModelSendAsync(method, cancellationToken). - ConfigureAwait(false); + _flowControlBlock.Set(); + } + else + { + _flowControlBlock.Reset(); + } - if (!_flowControlWrapper.IsEmpty) - { - _flowControlWrapper.Invoke(this, new FlowControlEventArgs(active)); - } + var method = new ChannelFlowOk(active); + await ModelSendAsync(method, cancellationToken). + ConfigureAwait(false); - return true; - } - finally + if (!_flowControlWrapper.IsEmpty) { - cmd.ReturnBuffers(); + _flowControlWrapper.Invoke(this, new FlowControlEventArgs(active)); } + + return true; } protected void HandleConnectionBlocked(IncomingCommand cmd) { - try - { - string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; - Session.Connection.HandleConnectionBlocked(reason); - } - finally - { - cmd.ReturnBuffers(); - } + string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; + Session.Connection.HandleConnectionBlocked(reason); } protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { + var method = new ConnectionClose(cmd.MethodSpan); + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); try { - var method = new ConnectionClose(cmd.MethodSpan); - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); - try - { - Session.Connection.ClosedViaPeer(reason); - - var replyMethod = new ConnectionCloseOk(); - await ModelSendAsync(replyMethod, cancellationToken) - .ConfigureAwait(false); + Session.Connection.ClosedViaPeer(reason); - SetCloseReason(Session.Connection.CloseReason!); - } - catch (IOException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } - catch (AlreadyClosedException) - { - // Ignored. We're only trying to be polite by sending - // the close-ok, after all. - } + var replyMethod = new ConnectionCloseOk(); + await ModelSendAsync(replyMethod, cancellationToken) + .ConfigureAwait(false); - return true; + SetCloseReason(Session.Connection.CloseReason!); } - finally + catch (IOException) + { + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. + } + catch (AlreadyClosedException) { - cmd.ReturnBuffers(); + // Ignored. We're only trying to be polite by sending + // the close-ok, after all. } + + return true; } protected async Task HandleConnectionSecureAsync(IncomingCommand _) @@ -862,31 +793,24 @@ await k.HandleCommandAsync(new IncomingCommand()) protected async Task HandleConnectionStartAsync(IncomingCommand cmd, CancellationToken cancellationToken) { - try + if (m_connectionStartCell is null) { - if (m_connectionStartCell is null) - { - var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); - await Session.Connection.CloseAsync(reason, false, - InternalConstants.DefaultConnectionCloseTimeout, - cancellationToken) - .ConfigureAwait(false); - } - else - { - var method = new ConnectionStart(cmd.MethodSpan); - var details = new ConnectionStartDetails(method._locales, method._mechanisms, - method._serverProperties, method._versionMajor, method._versionMinor); - m_connectionStartCell.SetResult(details); - m_connectionStartCell = null; - } - - return true; + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); + await Session.Connection.CloseAsync(reason, false, + InternalConstants.DefaultConnectionCloseTimeout, + cancellationToken) + .ConfigureAwait(false); } - finally + else { - cmd.ReturnBuffers(); + var method = new ConnectionStart(cmd.MethodSpan); + var details = new ConnectionStartDetails(method._locales, method._mechanisms, + method._serverProperties, method._versionMajor, method._versionMinor); + m_connectionStartCell.SetResult(details); + m_connectionStartCell = null; } + + return true; } protected async Task HandleConnectionTuneAsync(IncomingCommand cmd) @@ -901,16 +825,9 @@ await k.HandleCommandAsync(cmd) return true; } - protected void HandleConnectionUnblocked(IncomingCommand cmd) + protected void HandleConnectionUnblocked() { - try - { - Session.Connection.HandleConnectionUnblocked(); - } - finally - { - cmd.ReturnBuffers(); - } + Session.Connection.HandleConnectionUnblocked(); } public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, From d8429a480a1110851c06d015a20731499e5da5df Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 22 Jul 2024 11:49:55 -0700 Subject: [PATCH 2/2] Use `try` / `finally` when returning buffers. --- .../client/impl/ChannelBase.cs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 757c99f25f..f761409d4d 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -431,17 +431,22 @@ private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken can * * Else, the incoming command is the return of an RPC call, and must be handled. */ - if (false == await DispatchCommandAsync(cmd, cancellationToken) - .ConfigureAwait(false)) + try { - using (IRpcContinuation c = _continuationQueue.Next()) + if (false == await DispatchCommandAsync(cmd, cancellationToken) + .ConfigureAwait(false)) { - await c.HandleCommandAsync(cmd) - .ConfigureAwait(false); + using (IRpcContinuation c = _continuationQueue.Next()) + { + await c.HandleCommandAsync(cmd) + .ConfigureAwait(false); + } } } - - cmd.ReturnBuffers(); + finally + { + cmd.ReturnBuffers(); + } } [MethodImpl(MethodImplOptions.AggressiveInlining)]