Skip to content

Commit

Permalink
* Fix very subtle bug when connection is closed before basic.ack is…
Browse files Browse the repository at this point in the history
… received.
  • Loading branch information
lukebakken committed Mar 28, 2024
1 parent 44bd6d4 commit 25ca268
Showing 1 changed file with 45 additions and 6 deletions.
51 changes: 45 additions & 6 deletions projects/Test/Integration/TestToxiproxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public async Task TestCloseConnection()
cf.AutomaticRecoveryEnabled = true;
cf.NetworkRecoveryInterval = TimeSpan.FromSeconds(1);
cf.RequestedHeartbeat = TimeSpan.FromSeconds(1);
cf.SocketReadTimeout = cf.RequestedHeartbeat;

var messagePublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var connectionShutdownTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand All @@ -91,25 +90,37 @@ public async Task TestCloseConnection()
_output.WriteLine($"[ERROR] unexpected callback exception {ea.Detail} {ea.Exception}");
recoverySucceededTcs.SetResult(false);
};
conn.ConnectionRecoveryError += (s, ea) =>
{
_output.WriteLine($"[ERROR] connection recovery error {ea.Exception}");
recoverySucceededTcs.SetResult(false);
};
conn.ConnectionShutdown += (s, ea) =>
{
_output.WriteLine($"[INFO] connection shutdown");
if (IsVerbose)
{
_output.WriteLine($"[INFO] connection shutdown");
}
/*
* Note: using TrySetResult because this callback will be called when the
* test exits, and connectionShutdownTcs will have already been set
*/
connectionShutdownTcs.TrySetResult(true);
};
conn.RecoverySucceeded += (s, ea) =>
{
_output.WriteLine($"[INFO] connection recovery succeeded");
if (IsVerbose)
{
_output.WriteLine($"[INFO] connection recovery succeeded");
}
recoverySucceededTcs.SetResult(true);
};
async Task PublishLoop()
{
using (IChannel ch = await conn.CreateChannelAsync())
Expand All @@ -119,14 +130,42 @@ async Task PublishLoop()
while (conn.IsOpen)
{
await ch.BasicPublishAsync("", q.QueueName, GetRandomBody());
await ch.WaitForConfirmsAsync();
await Task.Delay(TimeSpan.FromSeconds(1));
messagePublishedTcs.TrySetResult(true);
/*
* Note:
* In this test, it is possible that the connection
* will be closed before the ack is returned,
* and this await will throw an exception
*/
try
{
await ch.WaitForConfirmsAsync();
}
catch (AlreadyClosedException ex)
{
if (IsVerbose)
{
_output.WriteLine($"[WARNING] WaitForConfirmsAsync ex: {ex}");
}
}
}
await ch.CloseAsync();
}
}
await PublishLoop();
try
{
await PublishLoop();
}
catch (Exception ex)
{
if (IsVerbose)
{
_output.WriteLine($"[WARNING] PublishLoop ex: {ex}");
}
}
Assert.True(await testSucceededTcs.Task);
await conn.CloseAsync();
}
Expand Down

0 comments on commit 25ca268

Please sign in to comment.