Skip to content

Commit

Permalink
Merge pull request #2231 from cescoffier/acknowledgement-coordination
Browse files Browse the repository at this point in the history
Provide a utility method to coordinate (negative) acknowledgement when multiple messages are created from one
  • Loading branch information
ozangunalp authored Jul 28, 2023
2 parents 4ab36e9 + dfb79f8 commit 7856940
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 1 deletion.
154 changes: 153 additions & 1 deletion api/src/main/java/io/smallrye/reactive/messaging/Messages.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,63 @@
package io.smallrye.reactive.messaging;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.CheckReturnValue;

/**
* A class handling coordination between messages.
*/
public class Messages {

private Messages() {
// Avoid direct instantiation.
}

/**
* Chains the given message with some other messages.
* It coordinates the acknowledgement. When all the other messages are acknowledged successfully, the passed
* message is acknowledged. If one of the other messages is acknowledged negatively, the passed message is also
* nacked (with the same reason). Subsequent ack/nack will be ignored.
* <p>
*
* @param message the message
* @return the chain builder that let you decide how the metadata are passed, and the set of messages.
*/
@CheckReturnValue
public static MessageChainBuilder chain(Message<?> message) {
return new MessageChainBuilder(message);
}

/**
* Merges multiple messages into a single one.
* This is an implementation of a <em>merge pattern</em>: n messages combined into 1.
* <p>
* Whe resulting message payload is computed using the combinator function.
* When the returned message is acked/nacked, the passes messages are acked/nacked accordingly.
* <p>
* Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given
* class, the metadata is already present in the result message, it's either ignored, or merged if the class
* implements {@link MergeableMetadata}.
*
* @param list the list of message, must not be empty, must not be null
* @param combinator the combinator method, must not be null
* @param <T> the payload type of the produced message
* @return the resulting message
*/
public static <T> Message<T> merge(List<Message<?>> list, Function<List<?>, T> combinator) {
if (list.isEmpty()) {
return Message.of(combinator.apply(Collections.emptyList()));
Expand Down Expand Up @@ -59,6 +98,20 @@ public static <T> Message<T> merge(List<Message<?>> list, Function<List<?>, T> c
.withMetadata(metadata);
}

/**
* Merges multiple messages into a single one.
* <p>
* Whe resulting message payload is computed using the combinator function.
* When the returned message is acked/nacked, the passes messages are acked/nacked accordingly.
* <p>
* Metadata are also merged. The metadata of all the messages are copied into the resulting message. If, for a given
* class, the metadata is already present in the result message, it's either ignored, or merged if the class
* implements {@link MergeableMetadata}.
*
* @param list the list of message, must not be empty, must not be null
* @param <T> the payload type of the passed messages
* @return the resulting message
*/
public static <T> Message<List<T>> merge(List<Message<T>> list) {
if (list.isEmpty()) {
return Message.of(Collections.emptyList());
Expand Down Expand Up @@ -90,7 +143,7 @@ public static <T> Message<List<T>> merge(List<Message<T>> list) {
.withMetadata(metadata);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
private static Metadata merge(Metadata first, Metadata second) {
Metadata result = first;
for (Object meta : second) {
Expand Down Expand Up @@ -121,4 +174,103 @@ private static Metadata merge(Metadata first, Metadata second) {
return result;
}

/**
* The message chain builder allows chaining message and configure metadata propagation.
* By default, all the metadata from the given message are copied into the chained messages.
*/
public static class MessageChainBuilder {
private final Message<?> input;
private Metadata metadata;

private MessageChainBuilder(Message<?> message) {
this.input = message;
this.metadata = message.getMetadata().copy();
}

/**
* Do not copy any metadata from the initial message to the chained message.
*
* @return the current {@link MessageChainBuilder}
*/
@CheckReturnValue
public MessageChainBuilder withoutMetadata() {
this.metadata = Metadata.empty();
return this;
}

/**
* Copy the given metadata of the given classes from the initial message to the chained message, if the initial
* message does not include a metadata object of the given class.
*
* In general, this method must be used after {@link #withoutMetadata()}.
*
* @return the current {@link MessageChainBuilder}
*/
@CheckReturnValue
public MessageChainBuilder withMetadata(Class<?>... mc) {
for (Class<?> clazz : mc) {
Optional<?> o = input.getMetadata().get(clazz);
o.ifPresent(value -> this.metadata = metadata.with(value));
}
return this;
}

/**
* Do not the given metadata of the given classes from the initial message to the chained message, if the initial
* message does not include a metadata object of the given class.
*
* @return the current {@link MessageChainBuilder}
*/
@CheckReturnValue
public MessageChainBuilder withoutMetadata(Class<?>... mc) {
for (Class<?> clazz : mc) {
this.metadata = this.metadata.without(clazz);
}
return this;
}

/**
* Passed the chained messages.
* The messages are not modified, but should not be used afterward, and should be replaced by the messages contained
* in the returned list.
* This method preserve the order. So, the first message corresponds to the first message in the returned list.
* The message from the returned list have the necessary logic to chain the ack/nack signals and the copied metadata.
*
* @param messages the chained messages, must not be empty, must not be null, must not contain null
* @return the list of modified messages
*/
public List<Message<?>> with(Message<?>... messages) {
AtomicBoolean done = new AtomicBoolean();

// Must be modifiable
List<Message<?>> trackers = Arrays.stream(messages).collect(Collectors.toCollection(CopyOnWriteArrayList::new));
List<Message<?>> outcomes = new ArrayList<>();
for (Message<?> message : messages) {
Message<?> tmp = message;
for (Object metadatum : metadata) {
tmp = tmp.addMetadata(metadatum);
}
outcomes.add(tmp
.withAck(() -> {
CompletionStage<Void> acked = message.ack();
if (trackers.remove(message)) {
if (trackers.isEmpty() && done.compareAndSet(false, true)) {
return acked.thenCompose(x -> input.ack());
}
}
return acked;
})
.withNack((reason) -> {
CompletionStage<Void> nacked = message.nack(reason);
if (trackers.remove(message)) {
if (done.compareAndSet(false, true)) {
return nacked.thenCompose(x -> input.nack(reason));
}
}
return nacked;
}));
}
return outcomes;
}
}
}
142 changes: 142 additions & 0 deletions api/src/test/java/io/smallrye/reactive/messaging/MessagesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -216,6 +217,147 @@ void checkWithEmptyList() {
assertThat(Messages.merge(List.of()).getPayload()).isEqualTo(Collections.emptyList());
}

@Test
void checkSimpleChainAcknowledgement() {
AtomicBoolean o1Ack = new AtomicBoolean();
AtomicBoolean o2Ack = new AtomicBoolean();
AtomicInteger i1Ack = new AtomicInteger();
Message<String> o1 = Message.of("foo", () -> {
o1Ack.set(true);
return CompletableFuture.completedFuture(null);
});
Message<String> o2 = Message.of("bar", () -> {
o2Ack.set(true);
return CompletableFuture.completedFuture(null);
});

Message<Integer> i = Message.of(1, () -> {
i1Ack.incrementAndGet();
return CompletableFuture.completedFuture(null);
});

List<Message<?>> outcomes = Messages.chain(i).with(o1, o2);
assertThat(i1Ack).hasValue(0);
assertThat(o1Ack).isFalse();
assertThat(o2Ack).isFalse();

outcomes.get(0).ack();
assertThat(i1Ack).hasValue(0);
assertThat(o1Ack).isTrue();
assertThat(o2Ack).isFalse();

outcomes.get(1).ack();
assertThat(i1Ack).hasValue(1);
assertThat(o1Ack).isTrue();
assertThat(o1Ack).isTrue();

outcomes.get(1).ack();
outcomes.get(0).ack();
assertThat(i1Ack).hasValue(1);

outcomes.get(1).nack(new Exception("boom"));
outcomes.get(0).nack(new Exception("boom"));
assertThat(i1Ack).hasValue(1);
}

@Test
void checkSimpleChainNegativeAcknowledgement() {
AtomicBoolean o1Ack = new AtomicBoolean();
AtomicBoolean o2Ack = new AtomicBoolean();
AtomicBoolean o1Nack = new AtomicBoolean();
AtomicBoolean o2Nack = new AtomicBoolean();
AtomicInteger i1Ack = new AtomicInteger();
AtomicInteger i1Nack = new AtomicInteger();

Message<String> o1 = Message.of("foo", () -> {
o1Ack.set(true);
return CompletableFuture.completedFuture(null);
}, t -> {
o1Nack.set(true);
return CompletableFuture.completedFuture(null);
});
Message<String> o2 = Message.of("bar", () -> {
o2Ack.set(true);
return CompletableFuture.completedFuture(null);
}, t -> {
o2Nack.set(true);
return CompletableFuture.completedFuture(null);
});

Message<Integer> i = Message.of(1, () -> {
i1Ack.incrementAndGet();
return CompletableFuture.completedFuture(null);
}, t -> {
i1Nack.incrementAndGet();
return CompletableFuture.completedFuture(null);
});

List<Message<?>> outcomes = Messages.chain(i).with(o1, o2);
assertThat(i1Ack).hasValue(0);
assertThat(o1Ack).isFalse();
assertThat(o2Ack).isFalse();
assertThat(i1Nack).hasValue(0);
assertThat(o1Nack).isFalse();
assertThat(o2Nack).isFalse();

outcomes.get(0).ack();
assertThat(i1Ack).hasValue(0);
assertThat(o1Ack).isTrue();
assertThat(o2Ack).isFalse();
assertThat(i1Nack).hasValue(0);
assertThat(o1Nack).isFalse();
assertThat(o2Nack).isFalse();

outcomes.get(0).nack(new Exception("boom"));
assertThat(i1Ack).hasValue(0);
assertThat(i1Nack).hasValue(0);

outcomes.get(1).nack(new Exception("boom"));
assertThat(i1Nack).hasValue(1);
assertThat(i1Ack).hasValue(0);
assertThat(o2Nack).isTrue();

outcomes.get(1).ack();
assertThat(i1Nack).hasValue(1);
assertThat(i1Ack).hasValue(0);
}

@Test
void testChainWithMetadataSelection() {
Message<Integer> i = Message.of(1)
.withMetadata(List.of(new NonMergeableMetadata("hello"), new MergeableMetadata("hello"),
new AnotherMetadata("hello")));

Message<String> m1 = Message.of("a");
AnotherMetadata am = new AnotherMetadata("hello");
Message<String> m2 = Message.of("b").addMetadata(am);

// No metadata copied from the original message
List<Message<?>> out = Messages.chain(i).withoutMetadata().with(m1, m2);
assertThat(out.get(0).getMetadata()).isEmpty();
assertThat(out.get(1).getMetadata()).hasSize(1).containsOnly(am);

// All metadata are copied from the original message
out = Messages.chain(i).with(m1, m2);
assertThat(out.get(0).getMetadata()).hasSize(3);
assertThat(out.get(1).getMetadata()).hasSize(3).doesNotContain(am);

// All metadata but MergeableMetadata are copied from the original message
out = Messages.chain(i).withoutMetadata(MergeableMetadata.class).with(m1, m2);
assertThat(out.get(0).getMetadata()).hasSize(2);
assertThat(out.get(1).getMetadata()).hasSize(2).doesNotContain(am);

// All metadata but AnotherMetadata are copied from the original message
out = Messages.chain(i).withoutMetadata(AnotherMetadata.class).with(m1, m2);
assertThat(out.get(0).getMetadata()).hasSize(2);
assertThat(out.get(1).getMetadata()).hasSize(3).contains(am);

out = Messages.chain(i).withoutMetadata().withMetadata(AnotherMetadata.class).with(m1, m2);
assertThat(out.get(0).getMetadata()).hasSize(1);
assertThat(out.get(1).getMetadata()).hasSize(1);

}

public static class NonMergeableMetadata {
String value;

Expand Down

0 comments on commit 7856940

Please sign in to comment.