From 9e02adb0ab79d0003775466c1a8002cc332c4e2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 7 Aug 2018 11:23:21 +0200 Subject: [PATCH] Add optional retry logic to topology recovery There's no topology recovery retry by default. The default implementation is composable: not all have the recoverable entities have to retry and the retry operations don't have to be only the corresponding entity recovery, but also other operations, like recovering the corresponding channel. Fixes #387 (cherry picked from commit 34e33ea80b9c71dfc0b2cb929b40e707e6e0fcac) Conflicts: src/main/java/com/rabbitmq/client/ConnectionFactory.java src/main/java/com/rabbitmq/client/impl/ConnectionParams.java src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java src/test/java/com/rabbitmq/client/test/TestUtils.java src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java --- .../rabbitmq/client/ConnectionFactory.java | 19 ++ .../client/impl/ConnectionParams.java | 11 + .../recovery/AutorecoveringConnection.java | 114 ++++++- .../client/impl/recovery/BackoffPolicy.java | 33 ++ .../impl/recovery/DefaultRetryHandler.java | 175 ++++++++++ .../client/impl/recovery/RetryContext.java | 99 ++++++ .../client/impl/recovery/RetryHandler.java | 62 ++++ .../client/impl/recovery/RetryResult.java | 57 ++++ .../TopologyRecoveryRetryHandlerBuilder.java | 166 ++++++++++ .../recovery/TopologyRecoveryRetryLogic.java | 109 ++++++ .../com/rabbitmq/client/test/ClientTests.java | 3 +- .../client/test/DefaultRetryHandlerTest.java | 310 ++++++++++++++++++ .../com/rabbitmq/client/test/TestUtils.java | 180 +++++++++- .../test/functional/ConnectionRecovery.java | 36 +- .../test/functional/FunctionalTests.java | 3 +- .../functional/TopologyRecoveryFiltering.java | 111 +------ .../functional/TopologyRecoveryRetry.java | 74 +++++ src/test/java/com/rabbitmq/tools/Host.java | 4 + 18 files changed, 1412 insertions(+), 154 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java create mode 100644 src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java create mode 100644 src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index cbbc4338fe..8c7b524917 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -29,6 +29,7 @@ import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RetryHandler; import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import java.io.IOException; @@ -180,6 +181,13 @@ public class ConnectionFactory implements Cloneable { */ private TopologyRecoveryFilter topologyRecoveryFilter; + /** + * Retry handler for topology recovery. + * Default is no retry. + * @since 4.8.0 + */ + private RetryHandler topologyRecoveryRetryHandler; + /** @return the default host to use for connections */ public String getHost() { return host; @@ -1055,6 +1063,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setWorkPoolTimeout(workPoolTimeout); result.setErrorOnWriteListener(errorOnWriteListener); result.setTopologyRecoveryFilter(topologyRecoveryFilter); + result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler); return result; } @@ -1396,4 +1405,14 @@ public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) { public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) { this.topologyRecoveryFilter = topologyRecoveryFilter; } + + /** + * Set retry handler for topology recovery. + * Default is no retry. + * @param topologyRecoveryRetryHandler + * @since 4.8.0 + */ + public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) { + this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java index 093142b296..47cd582e8a 100644 --- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java @@ -19,6 +19,8 @@ import com.rabbitmq.client.RecoveryDelayHandler; import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler; import com.rabbitmq.client.SaslConfig; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.impl.recovery.RetryHandler; import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import java.util.Map; @@ -48,6 +50,7 @@ public class ConnectionParams { private ErrorOnWriteListener errorOnWriteListener; private int workPoolTimeout = -1; private TopologyRecoveryFilter topologyRecoveryFilter; + private RetryHandler topologyRecoveryRetryHandler; private ExceptionHandler exceptionHandler; private ThreadFactory threadFactory; @@ -245,4 +248,12 @@ public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFil public TopologyRecoveryFilter getTopologyRecoveryFilter() { return topologyRecoveryFilter; } + + public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) { + this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler; + } + + public RetryHandler getTopologyRecoveryRetryHandler() { + return topologyRecoveryRetryHandler; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 28d64747d1..71dd0886dc 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -90,6 +90,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC // be created after application code has initiated shutdown. private final Object recoveryLock = new Object(); + private final RetryHandler retryHandler; + public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List
addrs) { this(params, f, new ListAddressResolver(addrs)); } @@ -109,6 +111,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ? letAllPassFilter() : params.getTopologyRecoveryFilter(); + + this.retryHandler = params.getTopologyRecoveryRetryHandler(); } private void setupErrorOnWriteListenerForPotentialRecovery() { @@ -633,6 +637,10 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) { } } + void recoverChannel(AutorecoveringChannel channel) throws IOException { + channel.automaticallyRecover(this, this.delegate); + } + private void notifyRecoveryListenersComplete() { for (RecoveryListener f : Utility.copy(this.recoveryListeners)) { f.handleRecovery(this); @@ -654,16 +662,16 @@ private void recoverTopology(final ExecutorService executor) { if (executor == null) { // recover entities in serial on the main connection thread for (final RecordedExchange exchange : Utility.copy(recordedExchanges).values()) { - recoverExchange(exchange); + recoverExchange(exchange, true); } for (final Map.Entry entry : Utility.copy(recordedQueues).entrySet()) { - recoverQueue(entry.getKey(), entry.getValue()); + recoverQueue(entry.getKey(), entry.getValue(), true); } for (final RecordedBinding b : Utility.copy(recordedBindings)) { - recoverBinding(b); + recoverBinding(b, true); } for (final Map.Entry entry : Utility.copy(consumers).entrySet()) { - recoverConsumer(entry.getKey(), entry.getValue()); + recoverConsumer(entry.getKey(), entry.getValue(), true); } } else { // Support recovering entities in parallel for connections that have a lot of queues, bindings, & consumers @@ -683,11 +691,22 @@ private void recoverTopology(final ExecutorService executor) { } } - private void recoverExchange(final RecordedExchange x) { + private void recoverExchange(RecordedExchange x, boolean retry) { // recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK. try { if (topologyRecoveryFilter.filterExchange(x)) { - x.recover(); + if (retry) { + final RecordedExchange entity = x; + x = (RecordedExchange) wrapRetryIfNecessary(x, new Callable() { + @Override + public Void call() throws Exception { + entity.recover(); + return null; + } + }).getRecordedEntity(); + } else { + x.recover(); + } LOGGER.debug("{} has recovered", x); } } catch (Exception cause) { @@ -698,12 +717,23 @@ private void recoverExchange(final RecordedExchange x) { } } - private void recoverQueue(final String oldName, final RecordedQueue q) { + void recoverQueue(final String oldName, RecordedQueue q, boolean retry) { try { if (topologyRecoveryFilter.filterQueue(q)) { LOGGER.debug("Recovering {}", q); - q.recover(); + if (retry) { + final RecordedQueue entity = q; + q = (RecordedQueue) wrapRetryIfNecessary(q, new Callable() { + @Override + public Void call() throws Exception { + entity.recover(); + return null; + } + }).getRecordedEntity(); + } else { + q.recover(); + } String newName = q.getName(); if (!oldName.equals(newName)) { // make sure server-named queues are re-added with @@ -734,10 +764,21 @@ private void recoverQueue(final String oldName, final RecordedQueue q) { } } - private void recoverBinding(final RecordedBinding b) { + private void recoverBinding(RecordedBinding b, boolean retry) { try { if (this.topologyRecoveryFilter.filterBinding(b)) { - b.recover(); + if (retry) { + final RecordedBinding entity = b; + b = (RecordedBinding) wrapRetryIfNecessary(b, new Callable() { + @Override + public Void call() throws Exception { + entity.recover(); + return null; + } + }).getRecordedEntity(); + } else { + b.recover(); + } LOGGER.debug("{} has recovered", b); } } catch (Exception cause) { @@ -748,11 +789,25 @@ private void recoverBinding(final RecordedBinding b) { } } - private void recoverConsumer(final String tag, final RecordedConsumer consumer) { + private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) { try { if (this.topologyRecoveryFilter.filterConsumer(consumer)) { LOGGER.debug("Recovering {}", consumer); - String newTag = consumer.recover(); + String newTag = null; + if (retry) { + final RecordedConsumer entity = consumer; + RetryResult retryResult = wrapRetryIfNecessary(consumer, new Callable() { + @Override + public String call() throws Exception { + return entity.recover(); + } + }); + consumer = (RecordedConsumer) retryResult.getRecordedEntity(); + newTag = (String) retryResult.getResult(); + } else { + newTag = consumer.recover(); + } + // make sure server-generated tags are re-added. MK. if(tag != null && !tag.equals(newTag)) { synchronized (this.consumers) { @@ -775,6 +830,33 @@ private void recoverConsumer(final String tag, final RecordedConsumer consumer) } } + private RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable recoveryAction) throws Exception { + if (this.retryHandler == null) { + T result = recoveryAction.call(); + return new RetryResult(entity, result); + } else { + try { + T result = recoveryAction.call(); + return new RetryResult(entity, result); + } catch (Exception e) { + RetryContext retryContext = new RetryContext(entity, e, this); + RetryResult retryResult; + if (entity instanceof RecordedQueue) { + retryResult = this.retryHandler.retryQueueRecovery(retryContext); + } else if (entity instanceof RecordedExchange) { + retryResult = this.retryHandler.retryExchangeRecovery(retryContext); + } else if (entity instanceof RecordedBinding) { + retryResult = this.retryHandler.retryBindingRecovery(retryContext); + } else if (entity instanceof RecordedConsumer) { + retryResult = this.retryHandler.retryConsumerRecovery(retryContext); + } else { + throw new IllegalArgumentException("Unknown type of recorded entity: " + entity); + } + return retryResult; + } + } + } + private void propagateQueueNameChangeToBindings(String oldName, String newName) { for (RecordedBinding b : Utility.copy(this.recordedBindings)) { if (b.getDestination().equals(oldName)) { @@ -825,15 +907,15 @@ private List> groupEntitiesByChannel public void run() { for (final E entity : entityList) { if (entity instanceof RecordedExchange) { - recoverExchange((RecordedExchange)entity); + recoverExchange((RecordedExchange)entity, true); } else if (entity instanceof RecordedQueue) { final RecordedQueue q = (RecordedQueue) entity; - recoverQueue(q.getName(), q); + recoverQueue(q.getName(), q, true); } else if (entity instanceof RecordedBinding) { - recoverBinding((RecordedBinding) entity); + recoverBinding((RecordedBinding) entity, true); } else if (entity instanceof RecordedConsumer) { final RecordedConsumer c = (RecordedConsumer) entity; - recoverConsumer(c.getConsumerTag(), c); + recoverConsumer(c.getConsumerTag(), c, true); } } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java new file mode 100644 index 0000000000..572bb6fb57 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/BackoffPolicy.java @@ -0,0 +1,33 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Backoff policy for topology recovery retry attempts. + * + * @see DefaultRetryHandler + * @see TopologyRecoveryRetryHandlerBuilder + * @since 4.8.0 + */ +public interface BackoffPolicy { + + /** + * Wait depending on the current attempt number (1, 2, 3, etc) + * @param attemptNumber current attempt number + * @throws InterruptedException + */ + void backoff(int attemptNumber) throws InterruptedException; +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java new file mode 100644 index 0000000000..a2879fbfd4 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java @@ -0,0 +1,175 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Composable topology recovery retry handler. + * This retry handler implementations let the user choose the condition + * to trigger retry and the retry operation for each type of recoverable + * entities. The number of attempts and the backoff policy (time to wait + * between retries) are also configurable. + *

+ * See also {@link TopologyRecoveryRetryHandlerBuilder} to easily create + * instances and {@link TopologyRecoveryRetryLogic} for ready-to-use + * conditions and operations. + * + * @see TopologyRecoveryRetryHandlerBuilder + * @see TopologyRecoveryRetryLogic + * @since 4.8.0 + */ +public class DefaultRetryHandler implements RetryHandler { + + private final RetryCondition queueRecoveryRetryCondition; + private final RetryCondition exchangeRecoveryRetryCondition; + private final RetryCondition bindingRecoveryRetryCondition; + private final RetryCondition consumerRecoveryRetryCondition; + + private final RetryOperation queueRecoveryRetryOperation; + private final RetryOperation exchangeRecoveryRetryOperation; + private final RetryOperation bindingRecoveryRetryOperation; + private final RetryOperation consumerRecoveryRetryOperation; + + private final int retryAttempts; + + private final BackoffPolicy backoffPolicy; + + public DefaultRetryHandler(RetryCondition queueRecoveryRetryCondition, + RetryCondition exchangeRecoveryRetryCondition, + RetryCondition bindingRecoveryRetryCondition, + RetryCondition consumerRecoveryRetryCondition, + RetryOperation queueRecoveryRetryOperation, + RetryOperation exchangeRecoveryRetryOperation, + RetryOperation bindingRecoveryRetryOperation, + RetryOperation consumerRecoveryRetryOperation, int retryAttempts, BackoffPolicy backoffPolicy) { + this.queueRecoveryRetryCondition = queueRecoveryRetryCondition; + this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition; + this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition; + this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition; + this.queueRecoveryRetryOperation = queueRecoveryRetryOperation; + this.exchangeRecoveryRetryOperation = exchangeRecoveryRetryOperation; + this.bindingRecoveryRetryOperation = bindingRecoveryRetryOperation; + this.consumerRecoveryRetryOperation = consumerRecoveryRetryOperation; + this.backoffPolicy = backoffPolicy; + if (retryAttempts <= 0) { + throw new IllegalArgumentException("Number of retry attempts must be greater than 0"); + } + this.retryAttempts = retryAttempts; + } + + @Override + public RetryResult retryQueueRecovery(RetryContext context) throws Exception { + return doRetry(queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context); + } + + @Override + public RetryResult retryExchangeRecovery(RetryContext context) throws Exception { + return doRetry(exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context); + } + + @Override + public RetryResult retryBindingRecovery(RetryContext context) throws Exception { + return doRetry(bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context); + } + + @Override + public RetryResult retryConsumerRecovery(RetryContext context) throws Exception { + return doRetry(consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context); + } + + protected RetryResult doRetry(RetryCondition condition, RetryOperation operation, T entity, RetryContext context) + throws Exception { + int attempts = 0; + Exception exception = context.exception(); + while (attempts < retryAttempts) { + if (condition.test(entity, exception)) { + backoffPolicy.backoff(attempts + 1); + try { + Object result = operation.call(context); + return new RetryResult( + entity, result == null ? null : result.toString() + ); + } catch (Exception e) { + exception = e; + attempts++; + continue; + } + } else { + throw exception; + } + } + throw context.exception(); + } + + public static abstract class RetryOperation { + + public abstract T call(RetryContext context) throws Exception; + + public RetryOperation andThen(final RetryOperation after) { + return new RetryOperation() { + + @Override + public V call(RetryContext context) throws Exception { + RetryOperation.this.call(context); + return after.call(context); + } + }; + } + } + + public static abstract class RetryCondition { + + public abstract boolean test(E entity, Exception ex); + + public RetryCondition and(final RetryCondition other) { + if (other == null) { + throw new IllegalArgumentException("Condition cannot be null"); + } + return new RetryCondition() { + + @Override + public boolean test(E entity, Exception ex) { + return RetryCondition.this.test(entity, ex) && other.test(entity, ex); + } + }; + } + + public RetryCondition negate(final RetryCondition other) { + if (other == null) { + throw new IllegalArgumentException("Condition cannot be null"); + } + return new RetryCondition() { + + @Override + public boolean test(E entity, Exception ex) { + return !RetryCondition.this.test(entity, ex); + } + }; + } + + public RetryCondition or(final RetryCondition other) { + if (other == null) { + throw new IllegalArgumentException("Condition cannot be null"); + } + return new RetryCondition() { + + @Override + public boolean test(E entity, Exception ex) { + return RetryCondition.this.test(entity, ex) || other.test(entity, ex); + } + }; + } + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java new file mode 100644 index 0000000000..25e5beb5fa --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryContext.java @@ -0,0 +1,99 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * The context of a topology recovery retry operation. + * + * @since 4.8.0 + */ +public class RetryContext { + + private final RecordedEntity entity; + + private final Exception exception; + + private final AutorecoveringConnection connection; + + public RetryContext(RecordedEntity entity, Exception exception, AutorecoveringConnection connection) { + this.entity = entity; + this.exception = exception; + this.connection = connection; + } + + /** + * The underlying connection. + * + * @return + */ + public AutorecoveringConnection connection() { + return connection; + } + + /** + * The exception that triggered the retry attempt. + * + * @return + */ + public Exception exception() { + return exception; + } + + /** + * The to-be-recovered entity. + * + * @return + */ + public RecordedEntity entity() { + return entity; + } + + /** + * The to-be-recovered entity as a queue. + * + * @return + */ + public RecordedQueue queue() { + return (RecordedQueue) entity; + } + + /** + * The to-be-recovered entity as an exchange. + * + * @return + */ + public RecordedExchange exchange() { + return (RecordedExchange) entity; + } + + /** + * The to-be-recovered entity as a binding. + * + * @return + */ + public RecordedBinding binding() { + return (RecordedBinding) entity; + } + + /** + * The to-be-recovered entity as a consumer. + * + * @return + */ + public RecordedConsumer consumer() { + return (RecordedConsumer) entity; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java new file mode 100644 index 0000000000..5eca22465d --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryHandler.java @@ -0,0 +1,62 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Contract to retry failed operations during topology recovery. + * Not all operations have to be retried, it's a decision of the + * underlying implementation. + * + * @since 4.8.0 + */ +public interface RetryHandler { + + /** + * Retry a failed queue recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryQueueRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed exchange recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryExchangeRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed binding recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryBindingRecovery(RetryContext context) throws Exception; + + /** + * Retry a failed consumer recovery operation. + * + * @param context the context of the retry + * @return the result of the retry attempt + * @throws Exception if the retry fails + */ + RetryResult retryConsumerRecovery(RetryContext context) throws Exception; +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java new file mode 100644 index 0000000000..5b0beb2baa --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RetryResult.java @@ -0,0 +1,57 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * The retry of a retried topology recovery operation. + * + * @since 4.8.0 + */ +public class RetryResult { + + /** + * The entity to recover. + */ + private final RecordedEntity recordedEntity; + + /** + * The result of the recovery operation. + * E.g. a consumer tag when recovering a consumer. + */ + private final Object result; + + public RetryResult(RecordedEntity recordedEntity, Object result) { + this.recordedEntity = recordedEntity; + this.result = result; + } + + /** + * The entity to recover. + * + * @return + */ + public RecordedEntity getRecordedEntity() { + return recordedEntity; + } + + /** + * The result of the recovery operation. + * E.g. a consumer tag when recovering a consumer. + */ + public Object getResult() { + return result; + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java new file mode 100644 index 0000000000..767178300a --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java @@ -0,0 +1,166 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Builder to ease creation of {@link DefaultRetryHandler} instances. + *

+ * Just override what you need. By default, retry conditions don't trigger retry, + * retry operations are no-op, the number of retry attempts is 1, and the backoff + * policy doesn't wait at all. + * + * @see DefaultRetryHandler + * @see TopologyRecoveryRetryLogic + * @since 4.8.0 + */ +public class TopologyRecoveryRetryHandlerBuilder { + + private DefaultRetryHandler.RetryCondition queueRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() { + + @Override + public boolean test(RecordedQueue entity, Exception ex) { + return false; + } + }; + private DefaultRetryHandler.RetryCondition exchangeRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() { + + @Override + public boolean test(RecordedExchange entity, Exception ex) { + return false; + } + }; + private DefaultRetryHandler.RetryCondition bindingRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() { + + @Override + public boolean test(RecordedBinding entity, Exception ex) { + return false; + } + }; + private DefaultRetryHandler.RetryCondition consumerRecoveryRetryCondition = new DefaultRetryHandler.RetryCondition() { + + @Override + public boolean test(RecordedConsumer entity, Exception ex) { + return false; + } + }; + + private DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation = new DefaultRetryHandler.RetryOperation() { + + @Override + public Object call(RetryContext context) { + return null; + } + }; + private DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation = new DefaultRetryHandler.RetryOperation() { + + @Override + public Object call(RetryContext context) { + return null; + } + }; + private DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation = new DefaultRetryHandler.RetryOperation() { + + @Override + public Object call(RetryContext context) { + return null; + } + }; + private DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation = new DefaultRetryHandler.RetryOperation() { + + @Override + public Object call(RetryContext context) { + return null; + } + }; + + private int retryAttempts = 2; + + private BackoffPolicy backoffPolicy = new BackoffPolicy() { + + @Override + public void backoff(int attemptNumber) { + + } + }; + + public static TopologyRecoveryRetryHandlerBuilder builder() { + return new TopologyRecoveryRetryHandlerBuilder(); + } + + public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryCondition( + DefaultRetryHandler.RetryCondition queueRecoveryRetryCondition) { + this.queueRecoveryRetryCondition = queueRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryCondition( + DefaultRetryHandler.RetryCondition exchangeRecoveryRetryCondition) { + this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryCondition( + DefaultRetryHandler.RetryCondition bindingRecoveryRetryCondition) { + this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryCondition( + DefaultRetryHandler.RetryCondition consumerRecoveryRetryCondition) { + this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryOperation(DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation) { + this.queueRecoveryRetryOperation = queueRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryOperation(DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation) { + this.exchangeRecoveryRetryOperation = exchangeRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryOperation(DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation) { + this.bindingRecoveryRetryOperation = bindingRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryOperation(DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation) { + this.consumerRecoveryRetryOperation = consumerRecoveryRetryOperation; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder backoffPolicy(BackoffPolicy backoffPolicy) { + this.backoffPolicy = backoffPolicy; + return this; + } + + public TopologyRecoveryRetryHandlerBuilder retryAttempts(int retryAttempts) { + this.retryAttempts = retryAttempts; + return this; + } + + public RetryHandler build() { + return new DefaultRetryHandler( + queueRecoveryRetryCondition, exchangeRecoveryRetryCondition, + bindingRecoveryRetryCondition, consumerRecoveryRetryCondition, + queueRecoveryRetryOperation, exchangeRecoveryRetryOperation, + bindingRecoveryRetryOperation, consumerRecoveryRetryOperation, + retryAttempts, + backoffPolicy); + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java new file mode 100644 index 0000000000..dc83a24301 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java @@ -0,0 +1,109 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.ShutdownSignalException; + +/** + * Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}. + * They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}. + * + * @see DefaultRetryHandler + * @see RetryHandler + * @see TopologyRecoveryRetryHandlerBuilder + * @since 4.8.0 + */ +public abstract class TopologyRecoveryRetryLogic { + + public static final DefaultRetryHandler.RetryCondition CHANNEL_CLOSED_NOT_FOUND = new DefaultRetryHandler.RetryCondition() { + + @Override + public boolean test(RecordedEntity entity, Exception e) { + if (e.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + if (cause.getReason() instanceof AMQP.Channel.Close) { + return ((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404; + } + } + return false; + } + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CHANNEL = new DefaultRetryHandler.RetryOperation() { + + @Override + public Void call(RetryContext context) throws Exception { + if (!context.entity().getChannel().isOpen()) { + context.connection().recoverChannel(context.entity().getChannel()); + } + return null; + } + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING_QUEUE = new DefaultRetryHandler.RetryOperation() { + + @Override + public Void call(RetryContext context) { + if (context.entity() instanceof RecordedQueueBinding) { + RecordedBinding binding = context.binding(); + AutorecoveringConnection connection = context.connection(); + RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination()); + if (recordedQueue != null) { + connection.recoverQueue( + recordedQueue.getName(), recordedQueue, false + ); + } + } + return null; + } + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_BINDING = new DefaultRetryHandler.RetryOperation() { + + @Override + public Void call(RetryContext context) throws Exception { + context.binding().recover(); + return null; + } + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CONSUMER_QUEUE = new DefaultRetryHandler.RetryOperation() { + + @Override + public Void call(RetryContext context) { + if (context.entity() instanceof RecordedConsumer) { + RecordedConsumer consumer = context.consumer(); + AutorecoveringConnection connection = context.connection(); + RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue()); + if (recordedQueue != null) { + connection.recoverQueue( + recordedQueue.getName(), recordedQueue, false + ); + } + } + return null; + } + }; + + public static final DefaultRetryHandler.RetryOperation RECOVER_CONSUMER = new DefaultRetryHandler.RetryOperation() { + + @Override + public String call(RetryContext context) throws Exception { + return context.consumer().recover(); + } + }; +} diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index c63145f445..cfd050228f 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -58,7 +58,8 @@ StrictExceptionHandlerTest.class, NoAutoRecoveryWhenTcpWindowIsFullTest.class, JsonRpcTest.class, - AddressTest.class + AddressTest.class, + DefaultRetryHandlerTest.class }) public class ClientTests { diff --git a/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java b/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java new file mode 100644 index 0000000000..60745670cf --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/DefaultRetryHandlerTest.java @@ -0,0 +1,310 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.impl.recovery.BackoffPolicy; +import com.rabbitmq.client.impl.recovery.DefaultRetryHandler; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; +import com.rabbitmq.client.impl.recovery.RecordedExchange; +import com.rabbitmq.client.impl.recovery.RecordedQueue; +import com.rabbitmq.client.impl.recovery.RetryContext; +import com.rabbitmq.client.impl.recovery.RetryHandler; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.verification.VerificationMode; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.intThat; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +/** + * + */ +public class DefaultRetryHandlerTest { + + RetryHandler handler; + + @Mock + DefaultRetryHandler.RetryCondition queueRecoveryRetryCondition; + @Mock + DefaultRetryHandler.RetryCondition exchangeRecoveryRetryCondition; + @Mock + DefaultRetryHandler.RetryCondition bindingRecoveryRetryCondition; + @Mock + DefaultRetryHandler.RetryCondition consumerRecoveryRetryCondition; + + @Mock + DefaultRetryHandler.RetryOperation queueRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation exchangeRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation bindingRecoveryRetryOperation; + @Mock + DefaultRetryHandler.RetryOperation consumerRecoveryRetryOperation; + + @Mock + BackoffPolicy backoffPolicy; + + @Before + public void init() { + initMocks(this); + } + + @Test + public void shouldNotRetryWhenConditionReturnsFalse() throws Exception { + conditionsReturn(false); + handler = handler(); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryQueueRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryExchangeRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryBindingRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + assertExceptionIsThrown( + "No retry, initial exception should have been re-thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryConsumerRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + verifyConditionsInvocation(times(1)); + verifyOperationsInvocation(never()); + verify(backoffPolicy, never()).backoff(anyInt()); + } + + @Test + public void shouldReturnOperationResultInRetryResultWhenRetrying() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("queue"); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("exchange"); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("binding"); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))).thenReturn("consumer"); + handler = handler(); + assertEquals( + "queue", + handler.retryQueueRecovery(retryContext()).getResult() + ); + assertEquals( + "exchange", + handler.retryExchangeRecovery(retryContext()).getResult() + ); + assertEquals( + "binding", + handler.retryBindingRecovery(retryContext()).getResult() + ); + assertEquals( + "consumer", + handler.retryConsumerRecovery(retryContext()).getResult() + ); + verifyConditionsInvocation(times(1)); + verifyOperationsInvocation(times(1)); + verify(backoffPolicy, times(1 * 4)).backoff(1); + } + + @Test + public void shouldRetryWhenOperationFailsAndConditionIsTrue() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("queue"); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("exchange"); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("binding"); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()).thenReturn("consumer"); + handler = handler(2); + assertEquals( + "queue", + handler.retryQueueRecovery(retryContext()).getResult() + ); + assertEquals( + "exchange", + handler.retryExchangeRecovery(retryContext()).getResult() + ); + assertEquals( + "binding", + handler.retryBindingRecovery(retryContext()).getResult() + ); + assertEquals( + "consumer", + handler.retryConsumerRecovery(retryContext()).getResult() + ); + verifyConditionsInvocation(times(2)); + verifyOperationsInvocation(times(2)); + checkBackoffSequence(1, 2, 1, 2, 1, 2, 1, 2); + } + + @Test + public void shouldThrowExceptionWhenRetryAttemptsIsExceeded() throws Exception { + conditionsReturn(true); + when(queueRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(exchangeRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(bindingRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + when(consumerRecoveryRetryOperation.call(any(RetryContext.class))) + .thenThrow(new Exception()); + handler = handler(3); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryQueueRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryExchangeRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryBindingRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + assertExceptionIsThrown( + "Retry exhausted, an exception should have been thrown", + new Callable() { + + @Override + public Object call() throws Exception { + return handler.retryConsumerRecovery(DefaultRetryHandlerTest.this.retryContext()); + } + } + ); + verifyConditionsInvocation(times(3)); + verifyOperationsInvocation(times(3)); + checkBackoffSequence(1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); + } + + private void assertExceptionIsThrown(String message, Callable action) { + try { + action.call(); + fail(message); + } catch (Exception e) { + } + } + + private void conditionsReturn(boolean shouldRetry) { + when(queueRecoveryRetryCondition.test(nullable(RecordedQueue.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(exchangeRecoveryRetryCondition.test(nullable(RecordedExchange.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(bindingRecoveryRetryCondition.test(nullable(RecordedBinding.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + when(consumerRecoveryRetryCondition.test(nullable(RecordedConsumer.class), nullable(Exception.class))) + .thenReturn(shouldRetry); + } + + private void verifyConditionsInvocation(VerificationMode mode) { + verify(queueRecoveryRetryCondition, mode).test(nullable(RecordedQueue.class), any(Exception.class)); + verify(exchangeRecoveryRetryCondition, mode).test(nullable(RecordedExchange.class), any(Exception.class)); + verify(bindingRecoveryRetryCondition, mode).test(nullable(RecordedBinding.class), any(Exception.class)); + verify(consumerRecoveryRetryCondition, mode).test(nullable(RecordedConsumer.class), any(Exception.class)); + } + + private void verifyOperationsInvocation(VerificationMode mode) throws Exception { + verify(queueRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(exchangeRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(bindingRecoveryRetryOperation, mode).call(any(RetryContext.class)); + verify(consumerRecoveryRetryOperation, mode).call(any(RetryContext.class)); + } + + private RetryHandler handler() { + return handler(1); + } + + private RetryHandler handler(int retryAttempts) { + return new DefaultRetryHandler( + queueRecoveryRetryCondition, exchangeRecoveryRetryCondition, + bindingRecoveryRetryCondition, consumerRecoveryRetryCondition, + queueRecoveryRetryOperation, exchangeRecoveryRetryOperation, + bindingRecoveryRetryOperation, consumerRecoveryRetryOperation, + retryAttempts, + backoffPolicy); + } + + private RetryContext retryContext() { + return new RetryContext(null, new Exception(), null); + } + + private void checkBackoffSequence(final int... sequence) throws InterruptedException { + final AtomicInteger count = new AtomicInteger(0); + verify(backoffPolicy, times(sequence.length)) + // for some reason Mockito calls the matchers twice as many times as the target method + .backoff(intThat(new ArgumentMatcher() { + @Override + public boolean matches(Integer i) { + return i == sequence[count.getAndIncrement() % sequence.length]; + } + })); + } +} diff --git a/src/test/java/com/rabbitmq/client/test/TestUtils.java b/src/test/java/com/rabbitmq/client/test/TestUtils.java index 62159b1ecd..32e4d59707 100644 --- a/src/test/java/com/rabbitmq/client/test/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/test/TestUtils.java @@ -15,7 +15,28 @@ package com.rabbitmq.client.test; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoverableConnection; +import com.rabbitmq.client.RecoveryListener; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.impl.NetworkConnection; +import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.tools.Host; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertTrue; public class TestUtils { @@ -23,7 +44,7 @@ public class TestUtils { public static ConnectionFactory connectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); - if(USE_NIO) { + if (USE_NIO) { connectionFactory.useNio(); } else { connectionFactory.useBlockingIo(); @@ -31,4 +52,161 @@ public static ConnectionFactory connectionFactory() { return connectionFactory; } + public static void close(Connection connection) { + if (connection != null) { + try { + connection.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static boolean isVersion37orLater(Connection connection) { + String currentVersion = null; + try { + currentVersion = connection.getServerProperties().get("version").toString(); + // versions built from source: 3.7.0+rc.1.4.gedc5d96 + if (currentVersion.contains("+")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("+")); + } + // alpha (snapshot) versions: 3.7.0~alpha.449-1 + if (currentVersion.contains("~")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("~")); + } + // alpha (snapshot) versions: 3.7.1-alpha.40 + if (currentVersion.contains("-")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("-")); + } + return "0.0.0".equals(currentVersion) || versionCompare(currentVersion, "3.7.0") >= 0; + } catch (RuntimeException e) { + LoggerFactory.getLogger(TestUtils.class).warn("Unable to parse broker version {}", currentVersion, e); + throw e; + } + } + + public static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c) + throws IOException, TimeoutException, InterruptedException { + Channel ch = c.createChannel(); + try { + ch.confirmSelect(); + final CountDownLatch latch = new CountDownLatch(1); + ch.basicConsume(queue, true, new DefaultConsumer(ch) { + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + latch.countDown(); + } + }); + ch.basicPublish(exchange, routingKey, null, "".getBytes()); + ch.waitForConfirmsOrDie(5000); + return latch.await(5, TimeUnit.SECONDS); + } finally { + if (ch != null && ch.isOpen()) { + ch.close(); + } + } + } + + public static boolean resourceExists(Callable callback) throws Exception { + Channel declarePassiveChannel = null; + try { + declarePassiveChannel = callback.call(); + return true; + } catch (IOException e) { + if (e.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + if (cause.getReason() instanceof AMQP.Channel.Close) { + if (((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404) { + return false; + } else { + throw e; + } + } + return false; + } else { + throw e; + } + } finally { + if (declarePassiveChannel != null && declarePassiveChannel.isOpen()) { + declarePassiveChannel.close(); + } + } + } + + public static boolean queueExists(final String queue, final Connection connection) throws Exception { + return resourceExists(new Callable() { + @Override + public Channel call() throws Exception { + Channel channel = connection.createChannel(); + channel.queueDeclarePassive(queue); + return channel; + } + }); + } + + public static boolean exchangeExists(final String exchange, final Connection connection) throws Exception { + return resourceExists(new Callable() { + @Override + public Channel call() throws Exception { + Channel channel = connection.createChannel(); + channel.exchangeDeclarePassive(exchange); + return channel; + } + }); + } + + public static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connection); + Host.closeConnection((NetworkConnection) connection); + wait(latch); + } + + public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connection); + Host.closeAllConnections(); + wait(latch); + } + + public static CountDownLatch prepareForRecovery(Connection conn) { + final CountDownLatch latch = new CountDownLatch(1); + ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { + + @Override + public void handleRecovery(Recoverable recoverable) { + latch.countDown(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + // No-op + } + }); + return latch; + } + + private static void wait(CountDownLatch latch) throws InterruptedException { + assertTrue(latch.await(90, TimeUnit.SECONDS)); + } + + /** + * http://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java + */ + static int versionCompare(String str1, String str2) { + String[] vals1 = str1.split("\\."); + String[] vals2 = str2.split("\\."); + int i = 0; + // set index to first non-equal ordinal or length of shortest version string + while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) { + i++; + } + // compare first non-equal ordinal number + if (i < vals1.length && i < vals2.length) { + int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i])); + return Integer.signum(diff); + } + // the strings are equal or one string is a substring of the other + // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" + return Integer.signum(vals1.length - vals2.length); + } } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index a89bb16995..04cf8b04d2 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.rabbitmq.client.test.TestUtils.prepareForRecovery; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.*; @@ -62,7 +63,7 @@ public class ConnectionRecovery extends BrokerTestCase { try { assertTrue(c.isOpen()); assertEquals(connectionName, c.getClientProvidedName()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); assertEquals(connectionName, c.getClientProvidedName()); } finally { @@ -82,7 +83,7 @@ public class ConnectionRecovery extends BrokerTestCase { RecoverableConnection c = newRecoveringConnection(addresses); try { assertTrue(c.isOpen()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); } finally { c.abort(); @@ -98,7 +99,7 @@ public class ConnectionRecovery extends BrokerTestCase { RecoverableConnection c = newRecoveringConnection(addresses); try { assertTrue(c.isOpen()); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); } finally { c.abort(); @@ -157,7 +158,7 @@ public String getPassword() { assertThat(usernameRequested.get(), is(1)); assertThat(passwordRequested.get(), is(1)); - closeAndWaitForRecovery(c); + TestUtils.closeAndWaitForRecovery(c); assertTrue(c.isOpen()); // username is requested in AMQConnection#toString, so it can be accessed at any time assertThat(usernameRequested.get(), greaterThanOrEqualTo(2)); @@ -804,7 +805,7 @@ public void handleDelivery(String consumerTag, Connection testConnection = connectionFactory.newConnection(); try { assertTrue(testConnection.isOpen()); - closeAndWaitForRecovery((RecoverableConnection) testConnection); + TestUtils.closeAndWaitForRecovery((RecoverableConnection) testConnection); assertTrue(testConnection.isOpen()); } finally { connection.close(); @@ -851,7 +852,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie } } // now do recovery - closeAndWaitForRecovery(testConnection); + TestUtils.closeAndWaitForRecovery(testConnection); // verify channels & topology recovered by publishing a message to each for (int i=0; i < channelCount; i++) { @@ -935,21 +936,6 @@ private static void expectExchangeRecovery(Channel ch, String x) throws IOExcept ch.exchangeDeclarePassive(x); } - private static CountDownLatch prepareForRecovery(Connection conn) { - final CountDownLatch latch = new CountDownLatch(1); - ((AutorecoveringConnection)conn).addRecoveryListener(new RecoveryListener() { - @Override - public void handleRecovery(Recoverable recoverable) { - latch.countDown(); - } - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - // No-op - } - }); - return latch; - } - private static CountDownLatch prepareForShutdown(Connection conn) { final CountDownLatch latch = new CountDownLatch(1); conn.addShutdownListener(new ShutdownListener() { @@ -962,13 +948,7 @@ public void shutdownCompleted(ShutdownSignalException cause) { } private void closeAndWaitForRecovery() throws IOException, InterruptedException { - closeAndWaitForRecovery((AutorecoveringConnection)this.connection); - } - - private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { - CountDownLatch latch = prepareForRecovery(connection); - Host.closeConnection((NetworkConnection) connection); - wait(latch); + TestUtils.closeAndWaitForRecovery((AutorecoveringConnection)this.connection); } private void restartPrimaryAndWaitForRecovery() throws IOException, InterruptedException { diff --git a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java index aeadb303dd..fb48f29585 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java +++ b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java @@ -78,7 +78,8 @@ Nack.class, ExceptionMessages.class, Metrics.class, - TopologyRecoveryFiltering.class + TopologyRecoveryFiltering.class, + TopologyRecoveryRetry.class }) public class FunctionalTests { diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java index 803fcf3cc9..3eb9687eea 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java @@ -21,12 +21,7 @@ import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoverableConnection; -import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.client.impl.NetworkConnection; -import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; import com.rabbitmq.client.impl.recovery.RecordedBinding; import com.rabbitmq.client.impl.recovery.RecordedConsumer; import com.rabbitmq.client.impl.recovery.RecordedExchange; @@ -34,16 +29,18 @@ import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import com.rabbitmq.client.test.BrokerTestCase; import com.rabbitmq.client.test.TestUtils; -import com.rabbitmq.tools.Host; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static com.rabbitmq.client.test.TestUtils.closeAndWaitForRecovery; +import static com.rabbitmq.client.test.TestUtils.exchangeExists; +import static com.rabbitmq.client.test.TestUtils.queueExists; +import static com.rabbitmq.client.test.TestUtils.sendAndConsumeMessage; import static org.awaitility.Awaitility.waitAtMost; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertFalse; @@ -62,106 +59,6 @@ public class TopologyRecoveryFiltering extends BrokerTestCase { }; Connection c; - private static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c) - throws IOException, TimeoutException, InterruptedException { - Channel ch = c.createChannel(); - try { - ch.confirmSelect(); - final CountDownLatch latch = new CountDownLatch(1); - ch.basicConsume(queue, true, new DefaultConsumer(ch) { - - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - latch.countDown(); - } - }); - ch.basicPublish(exchange, routingKey, null, "".getBytes()); - ch.waitForConfirmsOrDie(5000); - return latch.await(5, TimeUnit.SECONDS); - } finally { - if (ch != null && ch.isOpen()) { - ch.close(); - } - } - } - - private static boolean resourceExists(Callable callback) throws Exception { - Channel declarePassiveChannel = null; - try { - declarePassiveChannel = callback.call(); - return true; - } catch (IOException e) { - if (e.getCause() instanceof ShutdownSignalException) { - ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); - if (cause.getReason() instanceof AMQP.Channel.Close) { - if (((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404) { - return false; - } else { - throw e; - } - } - return false; - } else { - throw e; - } - } finally { - if (declarePassiveChannel != null && declarePassiveChannel.isOpen()) { - declarePassiveChannel.close(); - } - } - } - - private static boolean queueExists(final String queue, final Connection connection) throws Exception { - return resourceExists(new Callable() { - - @Override - public Channel call() throws Exception { - Channel channel = connection.createChannel(); - channel.queueDeclarePassive(queue); - return channel; - } - }); - } - - private static boolean exchangeExists(final String exchange, final Connection connection) throws Exception { - return resourceExists(new Callable() { - - @Override - public Channel call() throws Exception { - Channel channel = connection.createChannel(); - channel.exchangeDeclarePassive(exchange); - return channel; - } - }); - } - - private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { - CountDownLatch latch = prepareForRecovery(connection); - Host.closeConnection((NetworkConnection) connection); - wait(latch); - } - - private static CountDownLatch prepareForRecovery(Connection conn) { - final CountDownLatch latch = new CountDownLatch(1); - ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { - - @Override - public void handleRecovery(Recoverable recoverable) { - latch.countDown(); - } - - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - // No-op - } - }); - return latch; - } - - private static void wait(CountDownLatch latch) throws InterruptedException { - assertTrue(latch.await(20, TimeUnit.SECONDS)); - } - @Override protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java new file mode 100644 index 0000000000..cd4a80d150 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java @@ -0,0 +1,74 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test.functional; + +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.impl.recovery.DefaultRetryHandler; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; +import com.rabbitmq.client.impl.recovery.RecordedEntity; +import com.rabbitmq.client.test.BrokerTestCase; +import com.rabbitmq.client.test.TestUtils; +import org.junit.Test; + +import java.util.HashMap; + +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.CHANNEL_CLOSED_NOT_FOUND; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING_QUEUE; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CHANNEL; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER; +import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER_QUEUE; +import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class TopologyRecoveryRetry extends BrokerTestCase { + + @Test + public void topologyRecoveryRetry() throws Exception { + int nbQueues = 2000; + String prefix = "topology-recovery-retry-" + System.currentTimeMillis(); + for (int i = 0; i < nbQueues; i++) { + String queue = prefix + i; + channel.queueDeclare(queue, false, false, true, new HashMap()); + channel.queueBind(queue, "amq.direct", queue); + channel.basicConsume(queue, true, new DefaultConsumer(channel)); + } + + closeAllConnectionsAndWaitForRecovery(this.connection); + + assertTrue(channel.isOpen()); + } + + @Override + protected ConnectionFactory newConnectionFactory() { + ConnectionFactory connectionFactory = TestUtils.connectionFactory(); + connectionFactory.setTopologyRecoveryRetryHandler( + builder().bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) + .consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND) + .bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING)) + .consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER))) + .build() + ); + connectionFactory.setNetworkRecoveryInterval(1000); + return connectionFactory; + } +} diff --git a/src/test/java/com/rabbitmq/tools/Host.java b/src/test/java/com/rabbitmq/tools/Host.java index 5924e5931d..c919d78621 100644 --- a/src/test/java/com/rabbitmq/tools/Host.java +++ b/src/test/java/com/rabbitmq/tools/Host.java @@ -169,6 +169,10 @@ public static void closeConnection(String pid) throws IOException { rabbitmqctl("close_connection '" + pid + "' 'Closed via rabbitmqctl'"); } + public static void closeAllConnections() throws IOException { + rabbitmqctl("close_all_connections 'Closed via rabbitmqctl'"); + } + public static void closeConnection(NetworkConnection c) throws IOException { Host.ConnectionInfo ci = findConnectionInfoFor(Host.listConnections(), c); closeConnection(ci.getPid());