Skip to content

Commit

Permalink
Manually port 39a9f2b to 6.x
Browse files Browse the repository at this point in the history
Ports commit 39a9f2b to 6.x branch

See also:

Port more of #1164 to this PR
  • Loading branch information
lukebakken committed Mar 8, 2022
1 parent b150af9 commit dbced1c
Show file tree
Hide file tree
Showing 33 changed files with 252 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace RabbitMQ.Client
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,14 @@ public void Init(IEndpointResolver endpoints)
Init(fh);
}

internal IFrameHandler FrameHandler
{
get
{
return _delegate.FrameHandler;
}
}

private void Init(IFrameHandler fh)
{
if (_disposed)
Expand Down
37 changes: 26 additions & 11 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void Abort(ushort reasonCode, string reasonText, ShutdownInitiator initia

public void Close(ShutdownEventArgs reason)
{
Close(reason, false, Timeout.InfiniteTimeSpan);
Close(reason, false, TimeSpan.FromSeconds(30));
}

///<summary>Try to close connection in a graceful way</summary>
Expand All @@ -286,7 +286,7 @@ public void Close(ShutdownEventArgs reason)
///</para>
///<para>
///Timeout determines how much time internal close operations should be given
///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity.
///to complete.
///</para>
///</remarks>
public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
Expand All @@ -307,7 +307,10 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
{
// Try to send connection.close
// Wait for CloseOk in the MainLoop
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
if (!_closed)
{
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
}
}
catch (AlreadyClosedException)
{
Expand Down Expand Up @@ -453,9 +456,12 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe)
_session0.SetSessionClosing(false);
try
{
_session0.Transmit(ConnectionCloseWrapper(
hpe.ShutdownReason.ReplyCode,
hpe.ShutdownReason.ReplyText));
if (!_closed)
{
_session0.Transmit(ConnectionCloseWrapper(
hpe.ShutdownReason.ReplyCode,
hpe.ShutdownReason.ReplyText));
}
return true;
}
catch (IOException ioe)
Expand Down Expand Up @@ -952,13 +958,13 @@ public void UpdateSecret(string newSecret, string reason)
///<summary>API-side invocation of connection abort.</summary>
public void Abort()
{
Abort(Timeout.InfiniteTimeSpan);
Abort(TimeSpan.FromSeconds(5));
}

///<summary>API-side invocation of connection abort.</summary>
public void Abort(ushort reasonCode, string reasonText)
{
Abort(reasonCode, reasonText, Timeout.InfiniteTimeSpan);
Abort(reasonCode, reasonText, TimeSpan.FromSeconds(5));
}

///<summary>API-side invocation of connection abort with timeout.</summary>
Expand All @@ -976,13 +982,13 @@ public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
///<summary>API-side invocation of connection.close.</summary>
public void Close()
{
Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan);
Close(Constants.ReplySuccess, "Goodbye", TimeSpan.FromSeconds(30));
}

///<summary>API-side invocation of connection.close.</summary>
public void Close(ushort reasonCode, string reasonText)
{
Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan);
Close(reasonCode, reasonText, TimeSpan.FromSeconds(30));
}

///<summary>API-side invocation of connection.close with timeout.</summary>
Expand Down Expand Up @@ -1058,7 +1064,16 @@ internal OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonTex
return request;
}

void StartAndTune()
///<summary>Used for testing only.</summary>
internal IFrameHandler FrameHandler
{
get
{
return _frameHandler;
}
}

private void StartAndTune()
{
var connectionStartCell = new BlockingCell<ConnectionStartDetails>();
_model0.m_connectionStartCell = connectionStartCell;
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ public void Close()
{
lock (_semaphore)
{
if (!_closed)
if (_closed || _socket == null)
{
return;
}
else
{
try
{
Expand Down
23 changes: 14 additions & 9 deletions projects/Unit/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@

using NUnit.Framework;

using RabbitMQ.Client.Framing;
using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Unit
{

public class IntegrationFixture
{
internal IConnectionFactory ConnFactory;
Expand All @@ -56,6 +54,12 @@ public class IntegrationFixture

internal Encoding encoding = new UTF8Encoding();
public static TimeSpan RECOVERY_INTERVAL = TimeSpan.FromSeconds(2);
protected readonly string _testDisplayName;

public IntegrationFixture()
{
_testDisplayName = TestContext.CurrentContext.Test.FullName;
}

[SetUp]
public virtual void Init()
Expand All @@ -72,6 +76,7 @@ public void Dispose()
{
Model.Close();
}

if(Conn.IsOpen)
{
Conn.Close();
Expand Down Expand Up @@ -106,7 +111,7 @@ internal AutorecoveringConnection CreateAutorecoveringConnection(TimeSpan interv
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = interval
};
return (AutorecoveringConnection)cf.CreateConnection($"UNIT_CONN:{Guid.NewGuid()}");
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
}

internal AutorecoveringConnection CreateAutorecoveringConnection(TimeSpan interval, IList<string> hostnames)
Expand All @@ -119,7 +124,7 @@ internal AutorecoveringConnection CreateAutorecoveringConnection(TimeSpan interv
RequestedConnectionTimeout = TimeSpan.FromSeconds(1),
NetworkRecoveryInterval = interval
};
return (AutorecoveringConnection)cf.CreateConnection(hostnames, $"UNIT_CONN:{Guid.NewGuid()}");
return (AutorecoveringConnection)cf.CreateConnection(hostnames, $"{_testDisplayName}:{Guid.NewGuid()}");
}

internal AutorecoveringConnection CreateAutorecoveringConnection(IList<AmqpTcpEndpoint> endpoints)
Expand All @@ -132,7 +137,7 @@ internal AutorecoveringConnection CreateAutorecoveringConnection(IList<AmqpTcpEn
RequestedConnectionTimeout = TimeSpan.FromSeconds(1),
NetworkRecoveryInterval = RECOVERY_INTERVAL
};
return (AutorecoveringConnection)cf.CreateConnection(endpoints, $"UNIT_CONN:{Guid.NewGuid()}");
return (AutorecoveringConnection)cf.CreateConnection(endpoints, $"{_testDisplayName}:{Guid.NewGuid()}");
}

internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryDisabled()
Expand All @@ -143,7 +148,7 @@ internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyReco
TopologyRecoveryEnabled = false,
NetworkRecoveryInterval = RECOVERY_INTERVAL
};
return (AutorecoveringConnection)cf.CreateConnection($"UNIT_CONN:{Guid.NewGuid()}");
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
}

internal IConnection CreateNonRecoveringConnection()
Expand All @@ -153,7 +158,7 @@ internal IConnection CreateNonRecoveringConnection()
AutomaticRecoveryEnabled = false,
TopologyRecoveryEnabled = false
};
return cf.CreateConnection($"UNIT_CONN:{Guid.NewGuid()}");
return cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
}

internal IConnection CreateConnectionWithContinuationTimeout(bool automaticRecoveryEnabled, TimeSpan continuationTimeout)
Expand All @@ -163,7 +168,7 @@ internal IConnection CreateConnectionWithContinuationTimeout(bool automaticRecov
AutomaticRecoveryEnabled = automaticRecoveryEnabled,
ContinuationTimeout = continuationTimeout
};
return cf.CreateConnection($"UNIT_CONN:{Guid.NewGuid()}");
return cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
}

//
Expand All @@ -177,7 +182,7 @@ internal void WithTemporaryAutorecoveringConnection(Action<AutorecoveringConnect
AutomaticRecoveryEnabled = true
};

var connection = (AutorecoveringConnection)factory.CreateConnection($"UNIT_CONN:{Guid.NewGuid()}");
var connection = (AutorecoveringConnection)factory.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
try
{
action(connection);
Expand Down
4 changes: 4 additions & 0 deletions projects/Unit/TestAsyncConsumerExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class TestAsyncConsumerExceptions : IntegrationFixture
{
private static Exception TestException = new Exception("oops");

public TestAsyncConsumerExceptions() : base()
{
}

protected void TestExceptionHandlingWith(IBasicConsumer consumer,
Action<IModel, string, IBasicConsumer, string> action)
{
Expand Down
4 changes: 4 additions & 0 deletions projects/Unit/TestBasicGet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ namespace RabbitMQ.Client.Unit
[TestFixture]
public class TestBasicGet : IntegrationFixture
{
public TestBasicGet() : base()
{
}

[Test]
public void TestBasicGetWithClosedChannel()
{
Expand Down
6 changes: 5 additions & 1 deletion projects/Unit/TestBasicPublishBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@

namespace RabbitMQ.Client.Unit
{
internal class TestBasicPublishBatch : IntegrationFixture
public class TestBasicPublishBatch : IntegrationFixture
{
public TestBasicPublishBatch() : base()
{
}

[Test]
public void TestBasicPublishBatchSend()
{
Expand Down
5 changes: 4 additions & 1 deletion projects/Unit/TestConcurrentAccessWithSharedConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ namespace RabbitMQ.Client.Unit
[TestFixture]
public class TestConcurrentAccessWithSharedConnection : IntegrationFixture
{

internal const int Threads = 32;
internal CountdownEvent _latch;
internal TimeSpan _completionTimeout = TimeSpan.FromSeconds(90);

public TestConcurrentAccessWithSharedConnection() : base()
{
}

[SetUp]
public override void Init()
{
Expand Down
6 changes: 5 additions & 1 deletion projects/Unit/TestConfirmSelect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
namespace RabbitMQ.Client.Unit
{
[TestFixture]
public class TestConfirmSelect : IntegrationFixture {
public class TestConfirmSelect : IntegrationFixture
{
public TestConfirmSelect() : base()
{
}

[Test]
public void TestConfirmSelectIdempotency()
Expand Down
5 changes: 4 additions & 1 deletion projects/Unit/TestConnectionBlocked.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ public class TestConnectionBlocked : IntegrationFixture
private readonly object _lockObject = new object();
private bool _notified;

public TestConnectionBlocked() : base()
{
}

public void HandleBlocked(object sender, ConnectionBlockedEventArgs args)
{
Unblock();
}


public void HandleUnblocked(object sender, EventArgs ea)
{
lock (_lockObject)
Expand Down
6 changes: 5 additions & 1 deletion projects/Unit/TestConnectionFactoryContinuationTimeout.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@
namespace RabbitMQ.Client.Unit
{
[TestFixture]
internal class TestConnectionFactoryContinuationTimeout : IntegrationFixture
public class TestConnectionFactoryContinuationTimeout : IntegrationFixture
{
public TestConnectionFactoryContinuationTimeout() : base()
{
}

[Test]
public void TestConnectionFactoryContinuationTimeoutOnRecoveringConnection()
{
Expand Down
22 changes: 13 additions & 9 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class TestConnectionRecovery : IntegrationFixture
private readonly ushort _closeAtCount = 16;
private string _queueName;

public TestConnectionRecovery()
public TestConnectionRecovery() : base()
{
var rnd = new Random();
_messageBody = new byte[4096];
Expand All @@ -68,10 +68,19 @@ public override void Init()
Model.QueueDelete(_queueName);
}

[TearDown]
public void CleanUp()
protected override void ReleaseResources()
{
Conn.Close();
if (Model.IsOpen)
{
Model.Close();
}

if (Conn.IsOpen)
{
Conn.Close();
}

Unblock();
}

[Test]
Expand Down Expand Up @@ -1108,11 +1117,6 @@ internal ManualResetEventSlim PrepareForShutdown(IConnection conn)
return latch;
}

protected override void ReleaseResources()
{
Unblock();
}

internal void RestartServerAndWaitForRecovery()
{
RestartServerAndWaitForRecovery((IAutorecoveringConnection)Conn);
Expand Down
Loading

0 comments on commit dbced1c

Please sign in to comment.