Skip to content

Commit

Permalink
Add initial timeout property for KafkaRequestReply when using latest …
Browse files Browse the repository at this point in the history
…offset
  • Loading branch information
Malandril committed Sep 8, 2024
1 parent 68d260d commit 03f05cf
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
6 changes: 5 additions & 1 deletion documentation/src/main/docs/kafka/request-reply.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,13 @@ A snapshot of the list of pending replies is available through the `KafkaRequest
The requestor can be found in a position where a request is sent, and it's reply is already published to the reply topic,
before the requestor starts and polls the consumer.
In case the reply consumer is configured with `auto.offset.reset=latest`, which is the default value, this can lead to the requestor missing replies.

If `auto.offset.reset` is `latest`, at wiring time, before any request can take place, the `KafkaRequestReply`
finds partitions that the consumer needs to subscribe and waits for their assignment to the consumer.
On other occasons the `KafkaRequestReply#waitForAssignments` method can be used.
The timeout of the initial subscription can be adjusted with `reply.initial-assignment-timeout` which defaults to the `reply.timeout`.
If this timeout fails, `KafkaRequestReply` will enter an invalid state which will require it to be restarted.

On other occasions the `KafkaRequestReply#waitForAssignments` method can be used.

## Correlation Ids

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public interface KafkaRequestReply<Req, Rep> extends EmitterType {
*/
String REPLY_TIMEOUT_KEY = "reply.timeout";

/**
* The config key for the initial assignment timeout.
* This timeout is used at start when the {@code auto.offset.reset} is set to {@code latest}.
*/
String REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY = "reply.initial-assignment-timeout";

/**
* The config key for the correlation ID handler identifier.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class KafkaRequestReplyImpl<Req, Rep> extends MutinyEmitterImpl<Req>
private final KafkaSource<Object, Rep> replySource;
private final Set<TopicPartition> waitForPartitions;
private final boolean gracefulShutdown;
private final Duration initialAssignmentTimeout;
private Function<Message<Rep>, Message<Rep>> replyConverter;

public KafkaRequestReplyImpl(EmitterConfiguration config,
Expand Down Expand Up @@ -103,6 +104,8 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
this.replyTopic = consumerConfig.getTopic().orElse(null);
this.replyPartition = connectorConfig.getOptionalValue(REPLY_PARTITION_KEY, Integer.class).orElse(-1);
this.replyTimeout = Duration.ofMillis(connectorConfig.getOptionalValue(REPLY_TIMEOUT_KEY, Integer.class).orElse(5000));
this.initialAssignmentTimeout = Duration.ofMillis(connectorConfig
.getOptionalValue(REPLY_INITIAL_ASSIGNMENT_TIMEOUT_KEY, Integer.class).orElse((int) replyTimeout.toMillis()));

this.autoOffsetReset = consumerConfig.getAutoOffsetReset();
this.replyCorrelationIdHeader = connectorConfig.getOptionalValue(REPLY_CORRELATION_ID_HEADER_KEY, String.class)
Expand Down Expand Up @@ -151,8 +154,10 @@ private Set<TopicPartition> getWaitForPartitions(KafkaConnectorIncomingConfigura
@Override
public Flow.Publisher<Message<? extends Req>> getPublisher() {
return this.publisher
.plug(m -> "latest".equals(autoOffsetReset)
? m.onSubscription().call(() -> waitForAssignments().ifNoItem().after(replyTimeout).fail())
.plug(m -> "latest".equals(autoOffsetReset) ? m.onSubscription().call(() -> waitForAssignments()
.ifNoItem()
.after(initialAssignmentTimeout)
.fail())
: m)
.onTermination().invoke(this::complete);
}
Expand Down

0 comments on commit 03f05cf

Please sign in to comment.