diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index c56071e85adb..6a0df5897063 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -831,6 +831,16 @@ static void setupExternalBuilder( // We can expose dynamic read to external build when ReadFromKafkaDoFn is the default // implementation. builder.setDynamicRead(false); + + if (config.consumerPollingTimeout != null) { + if (config.consumerPollingTimeout <= 0) { + throw new IllegalArgumentException("consumerPollingTimeout should be > 0."); + } + builder.setConsumerPollingTimeout( + Duration.standardSeconds(config.consumerPollingTimeout)); + } else { + builder.setConsumerPollingTimeout(Duration.standardSeconds(2L)); + } } private static Coder resolveCoder(Class> deserializer) { @@ -893,6 +903,7 @@ public static class Configuration { private Long maxNumRecords; private Long maxReadTime; private Boolean commitOffsetInFinalize; + private Long consumerPollingTimeout; private String timestampPolicy; public void setConsumerConfig(Map consumerConfig) { @@ -934,6 +945,10 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) { public void setTimestampPolicy(String timestampPolicy) { this.timestampPolicy = timestampPolicy; } + + public void setConsumerPollingTimeout(Long consumerPollingTimeout) { + this.consumerPollingTimeout = consumerPollingTimeout; + } } } @@ -1341,8 +1356,9 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord } /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. - * The default is 2 second. + * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A + * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching + * enough (or any) records. The default is 2 seconds. */ public Read withConsumerPollingTimeout(Duration duration) { checkState( @@ -2386,8 +2402,9 @@ public ReadSourceDescriptors withBadRecordErrorHandler( } /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. - * The default is 2 second. + * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A + * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching + * enough (or any) records. The default is 2 seconds. */ public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { return toBuilder().setConsumerPollingTimeout(duration).build(); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index dd859af50864..246fdd80d739 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -107,7 +107,8 @@ public void testConstructKafkaRead() throws Exception { Field.of("value_deserializer", FieldType.STRING), Field.of("start_read_time", FieldType.INT64), Field.of("commit_offset_in_finalize", FieldType.BOOLEAN), - Field.of("timestamp_policy", FieldType.STRING))) + Field.of("timestamp_policy", FieldType.STRING), + Field.of("consumer_polling_timeout", FieldType.INT64))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) @@ -115,6 +116,7 @@ public void testConstructKafkaRead() throws Exception { .withFieldValue("start_read_time", startReadTime) .withFieldValue("commit_offset_in_finalize", false) .withFieldValue("timestamp_policy", "ProcessingTime") + .withFieldValue("consumer_polling_timeout", 5L) .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); @@ -265,6 +267,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception { expansionService.expand(request, observer); ExpansionApi.ExpansionResponse result = observer.result; RunnerApi.PTransform transform = result.getTransform(); + assertThat( transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*"))); diff --git a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java index 18708c560018..f69b9c3649b4 100644 --- a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java +++ b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java @@ -75,6 +75,7 @@ public class KafkaIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put( "getValueDeserializerProvider", "value_deserializer_provider"); READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", "check_stop_reading_fn"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", "consumer_polling_timeout"); } // A mapping from Write transform builder methods to the corresponding schema fields in diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index b96576b4efb3..e1aeab8d3a8c 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -93,7 +93,8 @@ ('value_deserializer', str), ('start_read_time', typing.Optional[int]), ('max_num_records', typing.Optional[int]), ('max_read_time', typing.Optional[int]), - ('commit_offset_in_finalize', bool), ('timestamp_policy', str)]) + ('commit_offset_in_finalize', bool), ('timestamp_policy', str), + ('consumer_polling_timeout', typing.Optional[int])]) def default_io_expansion_service(append_args=None): @@ -134,6 +135,7 @@ def __init__( max_read_time=None, commit_offset_in_finalize=False, timestamp_policy=processing_time_policy, + consumer_polling_timeout=None, with_metadata=False, expansion_service=None, ): @@ -159,6 +161,10 @@ def __init__( :param commit_offset_in_finalize: Whether to commit offsets when finalizing. :param timestamp_policy: The built-in timestamp policy which is used for extracting timestamp from KafkaRecord. + :param consumer_polling_timeout: Kafka client polling request + timeout time in seconds. A lower timeout optimizes for latency. Increase + the timeout if the consumer is not fetching any records. Default is 2 + seconds. :param with_metadata: whether the returned PCollection should contain Kafka related metadata or not. If False (default), elements of the returned PCollection will be of type 'bytes' if True, elements of the @@ -186,7 +192,8 @@ def __init__( max_read_time=max_read_time, start_read_time=start_read_time, commit_offset_in_finalize=commit_offset_in_finalize, - timestamp_policy=timestamp_policy)), + timestamp_policy=timestamp_policy, + consumer_polling_timeout=consumer_polling_timeout)), expansion_service or default_io_expansion_service())