Skip to content

Commit

Permalink
chore: add max poll interval on configOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
hcdias committed Oct 16, 2023
1 parent c0c368e commit 0428873
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 6 deletions.
4 changes: 4 additions & 0 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@

// An array of middlewares applied only for this consumer_group
'middlewares' => [],

// A max interval for consumer to make poll calls. That means: how much
// time we need to wait for poll calls until consider the consumer has inactive.
'max_poll_interval_ms' => 300000,
],

'producer' => [
Expand Down
4 changes: 2 additions & 2 deletions src/Connectors/Consumer/HighLevel.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ class HighLevel implements ConnectorInterface
public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): ConsumerInterface
{
$conf = $this->getConf($configOptions);
$maxPollIntervalMs = (int) $configOptions->getTimeout();
$maxPollIntervalMs = $configOptions->getMaxPollInterval();
$conf->set('group.id', $configOptions->getConsumerGroup());
$conf->set('auto.offset.reset', $configOptions->getOffsetReset());
if (!$autoCommit) {
$conf->set('enable.auto.commit', 'false');
}
$conf->set(
'max.poll.interval.ms',
$maxPollIntervalMs ?: 300000
$maxPollIntervalMs
);

$consumer = app(KafkaConsumer::class, ['conf' => $conf]);
Expand Down
4 changes: 2 additions & 2 deletions src/Connectors/Consumer/LowLevel.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class LowLevel implements ConnectorInterface
public function getConsumer(bool $autoCommit, ConfigOptions $configOptions): ConsumerInterface
{
$conf = $this->getConf();
$maxPollIntervalMs = (int) $configOptions->getTimeout();
$maxPollIntervalMs = $configOptions->getMaxPollInterval();
$conf->set(
'max.poll.interval.ms',
$maxPollIntervalMs ?: 300000
$maxPollIntervalMs
);
$conf->set('group.id', $configOptions->getConsumerGroup());
if (!$autoCommit) {
Expand Down
15 changes: 14 additions & 1 deletion src/TopicHandler/ConfigOptions/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class Consumer

private ?int $offset = null;

private int $maxPollInterval = 300000;

public function __construct(
string $topicId,
Broker $broker,
Expand All @@ -50,7 +52,8 @@ public function __construct(
int $timeout = 1000,
bool $autoCommit = true,
bool $commitASync = true,
string $offsetReset = 'smallest'
string $offsetReset = 'smallest',
int $maxPollInterval = 300000
) {
$this->broker = $broker;
$this->middlewares = $middlewares;
Expand All @@ -64,6 +67,7 @@ public function __construct(
$this->autoCommit = $autoCommit;
$this->commitASync = $commitASync;
$this->offsetReset = $offsetReset;
$this->maxPollInterval = $maxPollInterval;
}

public function getTimeout(): int
Expand Down Expand Up @@ -147,4 +151,13 @@ public function getOffset(): ?int
{
return $this->offset;
}
public function getMaxPollInterval(): int
{
return $this->maxPollInterval;
}

public function setMaxPollInterval(int $maxPollInterval): void
{
$this->maxPollInterval = $maxPollInterval;
}
}
5 changes: 5 additions & 0 deletions src/TopicHandler/ConfigOptions/Factories/ConsumerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ private static function convertConfigAttributes(array $consumerConfig): array
$consumerConfig['offsetReset'] = $consumerConfig['offset_reset'];
}


if (isset($consumerConfig['max_poll_interval_ms'])) {
$consumerConfig['maxPollInterval'] = $consumerConfig['max_poll_interval_ms'];
}

return $consumerConfig;
}
}
2 changes: 1 addition & 1 deletion tests/Unit/Console/ConsumerCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public function testItOverridesBrokerConnectionWhenCallingCommand(): void

$this->artisan($command, $parameters);
}

protected function setUp(): void
{
parent::setUp();
Expand All @@ -146,6 +145,7 @@ protected function setUp(): void
'offset_reset' => 'earliest',
'handler' => ConsumerHandlerDummy::class,
'timeout' => 123,
'max_poll_interval_ms' => 300000,
],
],
],
Expand Down

0 comments on commit 0428873

Please sign in to comment.