Skip to content

Commit

Permalink
#2519 Fix: PulsarOutgoingChannel tracing properties propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
michalcukierman committed Mar 13, 2024
1 parent 756a3e8 commit f694fab
Showing 1 changed file with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.pulsar.PulsarConnector;
import io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata;
import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.json.Json;
Expand Down Expand Up @@ -82,17 +83,26 @@ static void shutdown() {
GlobalOpenTelemetry.resetForTest();
}

@SuppressWarnings("ConstantConditions")
@Test
public void testFromAppToPulsar() throws PulsarClientException {
public void testFromAppGeneratingDataToPulsar() throws PulsarClientException {
testFromAppToPulsar(MyAppGeneratingData.class);
}

@Test
public void testFromAppGeneratingPulsarDataToPulsar() throws PulsarClientException {
testFromAppToPulsar(MyAppGeneratingPulsarData.class);
}

@SuppressWarnings("ConstantConditions")
public void testFromAppToPulsar(Class<?> applicationClass) throws PulsarClientException {
List<org.apache.pulsar.client.api.Message<Integer>> messages = new ArrayList<>();
receive(client.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(topic + "-consumer")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe(), 10, messages::add);

runApplication(getConfigForMyAppGeneratingData(), MyAppGeneratingData.class);
runApplication(getConfigForMyAppGeneratingData(), applicationClass);

await().until(() -> messages.size() >= 10);
List<Integer> values = new ArrayList<>();
Expand Down Expand Up @@ -379,6 +389,18 @@ public Flow.Publisher<Message<String>> source() {
}
}

@ApplicationScoped
public static class MyAppGeneratingPulsarData {
@Outgoing("pulsar")
public Flow.Publisher<Message<Integer>> source() {
return Multi.createFrom().range(0, 10)
.map(Message::of)
.map(m -> m.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withEventTime(1)
.build()));
}
}

@ApplicationScoped
public static class MyAppProcessingData {
@Incoming("source")
Expand Down

0 comments on commit f694fab

Please sign in to comment.