Skip to content

Commit

Permalink
Merge pull request #2342 from ozangunalp/publisher_decorator_emitter
Browse files Browse the repository at this point in the history
Decorate emitters with PublisherDecorators
  • Loading branch information
ozangunalp authored Nov 2, 2023
2 parents 95f6b30 + 2f759a1 commit e40d0a8
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -26,6 +27,7 @@
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

Expand Down Expand Up @@ -138,6 +140,7 @@ private class Transaction<R> implements TransactionalEmitter<T> {
private final Uni<Void> beforeAbort;
private final Function<Throwable, Uni<R>> afterAbort;

private final List<Uni<Void>> sendUnis = new CopyOnWriteArrayList<>();
private volatile boolean abort;

public Transaction() {
Expand All @@ -156,11 +159,8 @@ Uni<R> execute(Function<TransactionalEmitter<T>, Uni<R>> work) {
currentTransaction = this;
// If run on Vert.x context, `work` is called on the same context.
Context context = Vertx.currentContext();
Uni<Void> beginTx = producer.beginTransaction();
if (context != null) {
beginTx = beginTx.emitOn(runnable -> context.runOnContext(x -> runnable.run()));
}
return beginTx
return producer.beginTransaction()
.plug(u -> context == null ? u : u.emitOn(r -> VertxContext.runOnContext(context, r)))
.chain(() -> executeInTransaction(work))
.eventually(() -> currentTransaction = null);
}
Expand All @@ -169,6 +169,8 @@ private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> wo
//noinspection Convert2MethodRef
return Uni.createFrom().nullItem()
.chain(() -> work.apply(this))
// wait until all send operations are completed
.eventually(() -> waitOnSend())
// only flush() if the work completed with no exception
.call(() -> producer.flush())
// in the case of an exception or cancellation
Expand All @@ -183,6 +185,10 @@ private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> wo
.onItem().transformToUni(result -> afterCommit.apply(result));
}

private Uni<List<Void>> waitOnSend() {
return sendUnis.isEmpty() ? Uni.createFrom().nullItem() : Uni.join().all(sendUnis).andCollectFailures();
}

private Uni<Void> commit() {
return beforeCommit.call(producer::commitTransaction);
}
Expand All @@ -194,13 +200,18 @@ private Uni<Void> abort() {

@Override
public <M extends Message<? extends T>> void send(M msg) {
KafkaTransactionsImpl.this.send(msg.withNack(throwable -> CompletableFuture.completedFuture(null)));
CompletableFuture<Void> send = KafkaTransactionsImpl.this.sendMessage(msg)
.onFailure().invoke(KafkaLogging.log::unableToSendRecord)
.subscribeAsCompletionStage();
sendUnis.add(Uni.createFrom().completionStage(send));
}

@Override
public void send(T payload) {
KafkaTransactionsImpl.this.send(payload).subscribe().with(unused -> {
}, KafkaLogging.log::unableToSendRecord);
CompletableFuture<Void> send = KafkaTransactionsImpl.this.send(payload)
.onFailure().invoke(KafkaLogging.log::unableToSendRecord)
.subscribeAsCompletionStage();
sendUnis.add(Uni.createFrom().completionStage(send));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -19,6 +21,8 @@
import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.Tag;
import org.eclipse.microprofile.metrics.annotation.RegistryType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
Expand All @@ -33,10 +37,12 @@

public class SmallRyeMetricDecoratorTest extends WeldTestBase {

public static final List<String> TEST_MESSAGES = Arrays.asList("foo", "bar", "baz");

@Test
void testOnlyMetricDecoratorAvailable() {
weld.addExtensions(MetricCdiInjectionExtension.class);
runApplication(config(), MetricsTestBean.class);
runApplication(config(), MetricsTestEmitterBean.class);
Instance<PublisherDecorator> decorators = container.select(PublisherDecorator.class);
assertThat(decorators.select(MetricDecorator.class).isResolvable()).isTrue();
}
Expand All @@ -45,13 +51,26 @@ void testOnlyMetricDecoratorAvailable() {
public void testMicroProfileMetrics() {
weld.addExtensions(MetricCdiInjectionExtension.class);
MetricsTestBean bean = runApplication(config(), MetricsTestBean.class);

await().until(() -> bean.received().size() == 6);

assertEquals(MetricsTestBean.TEST_MESSAGES.size(), getCounter("source").getCount());
assertEquals(TEST_MESSAGES.size(), getCounter("source").getCount());

// Between source and sink, each message is duplicated so we expect double the count for sink
assertEquals(TEST_MESSAGES.size() * 2, getCounter("sink").getCount());
}

@Test
public void testMicroProfileMetricsWithEmitter() {
weld.addExtensions(MetricCdiInjectionExtension.class);
MetricsTestEmitterBean bean = runApplication(config(), MetricsTestEmitterBean.class);

resetCounters();
bean.sendMessages();
await().until(() -> bean.received().size() == 6);
assertEquals(TEST_MESSAGES.size(), getCounter("source").getCount());

// Between source and sink, each message is duplicated so we expect double the count for sink
assertEquals(MetricsTestBean.TEST_MESSAGES.size() * 2, getCounter("sink").getCount());
assertEquals(TEST_MESSAGES.size() * 2, getCounter("sink").getCount());
}

static MetricID metricID(String channelName) {
Expand All @@ -62,6 +81,20 @@ private MapBasedConfig config() {
return new MapBasedConfig().put("smallrye.messaging.metrics.mp.enabled", true);
}

private void resetCounters() {
MetricRegistry registry = container.select(MetricRegistry.class, RegistryTypeLiteral.BASE).get();
registry.getCounters().forEach((metricID, counter) -> {
try {
Field count = counter.getClass().getDeclaredField("count");
count.setAccessible(true);
LongAdder adder = (LongAdder) count.get(counter);
adder.reset();
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
});
}

private Counter getCounter(String channelName) {
MetricRegistry registry = container.select(MetricRegistry.class, RegistryTypeLiteral.BASE).get();
return registry.counter(metricID(channelName));
Expand All @@ -86,8 +119,6 @@ public MetricRegistry.Type type() {
@ApplicationScoped
public static class MetricsTestBean {

public static final List<String> TEST_MESSAGES = Arrays.asList("foo", "bar", "baz");

final List<String> received = new ArrayList<>();

@Inject
Expand Down Expand Up @@ -120,4 +151,35 @@ public List<String> received() {
return received;
}
}

@ApplicationScoped
public static class MetricsTestEmitterBean {

final List<String> received = new ArrayList<>();

@Inject
@Channel("source")
Emitter<String> emitter;

public void sendMessages() {
for (String msg : TEST_MESSAGES) {
emitter.send(msg);
}
}

@Incoming("source")
@Outgoing("sink")
public PublisherBuilder<String> duplicate(String input) {
return ReactiveStreams.of(input, input);
}

@Incoming("sink")
public void sink(String message) {
received.add(message);
}

public List<String> received() {
return received;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import io.smallrye.reactive.messaging.EmitterFactory;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.MessagePublisherProvider;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.providers.AbstractMediator;
import io.smallrye.reactive.messaging.providers.extension.*;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;

@ApplicationScoped
public class Wiring {
Expand All @@ -55,6 +57,10 @@ public class Wiring {
@Inject
Instance<SubscriberDecorator> subscriberDecorators;

@Any
@Inject
Instance<PublisherDecorator> publisherDecorators;

private final List<Component> components;

private Graph graph;
Expand Down Expand Up @@ -90,7 +96,8 @@ public void prepare(boolean strictMode, ChannelRegistry registry, List<EmitterCo
}

for (EmitterConfiguration emitter : emitters) {
components.add(new EmitterComponent(emitter, emitterFactories, defaultBufferSize, defaultBufferSizeLegacy));
components.add(new EmitterComponent(emitter, publisherDecorators, emitterFactories, defaultBufferSize,
defaultBufferSizeLegacy));
}

// At that point, the registry only contains connectors or managed channels
Expand Down Expand Up @@ -451,15 +458,19 @@ public void validate() throws WiringException {
static class EmitterComponent implements PublishingComponent, NoUpstreamComponent {

private final EmitterConfiguration configuration;
private final Instance<PublisherDecorator> decorators;
private final Instance<EmitterFactory<?>> emitterFactories;
private final Set<Component> downstreams = new LinkedHashSet<>();
private final int defaultBufferSize;
private final int defaultBufferSizeLegacy;

public EmitterComponent(EmitterConfiguration configuration, Instance<EmitterFactory<?>> emitterFactories,
public EmitterComponent(EmitterConfiguration configuration,
Instance<PublisherDecorator> decorators,
Instance<EmitterFactory<?>> emitterFactories,
int defaultBufferSize,
int defaultBufferSizeLegacy) {
this.configuration = configuration;
this.decorators = decorators;
this.emitterFactories = emitterFactories;
this.defaultBufferSize = defaultBufferSize;
this.defaultBufferSizeLegacy = defaultBufferSizeLegacy;
Expand Down Expand Up @@ -504,7 +515,10 @@ public void materialize(ChannelRegistry registry) {
private <T extends MessagePublisherProvider<?>> void registerEmitter(ChannelRegistry registry, int def) {
EmitterFactory<?> emitterFactory = getEmitterFactory(configuration.emitterType());
T emitter = (T) emitterFactory.createEmitter(configuration, def);
Publisher<? extends Message<?>> publisher = emitter.getPublisher();
Multi<? extends Message<?>> publisher = Multi.createFrom().publisher(emitter.getPublisher());
for (PublisherDecorator decorator : getSortedInstances(decorators)) {
publisher = decorator.decorate(publisher, configuration.name(), false);
}
Class<T> type = (Class<T>) configuration.emitterType().value();
registry.register(configuration.name(), type, emitter);
//noinspection ReactiveStreamsUnusedPublisher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ public void testDecorator() {
assertEquals(expected, collector.payloads());
}

@Test
public void testEmitterDecorator() {
addBeanClass(AppendingDecorator.class, EmitterBean.class);
initialize();

EmitterBean emitter = container.select(EmitterBean.class).get();
emitter.sendStrings();

MyCollector collector = container.select(MyCollector.class).get();

// Expect the values in the stream to have "-sink" appended by the decorator
List<String> expected = SimpleProducerBean.TEST_STRINGS.stream()
.map((s) -> s + "-sink")
.collect(Collectors.toList());

await().until(collector::payloads, hasSize(expected.size()));
assertEquals(expected, collector.payloads());
}

@Test
void testDeprecatedPublisherDecorator() {
addBeanClass(AppendingDeprecatedDecorator.class, SimpleProducerBean.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.smallrye.reactive.messaging.decorator;

import java.util.Arrays;
import java.util.List;

import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

public class EmitterBean {

public static final List<String> TEST_STRINGS = Arrays.asList("foo", "bar", "baz");

@Inject
@Channel("sink")
Emitter<String> emitter;

public void sendStrings() {
for (String msg : TEST_STRINGS) {
emitter.send(msg);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.smallrye.reactive.messaging.providers.metrics;

import java.util.Arrays;
import java.util.List;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

@ApplicationScoped
public class EmitterMetricsTestBean {

public static final List<String> TEST_MESSAGES = Arrays.asList("foo", "bar", "baz");

@Inject
@Channel("source")
Emitter<String> emitter;

public void sendMessages() {
for (String msg : TEST_MESSAGES) {
emitter.send(msg);
}
}

@Incoming("source")
@Outgoing("sink")
public PublisherBuilder<String> duplicate(String input) {
return ReactiveStreams.of(input, input);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ public void testMetrics() {
assertEquals(MetricsTestBean.TEST_MESSAGES.size() * 2, getCounter("sink").count());
}

@Test
public void testMetricsEmitter() {
releaseConfig();
addBeanClass(EmitterMetricsTestBean.class);
initialize();

MyCollector collector = container.select(MyCollector.class).get();
EmitterMetricsTestBean emitter = container.select(EmitterMetricsTestBean.class).get();

emitter.sendMessages();

await().until(() -> collector.messages().size() == 6);

assertEquals(MetricsTestBean.TEST_MESSAGES.size(), getCounter("source").count());

// Between source and sink, each message is duplicated so we expect double the count for sink
assertEquals(MetricsTestBean.TEST_MESSAGES.size() * 2, getCounter("sink").count());
}

private Counter getCounter(String channelName) {
return Metrics.counter("mp.messaging.message.count", "channel", channelName);
}
Expand Down

0 comments on commit e40d0a8

Please sign in to comment.