Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Shutdown hub when transport is gone #25

Merged
merged 9 commits into from
Nov 6, 2023
Merged
2 changes: 1 addition & 1 deletion .github/workflows/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
matrix:
transport:
- "php://localhost?size=10000"
- "redis://localhost?size=10000&trimInterval=0.5"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0"
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
9 changes: 8 additions & 1 deletion src/Hub/Controller/PublishController.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
use React\Http\Message\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
use Symfony\Component\HttpKernel\Exception\ServiceUnavailableHttpException;
use Symfony\Component\Uid\Ulid;
use Throwable;

use function BenTools\QueryString\query_string;
use function Freddie\is_truthy;
use function Freddie\nullify;
use function React\Async\await;

final class PublishController implements HubControllerInterface
{
Expand Down Expand Up @@ -69,7 +72,11 @@ public function __invoke(ServerRequestInterface $request): ResponseInterface
throw new AccessDeniedHttpException('Your rights are not sufficient to publish this update.');
}

$this->hub->publish($update);
try {
await($this->hub->publish($update));
} catch (Throwable) {
throw new ServiceUnavailableHttpException();
}

return new Response(201, body: (string) $update->message->id);
}
Expand Down
8 changes: 8 additions & 0 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
use Throwable;

use function array_key_exists;
use function sprintf;
Expand Down Expand Up @@ -99,4 +100,11 @@ public function getOption(string $name): mixed

return $this->options[$name];
}

public static function die(Throwable $e): never
{
Loop::stop();

throw $e;
}
}
19 changes: 19 additions & 0 deletions src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
use Clue\React\Redis\Client;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Freddie\Hub\Hub;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Update;
use Generator;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use RuntimeException;
use Symfony\Component\OptionsResolver\OptionsResolver;
use Throwable;

use function React\Async\await;
use function React\Promise\resolve;
Expand Down Expand Up @@ -41,8 +44,24 @@ public function __construct(
'trimInterval' => 0.0,
'channel' => 'mercure',
'key' => 'mercureUpdates',
'pingInterval' => 2.0,
]);
$this->options = $resolver->resolve($options);
if ($this->options['pingInterval']) {
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping());
}
}

/**
* @codeCoverageIgnore
*/
private function ping(): void
{
try {
await($this->redis->ping()); // @phpstan-ignore-line
bpolaszek marked this conversation as resolved.
Show resolved Hide resolved
} catch (Throwable) {
Hub::die(new RuntimeException('Redis connection closed unexpectedly.'));
}
}

public function subscribe(callable $callback): void
Expand Down
1 change: 1 addition & 0 deletions src/Hub/Transport/Redis/RedisTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public function create(string $dsn): TransportInterface
options: [
'size' => (int) max(0, $parsed->getParameter('size', 0)),
'trimInterval' => (float) max(0, $parsed->getParameter('trimInterval', 0.0)),
'pingInterval' => (float) max(0, $parsed->getParameter('pingInterval', 2.0)),
'channel' => (string) $parsed->getParameter('channel', 'mercure'),
'key' => (string) $parsed->getParameter('key', 'mercureUpdates'),
],
Expand Down
57 changes: 57 additions & 0 deletions tests/Unit/Hub/Controller/PublishControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@

namespace Freddie\Tests\Unit\Hub\Controller;

use Fig\Http\Message\StatusCodeInterface;
use FrameworkX\App;
use Freddie\Hub\Controller\PublishController;
use Freddie\Hub\Hub;
use Freddie\Hub\Middleware\HttpExceptionConverterMiddleware;
use Freddie\Hub\Middleware\TokenExtractorMiddleware;
use Freddie\Hub\Transport\PHP\PHPTransport;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Message;
use Freddie\Message\Update;
use Generator;
use Psr\Http\Message\ResponseInterface;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Promise\PromiseInterface;
use ReflectionClass;
use RuntimeException;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
use Symfony\Component\Uid\Ulid;
Expand All @@ -24,6 +29,7 @@
use function Freddie\Tests\handle;
use function Freddie\Tests\jwt_config;
use function Freddie\Tests\with_token;
use function React\Promise\reject;

it('publishes updates to the hub', function (
string $payload,
Expand Down Expand Up @@ -203,3 +209,54 @@
AccessDeniedHttpException::class,
'Your rights are not sufficient to publish this update.'
);

it('throws a service unavailable exception when publishing fails', function () {
$transport = new class implements TransportInterface {
public function publish(Update $update): PromiseInterface
{
return reject(new RuntimeException('☠️'));
}

public function subscribe(callable $callback): void
{
}

public function unsubscribe(callable $callback): void
{
}

public function reconciliate(string $lastEventID): Generator
{
}
};
$controller = new PublishController();
$app = new App(
new TokenExtractorMiddleware(
jwt_config()->parser(),
jwt_config()->validator(),
),
new HttpExceptionConverterMiddleware(),
$controller,
);
$hub = new Hub($app, $transport);
$controller->setHub($hub);

// Given
$jwt = create_jwt(['mercure' => ['publish' => ['*']]]);
$request = new ServerRequest(
'POST',
'/.well-known/mercure',
[
'Authorization' => "Bearer $jwt",
'Content-Type' => 'application/x-www-form-urlencoded',
],
body: 'topic=/foo&topic=/bar&data=foobar&private=true&id=' . Ulid::generate(),
);

// When
$response = handle($app, $request);

// Then
expect($response->getStatusCode())->toBe(StatusCodeInterface::STATUS_SERVICE_UNAVAILABLE)
->and((string) $response->getBody())->toBeEmpty();
});
6 changes: 6 additions & 0 deletions tests/Unit/Hub/HubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
use Freddie\Subscription\Subscriber;
use Generator;
use InvalidArgumentException;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use RuntimeException;
use Symfony\Component\Uid\Ulid;

use function func_get_args;
Expand Down Expand Up @@ -70,3 +72,7 @@ public function reconciliate(string $lastEventID): Generator
$hub = new Hub();
$hub->getOption('foo');
})->throws(InvalidArgumentException::class, 'Invalid option `foo`.');

it('dies', function () {
Hub::die(new RuntimeException('😵'));
})->throws(RuntimeException::class, '😵');
Loading