diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh
index 6cb4451a13..ee6918dfb6 100755
--- a/.ci/ubuntu/gha-setup.sh
+++ b/.ci/ubuntu/gha-setup.sh
@@ -43,13 +43,13 @@ function start_toxiproxy
# sudo ss -4nlp
echo "[INFO] starting Toxiproxy server docker container"
docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running"
- docker run --pull always --detach \
+ docker run --detach \
--name "$toxiproxy_docker_name" \
--hostname "$toxiproxy_docker_name" \
--publish 8474:8474 \
- --publish 55672:55672 \
+ --publish 55670-55680:55670-55680 \
--network "$docker_network_name" \
- 'ghcr.io/shopify/toxiproxy:2.7.0'
+ 'ghcr.io/shopify/toxiproxy:latest'
fi
}
@@ -58,7 +58,7 @@ function start_rabbitmq
echo "[INFO] starting RabbitMQ server docker container"
chmod 0777 "$GITHUB_WORKSPACE/.ci/ubuntu/log"
docker rm --force "$rabbitmq_docker_name" 2>/dev/null || echo "[INFO] $rabbitmq_docker_name was not running"
- docker run --pull always --detach \
+ docker run --detach \
--name "$rabbitmq_docker_name" \
--hostname "$rabbitmq_docker_name" \
--publish 5671:5671 \
diff --git a/.editorconfig b/.editorconfig
index 347225996c..0d73107917 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -169,6 +169,8 @@ dotnet_diagnostic.RS0036.severity = none
dotnet_diagnostic.RS0041.severity = none
dotnet_diagnostic.RS0051.severity = error
+dotnet_diagnostic.CA2007.severity = error
+
# C++ Files
[*.{cpp,h,in}]
curly_bracket_next_line = true
diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml
index 979c4ef66d..dde9c2a82b 100644
--- a/.github/workflows/build-test.yaml
+++ b/.github/workflows/build-test.yaml
@@ -57,7 +57,7 @@ jobs:
# Note: the cache path is relative to the workspace directory
# https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action
path: ~/installers
- key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }}
+ key: ${{ runner.os }}-v0-${{ hashFiles('.ci/windows/versions.json') }}
- name: Download Build (Debug)
uses: actions/download-artifact@v4
with:
@@ -104,7 +104,7 @@ jobs:
# Note: the cache path is relative to the workspace directory
# https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action
path: ~/installers
- key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }}
+ key: ${{ runner.os }}-v0-${{ hashFiles('.ci/windows/versions.json') }}
- name: Download Build (Debug)
uses: actions/download-artifact@v4
with:
@@ -190,6 +190,9 @@ jobs:
"${{ github.workspace }}/projects/Test/Integration/Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh
+ - name: Maybe collect toxiproxy logs
+ if: failure()
+ run: docker logs rabbitmq-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log
- name: Maybe upload RabbitMQ logs
if: failure()
uses: actions/upload-artifact@v4
diff --git a/Makefile b/Makefile
index a5ce1e9360..e8f3089543 100644
--- a/Makefile
+++ b/Makefile
@@ -6,10 +6,23 @@ RABBITMQ_DOCKER_NAME ?= rabbitmq-dotnet-client-rabbitmq
build:
dotnet build $(CURDIR)/Build.csproj
+# Note:
+#
+# --environment 'GITHUB_ACTIONS=true'
+#
+# The above argument is passed to `dotnet test` because it's assumed you
+# use this command to set up your local environment on Linux:
+#
+# ./.ci/ubuntu/gha-setup.sh toxiproxy
+#
+# The gha-setup.sh command has been tested on Ubuntu 22 and Arch Linux
+# and should work on any Linux system with a recent docker.
+#
test:
dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed'
- dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
- --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' $(CURDIR)/projects/Test/Integration/Integration.csproj --logger 'console;verbosity=detailed'
+ dotnet test --environment 'GITHUB_ACTIONS=true' \
+ --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
+ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
--environment 'PASSWORD=grapefruit' \
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
diff --git a/projects/Benchmarks/Benchmarks.csproj b/projects/Benchmarks/Benchmarks.csproj
index 56a308e404..267dd70259 100644
--- a/projects/Benchmarks/Benchmarks.csproj
+++ b/projects/Benchmarks/Benchmarks.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/RabbitMQ.Client/client/api/InternalConstants.cs b/projects/RabbitMQ.Client/client/api/InternalConstants.cs
index 25bdc300c6..8ee85ae173 100644
--- a/projects/RabbitMQ.Client/client/api/InternalConstants.cs
+++ b/projects/RabbitMQ.Client/client/api/InternalConstants.cs
@@ -37,7 +37,5 @@ internal static class InternalConstants
{
internal static readonly TimeSpan DefaultConnectionAbortTimeout = TimeSpan.FromSeconds(5);
internal static readonly TimeSpan DefaultConnectionCloseTimeout = TimeSpan.FromSeconds(30);
-
- internal static string Now => DateTime.UtcNow.ToString("s", System.Globalization.CultureInfo.InvariantCulture);
}
}
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
index edb8a76241..c004a84eca 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
@@ -396,7 +396,8 @@ public Task ConsumerCountAsync(string queue)
public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait)
{
- uint result = await InnerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty, noWait);
+ uint result = await InnerChannel.QueueDeleteAsync(queue, ifUnused, ifEmpty, noWait)
+ .ConfigureAwait(false);
await _connection.DeleteRecordedQueueAsync(queue, recordedEntitiesSemaphoreHeld: false)
.ConfigureAwait(false);
return result;
diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
index 560780e0b9..bbd06d982b 100644
--- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
+++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs
@@ -214,7 +214,8 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c
*/
if (_innerConnection?.IsOpen == true)
{
- await _innerConnection.AbortAsync(Constants.InternalError, "FailedAutoRecovery", _config.RequestedConnectionTimeout);
+ await _innerConnection.AbortAsync(Constants.InternalError, "FailedAutoRecovery", _config.RequestedConnectionTimeout)
+ .ConfigureAwait(false);
}
}
catch (Exception e2)
@@ -286,7 +287,8 @@ await ch.CloseAsync()
try
{
_recordedEntitiesSemaphore.Release();
- await _config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync(recordedExchange, ex, this);
+ await _config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandlerAsync(recordedExchange, ex, this)
+ .ConfigureAwait(false);
}
finally
{
@@ -373,7 +375,8 @@ await _recordedEntitiesSemaphore.WaitAsync()
try
{
_recordedEntitiesSemaphore.Release();
- await _config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync(recordedQueue, ex, this);
+ await _config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync(recordedQueue, ex, this)
+ .ConfigureAwait(false);
}
finally
{
@@ -445,7 +448,8 @@ await ch.CloseAsync()
try
{
_recordedEntitiesSemaphore.Release();
- await _config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandlerAsync(binding, ex, this);
+ await _config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandlerAsync(binding, ex, this)
+ .ConfigureAwait(false);
}
finally
{
@@ -521,7 +525,8 @@ await _recordedEntitiesSemaphore.WaitAsync()
try
{
_recordedEntitiesSemaphore.Release();
- await _config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandlerAsync(consumer, ex, this);
+ await _config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandlerAsync(consumer, ex, this)
+ .ConfigureAwait(false);
}
finally
{
diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
index 514c80aa20..ba50fca630 100644
--- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
+++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs
@@ -846,7 +846,8 @@ protected async Task HandleConnectionStartAsync(IncomingCommand cmd, Cance
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start");
await Session.Connection.CloseAsync(reason, false,
InternalConstants.DefaultConnectionCloseTimeout,
- cancellationToken);
+ cancellationToken)
+ .ConfigureAwait(false);
}
else
{
@@ -1045,12 +1046,14 @@ public async ValueTask BasicPublishAsync(string exchange, string ro
{
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
// TODO cancellation token
- await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
+ await ModelSendAsync(in cmd, in props, body, CancellationToken.None)
+ .ConfigureAwait(false);
}
else
{
// TODO cancellation token
- await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
+ await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None)
+ .ConfigureAwait(false);
}
}
catch
@@ -1107,12 +1110,14 @@ public async void BasicPublish(CachedString exchange, CachedString
{
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
// TODO cancellation token
- await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
+ await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None)
+ .ConfigureAwait(false);
}
else
{
// TODO cancellation token
- await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
+ await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None)
+ .ConfigureAwait(false);
}
}
catch
@@ -1155,12 +1160,14 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac
{
BasicProperties props = PopulateActivityAndPropagateTraceId(basicProperties, sendActivity);
// TODO cancellation token
- await ModelSendAsync(in cmd, in props, body, CancellationToken.None);
+ await ModelSendAsync(in cmd, in props, body, CancellationToken.None)
+ .ConfigureAwait(false);
}
else
{
// TODO cancellation token
- await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None);
+ await ModelSendAsync(in cmd, in basicProperties, body, CancellationToken.None)
+ .ConfigureAwait(false);
}
}
catch
@@ -1564,13 +1571,15 @@ await ModelSendAsync(method, k.CancellationToken)
public async Task MessageCountAsync(string queue)
{
- QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue);
+ QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue)
+ .ConfigureAwait(false);
return ok.MessageCount;
}
public async Task ConsumerCountAsync(string queue)
{
- QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue);
+ QueueDeclareOk ok = await QueueDeclarePassiveAsync(queue)
+ .ConfigureAwait(false);
return ok.ConsumerCount;
}
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
index c82d82f356..6cf2e20169 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
@@ -117,7 +117,8 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
* FinishCloseAsync will cancel the main loop
*/
MaybeTerminateMainloopAndStopHeartbeatTimers();
- await FinishCloseAsync(cancellationToken);
+ await FinishCloseAsync(cancellationToken)
+ .ConfigureAwait(false);
throw new ProtocolVersionMismatchException(Protocol.MajorVersion, Protocol.MinorVersion, serverVersion.Major, serverVersion.Minor);
}
@@ -183,7 +184,8 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
uint heartbeatInSeconds = NegotiatedMaxValue((uint)_config.HeartbeatInterval.TotalSeconds, (uint)connectionTune.m_heartbeatInSeconds);
Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds);
- await _channel0.ConnectionTuneOkAsync(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds, cancellationToken);
+ await _channel0.ConnectionTuneOkAsync(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds, cancellationToken)
+ .ConfigureAwait(false);
// TODO check for cancellation
MaybeStartCredentialRefresher();
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
index 8828416af9..4929b5f25b 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
@@ -87,7 +87,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
}
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout);
- await FinishCloseAsync(cts.Token);
+ await FinishCloseAsync(cts.Token)
+ .ConfigureAwait(false);
}
private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken)
@@ -194,7 +195,8 @@ private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe,
if (SetCloseReason(hpe.ShutdownReason))
{
OnShutdown(hpe.ShutdownReason);
- await _session0.SetSessionClosingAsync(false);
+ await _session0.SetSessionClosingAsync(false)
+ .ConfigureAwait(false);
try
{
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs
index 0807dd8715..62a1dba4a5 100644
--- a/projects/RabbitMQ.Client/client/impl/Connection.cs
+++ b/projects/RabbitMQ.Client/client/impl/Connection.cs
@@ -322,7 +322,8 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti
cancellationToken.ThrowIfCancellationRequested();
OnShutdown(reason);
- await _session0.SetSessionClosingAsync(false);
+ await _session0.SetSessionClosingAsync(false)
+ .ConfigureAwait(false);
try
{
diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs
index 17ac041d93..632b7cf72b 100644
--- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs
+++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs
@@ -31,7 +31,8 @@ protected override async Task ProcessChannelAsync(CancellationToken token)
case WorkType.Deliver:
await consumer.HandleBasicDeliverAsync(
consumerTag, work.DeliveryTag, work.Redelivered,
- work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory);
+ work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
+ .ConfigureAwait(false);
break;
case WorkType.Cancel:
consumer.HandleBasicCancel(consumerTag);
diff --git a/projects/Test/Applications/CreateChannel/CreateChannel.csproj b/projects/Test/Applications/CreateChannel/CreateChannel.csproj
index 9316c348d1..ddaac453b4 100644
--- a/projects/Test/Applications/CreateChannel/CreateChannel.csproj
+++ b/projects/Test/Applications/CreateChannel/CreateChannel.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/Test/Applications/MassPublish/MassPublish.csproj b/projects/Test/Applications/MassPublish/MassPublish.csproj
index 63d513a986..2ff780b2a3 100644
--- a/projects/Test/Applications/MassPublish/MassPublish.csproj
+++ b/projects/Test/Applications/MassPublish/MassPublish.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/Test/Common/Common.csproj b/projects/Test/Common/Common.csproj
index 7b36b33dec..59c7b946b5 100644
--- a/projects/Test/Common/Common.csproj
+++ b/projects/Test/Common/Common.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs
index 74e4f90071..325e227cba 100644
--- a/projects/Test/Common/IntegrationFixture.cs
+++ b/projects/Test/Common/IntegrationFixture.cs
@@ -31,7 +31,6 @@
using System;
using System.Collections.Generic;
-using System.Globalization;
using System.IO;
using System.Linq;
using System.Net.Sockets;
@@ -49,9 +48,8 @@ namespace Test
{
public abstract class IntegrationFixture : IAsyncLifetime
{
- private static bool s_isRunningInCI = false;
- private static bool s_isWindows = false;
- private static bool s_isVerbose = false;
+ private static readonly bool s_isRunningInCI = false;
+ private static readonly bool s_isVerbose = false;
private static int _connectionIdx = 0;
protected readonly RabbitMQCtl _rabbitMQCtl;
@@ -80,9 +78,8 @@ public abstract class IntegrationFixture : IAsyncLifetime
static IntegrationFixture()
{
S_Random = new Random();
- InitIsRunningInCI();
- InitIsWindows();
- InitIsVerbose();
+ s_isRunningInCI = InitIsRunningInCI();
+ s_isVerbose = InitIsVerbose();
if (s_isRunningInCI)
{
@@ -117,7 +114,10 @@ public IntegrationFixture(ITestOutputHelper output,
.Replace("Integration.", "I.")
.Replace("SequentialI.", "SI.");
- // Console.SetOut(new TestOutputWriter(output, _testDisplayName));
+ if (IsVerbose)
+ {
+ Console.SetOut(new TestOutputWriter(output, _testDisplayName));
+ }
}
public virtual async Task InitializeAsync()
@@ -229,7 +229,7 @@ protected static bool IsRunningInCI
protected static bool IsWindows
{
- get { return s_isWindows; }
+ get { return Util.IsWindows; }
}
protected static bool IsVerbose
@@ -427,10 +427,9 @@ protected static async Task WaitAsync(TaskCompletionSource tcs, TimeSpan t
protected ConnectionFactory CreateConnectionFactory()
{
- string now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
return new ConnectionFactory
{
- ClientProvidedName = $"{_testDisplayName}:{now}:{GetConnectionIdx()}",
+ ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}",
ContinuationTimeout = WaitSpan,
HandshakeContinuationTimeout = WaitSpan,
};
@@ -472,54 +471,36 @@ protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action
a(args);
}
- private static void InitIsRunningInCI()
+ private static bool InitIsRunningInCI()
{
bool ci;
if (bool.TryParse(Environment.GetEnvironmentVariable("CI"), out ci))
{
if (ci == true)
{
- s_isRunningInCI = true;
+ return true;
}
}
else if (bool.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS"), out ci))
{
if (ci == true)
{
- s_isRunningInCI = true;
+ return true;
}
}
- else
- {
- s_isRunningInCI = false;
- }
- }
- private static void InitIsWindows()
- {
- PlatformID platform = Environment.OSVersion.Platform;
- if (platform == PlatformID.Win32NT)
- {
- s_isWindows = true;
- return;
- }
-
- string os = Environment.GetEnvironmentVariable("OS");
- if (os != null)
- {
- os = os.Trim();
- s_isWindows = os == "Windows_NT";
- return;
- }
+ return false;
}
- private static void InitIsVerbose()
+ private static bool InitIsVerbose()
{
if (bool.TryParse(
Environment.GetEnvironmentVariable("RABBITMQ_CLIENT_TESTS_VERBOSE"), out _))
{
- s_isVerbose = true;
+ return true;
}
+
+ return false;
}
private static int GetConnectionIdx()
@@ -566,7 +547,5 @@ protected static TaskCompletionSource PrepareForRecovery(IConnection conn)
return tcs;
}
-
- public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);
}
}
diff --git a/projects/Test/Common/ProcessUtil.cs b/projects/Test/Common/ProcessUtil.cs
index 8f21482003..73c418c20c 100644
--- a/projects/Test/Common/ProcessUtil.cs
+++ b/projects/Test/Common/ProcessUtil.cs
@@ -156,25 +156,40 @@ public static async Task RunAsync(ProcessStartInfo startInfo)
// Process completion = exit AND stdout (if defined) AND stderr (if defined)
Task processCompletionTask = Task.WhenAll(processTasks);
- try
+ int attempts = 0;
+ while (attempts < 3)
{
- // Task to wait for exit OR timeout (if defined)
- await processCompletionTask.WaitAsync(TimeSpan.FromSeconds(30));
-
- // -> Process exited cleanly
- result.ExitCode = process.ExitCode;
- }
- catch (OperationCanceledException)
- {
- // -> Timeout, let's kill the process
- KillProcess(process);
- throw;
- }
- catch (TimeoutException)
- {
- // -> Timeout, let's kill the process
- KillProcess(process);
- throw;
+ try
+ {
+ // Task to wait for exit OR timeout
+ await processCompletionTask.WaitAsync(TimeSpan.FromSeconds(10)).ConfigureAwait(false);
+ result.ExitCode = process.ExitCode;
+ break;
+ }
+ catch (OperationCanceledException ex)
+ {
+ KillProcess(process);
+ if (attempts == 2)
+ {
+ throw;
+ }
+ else
+ {
+ Console.WriteLine("{0} [WARNING] caught exception and re-trying ({1}): {2}", Util.Now, attempts, ex);
+ }
+ }
+ catch (TimeoutException ex)
+ {
+ KillProcess(process);
+ if (attempts == 2)
+ {
+ throw;
+ }
+ else
+ {
+ Console.WriteLine("{0} [WARNING] caught exception and re-trying ({1}): {2}", Util.Now, attempts, ex);
+ }
+ }
}
// Read stdout/stderr
diff --git a/projects/Test/Common/RabbitMQCtl.cs b/projects/Test/Common/RabbitMQCtl.cs
index c9ced4ebb9..751a273559 100644
--- a/projects/Test/Common/RabbitMQCtl.cs
+++ b/projects/Test/Common/RabbitMQCtl.cs
@@ -42,7 +42,6 @@ namespace Test
public class RabbitMQCtl
{
private static readonly char[] newLine = new char[] { '\n' };
- private static readonly Func s_invokeRabbitMqCtl = GetRabbitMqCtlInvokeAction();
// NOTE: \r?
// https://learn.microsoft.com/en-us/dotnet/standard/base-types/regular-expression-options#multiline-mode
private static readonly Regex s_getConnectionProperties =
@@ -121,87 +120,28 @@ private static ProcessStartInfo GetRabbitMqCtlStartInfo(string args)
}
}
- // Try default
string umbrellaRabbitmqctlPath;
string providedRabbitmqctlPath;
- if (IsRunningOnMonoOrDotNetCore())
- {
- umbrellaRabbitmqctlPath = "../../../../../../rabbit/scripts/rabbitmqctl";
- providedRabbitmqctlPath = "rabbitmqctl";
- }
- else
+ if (Util.IsWindows)
{
umbrellaRabbitmqctlPath = @"..\..\..\..\..\..\rabbit\scripts\rabbitmqctl.bat";
providedRabbitmqctlPath = "rabbitmqctl.bat";
}
-
- string path = File.Exists(umbrellaRabbitmqctlPath) ? umbrellaRabbitmqctlPath : providedRabbitmqctlPath;
-
- if (IsRunningOnMonoOrDotNetCore())
- {
- return CreateProcessStartInfo(path, args);
- }
else
- {
- // FUTURE TODO is cmd.exe really necessary?
- return CreateProcessStartInfo("cmd.exe", $"/c \"\"{path}\" {args}");
- }
- }
-
- private static Func GetRabbitMqCtlInvokeAction()
- {
- string precomputedArguments;
- string envVariable = Environment.GetEnvironmentVariable("RABBITMQ_RABBITMQCTL_PATH");
-
- if (!string.IsNullOrWhiteSpace(envVariable))
- {
- const string DockerPrefix = "DOCKER:";
- if (envVariable.StartsWith(DockerPrefix))
- {
- // Call docker
- precomputedArguments = $"exec {envVariable.Substring(DockerPrefix.Length)} rabbitmqctl ";
- return args => CreateProcess("docker", precomputedArguments + args);
- }
- else
- {
- // call the path from the env var
- return args => CreateProcess(envVariable, args);
- }
- }
-
- // Try default
- string umbrellaRabbitmqctlPath;
- string providedRabbitmqctlPath;
-
- if (IsRunningOnMonoOrDotNetCore())
{
umbrellaRabbitmqctlPath = "../../../../../../rabbit/scripts/rabbitmqctl";
providedRabbitmqctlPath = "rabbitmqctl";
}
- else
- {
- umbrellaRabbitmqctlPath = @"..\..\..\..\..\..\rabbit\scripts\rabbitmqctl.bat";
- providedRabbitmqctlPath = "rabbitmqctl.bat";
- }
string path = File.Exists(umbrellaRabbitmqctlPath) ? umbrellaRabbitmqctlPath : providedRabbitmqctlPath;
- if (IsRunningOnMonoOrDotNetCore())
- {
- return args => CreateProcess(path, args);
- }
- else
- {
- precomputedArguments = $"/c \"\"{path}\" ";
- return args => CreateProcess("cmd.exe", precomputedArguments + args);
- }
+ return CreateProcessStartInfo(path, args);
}
private async Task GetConnectionPidAsync(string connectionName)
{
string stdout = await ExecRabbitMQCtlAsync("list_connections --silent pid client_properties");
-
Match match = s_getConnectionProperties.Match(stdout);
while (match.Success)
{
@@ -221,15 +161,6 @@ private Task CloseConnectionAsync(string pid)
return ExecRabbitMQCtlAsync($"close_connection \"{pid}\" \"Closed via rabbitmqctl\"");
}
- private static bool IsRunningOnMonoOrDotNetCore()
- {
-#if NETCOREAPP
- return true;
-#else
- return Type.GetType("Mono.Runtime") != null;
-#endif
- }
-
private static ProcessStartInfo CreateProcessStartInfo(string cmd, string arguments, string workDirectory = null)
{
return new ProcessStartInfo
@@ -243,22 +174,5 @@ private static ProcessStartInfo CreateProcessStartInfo(string cmd, string argume
WorkingDirectory = workDirectory
};
}
-
- private static Process CreateProcess(string cmd, string arguments, string workDirectory = null)
- {
- return new Process
- {
- StartInfo =
- {
- CreateNoWindow = true,
- UseShellExecute = false,
- RedirectStandardError = true,
- RedirectStandardOutput = true,
- FileName = cmd,
- Arguments = arguments,
- WorkingDirectory = workDirectory
- }
- };
- }
}
}
diff --git a/projects/Test/Common/TestOutputWriter.cs b/projects/Test/Common/TestOutputWriter.cs
index c2e47f8847..a9fe3d5cb0 100644
--- a/projects/Test/Common/TestOutputWriter.cs
+++ b/projects/Test/Common/TestOutputWriter.cs
@@ -29,6 +29,7 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------
+using System;
using System.IO;
using System.Text;
using Xunit.Abstractions;
@@ -52,11 +53,23 @@ public override void Write(char[] buffer, int index, int count)
{
if (count > 2)
{
- var sb = new StringBuilder("[DEBUG] ");
+ var sb = new StringBuilder(Util.Now);
+ sb.Append(" [DEBUG] ");
sb.Append(_testDisplayName);
sb.Append(" | ");
sb.Append(buffer, index, count);
- _output.WriteLine(sb.ToString().TrimEnd());
+ try
+ {
+ _output.WriteLine(sb.ToString().TrimEnd());
+ }
+ catch (InvalidOperationException)
+ {
+ /*
+ * Note:
+ * This exception can be thrown if there is no running test.
+ * Catch it here to prevent it causing an incorrect test failure.
+ */
+ }
}
}
}
diff --git a/projects/Test/Common/Util.cs b/projects/Test/Common/Util.cs
new file mode 100644
index 0000000000..1e9f5c0367
--- /dev/null
+++ b/projects/Test/Common/Util.cs
@@ -0,0 +1,37 @@
+using System;
+using System.Globalization;
+
+namespace Test
+{
+ public static class Util
+ {
+ private static readonly bool s_isWindows = false;
+
+ static Util()
+ {
+ s_isWindows = InitIsWindows();
+ }
+
+ public static string Now => DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
+
+ public static bool IsWindows => s_isWindows;
+
+ private static bool InitIsWindows()
+ {
+ PlatformID platform = Environment.OSVersion.Platform;
+ if (platform == PlatformID.Win32NT)
+ {
+ return true;
+ }
+
+ string os = Environment.GetEnvironmentVariable("OS");
+ if (os != null)
+ {
+ os = os.Trim();
+ return os == "Windows_NT";
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/projects/Test/Integration/Integration.csproj b/projects/Test/Integration/Integration.csproj
index 783224c57a..1d532b9ff2 100644
--- a/projects/Test/Integration/Integration.csproj
+++ b/projects/Test/Integration/Integration.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/Test/Integration/SslEnv.cs b/projects/Test/Integration/SslEnv.cs
index 272eab1d5d..a7fa6f79b5 100644
--- a/projects/Test/Integration/SslEnv.cs
+++ b/projects/Test/Integration/SslEnv.cs
@@ -41,7 +41,6 @@ public class SslEnv
private const string _hostname = "localhost";
private readonly string _sslDir;
private readonly bool _isSslConfigured;
- private readonly bool _isGithubActions;
public SslEnv()
{
@@ -53,7 +52,6 @@ public SslEnv()
if (_isSslConfigured)
{
- Boolean.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS"), out _isGithubActions);
_certPath = Path.Combine(_sslDir, $"client_{_hostname}.p12");
}
}
@@ -77,10 +75,5 @@ public bool IsSslConfigured
{
get { return _isSslConfigured; }
}
-
- public bool IsGitHubActions
- {
- get { return _isGithubActions; }
- }
}
}
diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs
index 9c5ad6934d..4c0c883e0b 100644
--- a/projects/Test/Integration/TestConnectionShutdown.cs
+++ b/projects/Test/Integration/TestConnectionShutdown.cs
@@ -30,8 +30,10 @@
//---------------------------------------------------------------------------
using System;
+using System.IO;
using System.Threading.Tasks;
using RabbitMQ.Client;
+using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Impl;
using Xunit;
@@ -57,7 +59,15 @@ public async Task TestCleanClosureWithSocketClosedOutOfBand()
var c = (AutorecoveringConnection)_conn;
await c.CloseFrameHandlerAsync();
- await _conn.CloseAsync(TimeSpan.FromSeconds(4));
+ try
+ {
+ await _conn.CloseAsync(TimeSpan.FromSeconds(4));
+ }
+ catch (AlreadyClosedException ex)
+ {
+ Assert.IsAssignableFrom(ex.InnerException);
+ }
+
await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown");
}
@@ -94,7 +104,13 @@ public async Task TestDisposedWithSocketClosedOutOfBand()
_conn.Dispose();
_conn = null;
- await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown");
+
+ TimeSpan waitSpan = TimeSpan.FromSeconds(3);
+ if (IsRunningInCI)
+ {
+ waitSpan = TimeSpan.FromSeconds(10);
+ }
+ await WaitAsync(tcs, waitSpan, "channel shutdown");
}
[Fact]
diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs
index 6011631a48..72b5597e70 100644
--- a/projects/Test/Integration/TestToxiproxy.cs
+++ b/projects/Test/Integration/TestToxiproxy.cs
@@ -30,12 +30,11 @@
//---------------------------------------------------------------------------
using System;
-using System.Diagnostics;
using System.Net;
using System.Threading.Tasks;
+using Integration;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
-using Toxiproxy.Net;
using Toxiproxy.Net.Toxics;
using Xunit;
using Xunit.Abstractions;
@@ -44,44 +43,10 @@ namespace Test.Integration
{
public class TestToxiproxy : IntegrationFixture
{
- private const string ProxyName = "rmq-localhost";
- private const ushort ProxyPort = 55672;
private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1);
- private readonly Connection _proxyConnection;
- private readonly Client _proxyClient;
- private readonly Proxy _rmqProxy;
public TestToxiproxy(ITestOutputHelper output) : base(output)
{
- if (AreToxiproxyTestsEnabled)
- {
- _proxyConnection = new Connection(resetAllToxicsAndProxiesOnClose: true);
- _proxyClient = _proxyConnection.Client();
-
- // to start, assume everything is on localhost
- _rmqProxy = new Proxy
- {
- Name = "rmq-localhost",
- Enabled = true,
- Listen = $"{IPAddress.Loopback}:{ProxyPort}",
- Upstream = $"{IPAddress.Loopback}:5672",
- };
-
- if (IsRunningInCI)
- {
- _rmqProxy.Listen = $"0.0.0.0:{ProxyPort}";
-
- // GitHub Actions
- if (false == IsWindows)
- {
- /*
- * Note: See the following setup script:
- * .ci/ubuntu/gha-setup.sh
- */
- _rmqProxy.Upstream = "rabbitmq-dotnet-client-rabbitmq:5672";
- }
- }
- }
}
public override Task InitializeAsync()
@@ -92,142 +57,246 @@ public override Task InitializeAsync()
Assert.Null(_conn);
Assert.Null(_channel);
- if (AreToxiproxyTestsEnabled)
- {
- return _proxyClient.AddAsync(_rmqProxy);
- }
- else
- {
- return Task.CompletedTask;
- }
- }
-
- public override Task DisposeAsync()
- {
- if (_proxyClient != null)
- {
- return _proxyClient.DeleteAsync(_rmqProxy);
- }
- else
- {
- return Task.CompletedTask;
- }
+ return Task.CompletedTask;
}
[SkippableFact]
[Trait("Category", "Toxiproxy")]
- public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
+ public async Task TestCloseConnection()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
- ConnectionFactory cf = CreateConnectionFactory();
- cf.Port = ProxyPort;
- cf.RequestedHeartbeat = _heartbeatTimeout;
- cf.AutomaticRecoveryEnabled = false;
+ using (var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows))
+ {
+ await pm.InitializeAsync();
- var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ ConnectionFactory cf = CreateConnectionFactory();
+ cf.Port = pm.ProxyPort;
+ cf.AutomaticRecoveryEnabled = true;
+ cf.NetworkRecoveryInterval = TimeSpan.FromSeconds(1);
+ cf.RequestedHeartbeat = TimeSpan.FromSeconds(1);
- Task pubTask = Task.Run(async () =>
- {
- using (IConnection conn = await cf.CreateConnectionAsync())
+ var messagePublishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var recoverySucceededTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var testSucceededTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Task pubTask = Task.Run(async () =>
{
- using (IChannel ch = await conn.CreateChannelAsync())
+ using (IConnection conn = await cf.CreateConnectionAsync())
{
- await ch.ConfirmSelectAsync();
- QueueDeclareOk q = await ch.QueueDeclareAsync();
- while (conn.IsOpen)
+ conn.CallbackException += (s, ea) =>
+ {
+ _output.WriteLine($"[ERROR] unexpected callback exception {ea.Detail} {ea.Exception}");
+ recoverySucceededTcs.SetResult(false);
+ };
+
+ conn.ConnectionRecoveryError += (s, ea) =>
+ {
+ _output.WriteLine($"[ERROR] connection recovery error {ea.Exception}");
+ recoverySucceededTcs.SetResult(false);
+ };
+
+ conn.ConnectionShutdown += (s, ea) =>
+ {
+ if (IsVerbose)
+ {
+ _output.WriteLine($"[INFO] connection shutdown");
+ }
+
+ /*
+ * Note: using TrySetResult because this callback will be called when the
+ * test exits, and connectionShutdownTcs will have already been set
+ */
+ connectionShutdownTcs.TrySetResult(true);
+ };
+
+ conn.RecoverySucceeded += (s, ea) =>
+ {
+ if (IsVerbose)
+ {
+ _output.WriteLine($"[INFO] connection recovery succeeded");
+ }
+
+ recoverySucceededTcs.SetResult(true);
+ };
+
+ async Task PublishLoop()
+ {
+ using (IChannel ch = await conn.CreateChannelAsync())
+ {
+ await ch.ConfirmSelectAsync();
+ QueueDeclareOk q = await ch.QueueDeclareAsync();
+ while (conn.IsOpen)
+ {
+ await ch.BasicPublishAsync("", q.QueueName, GetRandomBody());
+ messagePublishedTcs.TrySetResult(true);
+ /*
+ * Note:
+ * In this test, it is possible that the connection
+ * will be closed before the ack is returned,
+ * and this await will throw an exception
+ */
+ try
+ {
+ await ch.WaitForConfirmsAsync();
+ }
+ catch (AlreadyClosedException ex)
+ {
+ if (IsVerbose)
+ {
+ _output.WriteLine($"[WARNING] WaitForConfirmsAsync ex: {ex}");
+ }
+ }
+ }
+
+ await ch.CloseAsync();
+ }
+ }
+
+ try
+ {
+ await PublishLoop();
+ }
+ catch (Exception ex)
{
- await ch.BasicPublishAsync("", q.QueueName, GetRandomBody());
- await ch.WaitForConfirmsAsync();
- await Task.Delay(TimeSpan.FromSeconds(1));
- tcs.TrySetResult(true);
+ if (IsVerbose)
+ {
+ _output.WriteLine($"[WARNING] PublishLoop ex: {ex}");
+ }
}
- await ch.CloseAsync();
+ Assert.True(await testSucceededTcs.Task);
await conn.CloseAsync();
}
- }
- });
+ });
- Assert.True(await tcs.Task);
+ Assert.True(await messagePublishedTcs.Task);
- var timeoutToxic = new TimeoutToxic();
- timeoutToxic.Attributes.Timeout = 0;
- timeoutToxic.Toxicity = 1.0;
+ Task disableProxyTask = pm.DisableAsync();
- await _rmqProxy.AddAsync(timeoutToxic);
- var sw = new Stopwatch();
- sw.Start();
- Task updateProxyTask = _rmqProxy.UpdateAsync();
+ await Task.WhenAll(disableProxyTask, connectionShutdownTcs.Task);
- await Assert.ThrowsAsync(() =>
- {
- return Task.WhenAll(updateProxyTask, pubTask);
- });
+ Task enableProxyTask = pm.EnableAsync();
+
+ Task whenAllTask = Task.WhenAll(enableProxyTask, recoverySucceededTcs.Task);
+ await whenAllTask.WaitAsync(TimeSpan.FromSeconds(15));
- sw.Stop();
+ Assert.True(await recoverySucceededTcs.Task);
- // _output.WriteLine($"[INFO] heartbeat timeout took {sw.Elapsed}");
+ testSucceededTcs.SetResult(true);
+ await pubTask;
+ }
}
[SkippableFact]
[Trait("Category", "Toxiproxy")]
- public async Task TestTcpReset_GH1464()
+ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
- ConnectionFactory cf = CreateConnectionFactory();
- cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), ProxyPort);
- cf.Port = ProxyPort;
- cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
- cf.AutomaticRecoveryEnabled = true;
+ using (var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows))
+ {
+ await pm.InitializeAsync();
- var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ ConnectionFactory cf = CreateConnectionFactory();
+ cf.Port = pm.ProxyPort;
+ cf.RequestedHeartbeat = _heartbeatTimeout;
+ cf.AutomaticRecoveryEnabled = false;
- Task recoveryTask = Task.Run(async () =>
- {
- using (IConnection conn = await cf.CreateConnectionAsync())
- {
- conn.ConnectionShutdown += (o, ea) =>
- {
- connectionShutdownTcs.SetResult(true);
- };
+ var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- using (IChannel ch = await conn.CreateChannelAsync())
+ Task pubTask = Task.Run(async () =>
+ {
+ using (IConnection conn = await cf.CreateConnectionAsync())
{
- channelCreatedTcs.SetResult(true);
- await WaitForRecoveryAsync(conn);
- await ch.CloseAsync();
+ using (IChannel ch = await conn.CreateChannelAsync())
+ {
+ await ch.ConfirmSelectAsync();
+ QueueDeclareOk q = await ch.QueueDeclareAsync();
+ while (conn.IsOpen)
+ {
+ await ch.BasicPublishAsync("", q.QueueName, GetRandomBody());
+ await ch.WaitForConfirmsAsync();
+ await Task.Delay(TimeSpan.FromSeconds(1));
+ tcs.TrySetResult(true);
+ }
+
+ await ch.CloseAsync();
+ await conn.CloseAsync();
+ }
}
+ });
- await conn.CloseAsync();
- }
- });
+ Assert.True(await tcs.Task);
+
+ var timeoutToxic = new TimeoutToxic();
+ timeoutToxic.Attributes.Timeout = 0;
+ timeoutToxic.Toxicity = 1.0;
+
+ Task addToxicTask = pm.AddToxicAsync(timeoutToxic);
+
+ await Assert.ThrowsAsync(() =>
+ {
+ return Task.WhenAll(addToxicTask, pubTask);
+ });
+ }
+ }
- Assert.True(await channelCreatedTcs.Task);
+ [SkippableFact]
+ [Trait("Category", "Toxiproxy")]
+ public async Task TestTcpReset_GH1464()
+ {
+ Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
- const string toxicName = "rmq-localhost-reset_peer";
- var resetPeerToxic = new ResetPeerToxic();
- resetPeerToxic.Name = toxicName;
- resetPeerToxic.Attributes.Timeout = 500;
- resetPeerToxic.Toxicity = 1.0;
+ using (var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows))
+ {
+ await pm.InitializeAsync();
- var sw = new Stopwatch();
- sw.Start();
+ ConnectionFactory cf = CreateConnectionFactory();
+ cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
+ cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
+ cf.AutomaticRecoveryEnabled = true;
- await _rmqProxy.AddAsync(resetPeerToxic);
- Task updateProxyTask = _rmqProxy.UpdateAsync();
+ var channelCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var connectionShutdownTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- await Task.WhenAll(updateProxyTask, connectionShutdownTcs.Task);
+ Task recoveryTask = Task.Run(async () =>
+ {
+ using (IConnection conn = await cf.CreateConnectionAsync())
+ {
+ conn.ConnectionShutdown += (o, ea) =>
+ {
+ connectionShutdownTcs.SetResult(true);
+ };
- await _rmqProxy.RemoveToxicAsync(toxicName);
+ using (IChannel ch = await conn.CreateChannelAsync())
+ {
+ channelCreatedTcs.SetResult(true);
+ await WaitForRecoveryAsync(conn);
+ await ch.CloseAsync();
+ }
+
+ await conn.CloseAsync();
+ }
+ });
- await recoveryTask;
+ Assert.True(await channelCreatedTcs.Task);
- sw.Stop();
+ const string toxicName = "rmq-localhost-reset_peer";
+ var resetPeerToxic = new ResetPeerToxic();
+ resetPeerToxic.Name = toxicName;
+ resetPeerToxic.Attributes.Timeout = 500;
+ resetPeerToxic.Toxicity = 1.0;
- // _output.WriteLine($"[INFO] reset peer took {sw.Elapsed}");
+ Task addToxicTask = pm.AddToxicAsync(resetPeerToxic);
+
+ await Task.WhenAll(addToxicTask, connectionShutdownTcs.Task);
+
+ await pm.RemoveToxicAsync(toxicName);
+
+ await recoveryTask;
+ }
}
private bool AreToxiproxyTestsEnabled
diff --git a/projects/Test/Integration/ToxiproxyManager.cs b/projects/Test/Integration/ToxiproxyManager.cs
new file mode 100644
index 0000000000..b1d581623e
--- /dev/null
+++ b/projects/Test/Integration/ToxiproxyManager.cs
@@ -0,0 +1,127 @@
+using System;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Toxiproxy.Net;
+using Toxiproxy.Net.Toxics;
+
+namespace Integration
+{
+ public class ToxiproxyManager : IDisposable
+ {
+ private const string ProxyNamePrefix = "rmq-";
+ private const ushort ProxyPortStart = 55669;
+ private static int s_proxyPort = ProxyPortStart;
+
+ private readonly string _testDisplayName;
+ private readonly int _proxyPort;
+ private readonly Connection _proxyConnection;
+ private readonly Client _proxyClient;
+ private readonly Proxy _proxy;
+
+ private bool _disposedValue;
+
+ public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindows)
+ {
+ if (string.IsNullOrWhiteSpace(testDisplayName))
+ {
+ throw new ArgumentNullException(nameof(testDisplayName));
+ }
+
+ _testDisplayName = testDisplayName;
+
+ _proxyPort = Interlocked.Increment(ref s_proxyPort);
+
+ /*
+ string now = DateTime.UtcNow.ToString("o", System.Globalization.CultureInfo.InvariantCulture);
+ Console.WriteLine("{0} [DEBUG] {1} _proxyPort {2}", now, testDisplayName, _proxyPort);
+ */
+
+ _proxyConnection = new Connection(resetAllToxicsAndProxiesOnClose: true);
+ _proxyClient = _proxyConnection.Client();
+
+ // to start, assume everything is on localhost
+ _proxy = new Proxy
+ {
+ Enabled = true,
+ Listen = $"{IPAddress.Loopback}:{_proxyPort}",
+ Upstream = $"{IPAddress.Loopback}:5672",
+ };
+
+ if (isRunningInCI)
+ {
+ _proxy.Listen = $"0.0.0.0:{_proxyPort}";
+
+ // GitHub Actions
+ if (false == isWindows)
+ {
+ /*
+ * Note: See the following setup script:
+ * .ci/ubuntu/gha-setup.sh
+ */
+ _proxy.Upstream = "rabbitmq-dotnet-client-rabbitmq:5672";
+ }
+ }
+ }
+
+ public int ProxyPort => _proxyPort;
+
+ public async Task InitializeAsync()
+ {
+ _proxy.Name = $"{ProxyNamePrefix}{_testDisplayName}";
+
+ try
+ {
+ await _proxyClient.DeleteAsync(_proxy);
+ }
+ catch
+ {
+ }
+
+ await _proxyClient.AddAsync(_proxy);
+ }
+
+ public Task AddToxicAsync(T toxic) where T : ToxicBase
+ {
+ return _proxy.AddAsync(toxic);
+ }
+
+ public Task RemoveToxicAsync(string toxicName)
+ {
+ return _proxy.RemoveToxicAsync(toxicName);
+ }
+
+ public Task EnableAsync()
+ {
+ _proxy.Enabled = true;
+ return _proxyClient.UpdateAsync(_proxy);
+ }
+
+ public Task DisableAsync()
+ {
+ _proxy.Enabled = false;
+ return _proxyClient.UpdateAsync(_proxy);
+ }
+
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!_disposedValue)
+ {
+ if (disposing)
+ {
+ _proxyClient.DeleteAsync(_proxy).GetAwaiter().GetResult();
+ _proxyConnection.Dispose();
+ }
+
+ _disposedValue = true;
+ }
+ }
+ }
+}
diff --git a/projects/Test/OAuth2/OAuth2.csproj b/projects/Test/OAuth2/OAuth2.csproj
index 4afbf0b19b..ff04f3df28 100644
--- a/projects/Test/OAuth2/OAuth2.csproj
+++ b/projects/Test/OAuth2/OAuth2.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/Test/SequentialIntegration/SequentialIntegration.csproj b/projects/Test/SequentialIntegration/SequentialIntegration.csproj
index cea571fc19..0735c33889 100644
--- a/projects/Test/SequentialIntegration/SequentialIntegration.csproj
+++ b/projects/Test/SequentialIntegration/SequentialIntegration.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/Test/Unit/Unit.csproj b/projects/Test/Unit/Unit.csproj
index 91bac4e3cb..97cf8f8a41 100644
--- a/projects/Test/Unit/Unit.csproj
+++ b/projects/Test/Unit/Unit.csproj
@@ -2,10 +2,14 @@
net6.0;net472
+ $(NoWarn);CA2007
+ true
net6.0
+ $(NoWarn);CA2007
+ true
diff --git a/projects/toxiproxy-netcore b/projects/toxiproxy-netcore
index 090fb48cb2..9a13a41477 160000
--- a/projects/toxiproxy-netcore
+++ b/projects/toxiproxy-netcore
@@ -1 +1 @@
-Subproject commit 090fb48cb289502e07f599ba660765054db2f407
+Subproject commit 9a13a41477a49ca37878d8730c18f0c64c53939c