Skip to content

Commit

Permalink
Add optional retry logic to topology recovery
Browse files Browse the repository at this point in the history
There's no topology recovery retry by default. The default
implementation is composable: not all have the recoverable entities have
to retry and the retry operations don't have to be only the
corresponding entity recovery, but also other operations, like
recovering the corresponding channel.

Fixes #387

(cherry picked from commit 34e33ea)

Conflicts:
	src/main/java/com/rabbitmq/client/ConnectionFactory.java
	src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
	src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
	src/test/java/com/rabbitmq/client/test/TestUtils.java
	src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java
  • Loading branch information
acogoluegnes committed Aug 10, 2018
1 parent 77b9747 commit 9e02adb
Show file tree
Hide file tree
Showing 18 changed files with 1,412 additions and 154 deletions.
19 changes: 19 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

import java.io.IOException;
Expand Down Expand Up @@ -180,6 +181,13 @@ public class ConnectionFactory implements Cloneable {
*/
private TopologyRecoveryFilter topologyRecoveryFilter;

/**
* Retry handler for topology recovery.
* Default is no retry.
* @since 4.8.0
*/
private RetryHandler topologyRecoveryRetryHandler;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -1055,6 +1063,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setWorkPoolTimeout(workPoolTimeout);
result.setErrorOnWriteListener(errorOnWriteListener);
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
return result;
}

Expand Down Expand Up @@ -1396,4 +1405,14 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
this.topologyRecoveryFilter = topologyRecoveryFilter;
}

/**
* Set retry handler for topology recovery.
* Default is no retry.
* @param topologyRecoveryRetryHandler
* @since 4.8.0
*/
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.recovery.RetryHandler;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

import java.util.Map;
Expand Down Expand Up @@ -48,6 +50,7 @@ public class ConnectionParams {
private ErrorOnWriteListener errorOnWriteListener;
private int workPoolTimeout = -1;
private TopologyRecoveryFilter topologyRecoveryFilter;
private RetryHandler topologyRecoveryRetryHandler;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -245,4 +248,12 @@ public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFil
public TopologyRecoveryFilter getTopologyRecoveryFilter() {
return topologyRecoveryFilter;
}

public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
}

public RetryHandler getTopologyRecoveryRetryHandler() {
return topologyRecoveryRetryHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
// be created after application code has initiated shutdown.
private final Object recoveryLock = new Object();

private final RetryHandler retryHandler;

public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
this(params, f, new ListAddressResolver(addrs));
}
Expand All @@ -109,6 +111,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,

this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
letAllPassFilter() : params.getTopologyRecoveryFilter();

this.retryHandler = params.getTopologyRecoveryRetryHandler();
}

private void setupErrorOnWriteListenerForPotentialRecovery() {
Expand Down Expand Up @@ -633,6 +637,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
}
}

void recoverChannel(AutorecoveringChannel channel) throws IOException {
channel.automaticallyRecover(this, this.delegate);
}

private void notifyRecoveryListenersComplete() {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleRecovery(this);
Expand All @@ -654,16 +662,16 @@ private void recoverTopology(final ExecutorService executor) {
if (executor == null) {
// recover entities in serial on the main connection thread
for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) {
recoverExchange(exchange);
recoverExchange(exchange, true);
}
for (final Map.Entry<String, RecordedQueue> entry : Utility.copy(recordedQueues).entrySet()) {
recoverQueue(entry.getKey(), entry.getValue());
recoverQueue(entry.getKey(), entry.getValue(), true);
}
for (final RecordedBinding b : Utility.copy(recordedBindings)) {
recoverBinding(b);
recoverBinding(b, true);
}
for (final Map.Entry<String, RecordedConsumer> entry : Utility.copy(consumers).entrySet()) {
recoverConsumer(entry.getKey(), entry.getValue());
recoverConsumer(entry.getKey(), entry.getValue(), true);
}
} else {
// Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers
Expand All @@ -683,11 +691,22 @@ private void recoverTopology(final ExecutorService executor) {
}
}

private void recoverExchange(final RecordedExchange x) {
private void recoverExchange(RecordedExchange x, boolean retry) {
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
try {
if (topologyRecoveryFilter.filterExchange(x)) {
x.recover();
if (retry) {
final RecordedExchange entity = x;
x = (RecordedExchange) wrapRetryIfNecessary(x, new Callable<Void>() {
@Override
public Void call() throws Exception {
entity.recover();
return null;
}
}).getRecordedEntity();
} else {
x.recover();
}
LOGGER.debug("{} has recovered", x);
}
} catch (Exception cause) {
Expand All @@ -698,12 +717,23 @@ private void recoverExchange(final RecordedExchange x) {
}
}

private void recoverQueue(final String oldName, final RecordedQueue q) {

void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
try {
if (topologyRecoveryFilter.filterQueue(q)) {
LOGGER.debug("Recovering {}", q);
q.recover();
if (retry) {
final RecordedQueue entity = q;
q = (RecordedQueue) wrapRetryIfNecessary(q, new Callable<Void>() {
@Override
public Void call() throws Exception {
entity.recover();
return null;
}
}).getRecordedEntity();
} else {
q.recover();
}
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
Expand Down Expand Up @@ -734,10 +764,21 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {
}
}

private void recoverBinding(final RecordedBinding b) {
private void recoverBinding(RecordedBinding b, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterBinding(b)) {
b.recover();
if (retry) {
final RecordedBinding entity = b;
b = (RecordedBinding) wrapRetryIfNecessary(b, new Callable<Void>() {
@Override
public Void call() throws Exception {
entity.recover();
return null;
}
}).getRecordedEntity();
} else {
b.recover();
}
LOGGER.debug("{} has recovered", b);
}
} catch (Exception cause) {
Expand All @@ -748,11 +789,25 @@ private void recoverBinding(final RecordedBinding b) {
}
}

private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
try {
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
LOGGER.debug("Recovering {}", consumer);
String newTag = consumer.recover();
String newTag = null;
if (retry) {
final RecordedConsumer entity = consumer;
RetryResult retryResult = wrapRetryIfNecessary(consumer, new Callable<String>() {
@Override
public String call() throws Exception {
return entity.recover();
}
});
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
newTag = (String) retryResult.getResult();
} else {
newTag = consumer.recover();
}

// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
Expand All @@ -775,6 +830,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer)
}
}

private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
if (this.retryHandler == null) {
T result = recoveryAction.call();
return new RetryResult(entity, result);
} else {
try {
T result = recoveryAction.call();
return new RetryResult(entity, result);
} catch (Exception e) {
RetryContext retryContext = new RetryContext(entity, e, this);
RetryResult retryResult;
if (entity instanceof RecordedQueue) {
retryResult = this.retryHandler.retryQueueRecovery(retryContext);
} else if (entity instanceof RecordedExchange) {
retryResult = this.retryHandler.retryExchangeRecovery(retryContext);
} else if (entity instanceof RecordedBinding) {
retryResult = this.retryHandler.retryBindingRecovery(retryContext);
} else if (entity instanceof RecordedConsumer) {
retryResult = this.retryHandler.retryConsumerRecovery(retryContext);
} else {
throw new IllegalArgumentException("Unknown type of recorded entity: " + entity);
}
return retryResult;
}
}
}

private void propagateQueueNameChangeToBindings(String oldName, String newName) {
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
if (b.getDestination().equals(oldName)) {
Expand Down Expand Up @@ -825,15 +907,15 @@ private <E extends RecordedEntity> List<Callable<Object>> groupEntitiesByChannel
public void run() {
for (final E entity : entityList) {
if (entity instanceof RecordedExchange) {
recoverExchange((RecordedExchange)entity);
recoverExchange((RecordedExchange)entity, true);
} else if (entity instanceof RecordedQueue) {
final RecordedQueue q = (RecordedQueue) entity;
recoverQueue(q.getName(), q);
recoverQueue(q.getName(), q, true);
} else if (entity instanceof RecordedBinding) {
recoverBinding((RecordedBinding) entity);
recoverBinding((RecordedBinding) entity, true);
} else if (entity instanceof RecordedConsumer) {
final RecordedConsumer c = (RecordedConsumer) entity;
recoverConsumer(c.getConsumerTag(), c);
recoverConsumer(c.getConsumerTag(), c, true);
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.impl.recovery;

/**
* Backoff policy for topology recovery retry attempts.
*
* @see DefaultRetryHandler
* @see TopologyRecoveryRetryHandlerBuilder
* @since 4.8.0
*/
public interface BackoffPolicy {

/**
* Wait depending on the current attempt number (1, 2, 3, etc)
* @param attemptNumber current attempt number
* @throws InterruptedException
*/
void backoff(int attemptNumber) throws InterruptedException;
}
Loading

0 comments on commit 9e02adb

Please sign in to comment.