Skip to content

Commit

Permalink
simplify connection recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
bollhals committed Jan 19, 2021
1 parent ca65015 commit 476dcb6
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 155 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.Framing.Impl
{
#nullable enable
internal sealed partial class AutorecoveringConnection
{
private Task? _recoveryTask;
private CancellationTokenSource? _recoveryCancellationTokenSource;

private CancellationTokenSource RecoveryCancellationTokenSource => _recoveryCancellationTokenSource ??= new CancellationTokenSource();

private void HandleConnectionShutdown(object _, ShutdownEventArgs args)
{
if (ShouldTriggerConnectionRecovery(args))
{
var recoverTask = new Task<Task>(RecoverConnectionAsync);
if (Interlocked.CompareExchange(ref _recoveryTask, recoverTask.Unwrap(), null) is null)
{
recoverTask.Start();
}
}

static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) =>
args.Initiator == ShutdownInitiator.Peer ||
// happens when EOF is reached, e.g. due to RabbitMQ node
// connectivity loss or abrupt shutdown
args.Initiator == ShutdownInitiator.Library;
}

private async Task RecoverConnectionAsync()
{
try
{
var token = RecoveryCancellationTokenSource.Token;
bool success;
do
{
await Task.Delay(_factory.NetworkRecoveryInterval, token).ConfigureAwait(false);
success = TryPerformAutomaticRecovery();
} while (!success && !token.IsCancellationRequested);
}
catch (OperationCanceledException)
{
// expected when recovery cancellation token is set.
}
catch (Exception e)
{
ESLog.Error("Main recovery loop threw unexpected exception.", e);
}

// clear recovery task
_recoveryTask = null;
}

/// <summary>
/// Cancels the main recovery loop and will block until the loop finishes, or the timeout
/// expires, to prevent Close operations overlapping with recovery operations.
/// </summary>
private void StopRecoveryLoop()
{
if (_recoveryTask is null)
{
return;
}
RecoveryCancellationTokenSource.Cancel();

Task timeout = Task.Delay(_factory.RequestedConnectionTimeout);
if (Task.WhenAny(_recoveryTask, timeout).Result == timeout)
{
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
}
}
}
}
157 changes: 2 additions & 155 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand All @@ -43,7 +41,7 @@

namespace RabbitMQ.Client.Framing.Impl
{
internal sealed class AutorecoveringConnection : IConnection
internal sealed partial class AutorecoveringConnection : IConnection
{
private bool _disposed;
private Connection _delegate;
Expand Down Expand Up @@ -368,15 +366,7 @@ private void Init(IFrameHandler fh)
{
ThrowIfDisposed();
_delegate = new Connection(_factory, false, fh, ClientProvidedName);
_recoveryTask = Task.Run(MainRecoveryLoop);

ConnectionShutdown += (_, args) =>
{
if (ShouldTriggerConnectionRecovery(args))
{
_recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.BeginAutomaticRecovery);
}
};
ConnectionShutdown += HandleConnectionShutdown;
}

///<summary>API-side invocation of updating the secret.</summary>
Expand Down Expand Up @@ -754,149 +744,6 @@ private void RecoverQueues()
}
}

private static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) => args.Initiator == ShutdownInitiator.Peer ||
// happens when EOF is reached, e.g. due to RabbitMQ node
// connectivity loss or abrupt shutdown
args.Initiator == ShutdownInitiator.Library;

private enum RecoveryCommand
{
/// <summary>
/// Transition to auto-recovery state if not already in that state.
/// </summary>
BeginAutomaticRecovery,
/// <summary>
/// Attempt to recover connection. If connection is recovered, return
/// to connected state.
/// </summary>
PerformAutomaticRecovery
}


private enum RecoveryConnectionState
{
/// <summary>
/// Underlying connection is open.
/// </summary>
Connected,
/// <summary>
/// In the process of recovering underlying connection.
/// </summary>
Recovering
}

private Task _recoveryTask;
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected;

private readonly Channel<RecoveryCommand> _recoveryLoopCommandQueue = Channel.CreateUnbounded<RecoveryCommand>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false });

/// <summary>
/// This is the main loop for the auto-recovery thread.
/// </summary>
private async Task MainRecoveryLoop()
{
try
{
while (await _recoveryLoopCommandQueue.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_recoveryLoopCommandQueue.Reader.TryRead(out RecoveryCommand command))
{
switch (_recoveryLoopState)
{
case RecoveryConnectionState.Connected:
RecoveryLoopConnectedHandler(command);
break;
case RecoveryConnectionState.Recovering:
RecoveryLoopRecoveringHandler(command);
break;
default:
ESLog.Warn("RecoveryLoop state is out of range.");
break;
}
}
}
}
catch (OperationCanceledException)
{
// expected when recovery cancellation token is set.
}
catch (Exception e)
{
ESLog.Error("Main recovery loop threw unexpected exception.", e);
}
}

/// <summary>
/// Cancels the main recovery loop and will block until the loop finishes, or the timeout
/// expires, to prevent Close operations overlapping with recovery operations.
/// </summary>
private void StopRecoveryLoop()
{
_recoveryLoopCommandQueue.Writer.Complete();
Task timeout = Task.Delay(_factory.RequestedConnectionTimeout);

if (Task.WhenAny(_recoveryTask, timeout).Result == timeout)
{
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
}
}

/// <summary>
/// Handles commands when in the Recovering state.
/// </summary>
/// <param name="command"></param>
private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
{
switch (command)
{
case RecoveryCommand.BeginAutomaticRecovery:
ESLog.Info("Received request to BeginAutomaticRecovery, but already in Recovering state.");
break;
case RecoveryCommand.PerformAutomaticRecovery:
if (TryPerformAutomaticRecovery())
{
_recoveryLoopState = RecoveryConnectionState.Connected;
}
else
{
ScheduleRecoveryRetry();
}

break;
default:
ESLog.Warn($"RecoveryLoop command {command} is out of range.");
break;
}
}

/// <summary>
/// Handles commands when in the Connected state.
/// </summary>
/// <param name="command"></param>
private void RecoveryLoopConnectedHandler(RecoveryCommand command)
{
switch (command)
{
case RecoveryCommand.PerformAutomaticRecovery:
ESLog.Warn("Not expecting PerformAutomaticRecovery commands while in the connected state.");
break;
case RecoveryCommand.BeginAutomaticRecovery:
_recoveryLoopState = RecoveryConnectionState.Recovering;
ScheduleRecoveryRetry();
break;
default:
ESLog.Warn($"RecoveryLoop command {command} is out of range.");
break;
}
}

/// <summary>
/// Schedule a background Task to signal the command queue when the retry duration has elapsed.
/// </summary>
private void ScheduleRecoveryRetry() => _ = Task
.Delay(_factory.NetworkRecoveryInterval)
.ContinueWith(t => _recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.PerformAutomaticRecovery));

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
Expand Down

0 comments on commit 476dcb6

Please sign in to comment.