Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incoming channel concurrency #2382

Merged
merged 10 commits into from
Nov 23, 2023
1 change: 1 addition & 0 deletions documentation/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mkdocs-macros-plugin = ">=0.6.3"
pyyaml = ">=6.0"
mike = ">=1.1.2"
mkdocs-material = ">=8.1.4"
mkdocs-print-site-plugin = ">=2.3.6"

[dev-packages]

Expand Down
383 changes: 221 additions & 162 deletions documentation/Pipfile.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ nav:
- 'Channel Decorators': concepts/decorators.md
- 'Broadcast' : concepts/broadcast.md
- 'Merge channels' : concepts/merge.md
- 'Incoming Channel Concurrency' : concepts/incoming-concurrency.md
- '@Incomings' : concepts/incomings.md
- '@Outgoings' : concepts/outgoings.md
- 'Testing' : concepts/testing.md
Expand Down Expand Up @@ -102,6 +103,7 @@ plugins:
version_selector: true
css_dir: css
javascript_dir: javascript
- print-site
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved

copyright: >-
Sponsored by <a href="https://www.redhat.com"><img style="vertical-align: middle; height: 2.5em;" alt="Red Hat" src="https://github.com/jbossorg/website/raw/master/docs/img/redhat_reversed.svg"/></a> <br/>
Expand Down
56 changes: 56 additions & 0 deletions documentation/src/main/docs/concepts/incoming-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Incoming Channel Concurrency

!!!warning "Experimental"
Incoming channel `concurrency` config is an experimental feature.

The `concurrency` attribute for incoming channels provides a mechanism to enable concurrent non-blocking processing of incoming messages.
When applied to a channel, this attribute specifies the number of copies of that channel to be created and wired to the processing method,
allowing multiple messages to be processed concurrently.

For example, concurrency configuration for a Kafka incoming channel the configuration will look like:

```properties
mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.topic=orders
mp.messaging.incoming.my-channel.value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer
mp.messaging.incoming.my-channel.concurrency=4
```

In this example, there will be 4 copies of the `my-channel` running concurrently, with distinctive internal channel names,
`my-channel$1`, `my-channel$2`, etc. but all registered with the name `my-channel` to the `ChannelRegistry`.

!!!info "Kafka connector `partitions`"
This is essentially very similar to the Kafka connector `partitions` configuration, but addresses some its limitations.
Using `partitions` config in Kafka connector, channels are merged into the downstream message processor
(method annotated with `@Incoming` or an injected channel) which is therefore called sequentially.
This prevents concurrently processing messages from multiple partitions.

The `concurrency` mechanism effectively allows polling Kafka partitions from separate clients
and concurrently processing records while preserving the in-partition order.

Copy channels inherit all configuration attributes of the main channel config.
Per-copy channel attributes can be configured separately using the `$` separated channel names: `mp.messaging.incoming.my-channel$1.attribute`.

For example, the following AMQP 1.0 channel defines 3 channels each with a different selector:

```properties
mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.address=address
mp.messaging.incoming.data.durable=false
mp.messaging.incoming.data.concurrency=3
mp.messaging.incoming.data$1.selector=x='foo'
mp.messaging.incoming.data$2.selector=x='bar'
mp.messaging.incoming.data$3.selector=x='baz'
```

While the `concurrency` attribute is applicable to channels of any connector type,
the channel implementation may need to take this configuration into account and adjust the threading accordingly.
Connectors based on Vert.x event loop create a new event loop context per copy-channel to dispatch messages on distinct contexts.

!!!important "Non-blocking processing"
Note that while this allows concurrent processing, messages are still dispatched on Vert.x event loop threads, and should not be blocked.

Otherwise, connectors treat copy channels as independent channels.
For example, health check reports are registered separately for each copy-channel.


30 changes: 30 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
<mockito.version>4.8.1</mockito.version>
<junit-pioneer.version>2.2.0</junit-pioneer.version>
<junit-platform-commons.version>1.9.3</junit-platform-commons.version>
<system-stubs-jupiter.version>2.1.5</system-stubs-jupiter.version>

<jakarta.jms.api.version>3.1.0</jakarta.jms.api.version>
<jackson.version>2.16.0</jackson.version>
Expand Down Expand Up @@ -319,6 +320,11 @@
<artifactId>opencsv</artifactId>
<version>${opencsv.version}</version>
</dependency>
<dependency>
<groupId>uk.org.webcompere</groupId>
<artifactId>system-stubs-jupiter</artifactId>
<version>${system-stubs-jupiter.version}</version>
</dependency>

<!-- OpenTelemetry libs not on BOM -->
<dependency>
Expand Down Expand Up @@ -457,6 +463,11 @@
<version>${junit-pioneer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>uk.org.webcompere</groupId>
<artifactId>system-stubs-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -741,6 +752,25 @@
<argLine>--illegal-access=permit</argLine>
</properties>
</profile>
<profile>
<id>byte-buddy-experimental</id>
<activation>
<jdk>[21, )</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<net.bytebuddy.experimental>true</net.bytebuddy.experimental>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>ci-tests</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSenderOptions;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpReceiver;
import io.vertx.mutiny.amqp.AmqpSender;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import io.vertx.proton.ProtonSender;

Expand Down Expand Up @@ -175,6 +179,7 @@ private Multi<? extends Message<?>> getStreamOfMessages(AmqpReceiver receiver,
return Multi.createFrom().deferred(
() -> {
Multi<Message<?>> stream = receiver.toMulti()
.emitOn(c -> VertxContext.runOnContext(holder.getContext().getDelegate(), c))
.onItem().transformToUniAndConcatenate(m -> {
try {
return Uni.createFrom().item(new AmqpMessage<>(m, holder.getContext(), onNack,
Expand Down Expand Up @@ -215,7 +220,11 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {

AmqpClient client = AmqpClientHelper.createClient(this, ic, clientOptions, clientSslContexts);

ConnectionHolder holder = new ConnectionHolder(client, ic, getVertx());
Context root = null;
if (ConcurrencyConnectorConfig.getConcurrency(config).filter(i -> i > 1).isPresent()) {
root = Context.newInstance(((VertxInternal) getVertx().getDelegate()).createEventLoopContext());
}
ConnectionHolder holder = new ConnectionHolder(client, ic, getVertx(), root);
holders.put(ic.getChannel(), holder);

AmqpFailureHandler onNack = createFailureHandler(ic);
Expand Down Expand Up @@ -258,7 +267,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
AtomicReference<AmqpSender> sender = new AtomicReference<>();
AmqpClient client = AmqpClientHelper.createClient(this, oc, clientOptions, clientSslContexts);
String link = oc.getLinkName().orElseGet(oc::getChannel);
ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx());
ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx(), null);

Uni<AmqpSender> getSender = Uni.createFrom().deferred(() -> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ public class ConnectionHolder {
private final AtomicReference<CurrentConnection> holder = new AtomicReference<>();

private final Vertx vertx;
private final Context root;
private Consumer<Throwable> callback;

public ConnectionHolder(AmqpClient client,
AmqpConnectorCommonConfiguration configuration,
Vertx vertx) {
Vertx vertx, Context root) {
this.client = client;
this.configuration = configuration;
this.vertx = vertx;
this.root = root;
}

public Context getContext() {
Expand Down Expand Up @@ -134,7 +136,7 @@ public Uni<AmqpConnection> getOrEstablishConnection() {
.onSubscription().invoke(s -> log.establishingConnection())
.onItem().transform(conn -> {
log.connectionEstablished();
holder.set(new CurrentConnection(conn, Vertx.currentContext()));
holder.set(new CurrentConnection(conn, root == null ? Vertx.currentContext() : root));
conn
.exceptionHandler(t -> {
holder.set(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ public IncomingAmqpMetadata(AmqpMessage message) {
this.message = message;
}

/**
* The AMQP message.
*
* @return the AMQP message
*/
public AmqpMessage getMessage() {
return message;
}

/**
* The AMQP address of the message.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.smallrye.reactive.messaging.amqp.converters;

import java.lang.reflect.Type;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata;
import io.smallrye.reactive.messaging.providers.helpers.TypeUtils;

@ApplicationScoped
public class AmqpMessageConverter implements MessageConverter {
@Override
public boolean canConvert(Message<?> in, Type target) {
return in.getMetadata(IncomingAmqpMetadata.class).isPresent()
&& TypeUtils.isAssignable(target, io.vertx.amqp.AmqpMessage.class);
}

@Override
public Message<?> convert(Message<?> in, Type target) {
IncomingAmqpMetadata metadata = in.getMetadata(IncomingAmqpMetadata.class)
.orElseThrow(() -> new IllegalStateException("No AMQP 1.0 metadata"));
return in.withPayload(metadata.getMessage());
}
}
Loading
Loading