Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify connection recovery #1004

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.Framing.Impl
{
#nullable enable
internal sealed partial class AutorecoveringConnection
{
private Task? _recoveryTask;
private CancellationTokenSource? _recoveryCancellationTokenSource;

private CancellationTokenSource RecoveryCancellationTokenSource => _recoveryCancellationTokenSource ??= new CancellationTokenSource();

private void HandleConnectionShutdown(object _, ShutdownEventArgs args)
{
if (ShouldTriggerConnectionRecovery(args))
{
var recoverTask = new Task<Task>(RecoverConnectionAsync);
if (Interlocked.CompareExchange(ref _recoveryTask, recoverTask.Unwrap(), null) is null)
{
recoverTask.Start();
}
}

static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) =>
args.Initiator == ShutdownInitiator.Peer ||
// happens when EOF is reached, e.g. due to RabbitMQ node
// connectivity loss or abrupt shutdown
args.Initiator == ShutdownInitiator.Library;
}

private async Task RecoverConnectionAsync()
{
try
{
var token = RecoveryCancellationTokenSource.Token;
bool success;
do
{
await Task.Delay(_factory.NetworkRecoveryInterval, token).ConfigureAwait(false);
success = TryPerformAutomaticRecovery();
} while (!success && !token.IsCancellationRequested);
}
catch (OperationCanceledException)
{
// expected when recovery cancellation token is set.
}
catch (Exception e)
{
ESLog.Error("Main recovery loop threw unexpected exception.", e);
}

// clear recovery task
_recoveryTask = null;
}

/// <summary>
/// Cancels the main recovery loop and will block until the loop finishes, or the timeout
/// expires, to prevent Close operations overlapping with recovery operations.
/// </summary>
private void StopRecoveryLoop()
{
var task = _recoveryTask;
if (task is null)
{
return;
}
RecoveryCancellationTokenSource.Cancel();

Task timeout = Task.Delay(_factory.RequestedConnectionTimeout);
if (Task.WhenAny(task, timeout).Result == timeout)
{
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
}
}
}
}
157 changes: 2 additions & 155 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand All @@ -43,7 +41,7 @@

namespace RabbitMQ.Client.Framing.Impl
{
internal sealed class AutorecoveringConnection : IConnection
internal sealed partial class AutorecoveringConnection : IConnection
{
private bool _disposed;
private Connection _delegate;
Expand Down Expand Up @@ -368,15 +366,7 @@ private void Init(IFrameHandler fh)
{
ThrowIfDisposed();
_delegate = new Connection(_factory, false, fh, ClientProvidedName);
_recoveryTask = Task.Run(MainRecoveryLoop);

ConnectionShutdown += (_, args) =>
{
if (ShouldTriggerConnectionRecovery(args))
{
_recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.BeginAutomaticRecovery);
}
};
ConnectionShutdown += HandleConnectionShutdown;
}

///<summary>API-side invocation of updating the secret.</summary>
Expand Down Expand Up @@ -754,149 +744,6 @@ private void RecoverQueues()
}
}

private static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) => args.Initiator == ShutdownInitiator.Peer ||
// happens when EOF is reached, e.g. due to RabbitMQ node
// connectivity loss or abrupt shutdown
args.Initiator == ShutdownInitiator.Library;

private enum RecoveryCommand
{
/// <summary>
/// Transition to auto-recovery state if not already in that state.
/// </summary>
BeginAutomaticRecovery,
/// <summary>
/// Attempt to recover connection. If connection is recovered, return
/// to connected state.
/// </summary>
PerformAutomaticRecovery
}


private enum RecoveryConnectionState
{
/// <summary>
/// Underlying connection is open.
/// </summary>
Connected,
/// <summary>
/// In the process of recovering underlying connection.
/// </summary>
Recovering
}

private Task _recoveryTask;
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected;

private readonly Channel<RecoveryCommand> _recoveryLoopCommandQueue = Channel.CreateUnbounded<RecoveryCommand>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false });

/// <summary>
/// This is the main loop for the auto-recovery thread.
/// </summary>
private async Task MainRecoveryLoop()
{
try
{
while (await _recoveryLoopCommandQueue.Reader.WaitToReadAsync().ConfigureAwait(false))
{
while (_recoveryLoopCommandQueue.Reader.TryRead(out RecoveryCommand command))
{
switch (_recoveryLoopState)
{
case RecoveryConnectionState.Connected:
RecoveryLoopConnectedHandler(command);
break;
case RecoveryConnectionState.Recovering:
RecoveryLoopRecoveringHandler(command);
break;
default:
ESLog.Warn("RecoveryLoop state is out of range.");
break;
}
}
}
}
catch (OperationCanceledException)
{
// expected when recovery cancellation token is set.
}
catch (Exception e)
{
ESLog.Error("Main recovery loop threw unexpected exception.", e);
}
}

/// <summary>
/// Cancels the main recovery loop and will block until the loop finishes, or the timeout
/// expires, to prevent Close operations overlapping with recovery operations.
/// </summary>
private void StopRecoveryLoop()
{
_recoveryLoopCommandQueue.Writer.Complete();
Task timeout = Task.Delay(_factory.RequestedConnectionTimeout);

if (Task.WhenAny(_recoveryTask, timeout).Result == timeout)
{
ESLog.Warn("Timeout while trying to stop background AutorecoveringConnection recovery loop.");
}
}

/// <summary>
/// Handles commands when in the Recovering state.
/// </summary>
/// <param name="command"></param>
private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
{
switch (command)
{
case RecoveryCommand.BeginAutomaticRecovery:
ESLog.Info("Received request to BeginAutomaticRecovery, but already in Recovering state.");
break;
case RecoveryCommand.PerformAutomaticRecovery:
if (TryPerformAutomaticRecovery())
{
_recoveryLoopState = RecoveryConnectionState.Connected;
}
else
{
ScheduleRecoveryRetry();
}

break;
default:
ESLog.Warn($"RecoveryLoop command {command} is out of range.");
break;
}
}

/// <summary>
/// Handles commands when in the Connected state.
/// </summary>
/// <param name="command"></param>
private void RecoveryLoopConnectedHandler(RecoveryCommand command)
{
switch (command)
{
case RecoveryCommand.PerformAutomaticRecovery:
ESLog.Warn("Not expecting PerformAutomaticRecovery commands while in the connected state.");
break;
case RecoveryCommand.BeginAutomaticRecovery:
_recoveryLoopState = RecoveryConnectionState.Recovering;
ScheduleRecoveryRetry();
break;
default:
ESLog.Warn($"RecoveryLoop command {command} is out of range.");
break;
}
}

/// <summary>
/// Schedule a background Task to signal the command queue when the retry duration has elapsed.
/// </summary>
private void ScheduleRecoveryRetry() => _ = Task
.Delay(_factory.NetworkRecoveryInterval)
.ContinueWith(t => _recoveryLoopCommandQueue.Writer.TryWrite(RecoveryCommand.PerformAutomaticRecovery));

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
Expand Down