diff --git a/api/src/main/java/io/smallrye/reactive/messaging/Messages.java b/api/src/main/java/io/smallrye/reactive/messaging/Messages.java
index 097039028d..a5e1d3fa32 100644
--- a/api/src/main/java/io/smallrye/reactive/messaging/Messages.java
+++ b/api/src/main/java/io/smallrye/reactive/messaging/Messages.java
@@ -1,11 +1,14 @@
package io.smallrye.reactive.messaging;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -13,12 +16,48 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
+import io.smallrye.common.annotation.CheckReturnValue;
+
+/**
+ * A class handling coordination between messages.
+ */
public class Messages {
private Messages() {
// Avoid direct instantiation.
}
+ /**
+ * Chains the given message with some other messages.
+ * It coordinates the acknowledgement. When all the other messages are acknowledged successfully, the passed
+ * message is acknowledged. If one of the other messages is acknowledged negatively, the passed message is also
+ * nacked (with the same reason). Subsequent ack/nack will be ignored.
+ *
+ *
+ * @param message the message
+ * @return the chain builder that let you decide how the metadata are passed, and the set of messages.
+ */
+ @CheckReturnValue
+ public static MessageChainBuilder chain(Message> message) {
+ return new MessageChainBuilder(message);
+ }
+
+ /**
+ * Merges multiple messages into a single one.
+ * This is an implementation of a merge pattern: n messages combined into 1.
+ *
+ * Whe resulting message payload is computed using the combinator function.
+ * When the returned message is acked/nacked, the passes messages are acked/nacked accordingly.
+ *
+ * Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given
+ * class, the metadata is already present in the result message, it's either ignored, or merged if the class
+ * implements {@link MergeableMetadata}.
+ *
+ * @param list the list of message, must not be empty, must not be null
+ * @param combinator the combinator method, must not be null
+ * @param the payload type of the produced message
+ * @return the resulting message
+ */
public static Message merge(List> list, Function, T> combinator) {
if (list.isEmpty()) {
return Message.of(combinator.apply(Collections.emptyList()));
@@ -59,6 +98,20 @@ public static Message merge(List> list, Function, T> c
.withMetadata(metadata);
}
+ /**
+ * Merges multiple messages into a single one.
+ *
+ * Whe resulting message payload is computed using the combinator function.
+ * When the returned message is acked/nacked, the passes messages are acked/nacked accordingly.
+ *
+ * Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given
+ * class, the metadata is already present in the result message, it's either ignored, or merged if the class
+ * implements {@link MergeableMetadata}.
+ *
+ * @param list the list of message, must not be empty, must not be null
+ * @param the payload type of the passed messages
+ * @return the resulting message
+ */
public static Message> merge(List> list) {
if (list.isEmpty()) {
return Message.of(Collections.emptyList());
@@ -90,7 +143,7 @@ public static Message> merge(List> list) {
.withMetadata(metadata);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private static Metadata merge(Metadata first, Metadata second) {
Metadata result = first;
for (Object meta : second) {
@@ -121,4 +174,103 @@ private static Metadata merge(Metadata first, Metadata second) {
return result;
}
+ /**
+ * The message chain builder allows chaining message and configure metadata propagation.
+ * By default, all the metadata from the given message are copied into the chained messages.
+ */
+ public static class MessageChainBuilder {
+ private final Message> input;
+ private Metadata metadata;
+
+ private MessageChainBuilder(Message> message) {
+ this.input = message;
+ this.metadata = message.getMetadata().copy();
+ }
+
+ /**
+ * Do not copy any metadata from the initial message to the chained message.
+ *
+ * @return the current {@link MessageChainBuilder}
+ */
+ @CheckReturnValue
+ public MessageChainBuilder withoutMetadata() {
+ this.metadata = Metadata.empty();
+ return this;
+ }
+
+ /**
+ * Copy the given metadata of the given classes from the initial message to the chained message, if the initial
+ * message does not include a metadata object of the given class.
+ *
+ * In general, this method must be used after {@link #withoutMetadata()}.
+ *
+ * @return the current {@link MessageChainBuilder}
+ */
+ @CheckReturnValue
+ public MessageChainBuilder withMetadata(Class>... mc) {
+ for (Class> clazz : mc) {
+ Optional> o = input.getMetadata().get(clazz);
+ o.ifPresent(value -> this.metadata = metadata.with(value));
+ }
+ return this;
+ }
+
+ /**
+ * Do not the given metadata of the given classes from the initial message to the chained message, if the initial
+ * message does not include a metadata object of the given class.
+ *
+ * @return the current {@link MessageChainBuilder}
+ */
+ @CheckReturnValue
+ public MessageChainBuilder withoutMetadata(Class>... mc) {
+ for (Class> clazz : mc) {
+ this.metadata = this.metadata.without(clazz);
+ }
+ return this;
+ }
+
+ /**
+ * Passed the chained messages.
+ * The messages are not modified, but should not be used afterward, and should be replaced by the messages contained
+ * in the returned list.
+ * This method preserve the order. So, the first message corresponds to the first message in the returned list.
+ * The message from the returned list have the necessary logic to chain the ack/nack signals and the copied metadata.
+ *
+ * @param messages the chained messages, must not be empty, must not be null, must not contain null
+ * @return the list of modified messages
+ */
+ public List> with(Message>... messages) {
+ AtomicBoolean done = new AtomicBoolean();
+
+ // Must be modifiable
+ List> trackers = Arrays.stream(messages).collect(Collectors.toCollection(CopyOnWriteArrayList::new));
+ List> outcomes = new ArrayList<>();
+ for (Message> message : messages) {
+ Message> tmp = message;
+ for (Object metadatum : metadata) {
+ tmp = tmp.addMetadata(metadatum);
+ }
+ outcomes.add(tmp
+ .withAck(() -> {
+ CompletionStage acked = message.ack();
+ if (trackers.remove(message)) {
+ if (trackers.isEmpty() && done.compareAndSet(false, true)) {
+ return acked.thenCompose(x -> input.ack());
+ }
+ }
+ return acked;
+ })
+ .withNack((reason) -> {
+ CompletionStage nacked = message.nack(reason);
+ if (trackers.remove(message)) {
+ if (done.compareAndSet(false, true)) {
+ return nacked.thenCompose(x -> input.nack(reason));
+ }
+ }
+ return nacked;
+ }));
+ }
+ return outcomes;
+ }
+ }
}
diff --git a/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java b/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java
index 23bf489f75..aebf2f405e 100644
--- a/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java
+++ b/api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java
@@ -8,6 +8,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Test;
@@ -216,6 +217,147 @@ void checkWithEmptyList() {
assertThat(Messages.merge(List.of()).getPayload()).isEqualTo(Collections.emptyList());
}
+ @Test
+ void checkSimpleChainAcknowledgement() {
+ AtomicBoolean o1Ack = new AtomicBoolean();
+ AtomicBoolean o2Ack = new AtomicBoolean();
+ AtomicInteger i1Ack = new AtomicInteger();
+ Message o1 = Message.of("foo", () -> {
+ o1Ack.set(true);
+ return CompletableFuture.completedFuture(null);
+ });
+ Message o2 = Message.of("bar", () -> {
+ o2Ack.set(true);
+ return CompletableFuture.completedFuture(null);
+ });
+
+ Message i = Message.of(1, () -> {
+ i1Ack.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ });
+
+ List> outcomes = Messages.chain(i).with(o1, o2);
+ assertThat(i1Ack).hasValue(0);
+ assertThat(o1Ack).isFalse();
+ assertThat(o2Ack).isFalse();
+
+ outcomes.get(0).ack();
+ assertThat(i1Ack).hasValue(0);
+ assertThat(o1Ack).isTrue();
+ assertThat(o2Ack).isFalse();
+
+ outcomes.get(1).ack();
+ assertThat(i1Ack).hasValue(1);
+ assertThat(o1Ack).isTrue();
+ assertThat(o1Ack).isTrue();
+
+ outcomes.get(1).ack();
+ outcomes.get(0).ack();
+ assertThat(i1Ack).hasValue(1);
+
+ outcomes.get(1).nack(new Exception("boom"));
+ outcomes.get(0).nack(new Exception("boom"));
+ assertThat(i1Ack).hasValue(1);
+ }
+
+ @Test
+ void checkSimpleChainNegativeAcknowledgement() {
+ AtomicBoolean o1Ack = new AtomicBoolean();
+ AtomicBoolean o2Ack = new AtomicBoolean();
+ AtomicBoolean o1Nack = new AtomicBoolean();
+ AtomicBoolean o2Nack = new AtomicBoolean();
+ AtomicInteger i1Ack = new AtomicInteger();
+ AtomicInteger i1Nack = new AtomicInteger();
+
+ Message o1 = Message.of("foo", () -> {
+ o1Ack.set(true);
+ return CompletableFuture.completedFuture(null);
+ }, t -> {
+ o1Nack.set(true);
+ return CompletableFuture.completedFuture(null);
+ });
+ Message o2 = Message.of("bar", () -> {
+ o2Ack.set(true);
+ return CompletableFuture.completedFuture(null);
+ }, t -> {
+ o2Nack.set(true);
+ return CompletableFuture.completedFuture(null);
+ });
+
+ Message i = Message.of(1, () -> {
+ i1Ack.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ }, t -> {
+ i1Nack.incrementAndGet();
+ return CompletableFuture.completedFuture(null);
+ });
+
+ List> outcomes = Messages.chain(i).with(o1, o2);
+ assertThat(i1Ack).hasValue(0);
+ assertThat(o1Ack).isFalse();
+ assertThat(o2Ack).isFalse();
+ assertThat(i1Nack).hasValue(0);
+ assertThat(o1Nack).isFalse();
+ assertThat(o2Nack).isFalse();
+
+ outcomes.get(0).ack();
+ assertThat(i1Ack).hasValue(0);
+ assertThat(o1Ack).isTrue();
+ assertThat(o2Ack).isFalse();
+ assertThat(i1Nack).hasValue(0);
+ assertThat(o1Nack).isFalse();
+ assertThat(o2Nack).isFalse();
+
+ outcomes.get(0).nack(new Exception("boom"));
+ assertThat(i1Ack).hasValue(0);
+ assertThat(i1Nack).hasValue(0);
+
+ outcomes.get(1).nack(new Exception("boom"));
+ assertThat(i1Nack).hasValue(1);
+ assertThat(i1Ack).hasValue(0);
+ assertThat(o2Nack).isTrue();
+
+ outcomes.get(1).ack();
+ assertThat(i1Nack).hasValue(1);
+ assertThat(i1Ack).hasValue(0);
+ }
+
+ @Test
+ void testChainWithMetadataSelection() {
+ Message i = Message.of(1)
+ .withMetadata(List.of(new NonMergeableMetadata("hello"), new MergeableMetadata("hello"),
+ new AnotherMetadata("hello")));
+
+ Message m1 = Message.of("a");
+ AnotherMetadata am = new AnotherMetadata("hello");
+ Message m2 = Message.of("b").addMetadata(am);
+
+ // No metadata copied from the original message
+ List> out = Messages.chain(i).withoutMetadata().with(m1, m2);
+ assertThat(out.get(0).getMetadata()).isEmpty();
+ assertThat(out.get(1).getMetadata()).hasSize(1).containsOnly(am);
+
+ // All metadata are copied from the original message
+ out = Messages.chain(i).with(m1, m2);
+ assertThat(out.get(0).getMetadata()).hasSize(3);
+ assertThat(out.get(1).getMetadata()).hasSize(3).doesNotContain(am);
+
+ // All metadata but MergeableMetadata are copied from the original message
+ out = Messages.chain(i).withoutMetadata(MergeableMetadata.class).with(m1, m2);
+ assertThat(out.get(0).getMetadata()).hasSize(2);
+ assertThat(out.get(1).getMetadata()).hasSize(2).doesNotContain(am);
+
+ // All metadata but AnotherMetadata are copied from the original message
+ out = Messages.chain(i).withoutMetadata(AnotherMetadata.class).with(m1, m2);
+ assertThat(out.get(0).getMetadata()).hasSize(2);
+ assertThat(out.get(1).getMetadata()).hasSize(3).contains(am);
+
+ out = Messages.chain(i).withoutMetadata().withMetadata(AnotherMetadata.class).with(m1, m2);
+ assertThat(out.get(0).getMetadata()).hasSize(1);
+ assertThat(out.get(1).getMetadata()).hasSize(1);
+
+ }
+
public static class NonMergeableMetadata {
String value;