Skip to content

Commit

Permalink
Merge pull request #1316 from rosca-sabina/feature/658-add-filtering-…
Browse files Browse the repository at this point in the history
…to-topology-recovery-6.x

Add custom filtering and exception handling to topology recovery on 6.x
  • Loading branch information
Zerpet authored Mar 22, 2023
2 parents c351467 + a9cdaee commit bf7d538
Show file tree
Hide file tree
Showing 16 changed files with 1,051 additions and 68 deletions.
11 changes: 11 additions & 0 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ public TimeSpan ContinuationTimeout
/// </summary>
public bool TopologyRecoveryEnabled { get; set; } = true;

/// <summary>
/// Filter to include/exclude entities from topology recovery.
/// Default filter includes all entities in topology recovery.
/// </summary>
public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter();

/// <summary>
/// Custom logic for handling topology recovery exceptions that match the specified filters.
/// </summary>
public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; set; } = new TopologyRecoveryExceptionHandler();

/// <summary>
/// Construct a fresh instance, with all fields set to their respective defaults.
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions projects/RabbitMQ.Client/client/api/IRecordedBinding.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Collections.Generic;

namespace RabbitMQ.Client
{
public interface IRecordedBinding
{
string Source { get; }

string Destination { get; }

string RoutingKey { get; }

IDictionary<string, object> Arguments { get; }
}
}
17 changes: 17 additions & 0 deletions projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Collections.Generic;

namespace RabbitMQ.Client
{
public interface IRecordedConsumer
{
string ConsumerTag { get; }

string Queue { get; }

bool AutoAck { get; }

bool Exclusive { get; }

IDictionary<string, object> Arguments { get; }
}
}
17 changes: 17 additions & 0 deletions projects/RabbitMQ.Client/client/api/IRecordedExchange.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Collections.Generic;

namespace RabbitMQ.Client
{
public interface IRecordedExchange
{
string Name { get; }

string Type { get; }

bool Durable { get; }

bool AutoDelete { get; }

IDictionary<string, object> Arguments { get; }
}
}
19 changes: 19 additions & 0 deletions projects/RabbitMQ.Client/client/api/IRecordedQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Collections.Generic;

namespace RabbitMQ.Client
{
public interface IRecordedQueue
{
string Name { get; }

bool Durable { get; }

bool Exclusive { get; }

bool AutoDelete { get; }

IDictionary<string, object> Arguments { get; }

bool IsServerNamed { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using System;

namespace RabbitMQ.Client
{
/// <summary>
/// Custom logic for handling topology recovery exceptions that match the specified filters.
/// </summary>
public class TopologyRecoveryExceptionHandler
{
private static readonly Func<IRecordedExchange, Exception, bool> s_defaultExchangeExceptionCondition = (e, ex) => true;
private static readonly Func<IRecordedQueue, Exception, bool> s_defaultQueueExceptionCondition = (q, ex) => true;
private static readonly Func<IRecordedBinding, Exception, bool> s_defaultBindingExceptionCondition = (b, ex) => true;
private static readonly Func<IRecordedConsumer, Exception, bool> s_defaultConsumerExceptionCondition = (c, ex) => true;

private Func<IRecordedExchange, Exception, bool> _exchangeRecoveryExceptionCondition;
private Func<IRecordedQueue, Exception, bool> _queueRecoveryExceptionCondition;
private Func<IRecordedBinding, Exception, bool> _bindingRecoveryExceptionCondition;
private Func<IRecordedConsumer, Exception, bool> _consumerRecoveryExceptionCondition;
private Action<IRecordedExchange, Exception, IConnection> _exchangeRecoveryExceptionHandler;
private Action<IRecordedQueue, Exception, IConnection> _queueRecoveryExceptionHandler;
private Action<IRecordedBinding, Exception, IConnection> _bindingRecoveryExceptionHandler;
private Action<IRecordedConsumer, Exception, IConnection> _consumerRecoveryExceptionHandler;

/// <summary>
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
/// Default condition applies the exception handler to all exchange recovery exceptions.
/// </summary>
public Func<IRecordedExchange, Exception, bool> ExchangeRecoveryExceptionCondition
{
get => _exchangeRecoveryExceptionCondition ?? s_defaultExchangeExceptionCondition;

set
{
if (_exchangeRecoveryExceptionCondition != null)
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");

_exchangeRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
}
}

/// <summary>
/// Decides which queue recovery exceptions the custom exception handler is applied to.
/// Default condition applies the exception handler to all queue recovery exceptions.
/// </summary>
public Func<IRecordedQueue, Exception, bool> QueueRecoveryExceptionCondition
{
get => _queueRecoveryExceptionCondition ?? s_defaultQueueExceptionCondition;

set
{
if (_queueRecoveryExceptionCondition != null)
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionCondition)} after it has been initialized.");

_queueRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionCondition));
}
}

/// <summary>
/// Decides which binding recovery exceptions the custom exception handler is applied to.
/// Default condition applies the exception handler to all binding recovery exceptions.
/// </summary>
public Func<IRecordedBinding, Exception, bool> BindingRecoveryExceptionCondition
{
get => _bindingRecoveryExceptionCondition ?? s_defaultBindingExceptionCondition;

set
{
if (_bindingRecoveryExceptionCondition != null)
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");

_bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
}
}

/// <summary>
/// Decides which consumer recovery exceptions the custom exception handler is applied to.
/// Default condition applies the exception handler to all consumer recovery exceptions.
/// </summary>
public Func<IRecordedConsumer, Exception, bool> ConsumerRecoveryExceptionCondition
{
get => _consumerRecoveryExceptionCondition ?? s_defaultConsumerExceptionCondition;

set
{
if (_consumerRecoveryExceptionCondition != null)
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionCondition)} after it has been initialized.");

_consumerRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionCondition));
}
}

/// <summary>
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
/// </summary>
public Action<IRecordedExchange, Exception, IConnection> ExchangeRecoveryExceptionHandler
{
get => _exchangeRecoveryExceptionHandler;

set
{
if (_exchangeRecoveryExceptionHandler != null)
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionHandler)} after it has been initialized.");

_exchangeRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionHandler));
}
}

/// <summary>
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
/// </summary>
public Action<IRecordedQueue, Exception, IConnection> QueueRecoveryExceptionHandler
{
get => _queueRecoveryExceptionHandler;

set
{
if (_queueRecoveryExceptionHandler != null)
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionHandler)} after it has been initialized.");

_queueRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionHandler));
}
}

/// <summary>
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
/// </summary>
public Action<IRecordedBinding, Exception, IConnection> BindingRecoveryExceptionHandler
{
get => _bindingRecoveryExceptionHandler;

set
{
if (_bindingRecoveryExceptionHandler != null)
throw new InvalidOperationException($"Cannot modify {nameof(BindingRecoveryExceptionHandler)} after it has been initialized.");

_bindingRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(BindingRecoveryExceptionHandler));
}
}

/// <summary>
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
/// </summary>
public Action<IRecordedConsumer, Exception, IConnection> ConsumerRecoveryExceptionHandler
{
get => _consumerRecoveryExceptionHandler;

set
{
if (_consumerRecoveryExceptionHandler != null)
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionHandler)} after it has been initialized.");

_consumerRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionHandler));
}
}
}
}
85 changes: 85 additions & 0 deletions projects/RabbitMQ.Client/client/api/TopologyRecoveryFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;

namespace RabbitMQ.Client
{
/// <summary>
/// Filter to know which entities (exchanges, queues, bindings, consumers) should be recovered by topology recovery.
/// By default, allows all entities to be recovered.
/// </summary>
public class TopologyRecoveryFilter
{
private static readonly Func<IRecordedExchange, bool> s_defaultExchangeFilter = exchange => true;
private static readonly Func<IRecordedQueue, bool> s_defaultQueueFilter = queue => true;
private static readonly Func<IRecordedBinding, bool> s_defaultBindingFilter = binding => true;
private static readonly Func<IRecordedConsumer, bool> s_defaultConsumerFilter = consumer => true;

private Func<IRecordedExchange, bool> _exchangeFilter;
private Func<IRecordedQueue, bool> _queueFilter;
private Func<IRecordedBinding, bool> _bindingFilter;
private Func<IRecordedConsumer, bool> _consumerFilter;

/// <summary>
/// Decides whether an exchange is recovered or not.
/// </summary>
public Func<IRecordedExchange, bool> ExchangeFilter
{
get => _exchangeFilter ?? s_defaultExchangeFilter;

set
{
if (_exchangeFilter != null)
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeFilter)} after it has been initialized.");

_exchangeFilter = value ?? throw new ArgumentNullException(nameof(ExchangeFilter));
}
}

/// <summary>
/// Decides whether a queue is recovered or not.
/// </summary>
public Func<IRecordedQueue, bool> QueueFilter
{
get => _queueFilter ?? s_defaultQueueFilter;

set
{
if (_queueFilter != null)
throw new InvalidOperationException($"Cannot modify {nameof(QueueFilter)} after it has been initialized.");

_queueFilter = value ?? throw new ArgumentNullException(nameof(QueueFilter));
}
}

/// <summary>
/// Decides whether a binding is recovered or not.
/// </summary>
public Func<IRecordedBinding, bool> BindingFilter
{
get => _bindingFilter ?? s_defaultBindingFilter;

set
{
if (_bindingFilter != null)
throw new InvalidOperationException($"Cannot modify {nameof(BindingFilter)} after it has been initialized.");

_bindingFilter = value ?? throw new ArgumentNullException(nameof(BindingFilter));
}
}

/// <summary>
/// Decides whether a consumer is recovered or not.
/// </summary>
public Func<IRecordedConsumer, bool> ConsumerFilter
{
get => _consumerFilter ?? s_defaultConsumerFilter;

set
{
if (_consumerFilter != null)
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerFilter)} after it has been initialized.");

_consumerFilter = value ?? throw new ArgumentNullException(nameof(ConsumerFilter));
}
}
}
}
Loading

0 comments on commit bf7d538

Please sign in to comment.