diff --git a/api/revapi.json b/api/revapi.json index 7bc3bce7fb..758ec827d3 100644 --- a/api/revapi.json +++ b/api/revapi.json @@ -27,7 +27,18 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "code": "java.method.addedToInterface", + "new": "method io.smallrye.reactive.messaging.PausableChannel io.smallrye.reactive.messaging.ChannelRegistry::getPausable(java.lang.String)", + "justification": "Added PausableChannel to the ChannelRegistry interface to allow pausing and resuming channels." + }, + { + "code": "java.method.addedToInterface", + "new": "method void io.smallrye.reactive.messaging.ChannelRegistry::register(java.lang.String, io.smallrye.reactive.messaging.PausableChannel)", + "justification": "Added PausableChannel to the ChannelRegistry interface to allow pausing and resuming channels." + } + ] } }, { "extension" : "revapi.reporter.json", @@ -46,4 +57,4 @@ "minCriticality" : "documented", "output" : "out" } -} ] \ No newline at end of file +} ] diff --git a/api/src/main/java/io/smallrye/reactive/messaging/ChannelRegistry.java b/api/src/main/java/io/smallrye/reactive/messaging/ChannelRegistry.java index 1f0a7a78e2..5f81453215 100644 --- a/api/src/main/java/io/smallrye/reactive/messaging/ChannelRegistry.java +++ b/api/src/main/java/io/smallrye/reactive/messaging/ChannelRegistry.java @@ -43,4 +43,8 @@ Subscriber> register(String name, Map getOutgoingChannels(); + void register(String name, PausableChannel pausable); + + PausableChannel getPausable(String name); + } diff --git a/api/src/main/java/io/smallrye/reactive/messaging/PausableChannel.java b/api/src/main/java/io/smallrye/reactive/messaging/PausableChannel.java new file mode 100644 index 0000000000..7ad674f953 --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/PausableChannel.java @@ -0,0 +1,24 @@ +package io.smallrye.reactive.messaging; + +/** + * A channel that can be paused and resumed. + */ +public interface PausableChannel { + + /** + * Checks whether the channel is paused. + * + * @return {@code true} if the channel is paused, {@code false} otherwise + */ + boolean isPaused(); + + /** + * Pauses the channel. + */ + void pause(); + + /** + * Resumes the channel. + */ + void resume(); +} diff --git a/api/src/main/java/io/smallrye/reactive/messaging/PausableChannelConfiguration.java b/api/src/main/java/io/smallrye/reactive/messaging/PausableChannelConfiguration.java new file mode 100644 index 0000000000..4d4e887131 --- /dev/null +++ b/api/src/main/java/io/smallrye/reactive/messaging/PausableChannelConfiguration.java @@ -0,0 +1,28 @@ +package io.smallrye.reactive.messaging; + +/** + * A channel that can be paused and resumed. + */ +public interface PausableChannelConfiguration { + + /** + * The name of the property to configure whether the channel is pausable. + */ + String PAUSABLE_PROPERTY = "pausable"; + + /** + * The name of the property to configure whether the channel is initially paused. + */ + String PAUSED_PROPERTY = "initially-paused"; + + /** + * The name of the channel. + */ + String name(); + + /** + * Whether the channel is paused at subscribe time. + */ + boolean initiallyPaused(); + +} diff --git a/documentation/mkdocs.yml b/documentation/mkdocs.yml index dc5cd75905..136676c3cd 100644 --- a/documentation/mkdocs.yml +++ b/documentation/mkdocs.yml @@ -34,6 +34,7 @@ nav: - 'Message Context' : concepts/message-context.md - 'Metadata Injection': concepts/incoming-metadata-injection.md - 'Generic Payloads': concepts/generic-payloads.md + - 'Pausable Channels': concepts/pausable-channels.md - Kafka: - kafka/kafka.md diff --git a/documentation/src/main/docs/concepts/pausable-channels.md b/documentation/src/main/docs/concepts/pausable-channels.md new file mode 100644 index 0000000000..8f9326553d --- /dev/null +++ b/documentation/src/main/docs/concepts/pausable-channels.md @@ -0,0 +1,34 @@ +# Pausable Channels + +Based on reactive streams, Smallrye Reactive Messaging ensures that channels are back-pressured. +This means that the flow of messages is controlled by the downstream consumer, +whether it is a processing method or outgoing channel. +Sometimes you may want to pause the flow of messages, for example, when the consumer is not ready to process them. + +Injected [`@Channel`](emitter.md#retrieving-channels) streams are not subscribed to by default, so the flow of messages is controlled by the application. +But for [`@Incoming`](model.md#incoming-and-outgoing) methods, the flow of messages is controlled by the runtime. + +Pausable channels are useful when you want to control the flow of messages within your application. + +## Creating a Pausable Channel + +To use pausable channels, you need to activate it with the configuration property `pausable` set to `true`. + +```properties +mp.messaging.incoming.my-channel.pausable=true +# optional, by default the channel is NOT paused initially +mp.messaging.outgoing.my-channel.initially-paused=true +``` + +## Controlling the flow of messages + +If a channel is configured to be pausable, +you can get the `PausableChannel` by channel name from the `ChannelRegistry` programmatically, +and pause or resume the channel as needed: + +``` java +{{ insert('pausable/PausableController.java') }} +``` + +!!!warning + Pausable channels only work with back-pressure aware subscribers, with bounded downstream requests. diff --git a/documentation/src/main/java/pausable/PausableController.java b/documentation/src/main/java/pausable/PausableController.java new file mode 100644 index 0000000000..2850c244c0 --- /dev/null +++ b/documentation/src/main/java/pausable/PausableController.java @@ -0,0 +1,39 @@ +package pausable; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.PausableChannel; + +@ApplicationScoped +public class PausableController { + + @Inject + ChannelRegistry registry; + + @PostConstruct + public void resume() { + // Wait for the application to be ready + // Retrieve the pausable channel + PausableChannel pausable = registry.getPausable("my-channel"); + // Pause the processing of the messages + pausable.resume(); + } + + public void pause() { + // Retrieve the pausable channel + PausableChannel pausable = registry.getPausable("my-channel"); + // Pause the processing of the messages + pausable.pause(); + } + + @Incoming("my-channel") + void process(String message) { + // Process the message + } + +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/PausableChannelTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/PausableChannelTest.java new file mode 100644 index 0000000000..5db043a809 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/PausableChannelTest.java @@ -0,0 +1,191 @@ +package io.smallrye.reactive.messaging.kafka; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.LongAdder; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.PausableChannel; +import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; +import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; + +public class PausableChannelTest extends KafkaCompanionTestBase { + + public static final int COUNT = 100; + + private KafkaMapBasedConfig commonConfig() { + return kafkaConfig("mp.messaging.outgoing.out") + .with("topic", topic) + .put("key.serializer", StringSerializer.class.getName()) + .put("value.serializer", IntegerSerializer.class.getName()) + .withPrefix("mp.messaging.incoming.data") + .put("topic", topic) + .put("cloud-events", false) + .put("commit-strategy", "throttled") + .put("auto.offset.reset", "earliest") + .put("key.deserializer", StringDeserializer.class.getName()) + .put("value.deserializer", IntegerDeserializer.class.getName()); + } + + @Test + public void testPausableChannelInitiallyPaused() { + addBeans(MyMessageProducer.class); + ConsumerApp application = runApplication(commonConfig() + .with("pausable", true) + .with("initially-paused", true), ConsumerApp.class); + ChannelRegistry pausableChannels = get(ChannelRegistry.class); + PausableChannel pauser = pausableChannels.getPausable("data"); + + long firstStep = COUNT / 10; + long secondStep = COUNT / 5; + long finalStep = COUNT; + await().pollDelay(1, SECONDS).untilAsserted(() -> assertThat(application.getCount()).isEqualTo(0L)); + assertThat(pauser.isPaused()).isTrue(); + pauser.resume(); + await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(firstStep)); + pauser.pause(); + await().untilAsserted(() -> assertThat(application.getCount()).isBetween(firstStep, finalStep)); + assertThat(pauser.isPaused()).isTrue(); + pauser.resume(); + await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(secondStep)); + pauser.pause(); + await().untilAsserted(() -> assertThat(application.getCount()).isBetween(secondStep, finalStep)); + assertThat(pauser.isPaused()).isTrue(); + pauser.resume(); + await().untilAsserted(() -> assertThat(application.getCount()).isEqualTo(COUNT)); + } + + @Test + public void testPausableChannel() { + addBeans(MyMessageProducer.class); + ConsumerApp application = runApplication(commonConfig() + .with("pausable", true), ConsumerApp.class); + ChannelRegistry pausableChannels = get(ChannelRegistry.class); + PausableChannel pauser = pausableChannels.getPausable("data"); + + long firstStep = COUNT / 10; + long secondStep = COUNT / 5; + long finalStep = COUNT; + await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(firstStep)); + assertThat(pauser.isPaused()).isFalse(); + pauser.pause(); + await().untilAsserted(() -> assertThat(application.getCount()).isBetween(firstStep, finalStep)); + pauser.resume(); + await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(secondStep)); + pauser.pause(); + await().untilAsserted(() -> assertThat(application.getCount()).isBetween(secondStep, finalStep)); + pauser.resume(); + await().untilAsserted(() -> assertThat(application.getCount()).isEqualTo(COUNT)); + } + + @Test + public void testPausableChannelWithPauser() { + addBeans(MyMessageProducer.class); + ConsumerAppWithPauser application = runApplication(commonConfig() + .with("pausable", true), ConsumerAppWithPauser.class); + ChannelRegistry pausableChannels = get(ChannelRegistry.class); + PausableChannel pauser = pausableChannels.getPausable("data"); + + assertThat(pauser.isPaused()).isFalse(); + await().untilAsserted(() -> { + if (pauser.isPaused()) { + pauser.resume(); + } + assertThat(application.getCount()).isEqualTo(COUNT); + }); + assertThat(application.getPaused()).isEqualTo(5); + } + + @ApplicationScoped + public static class ConsumerApp { + + LongAdder count = new LongAdder(); + List list = new CopyOnWriteArrayList<>(); + + @Incoming("data") + @Blocking + public void consume(Integer message) throws InterruptedException { + list.add(message); + count.increment(); + Thread.sleep(50); + } + + public List get() { + return list; + } + + public long getCount() { + return count.longValue(); + } + } + + @ApplicationScoped + public static class ConsumerAppWithPauser { + + @Inject + ChannelRegistry registry; + + LongAdder count = new LongAdder(); + LongAdder paused = new LongAdder(); + List list = new CopyOnWriteArrayList<>(); + + @Incoming("data") + @Blocking + public void consume(Integer message) throws InterruptedException { + list.add(message); + count.increment(); + if (count.longValue() % 20 == 0) { + PausableChannel data = registry.getPausable("data"); + data.pause(); + paused.increment(); + } + } + + public List get() { + return list; + } + + public long getCount() { + return count.longValue(); + } + + public long getPaused() { + return paused.longValue(); + } + } + + @ApplicationScoped + public static class MyMessageProducer { + + List produced = new CopyOnWriteArrayList<>(); + + @Outgoing("out") + public Multi> generate() { + return Multi.createFrom().range(0, COUNT) + .map(i -> Message.of(i, () -> { + produced.add(i); + return CompletableFuture.completedFuture(null); + })); + } + } + +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java index eaa66a66b2..d0ba05eae2 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java @@ -41,6 +41,7 @@ import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl; import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator; import io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator; +import io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator; import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension; import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; @@ -112,6 +113,7 @@ public void initWeld() { weld.addBeanClass(ContextDecorator.class); weld.addBeanClass(ObservationDecorator.class); weld.addBeanClass(OutgoingObservationDecorator.class); + weld.addBeanClass(PausableChannelDecorator.class); weld.disableDiscovery(); } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/DefaultPausableChannelConfiguration.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/DefaultPausableChannelConfiguration.java new file mode 100644 index 0000000000..ca5fb66ef9 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/DefaultPausableChannelConfiguration.java @@ -0,0 +1,52 @@ +package io.smallrye.reactive.messaging.providers; + +import java.util.Objects; + +import io.smallrye.reactive.messaging.PausableChannelConfiguration; + +/** + * Default implementation of {@link PausableChannelConfiguration}. + */ +public class DefaultPausableChannelConfiguration implements PausableChannelConfiguration { + + private final String name; + private final boolean initiallyPaused; + + public DefaultPausableChannelConfiguration(String name, boolean initiallyPaused) { + this.name = name; + this.initiallyPaused = initiallyPaused; + } + + @Override + public String name() { + return name; + } + + @Override + public boolean initiallyPaused() { + return initiallyPaused; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (!(o instanceof DefaultPausableChannelConfiguration)) + return false; + DefaultPausableChannelConfiguration that = (DefaultPausableChannelConfiguration) o; + return initiallyPaused == that.initiallyPaused && Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, initiallyPaused); + } + + @Override + public String toString() { + return "DefaultPausableChannelConfiguration{" + + "name='" + name + '\'' + + ", initialPaused=" + initiallyPaused + + '}'; + } +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/PausableChannelDecorator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/PausableChannelDecorator.java new file mode 100644 index 0000000000..dac8471d9a --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/PausableChannelDecorator.java @@ -0,0 +1,51 @@ +package io.smallrye.reactive.messaging.providers.extension; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.PausableChannelConfiguration; +import io.smallrye.reactive.messaging.PublisherDecorator; +import io.smallrye.reactive.messaging.SubscriberDecorator; +import io.smallrye.reactive.messaging.providers.helpers.PausableMulti; + +@ApplicationScoped +public class PausableChannelDecorator implements PublisherDecorator, SubscriberDecorator { + + @Inject + ChannelRegistry registry; + + private final Map configurations = new HashMap<>(); + + @Override + public Multi> decorate(Multi> publisher, List channelName, + boolean isConnector) { + String channel = channelName.get(0); + if (isConnector && configurations.containsKey(channel)) { + PausableChannelConfiguration configuration = configurations.get(channel); + PausableMulti> pausable = new PausableMulti<>(publisher, configuration.initiallyPaused()); + for (String name : channelName) { + registry.register(name, pausable); + } + return pausable; + } + return publisher; + } + + @Override + public int getPriority() { + return PublisherDecorator.super.getPriority(); + } + + public void addConfiguration(PausableChannelConfiguration configuration) { + configurations.put(configuration.name(), configuration); + } + +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/PausableMulti.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/PausableMulti.java new file mode 100644 index 0000000000..25b8ba50cd --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/PausableMulti.java @@ -0,0 +1,113 @@ +package io.smallrye.reactive.messaging.providers.helpers; + +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.operators.MultiOperator; +import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor; +import io.smallrye.mutiny.subscription.MultiSubscriber; +import io.smallrye.reactive.messaging.PausableChannel; + +public class PausableMulti extends MultiOperator implements PausableChannel { + + private volatile boolean paused; + volatile PausableProcessor processor; + private final ReentrantLock lock = new ReentrantLock(); + + public PausableMulti(Multi upstream, boolean paused) { + super(upstream); + this.paused = paused; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + processor = new PausableProcessor(subscriber); + upstream().subscribe(processor); + } + + @Override + public boolean isPaused() { + return paused; + } + + @Override + public void pause() { + lock.lock(); + try { + paused = true; + } finally { + lock.unlock(); + } + } + + @Override + public void resume() { + lock.lock(); + try { + if (paused) { + PausableProcessor p = processor; + if (p != null) { + paused = false; + p.resume(); + } + } + } finally { + lock.unlock(); + } + } + + private class PausableProcessor extends MultiOperatorProcessor { + + private final AtomicLong demand = new AtomicLong(); + + PausableProcessor(MultiSubscriber downstream) { + super(downstream); + } + + void resume() { + Flow.Subscription subscription = getUpstreamSubscription(); + if (subscription == Subscriptions.CANCELLED) { + return; + } + long currentDemand = demand.get(); + if (currentDemand > 0) { + Subscriptions.produced(demand, currentDemand); + subscription.request(currentDemand); + } + } + + @Override + public void request(long numberOfItems) { + if (numberOfItems <= 0) { + onFailure(Subscriptions.getInvalidRequestException()); + return; + } + Flow.Subscription subscription = getUpstreamSubscription(); + if (subscription == Subscriptions.CANCELLED) { + return; + } + try { + Subscriptions.add(demand, numberOfItems); + long currentDemand = demand.get(); + if (paused) { + return; + } + if (currentDemand > 0) { + Subscriptions.produced(demand, currentDemand); + subscription.request(currentDemand); + } + } catch (Throwable failure) { + onFailure(failure); + } + } + + @Override + public void cancel() { + processor = null; + super.cancel(); + } + } +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java index b36862e2a8..613bdc9344 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java @@ -22,9 +22,12 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.ChannelRegistar; import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.PausableChannelConfiguration; import io.smallrye.reactive.messaging.PublisherDecorator; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; +import io.smallrye.reactive.messaging.providers.DefaultPausableChannelConfiguration; +import io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; /** @@ -139,12 +142,14 @@ void register(Map incomings, Map incomings, Map incomings, Map select = publisherDecoratorInstance.select(PausableChannelDecorator.class); + if (select.isResolvable()) { + PausableChannelDecorator pausableChannels = select.get(); + config.getOptionalValue(PausableChannelConfiguration.PAUSABLE_PROPERTY, Boolean.class).ifPresent(pausable -> { + if (pausable) { + pausableChannels.addConfiguration(new DefaultPausableChannelConfiguration(channel, + config.getOptionalValue(PausableChannelConfiguration.PAUSED_PROPERTY, Boolean.class) + .orElse(false))); + } + }); + } + } + private static String getConnectorAttribute(Config config) { // This method looks for connector and type. // The availability has been checked when the config object has been created diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java index 758e420247..d24157171b 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/InternalChannelRegistry.java @@ -2,7 +2,11 @@ import static io.smallrye.reactive.messaging.providers.i18n.ProviderMessages.msg; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Flow; @@ -15,6 +19,7 @@ import io.smallrye.reactive.messaging.ChannelRegistry; import io.smallrye.reactive.messaging.MutinyEmitter; +import io.smallrye.reactive.messaging.PausableChannel; @ApplicationScoped public class InternalChannelRegistry implements ChannelRegistry { @@ -26,6 +31,7 @@ public class InternalChannelRegistry implements ChannelRegistry { private final Map incoming = new ConcurrentHashMap<>(); private final Map, Map> emitters = new ConcurrentHashMap<>(); + private final Map pausables = new ConcurrentHashMap<>(); @Override public Flow.Publisher> register(String name, @@ -135,4 +141,14 @@ public Map getOutgoingChannels() { return incoming; } + @Override + public void register(String name, PausableChannel pausable) { + pausables.put(name, pausable); + } + + @Override + public PausableChannel getPausable(String name) { + return pausables.get(name); + } + } diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/PausableChannelTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/PausableChannelTest.java new file mode 100644 index 0000000000..be8d81b8dc --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/PausableChannelTest.java @@ -0,0 +1,82 @@ +package io.smallrye.reactive.messaging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.reactive.messaging.annotations.Blocking; + +public class PausableChannelTest extends WeldTestBaseWithoutTails { + + @BeforeEach + void setupConfig() { + installConfig("src/test/resources/config/pausable.properties"); + } + + @Test + public void testPausableChannelInitiallyPaused() { + addBeanClass(ConsumerApp.class); + + initialize(); + + ConsumerApp app = get(ConsumerApp.class); + ChannelRegistry pausableChannels = get(ChannelRegistry.class); + PausableChannel pauser = pausableChannels.getPausable("B"); + + await().pollDelay(3, TimeUnit.SECONDS).until(() -> app.getCount() == 0); + assertThat(pauser.isPaused()).isTrue(); + pauser.resume(); + await().untilAsserted(() -> assertThat(app.getCount()).isEqualTo(1L)); + + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + pauser.resume(); + await().untilAsserted(() -> assertThat(app.getCount()).isEqualTo(2L)); + + pauser.pause(); + assertThat(pauser.isPaused()).isTrue(); + pauser.resume(); + await().untilAsserted(() -> assertThat(app.getCount()).isEqualTo(3L)); + + assertThat(app.get()).containsExactly(2, 3, 4); + + } + + @ApplicationScoped + public static class ConsumerApp { + + LongAdder count = new LongAdder(); + List list = new CopyOnWriteArrayList<>(); + + @Incoming("B") + @Blocking + public void consume(Integer message) { + list.add(message); + count.increment(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + public List get() { + return list; + } + + public long getCount() { + return count.longValue(); + } + } + +} diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java index 6dec4d27f9..dbe7b0a547 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java @@ -34,6 +34,7 @@ import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl; import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator; import io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator; +import io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator; import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension; import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; @@ -123,6 +124,7 @@ public void setUp() { LegacyEmitterFactoryImpl.class, OutgoingInterceptorDecorator.class, IncomingInterceptorDecorator.class, + PausableChannelDecorator.class, // Observation Decorator ObservationDecorator.class, OutgoingObservationDecorator.class, diff --git a/smallrye-reactive-messaging-provider/src/test/resources/config/pausable.properties b/smallrye-reactive-messaging-provider/src/test/resources/config/pausable.properties new file mode 100644 index 0000000000..b0d4654f5d --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/resources/config/pausable.properties @@ -0,0 +1,5 @@ +# You should not be able to use the same channel name in an outgoing and incoming configuration +mp.messaging.incoming.B.connector=dummy +mp.messaging.incoming.B.pausable=true +mp.messaging.incoming.B.initially-paused=true +