Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Kafka channel instead of producer in DLQ and Delayed retry topic #2565

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static io.smallrye.reactive.messaging.providers.wiring.Wiring.wireOutgoingConnectorToUpstream;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
Expand All @@ -13,6 +14,7 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -24,21 +26,24 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSink;
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.vertx.mutiny.core.Vertx;
Expand All @@ -55,17 +60,18 @@ public class KafkaDeadLetterQueue implements KafkaFailureHandler {
public static final String DEAD_LETTER_OFFSET = "dead-letter-offset";
public static final String DEAD_LETTER_PARTITION = "dead-letter-partition";

public static final String CHANNEL_DLQ_SUFFIX = "dead-letter-queue";

private final String channel;
private final KafkaProducer producer;
private final KafkaSink dlqSink;
private final UnicastProcessor<Message<?>> dlqSource;
private final String topic;
private final BiConsumer<Throwable, Boolean> reportFailure;

public KafkaDeadLetterQueue(String channel, String topic, KafkaProducer producer,
BiConsumer<Throwable, Boolean> reportFailure) {
public KafkaDeadLetterQueue(String channel, String topic, KafkaSink dlqSink, UnicastProcessor<Message<?>> dlqSource) {
this.channel = channel;
this.topic = topic;
this.producer = producer;
this.reportFailure = reportFailure;
this.dlqSink = dlqSink;
this.dlqSource = dlqSource;
}

@ApplicationScoped
Expand All @@ -90,6 +96,12 @@ public static class Factory implements KafkaFailureHandler.Factory {
@Any
Instance<Map<String, Object>> configurations;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Inject
Instance<SubscriberDecorator> subscriberDecorators;

@Override
public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
Vertx vertx,
Expand All @@ -101,7 +113,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue",
KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX,
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
Expand All @@ -118,11 +130,12 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
log.deadLetterConfig(producerConfig.getTopic().orElse(null), producerConfig.getKeySerializer(),
producerConfig.getValueSerializer());

// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(producerConfig,
serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p));

return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, producer, reportFailure);
UnicastProcessor<Message<?>> processor = UnicastProcessor.create();
KafkaSink kafkaSink = new KafkaSink(producerConfig, kafkaCDIEvents, openTelemetryInstance,
serializationFailureHandlers, producerInterceptors);
wireOutgoingConnectorToUpstream(processor, kafkaSink.getSink(), subscriberDecorators,
producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX);
return new KafkaDeadLetterQueue(config.getChannel(), deadQueueTopic, kafkaSink, processor);
}
}

Expand Down Expand Up @@ -182,10 +195,14 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso
// remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume
dead.headers().remove(DESERIALIZATION_FAILURE_DLQ);
log.messageNackedDeadLetter(channel, topic);
return producer.send(dead)
.onFailure().invoke(t -> reportFailure.accept((Throwable) t, true))
.onItem().ignore().andContinueWithNull()
.chain(() -> Uni.createFrom().completionStage(record.ack()))
CompletableFuture<Void> future = new CompletableFuture<>();
dlqSource.onNext(record.withPayload(dead)
.withAck(() -> record.ack().thenAccept(__ -> future.complete(null)))
.withNack(throwable -> {
future.completeExceptionally(throwable);
return future;
}));
return Uni.createFrom().completionStage(future)
.emitOn(record::runOnMessageContext);
}

Expand All @@ -195,6 +212,6 @@ void addHeader(ProducerRecord<?, ?> record, String key, String value) {

@Override
public void terminate() {
producer.close();
dlqSink.closeQuietly();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.smallrye.reactive.messaging.kafka.fault;

import static io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler.DESERIALIZATION_FAILURE_DLQ;
import static io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue.CHANNEL_DLQ_SUFFIX;
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;
import static io.smallrye.reactive.messaging.providers.wiring.Wiring.wireOutgoingConnectorToUpstream;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
Expand All @@ -21,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -38,26 +41,29 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.SubscriberDecorator;
import io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.SerializationFailureHandler;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaLatestCommit;
import io.smallrye.reactive.messaging.kafka.impl.ConfigHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSink;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer;
import io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer;
import io.smallrye.reactive.messaging.providers.impl.ConnectorConfig;
import io.smallrye.reactive.messaging.providers.impl.OverrideConnectorConfig;
import io.vertx.core.impl.VertxInternal;
Expand Down Expand Up @@ -107,6 +113,12 @@ public static class Factory implements KafkaFailureHandler.Factory {
@Any
Instance<Map<String, Object>> configurations;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Inject
Instance<SubscriberDecorator> subscriberDecorators;

@Override
public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
Vertx vertx,
Expand All @@ -128,7 +140,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

String consumerClientId = (String) consumer.configuration().get(CLIENT_ID_CONFIG);
ConnectorConfig connectorConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "dead-letter-queue",
KafkaConnector.CONNECTOR_NAME, config.getChannel(), CHANNEL_DLQ_SUFFIX,
Map.of(KEY_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(keyDeserializer),
VALUE_SERIALIZER_CLASS_CONFIG, c -> getMirrorSerializer(valueDeserializer),
CLIENT_ID_CONFIG, c -> config.getDeadLetterQueueProducerClientId()
Expand All @@ -142,9 +154,11 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,

log.delayedRetryTopic(config.getChannel(), retryTopics, maxRetries, retryTimeout, deadQueueTopic);

// fire producer event (e.g. bind metrics)
ReactiveKafkaProducer<Object, Object> producer = new ReactiveKafkaProducer<>(producerConfig,
serializationFailureHandlers, producerInterceptors, null, (p, c) -> kafkaCDIEvents.producer().fire(p));
UnicastProcessor<Message<?>> processor = UnicastProcessor.create();
KafkaSink kafkaSink = new KafkaSink(producerConfig, kafkaCDIEvents, openTelemetryInstance,
serializationFailureHandlers, producerInterceptors);
wireOutgoingConnectorToUpstream(processor, kafkaSink.getSink(), subscriberDecorators,
producerConfig.getChannel() + "-" + CHANNEL_DLQ_SUFFIX);

ConnectorConfig retryConsumerConfig = new OverrideConnectorConfig(INCOMING_PREFIX, rootConfig.get(),
KafkaConnector.CONNECTOR_NAME, config.getChannel(), "delayed-retry-topic.consumer",
Expand All @@ -153,6 +167,7 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
GROUP_ID_CONFIG, c -> "kafka-delayed-retry-topic-" + consumerClientId));
Config retryKafkaConfig = ConfigHelper.retrieveChannelConfiguration(configurations, retryConsumerConfig);
KafkaConnectorIncomingConfiguration retryConfig = new KafkaConnectorIncomingConfiguration(retryKafkaConfig);

ReactiveKafkaConsumer<Object, Object> retryConsumer = new ReactiveKafkaConsumer<>(retryConfig,
deserializationFailureHandlers,
retryConsumerConfig.getValue(GROUP_ID_CONFIG, String.class), -1,
Expand All @@ -161,29 +176,29 @@ public KafkaFailureHandler create(KafkaConnectorIncomingConfiguration config,
c -> kafkaCDIEvents.consumer().fire(c));

return new KafkaDelayedRetryTopic(config.getChannel(), vertx, config, retryTopics, maxRetries, retryTimeout,
deadQueueTopic, producer, retryConsumer, reportFailure);
deadQueueTopic, processor, kafkaSink, retryConsumer);
}
}

private final String channel;
private final Vertx vertx;
private final KafkaConnectorIncomingConfiguration configuration;
private final String deadQueueTopic;
private final KafkaProducer producer;
private final UnicastProcessor<Message<?>> dlqSource;
private final KafkaSink dlqSink;
private final ReactiveKafkaConsumer consumer;
private final List<String> retryTopics;
private final int maxRetries;
private final long retryTimeout;
private final BiConsumer<Throwable, Boolean> reportFailure;

public KafkaDelayedRetryTopic(String channel, Vertx vertx, KafkaConnectorIncomingConfiguration configuration,
List<String> retryTopics,
int maxRetries,
long retryTimeout,
String deadQueueTopic,
KafkaProducer producer,
ReactiveKafkaConsumer consumer,
BiConsumer<Throwable, Boolean> reportFailure) {
UnicastProcessor<Message<?>> dlqSource,
KafkaSink dlqSink,
ReactiveKafkaConsumer consumer) {
super(vertx, configuration.config()
.getOptionalValue(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, Integer.class).orElse(60000));
this.channel = channel;
Expand All @@ -193,9 +208,9 @@ public KafkaDelayedRetryTopic(String channel, Vertx vertx, KafkaConnectorIncomin
this.maxRetries = maxRetries;
this.retryTimeout = retryTimeout;
this.deadQueueTopic = deadQueueTopic;
this.producer = producer;
this.dlqSource = dlqSource;
this.dlqSink = dlqSink;
this.consumer = consumer;
this.reportFailure = reportFailure;
}

public static String getRetryTopic(String topic, int delayMillis) {
Expand Down Expand Up @@ -292,10 +307,14 @@ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reaso
// remove DESERIALIZATION_FAILURE_DLQ header to prevent unconditional DQL in next consume
retry.headers().remove(DESERIALIZATION_FAILURE_DLQ);
log.delayedRetryNack(channel, topic);
return producer.send(retry)
.onFailure().invoke(t -> reportFailure.accept((Throwable) t, true))
.onItem().ignore().andContinueWithNull()
.chain(() -> Uni.createFrom().completionStage(record.ack()))
CompletableFuture<Void> future = new CompletableFuture<>();
dlqSource.onNext(record.withPayload(retry)
.withAck(() -> record.ack().thenAccept(__ -> future.complete(null)))
.withNack(throwable -> {
future.completeExceptionally(throwable);
return future;
}));
return Uni.createFrom().completionStage(future)
.emitOn(record::runOnMessageContext);
}

Expand Down Expand Up @@ -324,7 +343,7 @@ private static void setTimestampHeader(Headers headers, String key, long timesta

@Override
public void terminate() {
producer.close();
dlqSink.closeQuietly();
consumer.close();
}

Expand Down
Loading
Loading