Skip to content

Commit

Permalink
Fix(redis): Detect connection outages (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek committed Mar 20, 2024
1 parent 36f4e00 commit 7104712
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Generator;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use RuntimeException;
use Symfony\Component\OptionsResolver\OptionsResolver;

use function Freddie\maybeTimeout;
Expand Down Expand Up @@ -48,8 +49,10 @@ public function __construct(
]);
$this->options = $resolver->resolve($options);
if ($this->options['pingInterval']) {
$this->ping();
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping());
}
$this->subscriber->on('unsubscribe', fn () => Hub::die(new RuntimeException('Redis connection lost')));
}

/**
Expand Down
19 changes: 10 additions & 9 deletions tests/Unit/Hub/Transport/Redis/RedisTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
$factory = new RedisTransportFactory();
expect($factory->supports($dsn))->toBe($expected);
})->with(function () {
yield ['redis://localhost', true];
yield ['redis://localhost?pingInterval=0.0', true];
yield ['rediss://some.secure.place.com', true];
yield ['notredis://shrug', false];
});
Expand All @@ -23,21 +23,22 @@
expect($factory->create($dsn))->toEqual($expected);
})->with(function () {
$redisFactory = new Factory();
yield ['redis://localhost?foo=bar', new RedisTransport(
$redisFactory->createLazyClient('redis://localhost?foo=bar'),
$redisFactory->createLazyClient('redis://localhost?foo=bar'),
yield ['redis://localhost?foo=bar&pingInterval=0.0', new RedisTransport(
$redisFactory->createLazyClient('redis://localhost?foo=bar&pingInterval=0.0'),
$redisFactory->createLazyClient('redis://localhost?foo=bar&pingInterval=0.0'),
options: ['pingInterval' => 0.0],
)];
yield ['redis://localhost?size=1000&trimInterval=2.5', new RedisTransport(
$redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5'),
$redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5'),
options: ['size' => 1000, 'trimInterval' => 2.5],
yield ['redis://localhost?size=1000&trimInterval=2.5&pingInterval=0.0', new RedisTransport(
$redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5&pingInterval=0.0'),
$redisFactory->createLazyClient('redis://localhost?size=1000&trimInterval=2.5&pingInterval=0.0'),
options: ['size' => 1000, 'trimInterval' => 2.5, 'pingInterval' => 0.0],
)];
});

it('instantiates 2 different clients', function () {
$factory = new RedisTransportFactory();
/** @var RedisTransport $transport */
$transport = $factory->create('redis://localhost?size=1000');
$transport = $factory->create('redis://localhost?size=1000&pingInterval=0.0');
expect($transport->redis)->toBeInstanceOf(Client::class);
expect($transport->subscriber)->toBeInstanceOf(Client::class);
expect($transport->redis)->not()->toBe($transport->subscriber);
Expand Down
11 changes: 8 additions & 3 deletions tests/Unit/Hub/Transport/Redis/RedisTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
$eventEmitter = new EventEmitter();
$transport = new RedisTransport(
new RedisClientStub($storage, $eventEmitter),
new RedisClientStub($storage, $eventEmitter)
new RedisClientStub($storage, $eventEmitter),
options: ['pingInterval' => 0.0],
);

// Given
Expand All @@ -40,7 +41,7 @@

it('performs state reconciliation', function () {
$client = new RedisClientStub();
$transport = new RedisTransport($client, clone $client, options: ['size' => 3]);
$transport = new RedisTransport($client, clone $client, options: ['pingInterval' => 0.0, 'size' => 3]);

// Given
$updates = [
Expand Down Expand Up @@ -81,7 +82,11 @@

it('periodically trims the database', function () {
$client = new RedisClientStub();
$transport = new RedisTransport($client, clone $client, options: ['size' => 3, 'trimInterval' => 0.01]);
$transport = new RedisTransport($client, clone $client, options: [
'pingInterval' => 0.0,
'size' => 3,
'trimInterval' => 0.01
]);

// Given
$updates = [
Expand Down

0 comments on commit 7104712

Please sign in to comment.