Skip to content

Commit

Permalink
GH-2116: Add blocking retries to RT (#2124)
Browse files Browse the repository at this point in the history
* GH-2116: Add blocking retries to RT

Before we hardcoded a no-ops back off in the DefaultErrorHandler used in the Retryable Topics feature.
Adds a setter to let the user provide their own back off policy and configure blocking retries in conjunction with RT.

* Change DHE in LCFC to defaultFalse

With this we no longer need a no ops back off.
Some minor adjustments were needed to maintain behavior when the logic gets to DLPR.

* Change DHE in LCFC to defaultFalse

With this we no longer need a no ops back off.
Some minor adjustments were needed to maintain behavior when the logic gets to DLPR.

* Improve API and docs

Now retryable exceptions can be set directly in the lcfc class.
Improved the docs on how to combine blocking and non-blocking behaviors.
Added what's new entry for this feature.

* Improve ExceptionClassifier JavaDoc

Also add assertions to the LCFC new methods to warn the user if they already set the blocking configurations.
  • Loading branch information
tomazfernandes committed Feb 24, 2022
1 parent dad2061 commit 01549a6
Show file tree
Hide file tree
Showing 10 changed files with 717 additions and 12 deletions.
75 changes: 75 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/retrytopic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,80 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver)
----
====

[[retry-topic-combine-blocking]]
==== Combining blocking and non-blocking retries

Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.

To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
The default policy is `FixedBackOff`, with nine retries and no delay between them.
Optionally, you can provide your own back off policy.

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
----
====

If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:

====
[source, java]
----
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
----
====

NOTE: In combination with the global retryable topic's fatal exceptions classification, you can configure the framework for any behavior you'd like, such as having some exceptions trigger both blocking and non-blocking retries, trigger only one kind or the other, or go straight to the DLT without retries of any kind.

Here's an example with both configurations working together:

====
[source, java]
----
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
return ddtr;
}
----
====

In this example:

* `ShouldRetryOnlyBlockingException.class` would retry only via blocking and, if all retries fail, would go straight to the DLT.
* `ShouldRetryViaBothException.class` would retry via blocking, and if all blocking retries fail would be forwarded to the next retry topic for another set of attempts.
* `ShouldSkipBothRetriesException.class` would never be retried in any way and would go straight to the DLT if the first processing attempt failed.

IMPORTANT: Note that the blocking retries behavior is allowlist - you add the exceptions you do want to retry that way; while the non-blocking retries classification is geared towards FATAL exceptions and as such is denylist - you add the exceptions you don't want to do non-blocking retries, but to send directly to the DLT instead.

IMPORTANT: The non-blocking exception classification behavior also depends on the specific topic's configuration.

==== Topic Naming

Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
Expand Down Expand Up @@ -746,6 +820,7 @@ public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor desti
return retryTopicConfigurer;
}
----
====

[[change-kboe-logging-level]]
==== Changing KafkaBackOffException Logging Level
Expand Down
5 changes: 4 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,8 @@ See <<retry-topic-lcf>> for more information.
There's now a manageable global list of fatal exceptions that will make the failed record go straight to the DLT.
Refer to <<retry-topic-ex-classifier>> to see how to manage it.

You can now use blocking and non-blocking retries in conjunction.
See <<retry-topic-combine-blocking>> for more information.

The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level.
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
See <<change-kboe-logging-level>> if you need to change the logging level back to WARN or set it to any other level.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -180,12 +181,14 @@ public boolean removeNotRetryableException(Class<? extends Exception> exceptionT
* </ul>
* All others will be retried, unless {@link #defaultFalse()} has been called.
* @param exceptionType the exception type.
* @return true if the removal was successful.
* @return the classification of the exception if removal was successful;
* null otherwise.
* @since 2.8.4
* @see #addNotRetryableExceptions(Class...)
* @see #setClassifications(Map, boolean)
*/
public boolean removeClassification(Class<? extends Exception> exceptionType) {
@Nullable
public Boolean removeClassification(Class<? extends Exception> exceptionType) {
return this.classifier.getClassified().remove(exceptionType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiPredicate;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -126,12 +127,26 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
* @since 2.7
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records, Exception thrownException) {
return getRecoveryStrategy(records, null, thrownException);
}

/**
* Return a {@link RecoveryStrategy} to call to determine whether the first record in the
* list should be skipped.
* @param records the records.
* @param recoveryConsumer the consumer.
* @param thrownException the exception.
* @return the {@link RecoveryStrategy}.
* @since 2.8.4
*/
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records,
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) {
if (getClassifier().classify(thrownException)) {
return this.failureTracker::recovered;
}
else {
try {
this.failureTracker.getRecoverer().accept(records.get(0), thrownException);
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException);
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException));
}
catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -240,7 +240,7 @@ void clearThreadState() {
this.failures.remove();
}

BiConsumer<ConsumerRecord<?, ?>, Exception> getRecoverer() {
ConsumerAwareRecordRecoverer getRecoverer() {
return this.recoverer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.retrytopic;

import java.time.Clock;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
Expand All @@ -42,7 +43,7 @@
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.backoff.FixedBackOff;
import org.springframework.util.backoff.BackOff;

/**
*
Expand Down Expand Up @@ -81,6 +82,10 @@ public class ListenerContainerFactoryConfigurer {

private static final long LOWEST_BACKOFF_THRESHOLD = 1500L;

private BackOff providedBlockingBackOff = null;

private Class<? extends Exception>[] blockingExceptionTypes = null;

private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
};

Expand Down Expand Up @@ -158,6 +163,42 @@ public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerPr
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
}

/**
* Set a {@link BackOff} to be used with blocking retries.
* If the BackOff execution returns STOP, the record will be forwarded
* to the next retry topic or to the DLT, depending on how the non-blocking retries
* are configured.
* @param blockingBackOff the BackOff policy to be used by blocking retries.
* @since 2.8.4
* @see DefaultErrorHandler
*/
public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
Assert.notNull(blockingBackOff, "The provided BackOff cannot be null");
Assert.state(this.providedBlockingBackOff == null, () ->
"Blocking retries back off has already been set. Current: "
+ this.providedBlockingBackOff
+ " You provided: " + blockingBackOff);
this.providedBlockingBackOff = blockingBackOff;
}

/**
* Specify the exceptions to be retried via blocking.
* @param exceptionTypes the exceptions that should be retried.
* @since 2.8.4
* @see DefaultErrorHandler
*/
@SafeVarargs
@SuppressWarnings("varargs")
public final void setBlockingRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
Assert.notNull(exceptionTypes, "The exception types cannot be null");
Assert.noNullElements(exceptionTypes, "The exception types cannot have null elements");
Assert.state(this.blockingExceptionTypes == null,
() -> "Blocking retryable exceptions have already been set."
+ "Current ones: " + Arrays.toString(this.blockingExceptionTypes)
+ " You provided: " + Arrays.toString(exceptionTypes));
this.blockingExceptionTypes = exceptionTypes;
}

private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
boolean isSetContainerProperties) {
Expand Down Expand Up @@ -193,14 +234,23 @@ public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerC

protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer,
Configuration configuration) {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(0, 0));
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
errorHandler.defaultFalse();
errorHandler.setCommitRecovered(true);
errorHandler.setLogLevel(KafkaException.Level.DEBUG);
if (this.blockingExceptionTypes != null) {
errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
}
this.errorHandlerCustomizer.accept(errorHandler);
return errorHandler;
}

protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return this.providedBlockingBackOff != null
? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff)
: new DefaultErrorHandler(deadLetterPublishingRecoverer);
}

protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container,
Configuration configuration, boolean isSetContainerProperties) {
AcknowledgingConsumerAwareMessageListener<?, ?> listener = checkAndCast(container.getContainerProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package org.springframework.kafka.retrytopic;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;

Expand Down Expand Up @@ -59,13 +61,18 @@
import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Tomaz Fernandes
* @since 2.7
*/
@ExtendWith(MockitoExtension.class)
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
class ListenerContainerFactoryConfigurerTests {

@Mock
Expand Down Expand Up @@ -404,10 +411,63 @@ void shouldDecorateFactory() {
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
then(listener).should(times(1)).onMessage(data, ack, consumer);

then(this.configurerContainerCustomizer).should(times(1)).accept(container);
}

@Test
void shouldUseGivenBackOffAndExceptions() {

// given
given(container.getContainerProperties()).willReturn(containerProperties);
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
given(containerProperties.getMessageListener()).willReturn(listener);
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
BackOff backOffMock = mock(BackOff.class);
BackOffExecution backOffExecutionMock = mock(BackOffExecution.class);
given(backOffMock.start()).willReturn(backOffExecutionMock);

ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);
configurer.setBlockingRetriesBackOff(backOffMock);
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);

// when
KafkaListenerContainerFactory<?> decoratedFactory =
configurer.decorateFactory(this.containerFactory, configuration.forContainerFactoryConfigurer());
decoratedFactory.createListenerContainer(endpoint);

// then
then(backOffMock).should().start();
then(container).should().setCommonErrorHandler(errorHandlerCaptor.capture());
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
DefaultErrorHandler defaultErrorHandler = (DefaultErrorHandler) errorHandler;
assertThat(defaultErrorHandler.removeClassification(IllegalArgumentException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(IllegalStateException.class)).isTrue();
assertThat(defaultErrorHandler.removeClassification(ConversionException.class)).isNull();

}


@Test
void shouldThrowIfBackOffOrRetryablesAlreadySet() {
// given
BackOff backOff = new FixedBackOff();
ListenerContainerFactoryConfigurer configurer =
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
deadLetterPublishingRecovererFactory, clock);
configurer.setBlockingRetriesBackOff(backOff);
configurer.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);

// when / then
assertThatThrownBy(() -> configurer.setBlockingRetriesBackOff(backOff)).isInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> configurer.setBlockingRetryableExceptions(ConversionException.class, DeserializationException.class))
.isInstanceOf(IllegalStateException.class);
}


@Test
void shouldCacheFactoryInstances() {

Expand Down
Loading

0 comments on commit 01549a6

Please sign in to comment.