From bf32231ad6cb28e249610873ed45dbfa4183f030 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 27 Apr 2022 10:47:27 -0400 Subject: [PATCH] GH-1455: AdviceChain on Stream Listener Container Resolves https://github.com/spring-projects/spring-amqp/issues/1455 Add an advice chain to the stream listener container and its factory. Add a `StreamMessageRecoverer` for native stream messages. Add a retry interceptor to work with native stream messages. **cherry-pick to 2.4.x** * Add since to new setter. --- .../StreamRabbitListenerContainerFactory.java | 12 +- .../listener/StreamListenerContainer.java | 53 +++++++-- .../adapter/StreamMessageListenerAdapter.java | 5 +- .../stream/retry/StreamMessageRecoverer.java | 48 ++++++++ ...RetryOperationsInterceptorFactoryBean.java | 79 +++++++++++++ .../rabbit/stream/retry/package-info.java | 4 + .../stream/listener/RabbitListenerTests.java | 86 ++++++++++---- .../StreamListenerContainerTests.java | 105 ++++++++++++++++++ ...bstractRabbitListenerContainerFactory.java | 25 +---- .../BaseRabbitListenerContainerFactory.java | 24 +++- ...RetryOperationsInterceptorFactoryBean.java | 34 +++--- .../amqp/rabbit/retry/MessageRecoverer.java | 5 +- src/reference/asciidoc/stream.adoc | 21 ++++ 13 files changed, 427 insertions(+), 74 deletions(-) create mode 100644 spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamMessageRecoverer.java create mode 100644 spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamRetryOperationsInterceptorFactoryBean.java create mode 100644 spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/package-info.java create mode 100644 spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/StreamListenerContainerTests.java diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/StreamRabbitListenerContainerFactory.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/StreamRabbitListenerContainerFactory.java index 7d24a61f32..0eb337abfd 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/StreamRabbitListenerContainerFactory.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/StreamRabbitListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,15 @@ import java.lang.reflect.Method; +import org.aopalliance.aop.Advice; + import org.springframework.amqp.rabbit.batch.BatchingStrategy; import org.springframework.amqp.rabbit.config.BaseRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.amqp.rabbit.listener.MethodRabbitListenerEndpoint; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint; import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler; +import org.springframework.amqp.utils.JavaUtils; import org.springframework.lang.Nullable; import org.springframework.rabbit.stream.listener.ConsumerCustomizer; import org.springframework.rabbit.stream.listener.StreamListenerContainer; @@ -96,9 +99,10 @@ public StreamListenerContainer createListenerContainer(RabbitListenerEndpoint en }); } StreamListenerContainer container = createContainerInstance(); - if (this.consumerCustomizer != null) { - container.setConsumerCustomizer(this.consumerCustomizer); - } + Advice[] adviceChain = getAdviceChain(); + JavaUtils.INSTANCE + .acceptIfNotNull(this.consumerCustomizer, container::setConsumerCustomizer) + .acceptIfNotNull(adviceChain, container::setAdviceChain); applyCommonOverrides(endpoint, container); if (this.containerCustomizer != null) { this.containerCustomizer.configure(container); diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java index 4640defed7..67d51196a9 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.rabbit.stream.listener; +import org.aopalliance.aop.Advice; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -23,6 +24,8 @@ import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.aop.support.DefaultPointcutAdvisor; import org.springframework.beans.factory.BeanNameAware; import org.springframework.lang.Nullable; import org.springframework.rabbit.stream.support.StreamMessageProperties; @@ -62,6 +65,10 @@ public class StreamListenerContainer implements MessageListenerContainer, BeanNa private MessageListener messageListener; + private StreamMessageListener streamListener; + + private Advice[] adviceChain; + /** * Construct an instance using the provided environment. * @param environment the environment. @@ -154,6 +161,18 @@ public void setAutoStartup(boolean autoStart) { public boolean isAutoStartup() { return this.autoStartup; } + + /** + * Set an advice chain to apply to the listener. + * @param advices the advice chain. + * @since 2.4.5 + */ + public void setAdviceChain(Advice... advices) { + Assert.notNull(advices, "'advices' cannot be null"); + Assert.noNullElements(advices, "'advices' cannot have null elements"); + this.adviceChain = advices; + } + @Override @Nullable public Object getMessageListener() { @@ -183,26 +202,46 @@ public synchronized void stop() { @Override public void setupMessageListener(MessageListener messageListener) { - this.messageListener = messageListener; + adviseIfNeeded(messageListener); this.builder.messageHandler((context, message) -> { - if (messageListener instanceof StreamMessageListener) { - ((StreamMessageListener) messageListener).onStreamMessage(message, context); + if (this.streamListener != null) { + this.streamListener.onStreamMessage(message, context); } else { Message message2 = this.streamConverter.toMessage(message, new StreamMessageProperties(context)); - if (messageListener instanceof ChannelAwareMessageListener) { + if (this.messageListener instanceof ChannelAwareMessageListener) { try { - ((ChannelAwareMessageListener) messageListener).onMessage(message2, null); + ((ChannelAwareMessageListener) this.messageListener).onMessage(message2, null); } catch (Exception e) { // NOSONAR this.logger.error("Listner threw an exception", e); } } else { - messageListener.onMessage(message2); + this.messageListener.onMessage(message2); } } }); } + private void adviseIfNeeded(MessageListener messageListener) { + this.messageListener = messageListener; + if (messageListener instanceof StreamMessageListener) { + this.streamListener = (StreamMessageListener) messageListener; + } + if (this.adviceChain != null && this.adviceChain.length > 0) { + ProxyFactory factory = new ProxyFactory(messageListener); + for (Advice advice : this.adviceChain) { + factory.addAdvisor(new DefaultPointcutAdvisor(advice)); + } + factory.setInterfaces(messageListener.getClass().getInterfaces()); + if (this.streamListener != null) { + this.streamListener = (StreamMessageListener) factory.getProxy(getClass().getClassLoader()); + } + else { + this.messageListener = (MessageListener) factory.getProxy(getClass().getClassLoader()); + } + } + } + } diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/adapter/StreamMessageListenerAdapter.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/adapter/StreamMessageListenerAdapter.java index 3db6b3a3fe..359f3c6559 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/adapter/StreamMessageListenerAdapter.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/adapter/StreamMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.springframework.amqp.rabbit.listener.adapter.InvocationResult; import org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler; +import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.rabbit.stream.listener.StreamMessageListener; import com.rabbitmq.stream.Message; @@ -60,7 +61,7 @@ public void onStreamMessage(Message message, Context context) { } } catch (Exception ex) { - this.logger.error("Failed to invoke listener", ex); + throw new ListenerExecutionFailedException("Failed to invoke listener", ex); } } diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamMessageRecoverer.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamMessageRecoverer.java new file mode 100644 index 0000000000..222a5a215c --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamMessageRecoverer.java @@ -0,0 +1,48 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.retry; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; + +import com.rabbitmq.stream.MessageHandler.Context; + +/** + * Implementations of this interface can handle failed messages after retries are + * exhausted. + * + * @author Gary Russell + * @since 2.4.5 + * + */ +@FunctionalInterface +public interface StreamMessageRecoverer extends MessageRecoverer { + + @Override + default void recover(Message message, Throwable cause) { + } + + /** + * Callback for message that was consumed but failed all retry attempts. + * + * @param message the message to recover. + * @param context the context. + * @param cause the cause of the error. + */ + void recover(com.rabbitmq.stream.Message message, Context context, Throwable cause); + +} diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamRetryOperationsInterceptorFactoryBean.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamRetryOperationsInterceptorFactoryBean.java new file mode 100644 index 0000000000..e4960b4bb8 --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/StreamRetryOperationsInterceptorFactoryBean.java @@ -0,0 +1,79 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.retry; + +import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.rabbit.stream.listener.StreamListenerContainer; +import org.springframework.retry.RetryOperations; +import org.springframework.retry.interceptor.MethodInvocationRecoverer; +import org.springframework.retry.support.RetryTemplate; + +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler.Context; + +/** + * Convenient factory bean for creating a stateless retry interceptor for use in a + * {@link StreamListenerContainer} when consuming native stream messages, giving you a + * large amount of control over the behavior of a container when a listener fails. To + * control the number of retry attempt or the backoff in between attempts, supply a + * customized {@link RetryTemplate}. Stateless retry is appropriate if your listener can + * be called repeatedly between failures with no side effects. The semantics of stateless + * retry mean that a listener exception is not propagated to the container until the retry + * attempts are exhausted. When the retry attempts are exhausted it can be processed using + * a {@link StreamMessageRecoverer} if one is provided. + * + * @author Gary Russell + * + * @see RetryOperations#execute(org.springframework.retry.RetryCallback,org.springframework.retry.RecoveryCallback) + */ +public class StreamRetryOperationsInterceptorFactoryBean extends StatelessRetryOperationsInterceptorFactoryBean { + + @Override + protected MethodInvocationRecoverer createRecoverer() { + return (args, cause) -> { + StreamMessageRecoverer messageRecoverer = (StreamMessageRecoverer) getMessageRecoverer(); + Object arg = args[0]; + if (arg instanceof org.springframework.amqp.core.Message) { + return super.recover(args, cause); + } + else { + if (messageRecoverer == null) { + this.logger.warn("Message(s) dropped on recovery: " + arg, cause); + } + else { + messageRecoverer.recover((Message) arg, (Context) args[1], cause); + } + return null; + } + }; + } + + /** + * Set a {@link StreamMessageRecoverer} to call when retries are exhausted. + * @param messageRecoverer the recoverer. + */ + public void setStreamMessageRecoverer(StreamMessageRecoverer messageRecoverer) { + super.setMessageRecoverer(messageRecoverer); + } + + @Override + public void setMessageRecoverer(MessageRecoverer messageRecoverer) { + throw new UnsupportedOperationException("Use setStreamMessageRecoverer() instead"); + } + +} diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/package-info.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/package-info.java new file mode 100644 index 0000000000..cabb5d622f --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/retry/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes supporting retries. + */ +package org.springframework.rabbit.stream.retry; diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java index 2c7352cb89..b0f914d7ba 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; @@ -30,6 +31,7 @@ import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -38,9 +40,12 @@ import org.springframework.context.SmartLifecycle; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; +import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean; import org.springframework.rabbit.stream.support.StreamMessageProperties; +import org.springframework.retry.interceptor.RetryOperationsInterceptor; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -64,15 +69,6 @@ public class RabbitListenerTests extends AbstractIntegrationTests { @Autowired Config config; -// @AfterAll - causes test to throw errors - need to investigate - static void deleteQueues() { - try (Environment environment = Config.environment()) { - environment.deleteStream("test.stream.queue1"); - environment.deleteStream("test.stream.queue2"); - environment.deleteStream("stream.created.over.amqp"); - } - } - @Test void simple(@Autowired RabbitStreamTemplate template) throws Exception { Future future = template.convertAndSend("foo"); @@ -87,8 +83,8 @@ void simple(@Autowired RabbitStreamTemplate template) throws Exception { future = template.convertAndSend("bar", msg -> null); assertThat(future.get(10, TimeUnit.SECONDS)).isFalse(); assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(this.config.received).containsExactly("foo", "bar", "baz", "qux"); - assertThat(this.config.id).isEqualTo("test"); + assertThat(this.config.received).containsExactly("foo", "foo", "bar", "baz", "qux"); + assertThat(this.config.id).isEqualTo("testNative"); } @Test @@ -97,6 +93,8 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException { assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.receivedNative).isNotNull(); assertThat(this.config.context).isNotNull(); + assertThat(this.config.latch3.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue(); } @Test @@ -110,12 +108,18 @@ void queueOverAmqp() throws Exception { @EnableRabbit public static class Config { - final CountDownLatch latch1 = new CountDownLatch(4); + final CountDownLatch latch1 = new CountDownLatch(5); final CountDownLatch latch2 = new CountDownLatch(1); + final CountDownLatch latch3 = new CountDownLatch(3); + + final CountDownLatch latch4 = new CountDownLatch(1); + final List received = new ArrayList<>(); + final AtomicBoolean first = new AtomicBoolean(true); + volatile Message receivedNative; volatile Context context; @@ -133,12 +137,23 @@ static Environment environment() { SmartLifecycle creator(Environment env) { return new SmartLifecycle() { + boolean running; + @Override public void stop() { + clean(env); + this.running = false; } @Override public void start() { + clean(env); + env.streamCreator().stream("test.stream.queue1").create(); + env.streamCreator().stream("test.stream.queue2").create(); + this.running = true; + } + + private void clean(Environment env) { try { env.deleteStream("test.stream.queue1"); } @@ -149,42 +164,65 @@ public void start() { } catch (Exception e) { } - env.streamCreator().stream("test.stream.queue1").create(); - env.streamCreator().stream("test.stream.queue2").create(); + try { + env.deleteStream("stream.created.over.amqp"); + } + catch (Exception e) { + } } @Override public boolean isRunning() { - return false; + return this.running; } }; } @Bean RabbitListenerContainerFactory rabbitListenerContainerFactory(Environment env) { - return new StreamRabbitListenerContainerFactory(env); + StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env); + factory.setAdviceChain(RetryInterceptorBuilder.stateless().build()); + return factory; } @RabbitListener(queues = "test.stream.queue1") void listen(String in) { this.received.add(in); this.latch1.countDown(); + if (first.getAndSet(false)) { + throw new RuntimeException("fail first"); + } } @Bean - RabbitListenerContainerFactory nativeFactory(Environment env) { + public StreamRetryOperationsInterceptorFactoryBean sfb() { + StreamRetryOperationsInterceptorFactoryBean rfb = new StreamRetryOperationsInterceptorFactoryBean(); + rfb.setStreamMessageRecoverer((msg, context, throwable) -> { + this.latch4.countDown(); + }); + return rfb; + } + + @Bean + @DependsOn("sfb") + RabbitListenerContainerFactory nativeFactory(Environment env, + RetryOperationsInterceptor retry) { + StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env); factory.setNativeListener(true); factory.setConsumerCustomizer((id, builder) -> { - builder.name("myConsumer") + builder.name(id) .offset(OffsetSpecification.first()) .manualTrackingStrategy(); - this.id = id; + if (id.equals("testNative")) { + this.id = id; + } }); + factory.setAdviceChain(retry); return factory; } - @RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory") + @RabbitListener(id = "testNative", queues = "test.stream.queue2", containerFactory = "nativeFactory") void nativeMsg(Message in, Context context) { this.receivedNative = in; this.context = context; @@ -192,6 +230,12 @@ void nativeMsg(Message in, Context context) { context.storeOffset(); } + @RabbitListener(id = "testNativeFail", queues = "test.stream.queue2", containerFactory = "nativeFactory") + void nativeMsgFail(Message in, Context context) { + this.latch3.countDown(); + throw new RuntimeException("fail all"); + } + @Bean CachingConnectionFactory cf() { return new CachingConnectionFactory("localhost", amqpPort()); diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/StreamListenerContainerTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/StreamListenerContainerTests.java new file mode 100644 index 0000000000..3c681f62cc --- /dev/null +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/StreamListenerContainerTests.java @@ -0,0 +1,105 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.aopalliance.intercept.MethodInterceptor; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.MessageListener; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; + +import com.rabbitmq.stream.ConsumerBuilder; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.MessageHandler.Context; + +/** + * @author Gary Russell + * @since 2.4.5 + * + */ +public class StreamListenerContainerTests { + + @Test + void testAdviceChain() throws Exception { + Environment env = mock(Environment.class); + ConsumerBuilder builder = mock(ConsumerBuilder.class); + given(env.consumerBuilder()).willReturn(builder); + AtomicReference handler = new AtomicReference<>(); + willAnswer(inv -> { + handler.set(inv.getArgument(0)); + return null; + } + ).given(builder).messageHandler(any()); + AtomicBoolean advised = new AtomicBoolean(); + MethodInterceptor advice = (inv) -> { + advised.set(true); + return inv.proceed(); + }; + + StreamListenerContainer container = new StreamListenerContainer(env); + container.setAdviceChain(advice); + AtomicBoolean called = new AtomicBoolean(); + MessageListener ml = mock(MessageListener.class); + willAnswer(inv -> { + called.set(true); + return null; + }).given(ml).onMessage(any()); + container.setupMessageListener(ml); + Message message = mock(Message.class); + given(message.getBodyAsBinary()).willReturn("foo".getBytes()); + Context context = mock(Context.class); + handler.get().handle(context, message); + assertThat(advised.get()).isTrue(); + assertThat(called.get()).isTrue(); + + advised.set(false); + called.set(false); + ChannelAwareMessageListener cal = mock(ChannelAwareMessageListener.class); + willAnswer(inv -> { + called.set(true); + return null; + }).given(cal).onMessage(any(), isNull()); + container.setupMessageListener(cal); + handler.get().handle(context, message); + assertThat(advised.get()).isTrue(); + assertThat(called.get()).isTrue(); + + called.set(false); + StreamMessageListener sml = mock(StreamMessageListener.class); + willAnswer(inv -> { + called.set(true); + return null; + }).given(sml).onStreamMessage(message, context); + container.setupMessageListener(sml); + handler.get().handle(context, message); + assertThat(advised.get()).isTrue(); + assertThat(called.get()).isTrue(); + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java index d94ad2a2dc..193ea80de8 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,6 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.lang.Nullable; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.Assert; import org.springframework.util.ErrorHandler; @@ -86,8 +85,6 @@ public abstract class AbstractRabbitListenerContainerFactory recoveryCallback; + private Advice[] adviceChain; + @Override public abstract C createListenerContainer(RabbitListenerEndpoint endpoint); @@ -129,4 +134,21 @@ protected void applyCommonOverrides(@Nullable RabbitListenerEndpoint endpoint, C } } + /** + * @return the advice chain that was set. Defaults to {@code null}. + * @since 1.7.4 + */ + @Nullable + public Advice[] getAdviceChain() { + return this.adviceChain == null ? null : Arrays.copyOf(this.adviceChain, this.adviceChain.length); + } + + /** + * @param adviceChain the advice chain to set. + * @see AbstractMessageListenerContainer#setAdviceChain + */ + public void setAdviceChain(Advice... adviceChain) { + this.adviceChain = adviceChain == null ? null : Arrays.copyOf(adviceChain, adviceChain.length); + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java index eb6c6b6332..4530e6f9c6 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatelessRetryOperationsInterceptorFactoryBean.java @@ -46,7 +46,7 @@ */ public class StatelessRetryOperationsInterceptorFactoryBean extends AbstractRetryOperationsInterceptorFactoryBean { - private static Log logger = LogFactory.getLog(StatelessRetryOperationsInterceptorFactoryBean.class); + protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR @Override public RetryOperationsInterceptor getObject() { @@ -63,21 +63,23 @@ public RetryOperationsInterceptor getObject() { } @SuppressWarnings("unchecked") - private MethodInvocationRecoverer createRecoverer() { - return (args, cause) -> { - MessageRecoverer messageRecoverer = getMessageRecoverer(); - Object arg = args[1]; - if (messageRecoverer == null) { - logger.warn("Message(s) dropped on recovery: " + arg, cause); - } - else if (arg instanceof Message) { - messageRecoverer.recover((Message) arg, cause); - } - else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) { - ((MessageBatchRecoverer) messageRecoverer).recover((List) arg, cause); - } - return null; - }; + protected MethodInvocationRecoverer createRecoverer() { + return this::recover; + } + + protected Object recover(Object[] args, Throwable cause) { + MessageRecoverer messageRecoverer = getMessageRecoverer(); + Object arg = args[1]; + if (messageRecoverer == null) { + this.logger.warn("Message(s) dropped on recovery: " + arg, cause); + } + else if (arg instanceof Message) { + messageRecoverer.recover((Message) arg, cause); + } + else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) { + ((MessageBatchRecoverer) messageRecoverer).recover((List) arg, cause); + } + return null; } @Override diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java index 9c547b9696..61859705df 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/MessageRecoverer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,9 @@ import org.springframework.amqp.core.Message; /** + * Implementations of this interface can handle failed messages after retries are + * exhausted. + * * @author Dave Syer * @author Gary Russell * diff --git a/src/reference/asciidoc/stream.adoc b/src/reference/asciidoc/stream.adoc index 11f836d8a0..5e24395bee 100644 --- a/src/reference/asciidoc/stream.adoc +++ b/src/reference/asciidoc/stream.adoc @@ -136,3 +136,24 @@ void nativeMsg(Message in, Context context) { } ---- ==== + +Version 2.4.5 added the `adviceChain` property to the `StreamListenerContainer` (and its factory). +A new factory bean is also provided to create a stateless retry interceptor with an optional `StreamMessageRecoverer` for use when consuming raw stream messages. + +==== +[source, java] +---- +@Bean +public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) { + StreamRetryOperationsInterceptorFactoryBean rfb = + new StreamRetryOperationsInterceptorFactoryBean(); + rfb.setRetryOperations(retryTemplate); + rfb.setStreamMessageRecoverer((msg, context, throwable) -> { + ... + }); + return rfb; +} +---- +==== + +IMPORTANT: Stateful retry is not supported with this container.