diff --git a/config/kafka.php b/config/kafka.php index 3e7f0764..4ffe7459 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -44,6 +44,10 @@ // An array of middlewares applied only for this consumer_group 'middlewares' => [], + + // A max interval for consumer to make poll calls. That means: how much + // time we need to wait for poll calls until consider the consumer has inactive. + 'max_poll_interval_ms' => 300000, ], 'producer' => [ diff --git a/src/Connectors/Consumer/HighLevel.php b/src/Connectors/Consumer/HighLevel.php index 9c832ae0..ebdd09de 100644 --- a/src/Connectors/Consumer/HighLevel.php +++ b/src/Connectors/Consumer/HighLevel.php @@ -14,7 +14,7 @@ class HighLevel implements ConnectorInterface public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): ConsumerInterface { $conf = $this->getConf($configOptions); - $maxPollIntervalMs = (int) $configOptions->getTimeout(); + $maxPollIntervalMs = $configOptions->getMaxPollInterval(); $conf->set('group.id', $configOptions->getConsumerGroup()); $conf->set('auto.offset.reset', $configOptions->getOffsetReset()); if (!$autoCommit) { @@ -22,7 +22,7 @@ public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): Con } $conf->set( 'max.poll.interval.ms', - $maxPollIntervalMs ?: 300000 + $maxPollIntervalMs ); $consumer = app(KafkaConsumer::class, ['conf' => $conf]); diff --git a/src/Connectors/Consumer/LowLevel.php b/src/Connectors/Consumer/LowLevel.php index 30f4c277..da3c1360 100644 --- a/src/Connectors/Consumer/LowLevel.php +++ b/src/Connectors/Consumer/LowLevel.php @@ -15,10 +15,10 @@ class LowLevel implements ConnectorInterface public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): ConsumerInterface { $conf = $this->getConf(); - $maxPollIntervalMs = (int) $configOptions->getTimeout(); + $maxPollIntervalMs = $configOptions->getMaxPollInterval(); $conf->set( 'max.poll.interval.ms', - $maxPollIntervalMs ?: 300000 + $maxPollIntervalMs ); $conf->set('group.id', $configOptions->getConsumerGroup()); if (!$autoCommit) { diff --git a/src/TopicHandler/ConfigOptions/Consumer.php b/src/TopicHandler/ConfigOptions/Consumer.php index e2caa016..2019ed68 100644 --- a/src/TopicHandler/ConfigOptions/Consumer.php +++ b/src/TopicHandler/ConfigOptions/Consumer.php @@ -38,6 +38,8 @@ class Consumer private ?int $offset = null; + private int $maxPollInterval = 300000; + public function __construct( string $topicId, Broker $broker, @@ -50,7 +52,8 @@ public function __construct( int $timeout = 1000, bool $autoCommit = true, bool $commitASync = true, - string $offsetReset = 'smallest' + string $offsetReset = 'smallest', + int $maxPollInterval = 300000 ) { $this->broker = $broker; $this->middlewares = $middlewares; @@ -64,6 +67,7 @@ public function __construct( $this->autoCommit = $autoCommit; $this->commitASync = $commitASync; $this->offsetReset = $offsetReset; + $this->maxPollInterval = $maxPollInterval; } public function getTimeout(): int @@ -147,4 +151,13 @@ public function getOffset(): ?int { return $this->offset; } + public function getMaxPollInterval(): int + { + return $this->maxPollInterval; + } + + public function setMaxPollInterval(int $maxPollInterval): void + { + $this->maxPollInterval = $maxPollInterval; + } } diff --git a/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php b/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php index 2d28a28c..2f0896fb 100644 --- a/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php +++ b/src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php @@ -53,6 +53,11 @@ private static function convertConfigAttributes(array $consumerConfig): array $consumerConfig['offsetReset'] = $consumerConfig['offset_reset']; } + + if (isset($consumerConfig['max_poll_interval_ms'])) { + $consumerConfig['maxPollInterval'] = $consumerConfig['max_poll_interval_ms']; + } + return $consumerConfig; } } diff --git a/tests/Unit/Console/ConsumerCommandTest.php b/tests/Unit/Console/ConsumerCommandTest.php index c4737533..fd26cb27 100644 --- a/tests/Unit/Console/ConsumerCommandTest.php +++ b/tests/Unit/Console/ConsumerCommandTest.php @@ -131,7 +131,6 @@ public function testItOverridesBrokerConnectionWhenCallingCommand(): void $this->artisan($command, $parameters); } - protected function setUp(): void { parent::setUp(); @@ -146,6 +145,7 @@ protected function setUp(): void 'offset_reset' => 'earliest', 'handler' => ConsumerHandlerDummy::class, 'timeout' => 123, + 'max_poll_interval_ms' => 300000, ], ], ],