diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs new file mode 100644 index 0000000000..2c78b7f010 --- /dev/null +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -0,0 +1,110 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2020 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2020 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client.Logging; + +namespace RabbitMQ.Client.Framing.Impl +{ +#nullable enable + internal sealed partial class AutorecoveringConnection + { + private Task? _recoveryTask; + private CancellationTokenSource? _recoveryCancellationTokenSource; + + private CancellationTokenSource RecoveryCancellationTokenSource => _recoveryCancellationTokenSource ??= new CancellationTokenSource(); + + private void HandleConnectionShutdown(object _, ShutdownEventArgs args) + { + if (ShouldTriggerConnectionRecovery(args)) + { + var recoverTask = new Task(RecoverConnectionAsync); + if (Interlocked.CompareExchange(ref _recoveryTask, recoverTask.Unwrap(), null) is null) + { + recoverTask.Start(); + } + } + + static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) => + args.Initiator == ShutdownInitiator.Peer || + // happens when EOF is reached, e.g. due to RabbitMQ node + // connectivity loss or abrupt shutdown + args.Initiator == ShutdownInitiator.Library; + } + + private async Task RecoverConnectionAsync() + { + try + { + var token = RecoveryCancellationTokenSource.Token; + bool success; + do + { + await Task.Delay(_factory.NetworkRecoveryInterval, token).ConfigureAwait(false); + success = TryPerformAutomaticRecovery(); + } while (!success && !token.IsCancellationRequested); + } + catch (OperationCanceledException) + { + // expected when recovery cancellation token is set. + } + catch (Exception e) + { + ESLog.Error("Main recovery loop threw unexpected exception.", e); + } + + // clear recovery task + _recoveryTask = null; + } + + /// + /// Cancels the main recovery loop and will block until the loop finishes, or the timeout + /// expires, to prevent Close operations overlapping with recovery operations. + /// + private void StopRecoveryLoop() + { + var task = _recoveryTask; + if (task is null) + { + return; + } + RecoveryCancellationTokenSource.Cancel(); + + Task timeout = Task.Delay(_factory.RequestedConnectionTimeout); + if (Task.WhenAny(task, timeout).Result == timeout) + { + ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop."); + } + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index b039d45a01..5cabca4442 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -33,8 +33,6 @@ using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; -using System.Threading.Channels; -using System.Threading.Tasks; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; @@ -43,7 +41,7 @@ namespace RabbitMQ.Client.Framing.Impl { - internal sealed class AutorecoveringConnection : IConnection + internal sealed partial class AutorecoveringConnection : IConnection { private bool _disposed; private Connection _delegate; @@ -368,15 +366,7 @@ private void Init(IFrameHandler fh) { ThrowIfDisposed(); _delegate = new Connection(_factory, false, fh, ClientProvidedName); - _recoveryTask = Task.Run(MainRecoveryLoop); - - ConnectionShutdown += (_, args) => - { - if (ShouldTriggerConnectionRecovery(args)) - { - _recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.BeginAutomaticRecovery); - } - }; + ConnectionShutdown += HandleConnectionShutdown; } ///API-side invocation of updating the secret. @@ -754,149 +744,6 @@ private void RecoverQueues() } } - private static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) => args.Initiator == ShutdownInitiator.Peer || - // happens when EOF is reached, e.g. due to RabbitMQ node - // connectivity loss or abrupt shutdown - args.Initiator == ShutdownInitiator.Library; - - private enum RecoveryCommand - { - /// - /// Transition to auto-recovery state if not already in that state. - /// - BeginAutomaticRecovery, - /// - /// Attempt to recover connection. If connection is recovered, return - /// to connected state. - /// - PerformAutomaticRecovery - } - - - private enum RecoveryConnectionState - { - /// - /// Underlying connection is open. - /// - Connected, - /// - /// In the process of recovering underlying connection. - /// - Recovering - } - - private Task _recoveryTask; - private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected; - - private readonly Channel _recoveryLoopCommandQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false }); - - /// - /// This is the main loop for the auto-recovery thread. - /// - private async Task MainRecoveryLoop() - { - try - { - while (await _recoveryLoopCommandQueue.Reader.WaitToReadAsync().ConfigureAwait(false)) - { - while (_recoveryLoopCommandQueue.Reader.TryRead(out RecoveryCommand command)) - { - switch (_recoveryLoopState) - { - case RecoveryConnectionState.Connected: - RecoveryLoopConnectedHandler(command); - break; - case RecoveryConnectionState.Recovering: - RecoveryLoopRecoveringHandler(command); - break; - default: - ESLog.Warn("RecoveryLoop state is out of range."); - break; - } - } - } - } - catch (OperationCanceledException) - { - // expected when recovery cancellation token is set. - } - catch (Exception e) - { - ESLog.Error("Main recovery loop threw unexpected exception.", e); - } - } - - /// - /// Cancels the main recovery loop and will block until the loop finishes, or the timeout - /// expires, to prevent Close operations overlapping with recovery operations. - /// - private void StopRecoveryLoop() - { - _recoveryLoopCommandQueue.Writer.Complete(); - Task timeout = Task.Delay(_factory.RequestedConnectionTimeout); - - if (Task.WhenAny(_recoveryTask, timeout).Result == timeout) - { - ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop."); - } - } - - /// - /// Handles commands when in the Recovering state. - /// - /// - private void RecoveryLoopRecoveringHandler(RecoveryCommand command) - { - switch (command) - { - case RecoveryCommand.BeginAutomaticRecovery: - ESLog.Info("Received request to BeginAutomaticRecovery, but already in Recovering state."); - break; - case RecoveryCommand.PerformAutomaticRecovery: - if (TryPerformAutomaticRecovery()) - { - _recoveryLoopState = RecoveryConnectionState.Connected; - } - else - { - ScheduleRecoveryRetry(); - } - - break; - default: - ESLog.Warn($"RecoveryLoop command {command} is out of range."); - break; - } - } - - /// - /// Handles commands when in the Connected state. - /// - /// - private void RecoveryLoopConnectedHandler(RecoveryCommand command) - { - switch (command) - { - case RecoveryCommand.PerformAutomaticRecovery: - ESLog.Warn("Not expecting PerformAutomaticRecovery commands while in the connected state."); - break; - case RecoveryCommand.BeginAutomaticRecovery: - _recoveryLoopState = RecoveryConnectionState.Recovering; - ScheduleRecoveryRetry(); - break; - default: - ESLog.Warn($"RecoveryLoop command {command} is out of range."); - break; - } - } - - /// - /// Schedule a background Task to signal the command queue when the retry duration has elapsed. - /// - private void ScheduleRecoveryRetry() => _ = Task - .Delay(_factory.NetworkRecoveryInterval) - .ContinueWith(t => _recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.PerformAutomaticRecovery)); - [MethodImpl(MethodImplOptions.AggressiveInlining)] private void ThrowIfDisposed() {