Skip to content

Commit

Permalink
Polish recovery retry
Browse files Browse the repository at this point in the history
Add log in default retry handler, add operation to recover all the
bindings of a queue (useful when the recovery of a consumer fails
because isn't found), make AutorecoveringConnection#recoverConsumer and
AutorecoveringConnection#recoverQueue public as they contain useful
logic that some client code should be able to use, and declared a
pre-configured retry handler for the deleted queue case.

References #387

(cherry picked from commit 2b8d257)
  • Loading branch information
acogoluegnes committed Aug 13, 2018
1 parent 26a8dae commit f4978bc
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ private void recoverExchange(RecordedExchange x, boolean retry) {
}


void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
try {
if (topologyRecoveryFilter.filterQueue(q)) {
LOGGER.debug("Recovering {}", q);
Expand Down Expand Up @@ -774,7 +774,7 @@ private void recoverBinding(RecordedBinding b, boolean retry) {
}
}

private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
LOGGER.debug("Recovering {}", consumer);
Expand Down Expand Up @@ -1087,6 +1087,10 @@ public Map<String, RecordedExchange> getRecordedExchanges() {
return recordedExchanges;
}

public List<RecordedBinding> getRecordedBindings() {
return recordedBindings;
}

@Override
public String toString() {
return this.delegate.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package com.rabbitmq.client.impl.recovery;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.function.BiPredicate;

Expand All @@ -35,6 +38,8 @@
*/
public class DefaultRetryHandler implements RetryHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRetryHandler.class);

private final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
Expand Down Expand Up @@ -98,6 +103,7 @@ public RetryResult retryConsumerRecovery(RetryContext context) throws Exception

protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition, RetryOperation<?> operation, RecordedEntity entity, RetryContext context)
throws Exception {
log(entity, context.exception());
int attempts = 0;
Exception exception = context.exception();
while (attempts < retryAttempts) {
Expand All @@ -119,6 +125,10 @@ protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition,
throw context.exception();
}

protected void log(RecordedEntity entity, Exception exception) {
LOGGER.info("Error while recovering {}, retrying with {} attempt(s).", entity, retryAttempts, exception);
}

public interface RetryOperation<T> {

T call(RetryContext context) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ShutdownSignalException;

import java.util.List;
import java.util.function.BiPredicate;
import java.util.function.Predicate;

import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;

/**
* Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}.
* They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}.
Expand All @@ -32,6 +35,9 @@
*/
public abstract class TopologyRecoveryRetryLogic {

/**
* Channel has been closed because of a resource that doesn't exist.
*/
public static final BiPredicate<RecordedEntity, Exception> CHANNEL_CLOSED_NOT_FOUND = (entity, ex) -> {
if (ex.getCause() instanceof ShutdownSignalException) {
ShutdownSignalException cause = (ShutdownSignalException) ex.getCause();
Expand All @@ -42,13 +48,19 @@ public abstract class TopologyRecoveryRetryLogic {
return false;
};

/**
* Recover a channel.
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CHANNEL = context -> {
if (!context.entity().getChannel().isOpen()) {
context.connection().recoverChannel(context.entity().getChannel());
}
return null;
};

/**
* Recover the destination queue of a binding.
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING_QUEUE = context -> {
if (context.entity() instanceof RecordedQueueBinding) {
RecordedBinding binding = context.binding();
Expand All @@ -63,11 +75,17 @@ public abstract class TopologyRecoveryRetryLogic {
return null;
};

/**
* Recover a binding.
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING = context -> {
context.binding().recover();
return null;
};

/**
* Recover the queue of a consumer.
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE = context -> {
if (context.entity() instanceof RecordedConsumer) {
RecordedConsumer consumer = context.consumer();
Expand All @@ -82,5 +100,37 @@ public abstract class TopologyRecoveryRetryLogic {
return null;
};

/**
* Recover all the bindings of the queue of a consumer.
*/
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE_BINDINGS = context -> {
if (context.entity() instanceof RecordedConsumer) {
String queue = context.consumer().getQueue();
for (RecordedBinding recordedBinding : context.connection().getRecordedBindings()) {
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
recordedBinding.recover();
}
}
}
return null;
};

/**
* Recover a consumer.
*/
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = context -> context.consumer().recover();

/**
* Pre-configured {@link DefaultRetryHandler} that retries recovery of bindings and consumers
* when their respective queue is not found.
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
* can be deleted between queue recovery and binding/consumer recovery.
*/
public static final RetryHandler RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)))
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@

import java.util.HashMap;

import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.CHANNEL_CLOSED_NOT_FOUND;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING_QUEUE;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CHANNEL;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER_QUEUE;
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER;
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -57,13 +51,7 @@ public void topologyRecoveryRetry() throws Exception {
@Override
protected ConnectionFactory newConnectionFactory() {
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
connectionFactory.setTopologyRecoveryRetryHandler(
builder().bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)))
.build()
);
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER);
connectionFactory.setNetworkRecoveryInterval(1000);
return connectionFactory;
}
Expand Down

0 comments on commit f4978bc

Please sign in to comment.