Skip to content

Commit

Permalink
Merge pull request #1148 from rabbitmq/rabbitmq-dotnet-client-1140
Browse files Browse the repository at this point in the history
Fix flaky connection recovery tests.
  • Loading branch information
michaelklishin authored Feb 22, 2022
2 parents 8324fa0 + c32ac3a commit 2d04378
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion _site
Submodule _site updated 166 files
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,13 @@ private bool TryPerformAutomaticRecovery()
// 2. Recover queues
// 3. Recover bindings
// 4. Recover consumers
using var recoveryChannel = _innerConnection.CreateModel();
RecoverExchanges(recoveryChannel);
RecoverQueues(recoveryChannel);
RecoverBindings(recoveryChannel);
using (var recoveryChannel = _innerConnection.CreateModel())
{
RecoverExchanges(recoveryChannel);
RecoverQueues(recoveryChannel);
RecoverBindings(recoveryChannel);
}

}
RecoverModelsAndItsConsumers();
}
Expand Down
9 changes: 8 additions & 1 deletion projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,19 @@ internal void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverCo
newChannel.TxSelect();
}

/*
* https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1140
* If this assignment is not done before recovering consumers, there is a good
* chance that an invalid Model will be used to handle a basic.deliver frame,
* with the resulting basic.ack never getting sent out.
*/
_innerChannel = newChannel;

if (recoverConsumers)
{
_connection.RecoverConsumers(this, newChannel);
}

_innerChannel = newChannel;
_innerChannel.RunRecoveryEventHandlers(this);
}

Expand Down
1 change: 1 addition & 0 deletions projects/Unit/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ internal void StopRabbitMQ()
internal void StartRabbitMQ()
{
RabbitMQCtl.StartRabbitMQ();
RabbitMQCtl.AwaitRabbitMQ();
}

//
Expand Down
5 changes: 5 additions & 0 deletions projects/Unit/RabbitMQCtl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,5 +228,10 @@ public static void StartRabbitMQ()
{
ExecRabbitMQCtl("start_app");
}

public static void AwaitRabbitMQ()
{
ExecRabbitMQCtl("await_startup");
}
}
}
16 changes: 8 additions & 8 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public override void Dispose()

}

[Fact(Skip="TODO flaky")]
[Fact]
public void TestBasicAckAfterChannelRecovery()
{
var allMessagesSeenLatch = new ManualResetEventSlim(false);
Expand All @@ -112,7 +112,7 @@ public void TestBasicAckAfterChannelRecovery()
Wait(allMessagesSeenLatch);
}

[Fact(Skip="TODO flaky")]
[Fact]
public void TestBasicNackAfterChannelRecovery()
{
var allMessagesSeenLatch = new ManualResetEventSlim(false);
Expand All @@ -134,7 +134,7 @@ public void TestBasicNackAfterChannelRecovery()
Wait(allMessagesSeenLatch);
}

[Fact(Skip="TODO flaky")]
[Fact]
public void TestBasicRejectAfterChannelRecovery()
{
var allMessagesSeenLatch = new ManualResetEventSlim(false);
Expand Down Expand Up @@ -851,22 +851,22 @@ public void TestPublishRpcRightAfterReconnect()
var properties = new BasicProperties();
properties.ReplyTo = "amq.rabbitmq.reply-to";

bool done = false;
TimeSpan doneSpan = TimeSpan.FromMilliseconds(100);
var done = new ManualResetEventSlim(false);
Task.Run(() =>
{
try
{
CloseAndWaitForRecovery();
Thread.Sleep(100);
}
finally
{
done = true;
done.Set();
}
});

while (!done)
while (!done.IsSet)
{
try
{
Expand All @@ -880,7 +880,7 @@ public void TestPublishRpcRightAfterReconnect()
Assert.NotEqual(406, a.ShutdownReason.ReplyCode);
}
}
Thread.Sleep(1);
done.Wait(doneSpan);
}
}

Expand Down

0 comments on commit 2d04378

Please sign in to comment.