diff --git a/.github/workflows/app.yml b/.github/workflows/app.yml index b695304..a580649 100644 --- a/.github/workflows/app.yml +++ b/.github/workflows/app.yml @@ -72,7 +72,9 @@ jobs: matrix: transport: - "php://localhost?size=10000" - - "redis://localhost?size=10000&trimInterval=0.5" + - "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0" + - "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0.1" + - "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0.1&readTimeout=2" steps: - name: Checkout uses: actions/checkout@v2 diff --git a/README.md b/README.md index 723fce1..249cff5 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,10 @@ To launch the hub with the Redis transport, change the `TRANSPORT_DSN` environme TRANSPORT_DSN="redis://127.0.0.1:6379" ./bin/freddie ``` +Optional parameters you can pass in the DSN's query string: +- `pingInterval` - regularly ping Redis connection, which will help detect outages (default `2.0`) +- `readTimeout` - max duration in seconds of a ping or publish request (default `0.0`: considered disabled) + _Alternatively, you can set this variable into `.env.local`._ ## Advantages and limitations diff --git a/composer.json b/composer.json index c8e2733..70d2e1d 100644 --- a/composer.json +++ b/composer.json @@ -21,6 +21,7 @@ "nyholm/dsn": "^2.0", "phpdocumentor/reflection-docblock": "^5.3", "react/async": "^4.0.0", + "react/promise-timer": "^1.10", "rize/uri-template": "^0.3.4", "symfony/console": "^5.4.0|^6.0.0", "symfony/dotenv": "^5.4.0|^6.0.0", diff --git a/composer.lock b/composer.lock index 4971071..c8906e5 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "eeba0dba7917b1e895df84333d9ff56f", + "content-hash": "24e81b3c078bb0ca6b90b4f661d16fd2", "packages": [ { "name": "bentools/querystring", @@ -1790,16 +1790,16 @@ }, { "name": "react/promise-timer", - "version": "v1.9.0", + "version": "v1.10.0", "source": { "type": "git", "url": "https://github.com/reactphp/promise-timer.git", - "reference": "aa7a73c74b8d8c0f622f5982ff7b0351bc29e495" + "reference": "4cb85c1c2125390748e3908120bb82feb99fe766" }, "dist": { "type": "zip", - "url": "https://github.com/gitapi/repos/reactphp/promise-timer/zipball/aa7a73c74b8d8c0f622f5982ff7b0351bc29e495", - "reference": "aa7a73c74b8d8c0f622f5982ff7b0351bc29e495", + "url": "https://github.com/gitapi/repos/reactphp/promise-timer/zipball/4cb85c1c2125390748e3908120bb82feb99fe766", + "reference": "4cb85c1c2125390748e3908120bb82feb99fe766", "shasum": "" }, "require": { @@ -1808,7 +1808,7 @@ "react/promise": "^3.0 || ^2.7.0 || ^1.2.1" }, "require-dev": { - "phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.35" + "phpunit/phpunit": "^9.5 || ^5.7 || ^4.8.35" }, "type": "library", "autoload": { @@ -1857,19 +1857,15 @@ ], "support": { "issues": "https://github.com/reactphp/promise-timer/issues", - "source": "https://github.com/reactphp/promise-timer/tree/v1.9.0" + "source": "https://github.com/reactphp/promise-timer/tree/v1.10.0" }, "funding": [ { - "url": "https://github.com/WyriHaximus", - "type": "github" - }, - { - "url": "https://github.com/clue", - "type": "github" + "url": "https://opencollective.com/reactphp", + "type": "open_collective" } ], - "time": "2022-06-13T13:41:03+00:00" + "time": "2023-07-20T15:40:28+00:00" }, { "name": "react/socket", diff --git a/phpstan.neon b/phpstan.neon index 9f62798..292a908 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -5,3 +5,5 @@ parameters: ignoreErrors: - '#React\\Promise\\PromiseInterface is not generic#' + - '#Unable to resolve the template type T in call to function React\\Promise\\Timer\\timeout#' + - '#Unable to resolve the template type T in call to function Freddie\\maybeTimeout#' diff --git a/src/Hub/Controller/PublishController.php b/src/Hub/Controller/PublishController.php index ad4b073..92bd09b 100644 --- a/src/Hub/Controller/PublishController.php +++ b/src/Hub/Controller/PublishController.php @@ -15,11 +15,14 @@ use React\Http\Message\Response; use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException; use Symfony\Component\HttpKernel\Exception\BadRequestHttpException; +use Symfony\Component\HttpKernel\Exception\ServiceUnavailableHttpException; use Symfony\Component\Uid\Ulid; +use Throwable; use function BenTools\QueryString\query_string; use function Freddie\is_truthy; use function Freddie\nullify; +use function React\Async\await; final class PublishController implements HubControllerInterface { @@ -69,7 +72,11 @@ public function __invoke(ServerRequestInterface $request): ResponseInterface throw new AccessDeniedHttpException('Your rights are not sufficient to publish this update.'); } - $this->hub->publish($update); + try { + await($this->hub->publish($update)); + } catch (Throwable) { + throw new ServiceUnavailableHttpException(); + } return new Response(201, body: (string) $update->message->id); } diff --git a/src/Hub/Hub.php b/src/Hub/Hub.php index 0cadef0..8134ddb 100644 --- a/src/Hub/Hub.php +++ b/src/Hub/Hub.php @@ -15,6 +15,7 @@ use React\EventLoop\Loop; use React\Promise\PromiseInterface; use Symfony\Component\OptionsResolver\OptionsResolver; +use Throwable; use function array_key_exists; use function sprintf; @@ -99,4 +100,11 @@ public function getOption(string $name): mixed return $this->options[$name]; } + + public static function die(Throwable $e): never + { + Loop::stop(); + + throw $e; + } } diff --git a/src/Hub/Transport/Redis/RedisTransport.php b/src/Hub/Transport/Redis/RedisTransport.php index c8a7584..6d3e35d 100644 --- a/src/Hub/Transport/Redis/RedisTransport.php +++ b/src/Hub/Transport/Redis/RedisTransport.php @@ -7,6 +7,7 @@ use Clue\React\Redis\Client; use Evenement\EventEmitter; use Evenement\EventEmitterInterface; +use Freddie\Hub\Hub; use Freddie\Hub\Transport\TransportInterface; use Freddie\Message\Update; use Generator; @@ -14,6 +15,7 @@ use React\Promise\PromiseInterface; use Symfony\Component\OptionsResolver\OptionsResolver; +use function Freddie\maybeTimeout; use function React\Async\await; use function React\Promise\resolve; @@ -41,8 +43,26 @@ public function __construct( 'trimInterval' => 0.0, 'channel' => 'mercure', 'key' => 'mercureUpdates', + 'pingInterval' => 2.0, + 'readTimeout' => 0.0, ]); $this->options = $resolver->resolve($options); + if ($this->options['pingInterval']) { + Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping()); + } + } + + /** + * @codeCoverageIgnore + */ + private function ping(): void + { + /** @var PromiseInterface $ping */ + $ping = $this->redis->ping(); // @phpstan-ignore-line + $ping = maybeTimeout($ping, $this->options['readTimeout']); + $ping->then( + onRejected: Hub::die(...), + ); } public function subscribe(callable $callback): void @@ -61,7 +81,10 @@ public function publish(Update $update): PromiseInterface $this->init(); $payload = $this->serializer->serialize($update); - return $this->redis->publish($this->options['channel'], $payload) // @phpstan-ignore-line + /** @var PromiseInterface $promise */ + $promise = $this->redis->publish($this->options['channel'], $payload); // @phpstan-ignore-line + + return maybeTimeout($promise, $this->options['readTimeout']) ->then(fn () => $this->store($update)) ->then(fn () => $update); } diff --git a/src/Hub/Transport/Redis/RedisTransportFactory.php b/src/Hub/Transport/Redis/RedisTransportFactory.php index fb59c74..1b63362 100644 --- a/src/Hub/Transport/Redis/RedisTransportFactory.php +++ b/src/Hub/Transport/Redis/RedisTransportFactory.php @@ -38,6 +38,8 @@ public function create(string $dsn): TransportInterface options: [ 'size' => (int) max(0, $parsed->getParameter('size', 0)), 'trimInterval' => (float) max(0, $parsed->getParameter('trimInterval', 0.0)), + 'pingInterval' => (float) max(0, $parsed->getParameter('pingInterval', 2.0)), + 'readTimeout' => (float) max(0, $parsed->getParameter('readTimeout', 0.0)), 'channel' => (string) $parsed->getParameter('channel', 'mercure'), 'key' => (string) $parsed->getParameter('key', 'mercureUpdates'), ], diff --git a/src/functions.php b/src/functions.php index 087a806..0d4b197 100644 --- a/src/functions.php +++ b/src/functions.php @@ -7,10 +7,12 @@ use Freddie\Helper\FlatQueryParser; use Freddie\Helper\TopicHelper; use Psr\Http\Message\ServerRequestInterface; +use React\Promise\PromiseInterface; use function BenTools\QueryString\query_string; use function in_array; use function is_string; +use function React\Promise\Timer\timeout; use function settype; use function strtolower; use function trim; @@ -53,3 +55,14 @@ function extract_last_event_id(ServerRequestInterface $request): ?string ?? $qs->getParam('LAST-EVENT-ID') ?? null; } + +/** + * @internal + * @template T + * @param PromiseInterface $promise + * @return PromiseInterface + */ +function maybeTimeout(PromiseInterface $promise, float $time = 0.0): PromiseInterface +{ + return 0.0 === $time ? $promise : timeout($promise, $time); +} diff --git a/tests/Unit/Hub/Controller/PublishControllerTest.php b/tests/Unit/Hub/Controller/PublishControllerTest.php index 98d9a9f..41cfb1e 100644 --- a/tests/Unit/Hub/Controller/PublishControllerTest.php +++ b/tests/Unit/Hub/Controller/PublishControllerTest.php @@ -4,18 +4,23 @@ namespace Freddie\Tests\Unit\Hub\Controller; +use Fig\Http\Message\StatusCodeInterface; use FrameworkX\App; use Freddie\Hub\Controller\PublishController; use Freddie\Hub\Hub; use Freddie\Hub\Middleware\HttpExceptionConverterMiddleware; use Freddie\Hub\Middleware\TokenExtractorMiddleware; use Freddie\Hub\Transport\PHP\PHPTransport; +use Freddie\Hub\Transport\TransportInterface; use Freddie\Message\Message; use Freddie\Message\Update; +use Generator; use Psr\Http\Message\ResponseInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; +use React\Promise\PromiseInterface; use ReflectionClass; +use RuntimeException; use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException; use Symfony\Component\HttpKernel\Exception\BadRequestHttpException; use Symfony\Component\Uid\Ulid; @@ -24,6 +29,7 @@ use function Freddie\Tests\handle; use function Freddie\Tests\jwt_config; use function Freddie\Tests\with_token; +use function React\Promise\reject; it('publishes updates to the hub', function ( string $payload, @@ -203,3 +209,54 @@ AccessDeniedHttpException::class, 'Your rights are not sufficient to publish this update.' ); + +it('throws a service unavailable exception when publishing fails', function () { + $transport = new class implements TransportInterface { + public function publish(Update $update): PromiseInterface + { + return reject(new RuntimeException('☠️')); + } + + public function subscribe(callable $callback): void + { + } + + public function unsubscribe(callable $callback): void + { + } + + public function reconciliate(string $lastEventID): Generator + { + } + }; + $controller = new PublishController(); + $app = new App( + new TokenExtractorMiddleware( + jwt_config()->parser(), + jwt_config()->validator(), + ), + new HttpExceptionConverterMiddleware(), + $controller, + ); + $hub = new Hub($app, $transport); + $controller->setHub($hub); + + // Given + $jwt = create_jwt(['mercure' => ['publish' => ['*']]]); + $request = new ServerRequest( + 'POST', + '/.well-known/mercure', + [ + 'Authorization' => "Bearer $jwt", + 'Content-Type' => 'application/x-www-form-urlencoded', + ], + body: 'topic=/foo&topic=/bar&data=foobar&private=true&id=' . Ulid::generate(), + ); + + // When + $response = handle($app, $request); + + // Then + expect($response->getStatusCode())->toBe(StatusCodeInterface::STATUS_SERVICE_UNAVAILABLE) + ->and((string) $response->getBody())->toBeEmpty(); +}); diff --git a/tests/Unit/Hub/HubTest.php b/tests/Unit/Hub/HubTest.php index 5fa7d89..b536df6 100644 --- a/tests/Unit/Hub/HubTest.php +++ b/tests/Unit/Hub/HubTest.php @@ -11,7 +11,9 @@ use Freddie\Subscription\Subscriber; use Generator; use InvalidArgumentException; +use React\EventLoop\Loop; use React\Promise\PromiseInterface; +use RuntimeException; use Symfony\Component\Uid\Ulid; use function func_get_args; @@ -70,3 +72,7 @@ public function reconciliate(string $lastEventID): Generator $hub = new Hub(); $hub->getOption('foo'); })->throws(InvalidArgumentException::class, 'Invalid option `foo`.'); + +it('dies', function () { + Hub::die(new RuntimeException('😵')); +})->throws(RuntimeException::class, '😵');