Skip to content

Commit

Permalink
Merge pull request #1165 from rabbitmq/rabbitmq-dotnet-client-938-950…
Browse files Browse the repository at this point in the history
…-main

Port rabbitmq/rabbitmq-dotnet-client #950 to main
  • Loading branch information
michaelklishin authored Feb 25, 2022
2 parents 8588c9e + abcbfc7 commit 39a9f2b
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 14 deletions.
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static class IConnectionExtensions
/// </remarks>
public static void Close(this IConnection connection)
{
connection.Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan, false);
connection.Close(Constants.ReplySuccess, "Goodbye", TimeSpan.FromSeconds(30), false);
}

/// <summary>
Expand All @@ -37,7 +37,7 @@ public static void Close(this IConnection connection)
/// </remarks>
public static void Close(this IConnection connection, ushort reasonCode, string reasonText)
{
connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, false);
connection.Close(reasonCode, reasonText, TimeSpan.FromSeconds(30), false);
}

/// <summary>
Expand Down Expand Up @@ -93,7 +93,7 @@ public static void Close(this IConnection connection, ushort reasonCode, string
/// </remarks>
public static void Abort(this IConnection connection)
{
connection.Close(Constants.ReplySuccess, "Connection close forced", Timeout.InfiniteTimeSpan, true);
connection.Close(Constants.ReplySuccess, "Connection close forced", TimeSpan.FromSeconds(5), true);
}

/// <summary>
Expand All @@ -111,7 +111,7 @@ public static void Abort(this IConnection connection)
/// </remarks>
public static void Abort(this IConnection connection, ushort reasonCode, string reasonText)
{
connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, true);
connection.Close(reasonCode, reasonText, TimeSpan.FromSeconds(5), true);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public RecoveryAwareModel CreateNonRecoveringModel()
public override string ToString()
=> $"AutorecoveringConnection({InnerConnection.Id},{Endpoint},{GetHashCode()})";

internal IFrameHandler FrameHandler => InnerConnection.FrameHandler;

internal void Init()
{
Init(_factory.EndpointResolverFactory(new List<AmqpTcpEndpoint> { _factory.Endpoint }));
Expand Down
24 changes: 16 additions & 8 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@ public Connection(IConnectionFactory factory, IFrameHandler frameHandler, string
public bool IsOpen => CloseReason is null;

public int LocalPort => _frameHandler.LocalPort;

///<summary>Another overload of a Protocol property, useful
///for exposing a tighter type.</summary>
internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol;

public int RemotePort => _frameHandler.RemotePort;

public IDictionary<string, object?>? ServerProperties { get; private set; }
Expand All @@ -123,6 +118,16 @@ public Connection(IConnectionFactory factory, IFrameHandler frameHandler, string
///<summary>Explicit implementation of IConnection.Protocol.</summary>
IProtocol IConnection.Protocol => Endpoint.Protocol;

///<summary>Another overload of a Protocol property, useful
///for exposing a tighter type.</summary>
internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol;

///<summary>Used for testing only.</summary>
internal IFrameHandler FrameHandler
{
get { return _frameHandler; }
}

public event EventHandler<CallbackExceptionEventArgs> CallbackException
{
add => _callbackExceptionWrapper.AddHandler(value);
Expand Down Expand Up @@ -259,7 +264,7 @@ public void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool a
///</para>
///<para>
///Timeout determines how much time internal close operations should be given
///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity.
///to complete.
///</para>
///</remarks>
internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
Expand All @@ -279,8 +284,11 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
try
{
// Try to send connection.close wait for CloseOk in the MainLoop
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
if (!_closed)
{
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
}
}
catch (AlreadyClosedException)
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
if (m_connectionStartCell is null)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start");
Session.Connection.Close(reason, false, Timeout.InfiniteTimeSpan);
Session.Connection.Close(reason, false, TimeSpan.FromSeconds(30));
}

var method = new ConnectionStart(cmd.MethodBytes.Span);
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ public void Close()
{
lock (_semaphore)
{
if (!_closed)
if (_closed || _socket == null)
{
return;
}
else
{
try
{
Expand Down
56 changes: 56 additions & 0 deletions projects/Unit/TestConnectionShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
using System.Threading;

using RabbitMQ.Client.Impl;
using RabbitMQ.Client.Framing.Impl;

using Xunit;

Expand All @@ -41,6 +42,61 @@ namespace RabbitMQ.Client.Unit

public class TestConnectionShutdown : IntegrationFixture
{
[Fact]
public void TestCleanClosureWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var latch = new ManualResetEventSlim(false);
_model.ModelShutdown += (model, args) => {
latch.Set();
};

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

_conn.Close(TimeSpan.FromSeconds(4));
Wait(latch, TimeSpan.FromSeconds(5));
}

[Fact]
public void TestAbortWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var latch = new ManualResetEventSlim(false);
_model.ModelShutdown += (model, args) => {
latch.Set();
};

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

_conn.Abort();
// default Connection.Abort() timeout and then some
Wait(latch, TimeSpan.FromSeconds(6));
}

[Fact]
public void TestDisposedWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var latch = new ManualResetEventSlim(false);
_model.ModelShutdown += (model, args) => {
latch.Set();
};

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

_conn.Dispose();
Wait(latch, TimeSpan.FromSeconds(3));
}

[Fact]
public void TestShutdownSignalPropagationToChannels()
{
Expand Down

0 comments on commit 39a9f2b

Please sign in to comment.