From 03f05cf82dc8bc4f550c4fb5f840ce89a923352d Mon Sep 17 00:00:00 2001 From: Thomas Canava Date: Sun, 8 Sep 2024 11:59:04 +0200 Subject: [PATCH] Add initial timeout property for KafkaRequestReply when using latest offset --- documentation/src/main/docs/kafka/request-reply.md | 6 +++++- .../messaging/kafka/reply/KafkaRequestReply.java | 6 ++++++ .../messaging/kafka/reply/KafkaRequestReplyImpl.java | 9 +++++++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/documentation/src/main/docs/kafka/request-reply.md b/documentation/src/main/docs/kafka/request-reply.md index 22ba07c8ae..8cf79f719f 100644 --- a/documentation/src/main/docs/kafka/request-reply.md +++ b/documentation/src/main/docs/kafka/request-reply.md @@ -89,9 +89,13 @@ A snapshot of the list of pending replies is available through the `KafkaRequest The requestor can be found in a position where a request is sent, and it's reply is already published to the reply topic, before the requestor starts and polls the consumer. In case the reply consumer is configured with `auto.offset.reset=latest`, which is the default value, this can lead to the requestor missing replies. + If `auto.offset.reset` is `latest`, at wiring time, before any request can take place, the `KafkaRequestReply` finds partitions that the consumer needs to subscribe and waits for their assignment to the consumer. -On other occasons the `KafkaRequestReply#waitForAssignments` method can be used. +The timeout of the initial subscription can be adjusted with `reply.initial-assignment-timeout` which defaults to the `reply.timeout`. +If this timeout fails, `KafkaRequestReply` will enter an invalid state which will require it to be restarted. + +On other occasions the `KafkaRequestReply#waitForAssignments` method can be used. ## Correlation Ids diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java index 23af6eb92e..651b002670 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java @@ -73,6 +73,12 @@ public interface KafkaRequestReply extends EmitterType { */ String REPLY_TIMEOUT_KEY = "reply.timeout"; + /** + * The config key for the initial assignment timeout. + * This timeout is used at start when the {@code auto.offset.reset} is set to {@code latest}. + */ + String REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY = "reply.initial-assignment-timeout"; + /** * The config key for the correlation ID handler identifier. *

diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 240fef87f8..13c001f113 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -75,6 +75,7 @@ public class KafkaRequestReplyImpl extends MutinyEmitterImpl private final KafkaSource replySource; private final Set waitForPartitions; private final boolean gracefulShutdown; + private final Duration initialAssignmentTimeout; private Function, Message> replyConverter; public KafkaRequestReplyImpl(EmitterConfiguration config, @@ -103,6 +104,8 @@ public KafkaRequestReplyImpl(EmitterConfiguration config, this.replyTopic = consumerConfig.getTopic().orElse(null); this.replyPartition = connectorConfig.getOptionalValue(REPLY_PARTITION_KEY, Integer.class).orElse(-1); this.replyTimeout = Duration.ofMillis(connectorConfig.getOptionalValue(REPLY_TIMEOUT_KEY, Integer.class).orElse(5000)); + this.initialAssignmentTimeout = Duration.ofMillis(connectorConfig + .getOptionalValue(REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY, Integer.class).orElse((int) replyTimeout.toMillis())); this.autoOffsetReset = consumerConfig.getAutoOffsetReset(); this.replyCorrelationIdHeader = connectorConfig.getOptionalValue(REPLY_CORRELATION_ID_HEADER_KEY, String.class) @@ -151,8 +154,10 @@ private Set getWaitForPartitions(KafkaConnectorIncomingConfigura @Override public Flow.Publisher> getPublisher() { return this.publisher - .plug(m -> "latest".equals(autoOffsetReset) - ? m.onSubscription().call(() -> waitForAssignments().ifNoItem().after(replyTimeout).fail()) + .plug(m -> "latest".equals(autoOffsetReset) ? m.onSubscription().call(() -> waitForAssignments() + .ifNoItem() + .after(initialAssignmentTimeout) + .fail()) : m) .onTermination().invoke(this::complete); }