From 9a5d1d346aab74e4c4b74d53d3426fd2b986003e Mon Sep 17 00:00:00 2001 From: Sabina Rosca Date: Sun, 5 Mar 2023 18:18:52 +0200 Subject: [PATCH 1/5] Added topology recovery filter --- .../client/api/ConnectionFactory.cs | 6 ++ .../client/api/IRecordedBinding.cs | 15 ++++ .../client/api/IRecordedConsumer.cs | 17 ++++ .../client/api/IRecordedExchange.cs | 17 ++++ .../client/api/IRecordedQueue.cs | 19 +++++ .../client/api/TopologyRecoveryFilter.cs | 85 +++++++++++++++++++ .../client/impl/AutorecoveringConnection.cs | 12 +-- .../client/impl/AutorecoveringModel.cs | 20 ++--- .../client/impl/RecordedBinding.cs | 2 +- .../client/impl/RecordedConsumer.cs | 2 +- .../client/impl/RecordedExchange.cs | 10 +-- .../client/impl/RecordedQueue.cs | 41 ++++----- 12 files changed, 204 insertions(+), 42 deletions(-) create mode 100644 projects/RabbitMQ.Client/client/api/IRecordedBinding.cs create mode 100644 projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs create mode 100644 projects/RabbitMQ.Client/client/api/IRecordedExchange.cs create mode 100644 projects/RabbitMQ.Client/client/api/IRecordedQueue.cs create mode 100644 projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 9f11ae943d..2eb9d93165 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -271,6 +271,12 @@ public TimeSpan ContinuationTimeout /// public bool TopologyRecoveryEnabled { get; set; } = true; + /// + /// Filter to include/exclude entities from topology recovery. + /// Default filter includes all entities in topology recovery. + /// + public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter(); + /// /// Construct a fresh instance, with all fields set to their respective defaults. /// diff --git a/projects/RabbitMQ.Client/client/api/IRecordedBinding.cs b/projects/RabbitMQ.Client/client/api/IRecordedBinding.cs new file mode 100644 index 0000000000..27daae7dfa --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/IRecordedBinding.cs @@ -0,0 +1,15 @@ +using System.Collections.Generic; + +namespace RabbitMQ.Client +{ + public interface IRecordedBinding + { + string Source { get; } + + string Destination { get; } + + string RoutingKey { get; } + + IDictionary Arguments { get; } + } +} diff --git a/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs new file mode 100644 index 0000000000..b3ea93652b --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs @@ -0,0 +1,17 @@ +using System.Collections.Generic; + +namespace RabbitMQ.Client +{ + public interface IRecordedConsumer + { + string ConsumerTag { get; } + + string Queue { get; } + + bool AutoAck { get; } + + bool Exclusive { get; } + + IDictionary Arguments { get; } + } +} diff --git a/projects/RabbitMQ.Client/client/api/IRecordedExchange.cs b/projects/RabbitMQ.Client/client/api/IRecordedExchange.cs new file mode 100644 index 0000000000..9a47804dfa --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/IRecordedExchange.cs @@ -0,0 +1,17 @@ +using System.Collections.Generic; + +namespace RabbitMQ.Client +{ + public interface IRecordedExchange + { + string Name { get; } + + string Type { get; } + + bool Durable { get; } + + bool AutoDelete { get; } + + IDictionary Arguments { get; } + } +} diff --git a/projects/RabbitMQ.Client/client/api/IRecordedQueue.cs b/projects/RabbitMQ.Client/client/api/IRecordedQueue.cs new file mode 100644 index 0000000000..d5abf857c4 --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/IRecordedQueue.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; + +namespace RabbitMQ.Client +{ + public interface IRecordedQueue + { + string Name { get; } + + bool Durable { get; } + + bool Exclusive { get; } + + bool AutoDelete { get; } + + IDictionary Arguments { get; } + + bool IsServerNamed { get; } + } +} diff --git a/projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs b/projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs new file mode 100644 index 0000000000..d95430f755 --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs @@ -0,0 +1,85 @@ +using System; + +namespace RabbitMQ.Client +{ + /// + /// Filter to know which entities (exchanges, queues, bindings, consumers) should be recovered by topology recovery. + /// By default, allows all entities to be recovered. + /// + public class TopologyRecoveryFilter + { + private static readonly Func s_defaultExchangeFilter = exchange => true; + private static readonly Func s_defaultQueueFilter = queue => true; + private static readonly Func s_defaultBindingFilter = binding => true; + private static readonly Func s_defaultConsumerFilter = consumer => true; + + private Func _exchangeFilter; + private Func _queueFilter; + private Func _bindingFilter; + private Func _consumerFilter; + + /// + /// Decides whether an exchange is recovered or not. + /// + public Func ExchangeFilter + { + get => _exchangeFilter ?? s_defaultExchangeFilter; + + set + { + if (_exchangeFilter != null) + throw new InvalidOperationException($"Cannot modify {nameof(ExchangeFilter)} after it has been initialized."); + + _exchangeFilter = value ?? throw new ArgumentNullException(nameof(ExchangeFilter)); + } + } + + /// + /// Decides whether a queue is recovered or not. + /// + public Func QueueFilter + { + get => _queueFilter ?? s_defaultQueueFilter; + + set + { + if (_queueFilter != null) + throw new InvalidOperationException($"Cannot modify {nameof(QueueFilter)} after it has been initialized."); + + _queueFilter = value ?? throw new ArgumentNullException(nameof(QueueFilter)); + } + } + + /// + /// Decides whether a binding is recovered or not. + /// + public Func BindingFilter + { + get => _bindingFilter ?? s_defaultBindingFilter; + + set + { + if (_bindingFilter != null) + throw new InvalidOperationException($"Cannot modify {nameof(BindingFilter)} after it has been initialized."); + + _bindingFilter = value ?? throw new ArgumentNullException(nameof(BindingFilter)); + } + } + + /// + /// Decides whether a consumer is recovered or not. + /// + public Func ConsumerFilter + { + get => _consumerFilter ?? s_defaultConsumerFilter; + + set + { + if (_consumerFilter != null) + throw new InvalidOperationException($"Cannot modify {nameof(ConsumerFilter)} after it has been initialized."); + + _consumerFilter = value ?? throw new ArgumentNullException(nameof(ConsumerFilter)); + } + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 958073b656..f6c2c1d8b1 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -576,7 +576,7 @@ public void MaybeDeleteRecordedAutoDeleteExchange(string exchange) // last binding where this exchange is the source is gone, // remove recorded exchange // if it is auto-deleted. See bug 26364. - if ((rx != null) && rx.IsAutoDelete) + if ((rx != null) && rx.AutoDelete) { DeleteRecordedExchange(exchange); } @@ -593,7 +593,7 @@ public void MaybeDeleteRecordedAutoDeleteQueue(string queue) _recordedQueues.TryGetValue(queue, out RecordedQueue rq); // last consumer on this connection is gone, remove recorded queue // if it is auto-deleted. See bug 26364. - if ((rq != null) && rq.IsAutoDelete) + if ((rq != null) && rq.AutoDelete) { DeleteRecordedQueue(queue); } @@ -993,7 +993,7 @@ private void RecoverBindings(IModel model) recordedBindingsCopy = new Dictionary(_recordedBindings); } - foreach (RecordedBinding b in recordedBindingsCopy.Keys) + foreach (RecordedBinding b in recordedBindingsCopy.Keys.Where(x => _factory.TopologyRecoveryFilter?.BindingFilter(x) ?? true)) { try { @@ -1089,7 +1089,7 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe recordedConsumersCopy = new Dictionary(_recordedConsumers); } - foreach (KeyValuePair pair in recordedConsumersCopy) + foreach (KeyValuePair pair in recordedConsumersCopy.Where(x => _factory.TopologyRecoveryFilter?.ConsumerFilter(x.Value) ?? true)) { RecordedConsumer cons = pair.Value; if (cons.Model != modelToRecover) @@ -1154,7 +1154,7 @@ private void RecoverExchanges(IModel model) recordedExchangesCopy = new Dictionary(_recordedExchanges); } - foreach (RecordedExchange rx in recordedExchangesCopy.Values) + foreach (RecordedExchange rx in recordedExchangesCopy.Values.Where(x => _factory.TopologyRecoveryFilter?.ExchangeFilter(x) ?? true)) { try { @@ -1188,7 +1188,7 @@ private void RecoverQueues(IModel model) recordedQueuesCopy = new Dictionary(_recordedQueues); } - foreach (KeyValuePair pair in recordedQueuesCopy) + foreach (KeyValuePair pair in recordedQueuesCopy.Where(x => _factory.TopologyRecoveryFilter?.QueueFilter(x.Value) ?? true)) { string oldName = pair.Key; RecordedQueue rq = pair.Value; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs index 7c386eff9b..c4b3f0796c 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs @@ -1530,11 +1530,11 @@ public QueueDeclareOk QueueDeclare(string queue, bool durable, QueueDeclareOk result = _delegate.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); RecordedQueue rq = new RecordedQueue(result.QueueName). - Durable(durable). - Exclusive(exclusive). - AutoDelete(autoDelete). - Arguments(arguments). - ServerNamed(string.Empty.Equals(queue)); + WithDurable(durable). + WithExclusive(exclusive). + WithAutoDelete(autoDelete). + WithArguments(arguments). + WithServerNamed(string.Empty.Equals(queue)); _connection.RecordQueue(result.QueueName, rq); return result; } @@ -1551,11 +1551,11 @@ public void QueueDeclareNoWait(string queue, bool durable, _delegate.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments); RecordedQueue rq = new RecordedQueue(queue). - Durable(durable). - Exclusive(exclusive). - AutoDelete(autoDelete). - Arguments(arguments). - ServerNamed(string.Empty.Equals(queue)); + WithDurable(durable). + WithExclusive(exclusive). + WithAutoDelete(autoDelete). + WithArguments(arguments). + WithServerNamed(string.Empty.Equals(queue)); _connection.RecordQueue(queue, rq); } diff --git a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs index a7be10c253..966dea271c 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedBinding.cs @@ -33,7 +33,7 @@ namespace RabbitMQ.Client.Impl { - internal abstract class RecordedBinding + internal abstract class RecordedBinding : IRecordedBinding { public IDictionary Arguments { get; protected set; } public string Destination { get; set; } diff --git a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs index 5797e53a41..e2c802df8e 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs @@ -33,7 +33,7 @@ namespace RabbitMQ.Client.Impl { - internal class RecordedConsumer + internal class RecordedConsumer : IRecordedConsumer { public RecordedConsumer(AutorecoveringModel model, string queue) { diff --git a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs index 17edeb69c9..f88616bab5 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedExchange.cs @@ -33,7 +33,7 @@ namespace RabbitMQ.Client.Impl { - internal class RecordedExchange : RecordedNamedEntity + internal class RecordedExchange : RecordedNamedEntity, IRecordedExchange { public RecordedExchange(string name) : base(name) { @@ -41,18 +41,18 @@ public RecordedExchange(string name) : base(name) public IDictionary Arguments { get; private set; } public bool Durable { get; private set; } - public bool IsAutoDelete { get; private set; } + public bool AutoDelete { get; private set; } public string Type { get; private set; } public void Recover(IModel model) { - model.ExchangeDeclare(Name, Type, Durable, IsAutoDelete, Arguments); + model.ExchangeDeclare(Name, Type, Durable, AutoDelete, Arguments); } public override string ToString() { - return $"{GetType().Name}: name = '{Name}', type = '{Type}', durable = {Durable}, autoDelete = {IsAutoDelete}, arguments = '{Arguments}'"; + return $"{GetType().Name}: name = '{Name}', type = '{Type}', durable = {Durable}, autoDelete = {AutoDelete}, arguments = '{Arguments}'"; } public RecordedExchange WithArguments(IDictionary value) @@ -63,7 +63,7 @@ public RecordedExchange WithArguments(IDictionary value) public RecordedExchange WithAutoDelete(bool value) { - IsAutoDelete = value; + AutoDelete = value; return this; } diff --git a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs index 9265d8fa92..003548bb4c 100644 --- a/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs +++ b/projects/RabbitMQ.Client/client/impl/RecordedQueue.cs @@ -33,17 +33,20 @@ namespace RabbitMQ.Client.Impl { - internal class RecordedQueue : RecordedNamedEntity + internal class RecordedQueue : RecordedNamedEntity, IRecordedQueue { - private IDictionary _arguments; - private bool _durable; - private bool _exclusive; - public RecordedQueue(string name) : base(name) { } - public bool IsAutoDelete { get; private set; } + public bool Durable { get; private set; } + + public bool Exclusive { get; private set; } + + public bool AutoDelete { get; private set; } + + public IDictionary Arguments { get; private set; } + public bool IsServerNamed { get; private set; } protected string NameToUseForRecovery @@ -61,39 +64,39 @@ protected string NameToUseForRecovery } } - public RecordedQueue Arguments(IDictionary value) + public RecordedQueue WithArguments(IDictionary value) { - _arguments = value; + Arguments = value; return this; } - public RecordedQueue AutoDelete(bool value) + public RecordedQueue WithAutoDelete(bool value) { - IsAutoDelete = value; + AutoDelete = value; return this; } - public RecordedQueue Durable(bool value) + public RecordedQueue WithDurable(bool value) { - _durable = value; + Durable = value; return this; } - public RecordedQueue Exclusive(bool value) + public RecordedQueue WithExclusive(bool value) { - _exclusive = value; + Exclusive = value; return this; } public void Recover(IModel model) { - QueueDeclareOk ok = model.QueueDeclare(NameToUseForRecovery, _durable, - _exclusive, IsAutoDelete, - _arguments); + QueueDeclareOk ok = model.QueueDeclare(NameToUseForRecovery, Durable, + Exclusive, AutoDelete, + Arguments); Name = ok.QueueName; } - public RecordedQueue ServerNamed(bool value) + public RecordedQueue WithServerNamed(bool value) { IsServerNamed = value; return this; @@ -101,7 +104,7 @@ public RecordedQueue ServerNamed(bool value) public override string ToString() { - return $"{GetType().Name}: name = '{Name}', durable = {_durable}, exlusive = {_exclusive}, autoDelete = {IsAutoDelete}, arguments = '{_arguments}'"; + return $"{GetType().Name}: name = '{Name}', durable = {Durable}, exlusive = {Exclusive}, autoDelete = {AutoDelete}, arguments = '{Arguments}'"; } } } From d42511e5dcb9794b182a7c6e27333a369e5ec732 Mon Sep 17 00:00:00 2001 From: Sabina Rosca Date: Sun, 5 Mar 2023 22:49:01 +0200 Subject: [PATCH 2/5] Added tests for topology recovery filter --- projects/Unit/Fixtures.cs | 12 ++ projects/Unit/TestConnectionRecovery.cs | 244 ++++++++++++++++++++++++ 2 files changed, 256 insertions(+) diff --git a/projects/Unit/Fixtures.cs b/projects/Unit/Fixtures.cs index f1e9e66df9..fe411866d1 100644 --- a/projects/Unit/Fixtures.cs +++ b/projects/Unit/Fixtures.cs @@ -151,6 +151,18 @@ internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyReco return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}"); } + internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryFilter(TopologyRecoveryFilter filter) + { + var cf = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + TopologyRecoveryEnabled = true, + TopologyRecoveryFilter = filter + }; + + return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}"); + } + internal IConnection CreateNonRecoveringConnection() { var cf = new ConnectionFactory diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index a7dbcbf7bd..a5ea5914ae 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -32,6 +32,7 @@ using System; using System.Collections; using System.Collections.Generic; +using System.Text; using System.Threading; using NUnit.Framework; @@ -803,6 +804,7 @@ public void TestServerNamedQueueRecovery() Model.QueueDeclarePassive(nameAfter); } + [Test] public void TestUnbindQueueAfterRecoveryConnection() { @@ -1099,6 +1101,248 @@ public void TestUnblockedListenersRecovery() Wait(latch); } + [Test] + public void TestTopologyRecoveryQueueFilter() + { + var filter = new TopologyRecoveryFilter + { + QueueFilter = queue => !queue.Name.Contains("filtered") + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + + var queueToRecover = "recovered.queue"; + var queueToIgnore = "filtered.queue"; + ch.QueueDeclare(queueToRecover, false, false, false, null); + ch.QueueDeclare(queueToIgnore, false, false, false, null); + + Model.QueueDelete(queueToRecover); + Model.QueueDelete(queueToIgnore); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + AssertQueueRecovery(ch, queueToRecover, false); + + try + { + ch.QueueDeclarePassive(queueToIgnore); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 404); + } + } + + [Test] + public void TestTopologyRecoveryExchangeFilter() + { + var filter = new TopologyRecoveryFilter + { + ExchangeFilter = exchange => exchange.Type == "topic" && !exchange.Name.Contains("filtered") + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + + var exchangeToRecover = "recovered.exchange"; + var exchangeToIgnore = "filtered.exchange"; + ch.ExchangeDeclare(exchangeToRecover, "topic", false, true); + ch.ExchangeDeclare(exchangeToIgnore, "direct", false, true); + + Model.ExchangeDelete(exchangeToRecover); + Model.ExchangeDelete(exchangeToIgnore); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + AssertExchangeRecovery(ch, exchangeToRecover); + + try + { + ch.ExchangeDeclarePassive(exchangeToIgnore); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 404); + } + } + + [Test] + public void TestTopologyRecoveryBindingFilter() + { + var filter = new TopologyRecoveryFilter + { + BindingFilter = binding => !binding.RoutingKey.Contains("filtered") + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + + var exchange = "topology.recovery.exchange"; + var queueWithRecoveredBinding = "topology.recovery.queue.1"; + var queueWithIgnoredBinding = "topology.recovery.queue.2"; + var bindingToRecover = "recovered.binding"; + var bindingToIgnore = "filtered.binding"; + + ch.ExchangeDeclare(exchange, "direct"); + ch.QueueDeclare(queueWithRecoveredBinding, false, false, false, null); + ch.QueueDeclare(queueWithIgnoredBinding, false, false, false, null); + ch.QueueBind(queueWithRecoveredBinding, exchange, bindingToRecover); + ch.QueueBind(queueWithIgnoredBinding, exchange, bindingToIgnore); + ch.QueuePurge(queueWithRecoveredBinding); + ch.QueuePurge(queueWithIgnoredBinding); + + Model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecover); + Model.QueueUnbind(queueWithIgnoredBinding, exchange, bindingToIgnore); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecover)); + Assert.IsFalse(SendAndConsumeMessage(queueWithIgnoredBinding, exchange, bindingToIgnore)); + } + + [Test] + public void TestTopologyRecoveryConsumerFilter() + { + var filter = new TopologyRecoveryFilter + { + ConsumerFilter = consumer => !consumer.ConsumerTag.Contains("filtered") + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + ch.ConfirmSelect(); + + var exchange = "topology.recovery.exchange"; + var queueWithRecoveredConsumer = "topology.recovery.queue.1"; + var queueWithIgnoredConsumer = "topology.recovery.queue.2"; + var binding1 = "recovered.binding"; + var binding2 = "filtered.binding"; + + ch.ExchangeDeclare(exchange, "direct"); + ch.QueueDeclare(queueWithRecoveredConsumer, false, false, false, null); + ch.QueueDeclare(queueWithIgnoredConsumer, false, false, false, null); + ch.QueueBind(queueWithRecoveredConsumer, exchange, binding1); + ch.QueueBind(queueWithIgnoredConsumer, exchange, binding2); + ch.QueuePurge(queueWithRecoveredConsumer); + ch.QueuePurge(queueWithIgnoredConsumer); + + var recoverLatch = new ManualResetEventSlim(false); + var consumerToRecover = new EventingBasicConsumer(ch); + consumerToRecover.Received += (source, ea) => recoverLatch.Set(); + ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); + + var ignoredLatch = new ManualResetEventSlim(false); + var consumerToIgnore = new EventingBasicConsumer(ch); + consumerToIgnore.Received += (source, ea) => ignoredLatch.Set(); + ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + + Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5))); + Assert.IsFalse(ignoredLatch.Wait(TimeSpan.FromSeconds(5))); + + ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); + + try + { + ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag + } + } + + [Test] + public void TestTopologyRecoveryDefaultFilterRecoversAllEntities() + { + var filter = new TopologyRecoveryFilter(); + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + ch.ConfirmSelect(); + + var exchange = "topology.recovery.exchange"; + var queue1 = "topology.recovery.queue.1"; + var queue2 = "topology.recovery.queue.2"; + var binding1 = "recovered.binding"; + var binding2 = "filtered.binding"; + + ch.ExchangeDeclare(exchange, "direct"); + ch.QueueDeclare(queue1, false, false, false, null); + ch.QueueDeclare(queue2, false, false, false, null); + ch.QueueBind(queue1, exchange, binding1); + ch.QueueBind(queue2, exchange, binding2); + ch.QueuePurge(queue1); + ch.QueuePurge(queue2); + + var consumerLatch1 = new ManualResetEventSlim(false); + var consumer1 = new EventingBasicConsumer(ch); + consumer1.Received += (source, ea) => consumerLatch1.Set(); + ch.BasicConsume(queue1, true, "recovered.consumer", consumer1); + + var consumerLatch2 = new ManualResetEventSlim(false); + var consumer2 = new EventingBasicConsumer(ch); + consumer2.Received += (source, ea) => consumerLatch2.Set(); + ch.BasicConsume(queue2, true, "filtered.consumer", consumer2); + + Model.ExchangeDelete(exchange); + Model.QueueDelete(queue1); + Model.QueueDelete(queue2); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + AssertExchangeRecovery(ch, exchange); + ch.QueueDeclarePassive(queue1); + ch.QueueDeclarePassive(queue2); + + ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + + Assert.IsTrue(consumerLatch1.Wait(TimeSpan.FromSeconds(5))); + Assert.IsTrue(consumerLatch2.Wait(TimeSpan.FromSeconds(5))); + } + + internal bool SendAndConsumeMessage(string queue, string exchange, string routingKey) + { + using (var ch = Conn.CreateModel()) + { + ch.ConfirmSelect(); + var latch = new ManualResetEventSlim(false); + + var consumer = new AckingBasicConsumer(ch, 1, latch); + + ch.BasicConsume(queue, true, consumer); + + ch.BasicPublish(exchange, routingKey, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + ch.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5)); + + return latch.Wait(TimeSpan.FromSeconds(5)); + } + } + internal void AssertExchangeRecovery(IModel m, string x) { m.ConfirmSelect(); From 92b5221f5eb7cbe51dd5a40d70968db0ae357f9e Mon Sep 17 00:00:00 2001 From: Sabina Rosca Date: Mon, 6 Mar 2023 21:54:07 +0200 Subject: [PATCH 3/5] Added custom exception handlers to topology recovery --- .../client/api/ConnectionFactory.cs | 5 + .../api/TopologyRecoveryExceptionHandler.cs | 157 ++++++++++++++++++ .../client/impl/AutorecoveringConnection.cs | 116 ++++++++++--- 3 files changed, 256 insertions(+), 22 deletions(-) create mode 100644 projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs diff --git a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs index 2eb9d93165..7074ff4bcf 100644 --- a/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs +++ b/projects/RabbitMQ.Client/client/api/ConnectionFactory.cs @@ -277,6 +277,11 @@ public TimeSpan ContinuationTimeout /// public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter(); + /// + /// Custom logic for handling topology recovery exceptions that match the specified filters. + /// + public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; set; } = new TopologyRecoveryExceptionHandler(); + /// /// Construct a fresh instance, with all fields set to their respective defaults. /// diff --git a/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs new file mode 100644 index 0000000000..ed782d6173 --- /dev/null +++ b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs @@ -0,0 +1,157 @@ +using System; + +namespace RabbitMQ.Client +{ + /// + /// Custom logic for handling topology recovery exceptions that match the specified filters. + /// + public class TopologyRecoveryExceptionHandler + { + private static readonly Func s_defaultExchangeExceptionCondition = (e, ex) => true; + private static readonly Func s_defaultQueueExceptionCondition = (q, ex) => true; + private static readonly Func s_defaultBindingExceptionCondition = (b, ex) => true; + private static readonly Func s_defaultConsumerExceptionCondition = (c, ex) => true; + + private Func _exchangeRecoveryExceptionCondition; + private Func _queueRecoveryExceptionCondition; + private Func _bindingRecoveryExceptionCondition; + private Func _consumerRecoveryExceptionCondition; + private Action _exchangeRecoveryExceptionHandler; + private Action _queueRecoveryExceptionHandler; + private Action _bindingRecoveryExceptionHandler; + private Action _consumerRecoveryExceptionHandler; + + /// + /// Decides which exchange recovery exceptions the custom exception handler is applied to. + /// Default condition applies the exception handler to all exchange recovery exceptions. + /// + public Func ExchangeRecoveryExceptionCondition + { + get => _exchangeRecoveryExceptionCondition ?? s_defaultExchangeExceptionCondition; + + set + { + if (_exchangeRecoveryExceptionCondition != null) + throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized."); + + _exchangeRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition)); + } + } + + /// + /// Decides which queue recovery exceptions the custom exception handler is applied to. + /// Default condition applies the exception handler to all queue recovery exceptions. + /// + public Func QueueRecoveryExceptionCondition + { + get => _queueRecoveryExceptionCondition ?? s_defaultQueueExceptionCondition; + + set + { + if (_queueRecoveryExceptionCondition != null) + throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionCondition)} after it has been initialized."); + + _queueRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionCondition)); + } + } + + /// + /// Decides which binding recovery exceptions the custom exception handler is applied to. + /// Default condition applies the exception handler to all binding recovery exceptions. + /// + public Func BindingRecoveryExceptionCondition + { + get => _bindingRecoveryExceptionCondition ?? s_defaultBindingExceptionCondition; + + set + { + if (_bindingRecoveryExceptionCondition != null) + throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized."); + + _bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition)); + } + } + + /// + /// Decides which consumer recovery exceptions the custom exception handler is applied to. + /// Default condition applies the exception handler to all consumer recovery exceptions. + /// + public Func ConsumerRecoveryExceptionCondition + { + get => _consumerRecoveryExceptionCondition ?? s_defaultConsumerExceptionCondition; + + set + { + if (_consumerRecoveryExceptionCondition != null) + throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionCondition)} after it has been initialized."); + + _consumerRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionCondition)); + } + } + + /// + /// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange. + /// + public Action ExchangeRecoveryExceptionHandler + { + get => _exchangeRecoveryExceptionHandler; + + set + { + if (_exchangeRecoveryExceptionHandler != null) + throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionHandler)} after it has been initialized."); + + _exchangeRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionHandler)); + } + } + + /// + /// Retries, or otherwise handles, an exception thrown when attempting to recover a queue. + /// + public Action QueueRecoveryExceptionHandler + { + get => _queueRecoveryExceptionHandler; + + set + { + if (_queueRecoveryExceptionHandler != null) + throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionHandler)} after it has been initialized."); + + _queueRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionHandler)); + } + } + + /// + /// Retries, or otherwise handles, an exception thrown when attempting to recover a binding. + /// + public Action BindingRecoveryExceptionHandler + { + get => _bindingRecoveryExceptionHandler; + + set + { + if (_bindingRecoveryExceptionHandler != null) + throw new InvalidOperationException($"Cannot modify {nameof(BindingRecoveryExceptionHandler)} after it has been initialized."); + + _bindingRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(BindingRecoveryExceptionHandler)); + } + } + + /// + /// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer. + /// Is only called when the exception did not cause the consumer's channel to close. + /// + public Action ConsumerRecoveryExceptionHandler + { + get => _consumerRecoveryExceptionHandler; + + set + { + if (_consumerRecoveryExceptionHandler != null) + throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionHandler)} after it has been initialized."); + + _consumerRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionHandler)); + } + } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index f6c2c1d8b1..90ef67f93d 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -428,11 +428,11 @@ private bool TryPerformAutomaticRecovery() // 2. Recover queues // 3. Recover bindings // 4. Recover consumers - using (var recoveryModel = _delegate.CreateModel()) + using (var recoveryModelFactory = new RecoveryModelFactory(_delegate)) { - RecoverExchanges(recoveryModel); - RecoverQueues(recoveryModel); - RecoverBindings(recoveryModel); + RecoverExchanges(recoveryModelFactory); + RecoverQueues(recoveryModelFactory); + RecoverBindings(recoveryModelFactory); } } @@ -985,7 +985,7 @@ private void PropagateQueueNameChangeToConsumers(string oldName, string newName) } } - private void RecoverBindings(IModel model) + private void RecoverBindings(RecoveryModelFactory recoveryModelFactory) { Dictionary recordedBindingsCopy; lock (_recordedEntitiesLock) @@ -997,13 +997,21 @@ private void RecoverBindings(IModel model) { try { - b.Recover(model); + b.Recover(recoveryModelFactory.RecoveryModel); } catch (Exception cause) { - string s = string.Format("Caught an exception while recovering binding between {0} and {1}: {2}", - b.Source, b.Destination, cause.Message); - HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + if (_factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler != null + && _factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionCondition(b, cause)) + { + _factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(b, cause); + } + else + { + string s = string.Format("Caught an exception while recovering binding between {0} and {1}: {2}", + b.Source, b.Destination, cause.Message); + HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + } } } } @@ -1139,14 +1147,23 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe } catch (Exception cause) { - string s = string.Format("Caught an exception while recovering consumer {0} on queue {1}: {2}", - tag, cons.Queue, cause.Message); - HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + if (channelToUse.IsOpen + && _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null + && _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionCondition(cons, cause)) + { + _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(cons, cause); + } + else + { + string s = string.Format("Caught an exception while recovering consumer {0} on queue {1}: {2}", + tag, cons.Queue, cause.Message); + HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + } } } } - private void RecoverExchanges(IModel model) + private void RecoverExchanges(RecoveryModelFactory recoveryModelFactory) { Dictionary recordedExchangesCopy; lock (_recordedEntitiesLock) @@ -1158,13 +1175,21 @@ private void RecoverExchanges(IModel model) { try { - rx.Recover(model); + rx.Recover(recoveryModelFactory.RecoveryModel); } catch (Exception cause) { - string s = string.Format("Caught an exception while recovering exchange {0}: {1}", - rx.Name, cause.Message); - HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + if (_factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler != null + && _factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionCondition(rx, cause)) + { + _factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(rx, cause); + } + else + { + string s = string.Format("Caught an exception while recovering exchange {0}: {1}", + rx.Name, cause.Message); + HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + } } } } @@ -1180,7 +1205,7 @@ private void RecoverModelsAndItsConsumers() } } - private void RecoverQueues(IModel model) + private void RecoverQueues(RecoveryModelFactory recoveryModelFactory) { Dictionary recordedQueuesCopy; lock (_recordedEntitiesLock) @@ -1195,7 +1220,7 @@ private void RecoverQueues(IModel model) try { - rq.Recover(model); + rq.Recover(recoveryModelFactory.RecoveryModel); string newName = rq.Name; if (!oldName.Equals(newName)) @@ -1232,9 +1257,17 @@ private void RecoverQueues(IModel model) } catch (Exception cause) { - string s = string.Format("Caught an exception while recovering queue {0}: {1}", - oldName, cause.Message); - HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + if (_factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler != null + && _factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionCondition(rq, cause)) + { + _factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(rq, cause); + } + else + { + string s = string.Format("Caught an exception while recovering queue {0}: {1}", + oldName, cause.Message); + HandleTopologyRecoveryException(new TopologyRecoveryException(s, cause)); + } } } } @@ -1295,6 +1328,45 @@ private enum RecoveryConnectionState Recovering } + private sealed class RecoveryModelFactory : IDisposable + { + private readonly IConnection _connection; + private IModel _recoveryModel; + + public RecoveryModelFactory(IConnection connection) + { + _connection = connection; + } + + public IModel RecoveryModel + { + get + { + if (_recoveryModel == null) + { + _recoveryModel = _connection.CreateModel(); + } + + if (_recoveryModel.IsClosed) + { + _recoveryModel.Dispose(); + _recoveryModel = _connection.CreateModel(); + } + + return _recoveryModel; + } + } + + public void Dispose() + { + if (_recoveryModel != null) + { + _recoveryModel.Close(); + _recoveryModel.Dispose(); + } + } + } + private Task _recoveryTask; private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected; From c65192cee14f4761a0652ec34d49a4551fbf7aa6 Mon Sep 17 00:00:00 2001 From: Sabina Rosca Date: Tue, 7 Mar 2023 00:31:29 +0200 Subject: [PATCH 4/5] Added tests for topology recovery exception handler --- .../client/api/IRecordedConsumer.cs | 2 + .../api/TopologyRecoveryExceptionHandler.cs | 16 +- .../client/impl/AutorecoveringConnection.cs | 11 +- projects/Unit/Fixtures.cs | 12 + projects/Unit/TestConnectionRecovery.cs | 215 +++++++++++++++++- 5 files changed, 236 insertions(+), 20 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs index b3ea93652b..ceca2566b3 100644 --- a/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs @@ -4,6 +4,8 @@ namespace RabbitMQ.Client { public interface IRecordedConsumer { + IBasicConsumer Consumer { get; } + string ConsumerTag { get; } string Queue { get; } diff --git a/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs index ed782d6173..0438f8ea7d 100644 --- a/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs +++ b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs @@ -16,10 +16,10 @@ public class TopologyRecoveryExceptionHandler private Func _queueRecoveryExceptionCondition; private Func _bindingRecoveryExceptionCondition; private Func _consumerRecoveryExceptionCondition; - private Action _exchangeRecoveryExceptionHandler; - private Action _queueRecoveryExceptionHandler; - private Action _bindingRecoveryExceptionHandler; - private Action _consumerRecoveryExceptionHandler; + private Action _exchangeRecoveryExceptionHandler; + private Action _queueRecoveryExceptionHandler; + private Action _bindingRecoveryExceptionHandler; + private Action _consumerRecoveryExceptionHandler; /// /// Decides which exchange recovery exceptions the custom exception handler is applied to. @@ -92,7 +92,7 @@ public Func ConsumerRecoveryExceptionConditi /// /// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange. /// - public Action ExchangeRecoveryExceptionHandler + public Action ExchangeRecoveryExceptionHandler { get => _exchangeRecoveryExceptionHandler; @@ -108,7 +108,7 @@ public Action ExchangeRecoveryExceptionHandler /// /// Retries, or otherwise handles, an exception thrown when attempting to recover a queue. /// - public Action QueueRecoveryExceptionHandler + public Action QueueRecoveryExceptionHandler { get => _queueRecoveryExceptionHandler; @@ -124,7 +124,7 @@ public Action QueueRecoveryExceptionHandler /// /// Retries, or otherwise handles, an exception thrown when attempting to recover a binding. /// - public Action BindingRecoveryExceptionHandler + public Action BindingRecoveryExceptionHandler { get => _bindingRecoveryExceptionHandler; @@ -141,7 +141,7 @@ public Action BindingRecoveryExceptionHandler /// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer. /// Is only called when the exception did not cause the consumer's channel to close. /// - public Action ConsumerRecoveryExceptionHandler + public Action ConsumerRecoveryExceptionHandler { get => _consumerRecoveryExceptionHandler; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 90ef67f93d..0f9b5c90f5 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -1004,7 +1004,7 @@ private void RecoverBindings(RecoveryModelFactory recoveryModelFactory) if (_factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler != null && _factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionCondition(b, cause)) { - _factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(b, cause); + _factory.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(b, cause, this); } else { @@ -1147,11 +1147,10 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe } catch (Exception cause) { - if (channelToUse.IsOpen - && _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null + if (_factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null && _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionCondition(cons, cause)) { - _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(cons, cause); + _factory.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(cons, cause, this); } else { @@ -1182,7 +1181,7 @@ private void RecoverExchanges(RecoveryModelFactory recoveryModelFactory) if (_factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler != null && _factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionCondition(rx, cause)) { - _factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(rx, cause); + _factory.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(rx, cause, this); } else { @@ -1260,7 +1259,7 @@ private void RecoverQueues(RecoveryModelFactory recoveryModelFactory) if (_factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler != null && _factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionCondition(rq, cause)) { - _factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(rq, cause); + _factory.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(rq, cause, this); } else { diff --git a/projects/Unit/Fixtures.cs b/projects/Unit/Fixtures.cs index fe411866d1..b7c9d1d01e 100644 --- a/projects/Unit/Fixtures.cs +++ b/projects/Unit/Fixtures.cs @@ -163,6 +163,18 @@ internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyReco return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}"); } + internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(TopologyRecoveryExceptionHandler handler) + { + var cf = new ConnectionFactory + { + AutomaticRecoveryEnabled = true, + TopologyRecoveryEnabled = true, + TopologyRecoveryExceptionHandler = handler + }; + + return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}"); + } + internal IConnection CreateNonRecoveringConnection() { var cf = new ConnectionFactory diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index a5ea5914ae..38cda116aa 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -30,8 +30,8 @@ //--------------------------------------------------------------------------- using System; -using System.Collections; using System.Collections.Generic; +using System.ServiceModel.Channels; using System.Text; using System.Threading; @@ -1228,8 +1228,8 @@ public void TestTopologyRecoveryConsumerFilter() var exchange = "topology.recovery.exchange"; var queueWithRecoveredConsumer = "topology.recovery.queue.1"; var queueWithIgnoredConsumer = "topology.recovery.queue.2"; - var binding1 = "recovered.binding"; - var binding2 = "filtered.binding"; + var binding1 = "recovered.binding.1"; + var binding2 = "recovered.binding.2"; ch.ExchangeDeclare(exchange, "direct"); ch.QueueDeclare(queueWithRecoveredConsumer, false, false, false, null); @@ -1325,6 +1325,209 @@ public void TestTopologyRecoveryDefaultFilterRecoversAllEntities() Assert.IsTrue(consumerLatch2.Wait(TimeSpan.FromSeconds(5))); } + [Test] + public void TestTopologyRecoveryQueueExceptionHandler() + { + var changedQueueArguments = new Dictionary + { + { Headers.XMaxPriority, 20 } + }; + var exceptionHandler = new TopologyRecoveryExceptionHandler + { + QueueRecoveryExceptionCondition = (rq, ex) => + { + return rq.Name.Contains("exception") + && ex is OperationInterruptedException operationInterruptedException + && operationInterruptedException.ShutdownReason.ReplyCode == Constants.PreconditionFailed; + }, + QueueRecoveryExceptionHandler = (rq, ex, connection) => + { + using (var model = connection.CreateModel()) + { + model.QueueDeclare(rq.Name, false, false, false, changedQueueArguments); + } + } + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + + var queueToRecoverWithException = "recovery.exception.queue"; + var queueToRecoverSuccessfully = "successfully.recovered.queue"; + ch.QueueDeclare(queueToRecoverWithException, false, false, false, null); + ch.QueueDeclare(queueToRecoverSuccessfully, false, false, false, null); + + Model.QueueDelete(queueToRecoverSuccessfully); + Model.QueueDelete(queueToRecoverWithException); + Model.QueueDeclare(queueToRecoverWithException, false, false, false, changedQueueArguments); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + AssertQueueRecovery(ch, queueToRecoverSuccessfully, false); + AssertQueueRecovery(ch, queueToRecoverWithException, false, changedQueueArguments); + + //Cleanup + Model.QueueDelete(queueToRecoverWithException); + } + + [Test] + public void TestTopologyRecoveryExchangeExceptionHandler() + { + var exceptionHandler = new TopologyRecoveryExceptionHandler + { + ExchangeRecoveryExceptionCondition = (re, ex) => + { + return re.Name.Contains("exception") + && ex is OperationInterruptedException operationInterruptedException + && operationInterruptedException.ShutdownReason.ReplyCode == Constants.PreconditionFailed; + }, + ExchangeRecoveryExceptionHandler = (re, ex, connection) => + { + using (var model = connection.CreateModel()) + { + model.ExchangeDeclare(re.Name, "topic", false, false); + } + } + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + + var exchangeToRecoverWithException = "recovery.exception.exchange"; + var exchangeToRecoverSuccessfully = "successfully.recovered.exchange"; + ch.ExchangeDeclare(exchangeToRecoverWithException, "direct", false, false); + ch.ExchangeDeclare(exchangeToRecoverSuccessfully, "direct", false, false); + + Model.ExchangeDelete(exchangeToRecoverSuccessfully); + Model.ExchangeDelete(exchangeToRecoverWithException); + Model.ExchangeDeclare(exchangeToRecoverWithException, "topic", false, false); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + AssertExchangeRecovery(ch, exchangeToRecoverSuccessfully); + AssertExchangeRecovery(ch, exchangeToRecoverWithException); + + //Cleanup + Model.ExchangeDelete(exchangeToRecoverWithException); + } + + [Test] + public void TestTopologyRecoveryBindingExceptionHandler() + { + var exchange = "topology.recovery.exchange"; + var queueWithExceptionBinding = "recovery.exception.queue"; + var bindingToRecoverWithException = "recovery.exception.binding"; + + var exceptionHandler = new TopologyRecoveryExceptionHandler + { + BindingRecoveryExceptionCondition = (b, ex) => + { + return b.RoutingKey.Contains("exception") + && ex is OperationInterruptedException operationInterruptedException + && operationInterruptedException.ShutdownReason.ReplyCode == Constants.NotFound; + }, + BindingRecoveryExceptionHandler = (b, ex, connection) => + { + using (var model = connection.CreateModel()) + { + model.QueueDeclare(queueWithExceptionBinding, false, false, false, null); + model.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException); + } + } + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + + var queueWithRecoveredBinding = "successfully.recovered.queue"; + var bindingToRecoverSuccessfully = "successfully.recovered.binding"; + + Model.QueueDeclare(queueWithExceptionBinding, false, false, false, null); + + ch.ExchangeDeclare(exchange, "direct"); + ch.QueueDeclare(queueWithRecoveredBinding, false, false, false, null); + ch.QueueBind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully); + ch.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException); + ch.QueuePurge(queueWithRecoveredBinding); + ch.QueuePurge(queueWithExceptionBinding); + + Model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully); + Model.QueueUnbind(queueWithExceptionBinding, exchange, bindingToRecoverWithException); + Model.QueueDelete(queueWithExceptionBinding); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully)); + Assert.IsFalse(SendAndConsumeMessage(queueWithExceptionBinding, exchange, bindingToRecoverWithException)); + } + + [Test] + public void TestTopologyRecoveryConsumerExceptionHandler() + { + var queueWithExceptionConsumer = "recovery.exception.queue"; + + var exceptionHandler = new TopologyRecoveryExceptionHandler + { + ConsumerRecoveryExceptionCondition = (c, ex) => + { + return c.ConsumerTag.Contains("exception") + && ex is OperationInterruptedException operationInterruptedException + && operationInterruptedException.ShutdownReason.ReplyCode == Constants.NotFound; + }, + ConsumerRecoveryExceptionHandler = (c, ex, connection) => + { + using (var model = connection.CreateModel()) + { + model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null); + model.BasicConsume(queueWithExceptionConsumer, true, c.ConsumerTag, c.Consumer); + } + } + }; + var latch = new ManualResetEventSlim(false); + AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler); + conn.RecoverySucceeded += (source, ea) => latch.Set(); + IModel ch = conn.CreateModel(); + ch.ConfirmSelect(); + + Model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null); + Model.QueuePurge(queueWithExceptionConsumer); + + var recoverLatch = new ManualResetEventSlim(false); + var consumerToRecover = new EventingBasicConsumer(ch); + consumerToRecover.Received += (source, ea) => recoverLatch.Set(); + ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover); + + Model.QueueDelete(queueWithExceptionConsumer); + + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + + ch.BasicPublish("", queueWithExceptionConsumer, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + + Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5))); + + try + { + ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag + } + } + internal bool SendAndConsumeMessage(string queue, string exchange, string routingKey) { using (var ch = Conn.CreateModel()) @@ -1362,15 +1565,15 @@ internal void AssertQueueRecovery(IModel m, string q) AssertQueueRecovery(m, q, true); } - internal void AssertQueueRecovery(IModel m, string q, bool exclusive) + internal void AssertQueueRecovery(IModel m, string q, bool exclusive, IDictionary arguments = null) { m.ConfirmSelect(); m.QueueDeclarePassive(q); - QueueDeclareOk ok1 = m.QueueDeclare(q, false, exclusive, false, null); + QueueDeclareOk ok1 = m.QueueDeclare(q, false, exclusive, false, arguments); Assert.AreEqual(ok1.MessageCount, 0); m.BasicPublish("", q, null, _messageBody); Assert.IsTrue(WaitForConfirms(m)); - QueueDeclareOk ok2 = m.QueueDeclare(q, false, exclusive, false, null); + QueueDeclareOk ok2 = m.QueueDeclare(q, false, exclusive, false, arguments); Assert.AreEqual(ok2.MessageCount, 1); } From a9cdaeebf33dcb9064616e3e4c36f3b5565afcc9 Mon Sep 17 00:00:00 2001 From: Sabina Rosca Date: Mon, 13 Mar 2023 21:55:05 +0200 Subject: [PATCH 5/5] Fixed unit tests --- .../client/api/IRecordedConsumer.cs | 2 - .../api/TopologyRecoveryExceptionHandler.cs | 1 - .../Unit/APIApproval.Approve.verified.txt | 54 ++++ projects/Unit/TestConnectionRecovery.cs | 230 +++++++++++------- 4 files changed, 201 insertions(+), 86 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs index ceca2566b3..b3ea93652b 100644 --- a/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs @@ -4,8 +4,6 @@ namespace RabbitMQ.Client { public interface IRecordedConsumer { - IBasicConsumer Consumer { get; } - string ConsumerTag { get; } string Queue { get; } diff --git a/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs index 0438f8ea7d..a40097f383 100644 --- a/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs +++ b/projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs @@ -139,7 +139,6 @@ public Action BindingRecoveryException /// /// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer. - /// Is only called when the exception did not cause the consumer's channel to close. /// public Action ConsumerRecoveryExceptionHandler { diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index 997db0a5c8..8b2e9c0f18 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -106,6 +106,8 @@ namespace RabbitMQ.Client public System.TimeSpan SocketWriteTimeout { get; set; } public RabbitMQ.Client.SslOption Ssl { get; set; } public bool TopologyRecoveryEnabled { get; set; } + public RabbitMQ.Client.TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; set; } + public RabbitMQ.Client.TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } public System.Uri Uri { get; set; } public bool UseBackgroundThreadsForIO { get; set; } public string UserName { get; set; } @@ -476,6 +478,38 @@ namespace RabbitMQ.Client int MinorVersion { get; } int Revision { get; } } + public interface IRecordedBinding + { + System.Collections.Generic.IDictionary Arguments { get; } + string Destination { get; } + string RoutingKey { get; } + string Source { get; } + } + public interface IRecordedConsumer + { + System.Collections.Generic.IDictionary Arguments { get; } + bool AutoAck { get; } + string ConsumerTag { get; } + bool Exclusive { get; } + string Queue { get; } + } + public interface IRecordedExchange + { + System.Collections.Generic.IDictionary Arguments { get; } + bool AutoDelete { get; } + bool Durable { get; } + string Name { get; } + string Type { get; } + } + public interface IRecordedQueue + { + System.Collections.Generic.IDictionary Arguments { get; } + bool AutoDelete { get; } + bool Durable { get; } + bool Exclusive { get; } + bool IsServerNamed { get; } + string Name { get; } + } public interface IRecoverable { event System.EventHandler Recovery; @@ -582,6 +616,26 @@ namespace RabbitMQ.Client public string ServerName { get; set; } public System.Security.Authentication.SslProtocols Version { get; set; } } + public class TopologyRecoveryExceptionHandler + { + public TopologyRecoveryExceptionHandler() { } + public System.Func BindingRecoveryExceptionCondition { get; set; } + public System.Action BindingRecoveryExceptionHandler { get; set; } + public System.Func ConsumerRecoveryExceptionCondition { get; set; } + public System.Action ConsumerRecoveryExceptionHandler { get; set; } + public System.Func ExchangeRecoveryExceptionCondition { get; set; } + public System.Action ExchangeRecoveryExceptionHandler { get; set; } + public System.Func QueueRecoveryExceptionCondition { get; set; } + public System.Action QueueRecoveryExceptionHandler { get; set; } + } + public class TopologyRecoveryFilter + { + public TopologyRecoveryFilter() { } + public System.Func BindingFilter { get; set; } + public System.Func ConsumerFilter { get; set; } + public System.Func ExchangeFilter { get; set; } + public System.Func QueueFilter { get; set; } + } } namespace RabbitMQ.Client.Events { diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index 38cda116aa..38bf728935 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -1121,20 +1121,27 @@ public void TestTopologyRecoveryQueueFilter() Model.QueueDelete(queueToRecover); Model.QueueDelete(queueToIgnore); - CloseAndWaitForRecovery(conn); - Wait(latch); - - Assert.IsTrue(ch.IsOpen); - AssertQueueRecovery(ch, queueToRecover, false); - try { - ch.QueueDeclarePassive(queueToIgnore); - Assert.Fail("Expected an exception"); + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + AssertQueueRecovery(ch, queueToRecover, false); + + try + { + AssertQueueRecovery(ch, queueToIgnore, false); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 404); + } } - catch (OperationInterruptedException e) + finally { - AssertShutdownError(e.ShutdownReason, 404); + conn.Abort(); } } @@ -1158,20 +1165,27 @@ public void TestTopologyRecoveryExchangeFilter() Model.ExchangeDelete(exchangeToRecover); Model.ExchangeDelete(exchangeToIgnore); - CloseAndWaitForRecovery(conn); - Wait(latch); - - Assert.IsTrue(ch.IsOpen); - AssertExchangeRecovery(ch, exchangeToRecover); - try { - ch.ExchangeDeclarePassive(exchangeToIgnore); - Assert.Fail("Expected an exception"); + CloseAndWaitForRecovery(conn); + Wait(latch); + + Assert.IsTrue(ch.IsOpen); + AssertExchangeRecovery(ch, exchangeToRecover); + + try + { + AssertExchangeRecovery(ch, exchangeToIgnore); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 404); + } } - catch (OperationInterruptedException e) + finally { - AssertShutdownError(e.ShutdownReason, 404); + conn.Abort(); } } @@ -1204,12 +1218,19 @@ public void TestTopologyRecoveryBindingFilter() Model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecover); Model.QueueUnbind(queueWithIgnoredBinding, exchange, bindingToIgnore); - CloseAndWaitForRecovery(conn); - Wait(latch); + try + { + CloseAndWaitForRecovery(conn); + Wait(latch); - Assert.IsTrue(ch.IsOpen); - Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecover)); - Assert.IsFalse(SendAndConsumeMessage(queueWithIgnoredBinding, exchange, bindingToIgnore)); + Assert.IsTrue(ch.IsOpen); + Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecover)); + Assert.IsFalse(SendAndConsumeMessage(queueWithIgnoredBinding, exchange, bindingToIgnore)); + } + finally + { + conn.Abort(); + } } [Test] @@ -1249,26 +1270,33 @@ public void TestTopologyRecoveryConsumerFilter() consumerToIgnore.Received += (source, ea) => ignoredLatch.Set(); ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); - CloseAndWaitForRecovery(conn); - Wait(latch); + try + { + CloseAndWaitForRecovery(conn); + Wait(latch); - Assert.IsTrue(ch.IsOpen); - ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); - ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + Assert.IsTrue(ch.IsOpen); + ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); - Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5))); - Assert.IsFalse(ignoredLatch.Wait(TimeSpan.FromSeconds(5))); + Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5))); + Assert.IsFalse(ignoredLatch.Wait(TimeSpan.FromSeconds(5))); - ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); + ch.BasicConsume(queueWithIgnoredConsumer, true, "filtered.consumer", consumerToIgnore); - try - { - ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); - Assert.Fail("Expected an exception"); + try + { + ch.BasicConsume(queueWithRecoveredConsumer, true, "recovered.consumer", consumerToRecover); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag + } } - catch (OperationInterruptedException e) + finally { - AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag + conn.Abort(); } } @@ -1310,19 +1338,26 @@ public void TestTopologyRecoveryDefaultFilterRecoversAllEntities() Model.QueueDelete(queue1); Model.QueueDelete(queue2); - CloseAndWaitForRecovery(conn); - Wait(latch); + try + { + CloseAndWaitForRecovery(conn); + Wait(latch); - Assert.IsTrue(ch.IsOpen); - AssertExchangeRecovery(ch, exchange); - ch.QueueDeclarePassive(queue1); - ch.QueueDeclarePassive(queue2); + Assert.IsTrue(ch.IsOpen); + AssertExchangeRecovery(ch, exchange); + ch.QueueDeclarePassive(queue1); + ch.QueueDeclarePassive(queue2); - ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); - ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + ch.BasicPublish(exchange, binding1, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + ch.BasicPublish(exchange, binding2, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); - Assert.IsTrue(consumerLatch1.Wait(TimeSpan.FromSeconds(5))); - Assert.IsTrue(consumerLatch2.Wait(TimeSpan.FromSeconds(5))); + Assert.IsTrue(consumerLatch1.Wait(TimeSpan.FromSeconds(5))); + Assert.IsTrue(consumerLatch2.Wait(TimeSpan.FromSeconds(5))); + } + finally + { + conn.Abort(); + } } [Test] @@ -1362,15 +1397,22 @@ public void TestTopologyRecoveryQueueExceptionHandler() Model.QueueDelete(queueToRecoverWithException); Model.QueueDeclare(queueToRecoverWithException, false, false, false, changedQueueArguments); - CloseAndWaitForRecovery(conn); - Wait(latch); + try + { + CloseAndWaitForRecovery(conn); + Wait(latch); - Assert.IsTrue(ch.IsOpen); - AssertQueueRecovery(ch, queueToRecoverSuccessfully, false); - AssertQueueRecovery(ch, queueToRecoverWithException, false, changedQueueArguments); + Assert.IsTrue(ch.IsOpen); + AssertQueueRecovery(ch, queueToRecoverSuccessfully, false); + AssertQueueRecovery(ch, queueToRecoverWithException, false, changedQueueArguments); + } + finally + { + //Cleanup + Model.QueueDelete(queueToRecoverWithException); - //Cleanup - Model.QueueDelete(queueToRecoverWithException); + conn.Abort(); + } } [Test] @@ -1406,15 +1448,22 @@ public void TestTopologyRecoveryExchangeExceptionHandler() Model.ExchangeDelete(exchangeToRecoverWithException); Model.ExchangeDeclare(exchangeToRecoverWithException, "topic", false, false); - CloseAndWaitForRecovery(conn); - Wait(latch); + try + { + CloseAndWaitForRecovery(conn); + Wait(latch); - Assert.IsTrue(ch.IsOpen); - AssertExchangeRecovery(ch, exchangeToRecoverSuccessfully); - AssertExchangeRecovery(ch, exchangeToRecoverWithException); + Assert.IsTrue(ch.IsOpen); + AssertExchangeRecovery(ch, exchangeToRecoverSuccessfully); + AssertExchangeRecovery(ch, exchangeToRecoverWithException); + } + finally + { + //Cleanup + Model.ExchangeDelete(exchangeToRecoverWithException); - //Cleanup - Model.ExchangeDelete(exchangeToRecoverWithException); + conn.Abort(); + } } [Test] @@ -1462,12 +1511,19 @@ public void TestTopologyRecoveryBindingExceptionHandler() Model.QueueUnbind(queueWithExceptionBinding, exchange, bindingToRecoverWithException); Model.QueueDelete(queueWithExceptionBinding); - CloseAndWaitForRecovery(conn); - Wait(latch); + try + { + CloseAndWaitForRecovery(conn); + Wait(latch); - Assert.IsTrue(ch.IsOpen); - Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully)); - Assert.IsFalse(SendAndConsumeMessage(queueWithExceptionBinding, exchange, bindingToRecoverWithException)); + Assert.IsTrue(ch.IsOpen); + Assert.IsTrue(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully)); + Assert.IsTrue(SendAndConsumeMessage(queueWithExceptionBinding, exchange, bindingToRecoverWithException)); + } + finally + { + conn.Abort(); + } } [Test] @@ -1488,8 +1544,11 @@ public void TestTopologyRecoveryConsumerExceptionHandler() using (var model = connection.CreateModel()) { model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null); - model.BasicConsume(queueWithExceptionConsumer, true, c.ConsumerTag, c.Consumer); } + + // So topology recovery runs again. This time he missing queue should exist, making + // it possible to recover the consumer successfully. + throw ex; } }; var latch = new ManualResetEventSlim(false); @@ -1508,23 +1567,30 @@ public void TestTopologyRecoveryConsumerExceptionHandler() Model.QueueDelete(queueWithExceptionConsumer); - CloseAndWaitForRecovery(conn); - Wait(latch); + try + { + CloseAndWaitForShutdown(conn); + Wait(latch, TimeSpan.FromSeconds(20)); - Assert.IsTrue(ch.IsOpen); + Assert.IsTrue(ch.IsOpen); - ch.BasicPublish("", queueWithExceptionConsumer, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); + ch.BasicPublish("", queueWithExceptionConsumer, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); - Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5))); + Assert.IsTrue(recoverLatch.Wait(TimeSpan.FromSeconds(5))); - try - { - ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover); - Assert.Fail("Expected an exception"); + try + { + ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover); + Assert.Fail("Expected an exception"); + } + catch (OperationInterruptedException e) + { + AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag + } } - catch (OperationInterruptedException e) + finally { - AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag + conn.Abort(); } } @@ -1532,15 +1598,13 @@ internal bool SendAndConsumeMessage(string queue, string exchange, string routin { using (var ch = Conn.CreateModel()) { - ch.ConfirmSelect(); var latch = new ManualResetEventSlim(false); var consumer = new AckingBasicConsumer(ch, 1, latch); - ch.BasicConsume(queue, true, consumer); + ch.BasicConsume(queue, false, consumer); ch.BasicPublish(exchange, routingKey, ch.CreateBasicProperties(), Encoding.UTF8.GetBytes("test message")); - ch.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5)); return latch.Wait(TimeSpan.FromSeconds(5)); }