Skip to content

Commit

Permalink
fix: add catch to any throwable in consumer manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ssgoncalves authored and hcdias committed Oct 10, 2023
1 parent 1de2aaf commit 1da7001
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 47 deletions.
7 changes: 3 additions & 4 deletions src/Connectors/Consumer/Manager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

namespace Metamorphosis\Connectors\Consumer;

use Exception;
use Metamorphosis\Consumers\ConsumerInterface;
use Metamorphosis\Exceptions\ResponseTimeoutException;
use Metamorphosis\Exceptions\ResponseWarningException;
use Metamorphosis\Middlewares\Handler\Dispatcher;
use Metamorphosis\Record\ConsumerRecord;
use Metamorphosis\TopicHandler\Consumer\Handler as ConsumerHandler;
use RdKafka\Message;
use Throwable;

class Manager
{
Expand Down Expand Up @@ -65,9 +65,8 @@ public function handleMessage(): void
$this->consumerHandler->warning($exception);

return;
} catch (Exception $exception) {
$this->consumerHandler->failed($exception);

} catch (Throwable $throwable) {
$this->consumerHandler->failed($throwable);
return;
}

Expand Down
5 changes: 2 additions & 3 deletions src/TopicHandler/Consumer/AbstractHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Metamorphosis\TopicHandler\Consumer;

use Exception;
use Metamorphosis\Exceptions\ResponseWarningException;
use Metamorphosis\TopicHandler\ConfigOptions\Consumer as ConsumerConfigOptions;
use Throwable;

abstract class AbstractHandler implements Handler
{
Expand All @@ -25,11 +25,10 @@ public function __construct(?ConsumerConfigOptions $configOptions = null)
public function warning(ResponseWarningException $exception): void
{
}

/**
* @phpcsSuppress SlevomatCodingStandard.Functions.UnusedParameter.UnusedParameter
*/
public function failed(Exception $exception): void
public function failed(Throwable $exception): void
{
}

Expand Down
4 changes: 2 additions & 2 deletions src/TopicHandler/Consumer/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Metamorphosis\TopicHandler\Consumer;

use Exception;
use Metamorphosis\Exceptions\ResponseWarningException;
use Metamorphosis\Record\RecordInterface;
use Throwable;

interface Handler
{
Expand All @@ -26,5 +26,5 @@ public function finished(): void;
/**
* Handle failure process.
*/
public function failed(Exception $exception): void;
public function failed(Throwable $throwable): void;
}
6 changes: 3 additions & 3 deletions tests/Integration/Dummies/MessageConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

namespace Tests\Integration\Dummies;

use Exception;
use Illuminate\Support\Facades\Log;
use Metamorphosis\Exceptions\ResponseWarningException;
use Metamorphosis\Record\RecordInterface;
use Metamorphosis\TopicHandler\Consumer\AbstractHandler;
use Throwable;

class MessageConsumer extends AbstractHandler
{
Expand All @@ -24,10 +24,10 @@ public function warning(ResponseWarningException $exception): void
]);
}

public function failed(Exception $exception): void
public function failed(Throwable $throwable): void
{
Log::error('Failed to handle kafka record for sku.', [
'exception' => $exception,
'exception' => $throwable,
]);
}
}
63 changes: 30 additions & 33 deletions tests/Unit/Connectors/Consumer/ManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

namespace Tests\Unit\Connectors\Consumer;

use Error;
use Exception;
use InvalidArgumentException;
use Metamorphosis\Connectors\Consumer\Manager;
use Metamorphosis\Consumers\ConsumerInterface;
use Metamorphosis\Exceptions\ResponseTimeoutException;
Expand All @@ -13,6 +15,9 @@
use Mockery as m;
use RdKafka\Message as KafkaMessage;
use Tests\LaravelTestCase;
use Tests\Unit\Dummies\ConsumerHandlerDummy;
use Throwable;
use TypeError;

class ManagerTest extends LaravelTestCase
{
Expand Down Expand Up @@ -48,27 +53,11 @@ public function testShouldHandleMultiplesMessages(): void
$kafkaMessage3->payload = 'original message 3';
$kafkaMessage3->err = RD_KAFKA_RESP_ERR_NO_ERROR;

$messages = [$kafkaMessage1, $kafkaMessage2, $kafkaMessage3];
$count = 0;
$exception = new Exception('Exception occurs when consuming.');

// Expectations
$consumer->shouldReceive('consume')
->times(4)
->andReturnUsing(
function () use ($messages, &$count, $exception) {
$message = $messages[$count] ?? null;
if (!$message) {
throw $exception;
}
$count++;

return $message;
}
);

$consumerHandler->expects()
->failed($exception);
$consumer->shouldReceive()
->consume()
->times(3)
->andReturn($kafkaMessage1, $kafkaMessage2, $kafkaMessage3);

$dispatcher->expects()
->handle($consumerRecord)
Expand All @@ -78,7 +67,6 @@ function () use ($messages, &$count, $exception) {
$runner->handleMessage();
$runner->handleMessage();
$runner->handleMessage();
$runner->handleMessage();
}

public function testShouldCallWarningWhenErrorOccurs(): void
Expand Down Expand Up @@ -149,19 +137,10 @@ public function testShouldHandleAsyncCommit(): void
);

// Expectations
$consumer->shouldReceive('consume')
$consumer->shouldReceive()
->consume()
->times(3)
->andReturnUsing(
function () use ($messages, &$count, $exception) {
$message = $messages[$count] ?? null;
if (!$message) {
throw $exception;
}
$count++;

return $message;
}
);
->andReturn($kafkaMessage1, $kafkaMessage2);

$consumer->expects()
->commitAsync()
Expand All @@ -184,4 +163,22 @@ function () use ($messages, &$count, $exception) {
$runner->handleMessage();
$runner->handleMessage();
}

public function getThrowableScenarios(): array
{
return [
'Exception' => [
'throwable' => new Exception(),
],
'Error' => [
'throwable' => new Error(),
],
'InvalidArgumentException' => [
'throwable' => new InvalidArgumentException(),
],
'TypeError' => [
'throwable' => new TypeError(),
],
];
}
}
4 changes: 2 additions & 2 deletions tests/Unit/Dummies/ConsumerHandlerDummy.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

namespace Tests\Unit\Dummies;

use Exception;
use Metamorphosis\Record\RecordInterface;
use Metamorphosis\TopicHandler\Consumer\AbstractHandler;
use Throwable;

class ConsumerHandlerDummy extends AbstractHandler
{
public function handle(RecordInterface $data): void
{
}

public function failed(Exception $exception): void
public function failed(Throwable $throwable): void
{
}
}

0 comments on commit 1da7001

Please sign in to comment.