Skip to content

Commit

Permalink
Pausable channels
Browse files Browse the repository at this point in the history
Closes #1649
  • Loading branch information
ozangunalp committed Jun 28, 2024
1 parent 3b6a92b commit 02293ec
Show file tree
Hide file tree
Showing 17 changed files with 675 additions and 3 deletions.
15 changes: 13 additions & 2 deletions api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,18 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"code": "java.method.addedToInterface",
"new": "method io.smallrye.reactive.messaging.PausableChannel io.smallrye.reactive.messaging.ChannelRegistry::getPausable(java.lang.String)",
"justification": "Added PausableChannel to the ChannelRegistry interface to allow pausing and resuming channels."
},
{
"code": "java.method.addedToInterface",
"new": "method void io.smallrye.reactive.messaging.ChannelRegistry::register(java.lang.String, io.smallrye.reactive.messaging.PausableChannel)",
"justification": "Added PausableChannel to the ChannelRegistry interface to allow pausing and resuming channels."
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -46,4 +57,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ Subscriber<? extends Message<?>> register(String name,

Map<String, Boolean> getOutgoingChannels();

void register(String name, PausableChannel pausable);

PausableChannel getPausable(String name);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.smallrye.reactive.messaging;

/**
* A channel that can be paused and resumed.
*/
public interface PausableChannel {

/**
* Checks whether the channel is paused.
*
* @return {@code true} if the channel is paused, {@code false} otherwise
*/
boolean isPaused();

/**
* Pauses the channel.
*/
void pause();

/**
* Resumes the channel.
*/
void resume();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.smallrye.reactive.messaging;

/**
* A channel that can be paused and resumed.
*/
public interface PausableChannelConfiguration {

/**
* The name of the property to configure whether the channel is pausable.
*/
String PAUSABLE_PROPERTY = "pausable";

/**
* The name of the property to configure whether the channel is initially paused.
*/
String PAUSED_PROPERTY = "initially-paused";

/**
* The name of the channel.
*/
String name();

/**
* Whether the channel is paused at subscribe time.
*/
boolean initiallyPaused();

}
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ nav:
- 'Message Context' : concepts/message-context.md
- 'Metadata Injection': concepts/incoming-metadata-injection.md
- 'Generic Payloads': concepts/generic-payloads.md
- 'Pausable Channels': concepts/pausable-channels.md

- Kafka:
- kafka/kafka.md
Expand Down
31 changes: 31 additions & 0 deletions documentation/src/main/docs/concepts/pausable-channels.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Pausable Channels

Based on reactive streams, Smallrye Reactive Messaging ensures that channels are back-pressured.
This means that the flow of messages is controlled by the downstream consumer,
whether it is a processing method or outgoing channel.
Sometimes you may want to pause the flow of messages, for example, when the consumer is not ready to process them.

Injected [`@Channel`](emitter.md#retrieving-channels) streams are not subscribed to by default, so the flow of messages is controlled by the application.
But for [`@Incoming`](model.md#incoming-and-outgoing) methods, the flow of messages is controlled by the runtime.

Pausable channels are useful when you want to control the flow of messages within your application.

## Creating a Pausable Channel

To use pausable channels, you need to activate it with the configuration property `pausable` set to `true`.

```properties
mp.messaging.incoming.my-channel.pausable=true
# optional, by default the channel is NOT paused initially
mp.messaging.outgoing.my-channel.initially-paused=true
```

## Controlling the flow of messages

If a channel is configured to be pausable,
you can get the `PausableChannel` by channel name from the `ChannelRegistry` programmatically,
and pause or resume the channel as needed:

``` java
{{ insert('pausable/PausableController.java') }}
```
39 changes: 39 additions & 0 deletions documentation/src/main/java/pausable/PausableController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pausable;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;

@ApplicationScoped
public class PausableController {

@Inject
ChannelRegistry registry;

@PostConstruct
public void resume() {
// Wait for the application to be ready
// Retrieve the pausable channel
PausableChannel pausable = registry.getPausable("my-channel");
// Pause the processing of the messages
pausable.resume();
}

public void pause() {
// Retrieve the pausable channel
PausableChannel pausable = registry.getPausable("my-channel");
// Pause the processing of the messages
pausable.pause();
}

@Incoming("my-channel")
void process(String message) {
// Process the message
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package io.smallrye.reactive.messaging.kafka;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.LongAdder;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig;

public class PausableChannelTest extends KafkaCompanionTestBase {

public static final int COUNT = 100;

private KafkaMapBasedConfig commonConfig() {
return kafkaConfig("mp.messaging.outgoing.out")
.with("topic", topic)
.put("key.serializer", StringSerializer.class.getName())
.put("value.serializer", IntegerSerializer.class.getName())
.withPrefix("mp.messaging.incoming.data")
.put("topic", topic)
.put("cloud-events", false)
.put("commit-strategy", "throttled")
.put("auto.offset.reset", "earliest")
.put("key.deserializer", StringDeserializer.class.getName())
.put("value.deserializer", IntegerDeserializer.class.getName());
}

@Test
public void testPausableChannelInitiallyPaused() {
addBeans(MyMessageProducer.class);
ConsumerApp application = runApplication(commonConfig()
.with("pausable", true)
.with("initially-paused", true), ConsumerApp.class);
ChannelRegistry pausableChannels = get(ChannelRegistry.class);
PausableChannel pauser = pausableChannels.getPausable("data");

long firstStep = COUNT / 10;
long secondStep = COUNT / 5;
long finalStep = COUNT;
await().pollDelay(1, SECONDS).untilAsserted(() -> assertThat(application.getCount()).isEqualTo(0L));
assertThat(pauser.isPaused()).isTrue();
pauser.resume();
await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(firstStep));
pauser.pause();
await().untilAsserted(() -> assertThat(application.getCount()).isBetween(firstStep, finalStep));
assertThat(pauser.isPaused()).isTrue();
pauser.resume();
await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(secondStep));
pauser.pause();
await().untilAsserted(() -> assertThat(application.getCount()).isBetween(secondStep, finalStep));
assertThat(pauser.isPaused()).isTrue();
pauser.resume();
await().untilAsserted(() -> assertThat(application.getCount()).isEqualTo(COUNT));
}

@Test
public void testPausableChannel() {
addBeans(MyMessageProducer.class);
ConsumerApp application = runApplication(commonConfig()
.with("pausable", true), ConsumerApp.class);
ChannelRegistry pausableChannels = get(ChannelRegistry.class);
PausableChannel pauser = pausableChannels.getPausable("data");

long firstStep = COUNT / 10;
long secondStep = COUNT / 5;
long finalStep = COUNT;
await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(firstStep));
assertThat(pauser.isPaused()).isFalse();
pauser.pause();
await().untilAsserted(() -> assertThat(application.getCount()).isBetween(firstStep, finalStep));
pauser.resume();
await().untilAsserted(() -> assertThat(application.getCount()).isGreaterThan(secondStep));
pauser.pause();
await().untilAsserted(() -> assertThat(application.getCount()).isBetween(secondStep, finalStep));
pauser.resume();
await().untilAsserted(() -> assertThat(application.getCount()).isEqualTo(COUNT));
}

@Test
public void testPausableChannelWithPauser() {
addBeans(MyMessageProducer.class);
ConsumerAppWithPauser application = runApplication(commonConfig()
.with("pausable", true), ConsumerAppWithPauser.class);
ChannelRegistry pausableChannels = get(ChannelRegistry.class);
PausableChannel pauser = pausableChannels.getPausable("data");

assertThat(pauser.isPaused()).isFalse();
await().untilAsserted(() -> {
if (pauser.isPaused()) {
pauser.resume();
}
assertThat(application.getCount()).isEqualTo(COUNT);
});
assertThat(application.getPaused()).isEqualTo(5);
}

@ApplicationScoped
public static class ConsumerApp {

LongAdder count = new LongAdder();
List<Integer> list = new CopyOnWriteArrayList<>();

@Incoming("data")
@Blocking
public void consume(Integer message) throws InterruptedException {
list.add(message);
count.increment();
Thread.sleep(50);
}

public List<Integer> get() {
return list;
}

public long getCount() {
return count.longValue();
}
}

@ApplicationScoped
public static class ConsumerAppWithPauser {

@Inject
ChannelRegistry registry;

LongAdder count = new LongAdder();
LongAdder paused = new LongAdder();
List<Integer> list = new CopyOnWriteArrayList<>();

@Incoming("data")
@Blocking
public void consume(Integer message) throws InterruptedException {
list.add(message);
count.increment();
if (count.longValue() % 20 == 0) {
PausableChannel data = registry.getPausable("data");
data.pause();
paused.increment();
}
}

public List<Integer> get() {
return list;
}

public long getCount() {
return count.longValue();
}

public long getPaused() {
return paused.longValue();
}
}

@ApplicationScoped
public static class MyMessageProducer {

List<Integer> produced = new CopyOnWriteArrayList<>();

@Outgoing("out")
public Multi<Message<Integer>> generate() {
return Multi.createFrom().range(0, COUNT)
.map(i -> Message.of(i, () -> {
produced.add(i);
return CompletableFuture.completedFuture(null);
}));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterFactoryImpl;
import io.smallrye.reactive.messaging.providers.extension.ObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.OutgoingObservationDecorator;
import io.smallrye.reactive.messaging.providers.extension.PausableChannelDecorator;
import io.smallrye.reactive.messaging.providers.extension.ReactiveMessagingExtension;
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
Expand Down Expand Up @@ -112,6 +113,7 @@ public void initWeld() {
weld.addBeanClass(ContextDecorator.class);
weld.addBeanClass(ObservationDecorator.class);
weld.addBeanClass(OutgoingObservationDecorator.class);
weld.addBeanClass(PausableChannelDecorator.class);
weld.disableDiscovery();
}

Expand Down
Loading

0 comments on commit 02293ec

Please sign in to comment.