From d2397aea109b3c1c18812580fa64212d232b3173 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 4 Apr 2024 13:19:53 +0200 Subject: [PATCH] Use Kafka channel instead of producer in DLQ and delayed retry topic --- .../messaging/aws/sqs/SqsMessage.java | 1 - .../kafka/fault/KafkaDeadLetterQueue.java | 55 +++++++++++------- .../kafka/fault/KafkaDelayedRetryTopic.java | 57 ++++++++++++------- .../kafka/fault/KafkaFailureHandlerTest.java | 41 ++++++++++++- .../messaging/providers/wiring/Wiring.java | 17 ++++-- 5 files changed, 126 insertions(+), 45 deletions(-) diff --git a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java index d944d314b8..ab6206ab87 100644 --- a/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java +++ b/smallrye-reactive-messaging-aws-sqs/src/main/java/io/smallrye/reactive/messaging/aws/sqs/SqsMessage.java @@ -14,7 +14,6 @@ import io.smallrye.reactive.messaging.json.JsonMapping; import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; - import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java index 5800d13991..477722baa6 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.java @@ -2,6 +2,7 @@ import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; +import static io.smallrye.reactive.messaging.providers.wiring.Wiring.wireOutgoingConnectorToUpstream; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; @@ -13,6 +14,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import jakarta.enterprise.context.ApplicationScoped; @@ -24,21 +26,24 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor; +import io.smallrye.reactive.messaging.SubscriberDecorator; import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; import io.smallrye.reactive.messaging.kafka.KafkaConnector; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; -import io.smallrye.reactive.messaging.kafka.KafkaProducer; import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler; import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper; -import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer; +import io.smallrye.reactive.messaging.kafka.impl.KafkaSink; import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig; import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig; import io.vertx.mutiny.core.Vertx; @@ -55,17 +60,18 @@ public class KafkaDeadLetterQueue implements KafkaFailureHandler { public static final String DEAD_LETTER_OFFSET = "dead-letter-offset"; public static final String DEAD_LETTER_PARTITION = "dead-letter-partition"; + public static final String CHANNEL_DLQ_SUFFIX = "dead-letter-queue"; + private final String channel; - private final KafkaProducer producer; + private final KafkaSink dlqSink; + private final UnicastProcessor> dlqSource; private final String topic; - private final BiConsumer reportFailure; - public KafkaDeadLetterQueue(String channel, String topic, KafkaProducer producer, - BiConsumer reportFailure) { + public KafkaDeadLetterQueue(String channel, String topic, KafkaSink dlqSink, UnicastProcessor> dlqSource) { this.channel = channel; this.topic = topic; - this.producer = producer; - this.reportFailure = reportFailure; + this.dlqSink = dlqSink; + this.dlqSource = dlqSource; } @ApplicationScoped @@ -90,6 +96,12 @@ public static class Factory implements KafkaFailureHandler.Factory { @Any Instance> configurations; + @Inject + Instance openTelemetryInstance; + + @Inject + Instance subscriberDecorators; + @Override public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, Vertx vertx, @@ -101,7 +113,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG); ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), - KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue", + KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX, Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer), CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId() @@ -118,11 +130,12 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, log.deadLetterConfig(producerConfig.getTopic().orElse(null), producerConfig.getKeySerializer(), producerConfig.getValueSerializer()); - // fire producer event (e.g. bind metrics) - ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(producerConfig, - serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p)); - - return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure); + UnicastProcessor> processor = UnicastProcessor.create(); + KafkaSink kafkaSink = new KafkaSink(producerConfig, kafkaCDIEvents, openTelemetryInstance, + serializationFailureHandlers, producerInterceptors); + wireOutgoingConnectorToUpstream(processor, kafkaSink.getSink(), subscriberDecorators, + producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX); + return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, kafkaSink, processor); } } @@ -182,10 +195,14 @@ public Uni handle(IncomingKafkaRecord record, Throwable reaso // remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume dead.headers().remove(DESERIALIZATION_FAILURE_DLQ); log.messageNackedDeadLetter(channel, topic); - return producer.send(dead) - .onFailure().invoke(t -> reportFailure.accept((Throwable) t, true)) - .onItem().ignore().andContinueWithNull() - .chain(() -> Uni.createFrom().completionStage(record.ack())) + CompletableFuture future = new CompletableFuture<>(); + dlqSource.onNext(record.withPayload(dead) + .withAck(() -> record.ack().thenAccept(__ -> future.complete(null))) + .withNack(throwable -> { + future.completeExceptionally(throwable); + return future; + })); + return Uni.createFrom().completionStage(future) .emitOn(record::runOnMessageContext); } @@ -195,6 +212,6 @@ void addHeader(ProducerRecord record, String key, String value) { @Override public void terminate() { - producer.close(); + dlqSink.closeQuietly(); } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java index 1d482150ba..6aa653d505 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/fault/KafkaDelayedRetryTopic.java @@ -1,7 +1,9 @@ package io.smallrye.reactive.messaging.kafka.fault; import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ; +import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.CHANNEL_DLQ_SUFFIX; import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log; +import static io.smallrye.reactive.messaging.providers.wiring.Wiring.wireOutgoingConnectorToUpstream; import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; @@ -21,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -38,11 +41,15 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.common.annotation.Identifier; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor; +import io.smallrye.reactive.messaging.SubscriberDecorator; import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler; import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents; @@ -50,14 +57,13 @@ import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; -import io.smallrye.reactive.messaging.kafka.KafkaProducer; import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler; import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.commit.ContextHolder; import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit; import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper; +import io.smallrye.reactive.messaging.kafka.impl.KafkaSink; import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer; -import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer; import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig; import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig; import io.vertx.core.impl.VertxInternal; @@ -107,6 +113,12 @@ public static class Factory implements KafkaFailureHandler.Factory { @Any Instance> configurations; + @Inject + Instance openTelemetryInstance; + + @Inject + Instance subscriberDecorators; + @Override public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, Vertx vertx, @@ -128,7 +140,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG); ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), - KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue", + KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX, Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer), VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer), CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId() @@ -142,9 +154,11 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic); - // fire producer event (e.g. bind metrics) - ReactiveKafkaProducer producer = new ReactiveKafkaProducer<>(producerConfig, - serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p)); + UnicastProcessor> processor = UnicastProcessor.create(); + KafkaSink kafkaSink = new KafkaSink(producerConfig, kafkaCDIEvents, openTelemetryInstance, + serializationFailureHandlers, producerInterceptors); + wireOutgoingConnectorToUpstream(processor, kafkaSink.getSink(), subscriberDecorators, + producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX); ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(), KafkaConnector.CONNECTOR_NAME, config.getChannel(), "delayed-retry-topic.consumer", @@ -153,6 +167,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId)); Config retryKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, retryConsumerConfig); KafkaConnectorIncomingConfiguration retryConfig = new KafkaConnectorIncomingConfiguration(retryKafkaConfig); + ReactiveKafkaConsumer retryConsumer = new ReactiveKafkaConsumer<>(retryConfig, deserializationFailureHandlers, retryConsumerConfig.getValue(GROUP_ID_CONFIG, String.class), -1, @@ -161,7 +176,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, c -> kafkaCDIEvents.consumer().fire(c)); return new KafkaDelayedRetryTopic(config.getChannel(), vertx, config, retryTopics, maxRetries, retryTimeout, - deadQueueTopic, producer, retryConsumer, reportFailure); + deadQueueTopic, processor, kafkaSink, retryConsumer); } } @@ -169,21 +184,21 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config, private final Vertx vertx; private final KafkaConnectorIncomingConfiguration configuration; private final String deadQueueTopic; - private final KafkaProducer producer; + private final UnicastProcessor> dlqSource; + private final KafkaSink dlqSink; private final ReactiveKafkaConsumer consumer; private final List retryTopics; private final int maxRetries; private final long retryTimeout; - private final BiConsumer reportFailure; public KafkaDelayedRetryTopic(String channel, Vertx vertx, KafkaConnectorIncomingConfiguration configuration, List retryTopics, int maxRetries, long retryTimeout, String deadQueueTopic, - KafkaProducer producer, - ReactiveKafkaConsumer consumer, - BiConsumer reportFailure) { + UnicastProcessor> dlqSource, + KafkaSink dlqSink, + ReactiveKafkaConsumer consumer) { super(vertx, configuration.config() .getOptionalValue(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Integer.class).orElse(60000)); this.channel = channel; @@ -193,9 +208,9 @@ public KafkaDelayedRetryTopic(String channel, Vertx vertx, KafkaConnectorIncomin this.maxRetries = maxRetries; this.retryTimeout = retryTimeout; this.deadQueueTopic = deadQueueTopic; - this.producer = producer; + this.dlqSource = dlqSource; + this.dlqSink = dlqSink; this.consumer = consumer; - this.reportFailure = reportFailure; } public static String getRetryTopic(String topic, int delayMillis) { @@ -292,10 +307,14 @@ public Uni handle(IncomingKafkaRecord record, Throwable reaso // remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume retry.headers().remove(DESERIALIZATION_FAILURE_DLQ); log.delayedRetryNack(channel, topic); - return producer.send(retry) - .onFailure().invoke(t -> reportFailure.accept((Throwable) t, true)) - .onItem().ignore().andContinueWithNull() - .chain(() -> Uni.createFrom().completionStage(record.ack())) + CompletableFuture future = new CompletableFuture<>(); + dlqSource.onNext(record.withPayload(retry) + .withAck(() -> record.ack().thenAccept(__ -> future.complete(null))) + .withNack(throwable -> { + future.completeExceptionally(throwable); + return future; + })); + return Uni.createFrom().completionStage(future) .emitOn(record::runOnMessageContext); } @@ -324,7 +343,7 @@ private static void setTimestampHeader(Headers headers, String key, long timesta @Override public void terminate() { - producer.close(); + dlqSink.closeQuietly(); consumer.close(); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java index 3d1bb0b009..794a651f74 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/fault/KafkaFailureHandlerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import org.junit.jupiter.api.Test; import io.smallrye.mutiny.Uni; @@ -49,6 +50,10 @@ import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask; +import io.smallrye.reactive.messaging.observation.DefaultMessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservation; +import io.smallrye.reactive.messaging.observation.MessageObservationCollector; +import io.smallrye.reactive.messaging.observation.ObservationContext; public class KafkaFailureHandlerTest extends KafkaCompanionTestBase { @@ -125,6 +130,7 @@ public void testIgnoreStrategyWithPayload() { @Test public void testDeadLetterQueueStrategyWithDefaultTopic() { + addBeans(MyObservationCollector.class); ConsumerTask records = companion.consumeIntegers().fromTopics("dead-letter-topic-kafka", 3); @@ -154,6 +160,35 @@ public void testDeadLetterQueueStrategyWithDefaultTopic() { assertThat(bean.consumers()).isEqualTo(1); assertThat(bean.producers()).isEqualTo(1); + + MyObservationCollector collector = get(MyObservationCollector.class); + await().untilAsserted(() -> assertThat(collector.observed()).hasSize(3) + .allSatisfy(MessageObservation::isDone)); + } + + @ApplicationScoped + public static class MyObservationCollector implements MessageObservationCollector { + + List observed = new CopyOnWriteArrayList<>(); + + @Override + public ObservationContext initObservation(String channel, boolean incoming, boolean emitter) { + if (incoming) { + return null; + } + return ObservationContext.DEFAULT; + } + + @Override + public MessageObservation onNewMessage(String channel, Message message, ObservationContext observationContext) { + DefaultMessageObservation observation = new DefaultMessageObservation(channel); + observed.add(observation); + return observation; + } + + public List observed() { + return observed; + } } @Test @@ -433,7 +468,7 @@ public void testDeadLetterQueueStrategyWithCustomConfig() { @Test public void testDelayedRetryStrategy() { - addBeans(KafkaDelayedRetryTopic.Factory.class); + addBeans(KafkaDelayedRetryTopic.Factory.class, MyObservationCollector.class); List delayedRetryTopics = List.of(getRetryTopic(topic, 2000), getRetryTopic(topic, 4000)); MyReceiverBean bean = runApplication(getDelayedRetryConfig(topic, delayedRetryTopics), MyReceiverBean.class); await().until(this::isReady); @@ -470,6 +505,10 @@ public void testDelayedRetryStrategy() { assertThat(bean.consumers()).isEqualTo(2L); assertThat(bean.producers()).isEqualTo(1); + + MyObservationCollector collector = get(MyObservationCollector.class); + await().untilAsserted(() -> assertThat(collector.observed()).hasSize(9) + .allSatisfy(MessageObservation::isDone)); } @Test diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java index 6dfe66a10c..b7cc4227b8 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java @@ -432,11 +432,7 @@ public void materialize(ChannelRegistry registry) { } // TODO Improve this. Flow.Subscriber connector = registry.getSubscribers(name).get(0); - for (SubscriberDecorator decorator : getSortedInstances(subscriberDecorators)) { - merged = decorator.decorate(merged, Collections.singletonList(name), true); - } - // The connector will cancel the subscription. - merged.subscribe().withSubscriber(connector); + wireOutgoingConnectorToUpstream(merged, connector, subscriberDecorators, name); } @Override @@ -1096,4 +1092,15 @@ public String toString() { } } + public static void wireOutgoingConnectorToUpstream(Multi> multi, + Flow.Subscriber outgoingConnector, + Instance subscriberDecorators, String channelName) { + List channelNames = Collections.singletonList(channelName); + for (SubscriberDecorator decorator : getSortedInstances(subscriberDecorators)) { + multi = decorator.decorate(multi, channelNames, true); + } + // The connector will cancel the subscription. + multi.subscribe().withSubscriber(outgoingConnector); + } + }