Skip to content

Commit

Permalink
Fix: Shutdown hub when transport is gone (#25)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Mickaël Isaert <mickael.isaert@gmail.com>
  • Loading branch information
bpolaszek and misaert committed Nov 6, 2023
1 parent 6fc70c7 commit a868e3b
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 17 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ jobs:
matrix:
transport:
- "php://localhost?size=10000"
- "redis://localhost?size=10000&trimInterval=0.5"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0.1"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0.1&readTimeout=2"
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ To launch the hub with the Redis transport, change the `TRANSPORT_DSN` environme
TRANSPORT_DSN="redis://127.0.0.1:6379" ./bin/freddie
```

Optional parameters you can pass in the DSN's query string:
- `pingInterval` - regularly ping Redis connection, which will help detect outages (default `2.0`)
- `readTimeout` - max duration in seconds of a ping or publish request (default `0.0`: considered disabled)

_Alternatively, you can set this variable into `.env.local`._

## Advantages and limitations
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"nyholm/dsn": "^2.0",
"phpdocumentor/reflection-docblock": "^5.3",
"react/async": "^4.0.0",
"react/promise-timer": "^1.10",
"rize/uri-template": "^0.3.4",
"symfony/console": "^5.4.0|^6.0.0",
"symfony/dotenv": "^5.4.0|^6.0.0",
Expand Down
24 changes: 10 additions & 14 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ parameters:

ignoreErrors:
- '#React\\Promise\\PromiseInterface is not generic#'
- '#Unable to resolve the template type T in call to function React\\Promise\\Timer\\timeout#'
- '#Unable to resolve the template type T in call to function Freddie\\maybeTimeout#'
9 changes: 8 additions & 1 deletion src/Hub/Controller/PublishController.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
use React\Http\Message\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
use Symfony\Component\HttpKernel\Exception\ServiceUnavailableHttpException;
use Symfony\Component\Uid\Ulid;
use Throwable;

use function BenTools\QueryString\query_string;
use function Freddie\is_truthy;
use function Freddie\nullify;
use function React\Async\await;

final class PublishController implements HubControllerInterface
{
Expand Down Expand Up @@ -69,7 +72,11 @@ public function __invoke(ServerRequestInterface $request): ResponseInterface
throw new AccessDeniedHttpException('Your rights are not sufficient to publish this update.');
}

$this->hub->publish($update);
try {
await($this->hub->publish($update));
} catch (Throwable) {
throw new ServiceUnavailableHttpException();
}

return new Response(201, body: (string) $update->message->id);
}
Expand Down
8 changes: 8 additions & 0 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
use Throwable;

use function array_key_exists;
use function sprintf;
Expand Down Expand Up @@ -99,4 +100,11 @@ public function getOption(string $name): mixed

return $this->options[$name];
}

public static function die(Throwable $e): never
{
Loop::stop();

throw $e;
}
}
25 changes: 24 additions & 1 deletion src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
use Clue\React\Redis\Client;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Freddie\Hub\Hub;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Update;
use Generator;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;

use function Freddie\maybeTimeout;
use function React\Async\await;
use function React\Promise\resolve;

Expand Down Expand Up @@ -41,8 +43,26 @@ public function __construct(
'trimInterval' => 0.0,
'channel' => 'mercure',
'key' => 'mercureUpdates',
'pingInterval' => 2.0,
'readTimeout' => 0.0,
]);
$this->options = $resolver->resolve($options);
if ($this->options['pingInterval']) {
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping());
}
}

/**
* @codeCoverageIgnore
*/
private function ping(): void
{
/** @var PromiseInterface $ping */
$ping = $this->redis->ping(); // @phpstan-ignore-line
$ping = maybeTimeout($ping, $this->options['readTimeout']);
$ping->then(
onRejected: Hub::die(...),
);
}

public function subscribe(callable $callback): void
Expand All @@ -61,7 +81,10 @@ public function publish(Update $update): PromiseInterface
$this->init();
$payload = $this->serializer->serialize($update);

return $this->redis->publish($this->options['channel'], $payload) // @phpstan-ignore-line
/** @var PromiseInterface $promise */
$promise = $this->redis->publish($this->options['channel'], $payload); // @phpstan-ignore-line

return maybeTimeout($promise, $this->options['readTimeout'])
->then(fn () => $this->store($update))
->then(fn () => $update);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Hub/Transport/Redis/RedisTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public function create(string $dsn): TransportInterface
options: [
'size' => (int) max(0, $parsed->getParameter('size', 0)),
'trimInterval' => (float) max(0, $parsed->getParameter('trimInterval', 0.0)),
'pingInterval' => (float) max(0, $parsed->getParameter('pingInterval', 2.0)),
'readTimeout' => (float) max(0, $parsed->getParameter('readTimeout', 0.0)),
'channel' => (string) $parsed->getParameter('channel', 'mercure'),
'key' => (string) $parsed->getParameter('key', 'mercureUpdates'),
],
Expand Down
13 changes: 13 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
use Freddie\Helper\FlatQueryParser;
use Freddie\Helper\TopicHelper;
use Psr\Http\Message\ServerRequestInterface;
use React\Promise\PromiseInterface;

use function BenTools\QueryString\query_string;
use function in_array;
use function is_string;
use function React\Promise\Timer\timeout;
use function settype;
use function strtolower;
use function trim;
Expand Down Expand Up @@ -53,3 +55,14 @@ function extract_last_event_id(ServerRequestInterface $request): ?string
?? $qs->getParam('LAST-EVENT-ID')
?? null;
}

/**
* @internal
* @template T
* @param PromiseInterface<T> $promise
* @return PromiseInterface<T>
*/
function maybeTimeout(PromiseInterface $promise, float $time = 0.0): PromiseInterface
{
return 0.0 === $time ? $promise : timeout($promise, $time);
}
57 changes: 57 additions & 0 deletions tests/Unit/Hub/Controller/PublishControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@

namespace Freddie\Tests\Unit\Hub\Controller;

use Fig\Http\Message\StatusCodeInterface;
use FrameworkX\App;
use Freddie\Hub\Controller\PublishController;
use Freddie\Hub\Hub;
use Freddie\Hub\Middleware\HttpExceptionConverterMiddleware;
use Freddie\Hub\Middleware\TokenExtractorMiddleware;
use Freddie\Hub\Transport\PHP\PHPTransport;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Message;
use Freddie\Message\Update;
use Generator;
use Psr\Http\Message\ResponseInterface;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Promise\PromiseInterface;
use ReflectionClass;
use RuntimeException;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
use Symfony\Component\Uid\Ulid;
Expand All @@ -24,6 +29,7 @@
use function Freddie\Tests\handle;
use function Freddie\Tests\jwt_config;
use function Freddie\Tests\with_token;
use function React\Promise\reject;

it('publishes updates to the hub', function (
string $payload,
Expand Down Expand Up @@ -203,3 +209,54 @@
AccessDeniedHttpException::class,
'Your rights are not sufficient to publish this update.'
);

it('throws a service unavailable exception when publishing fails', function () {
$transport = new class implements TransportInterface {
public function publish(Update $update): PromiseInterface
{
return reject(new RuntimeException('☠️'));
}

public function subscribe(callable $callback): void
{
}

public function unsubscribe(callable $callback): void
{
}

public function reconciliate(string $lastEventID): Generator
{
}
};
$controller = new PublishController();
$app = new App(
new TokenExtractorMiddleware(
jwt_config()->parser(),
jwt_config()->validator(),
),
new HttpExceptionConverterMiddleware(),
$controller,
);
$hub = new Hub($app, $transport);
$controller->setHub($hub);

// Given
$jwt = create_jwt(['mercure' => ['publish' => ['*']]]);
$request = new ServerRequest(
'POST',
'/.well-known/mercure',
[
'Authorization' => "Bearer $jwt",
'Content-Type' => 'application/x-www-form-urlencoded',
],
body: 'topic=/foo&topic=/bar&data=foobar&private=true&id=' . Ulid::generate(),
);

// When
$response = handle($app, $request);

// Then
expect($response->getStatusCode())->toBe(StatusCodeInterface::STATUS_SERVICE_UNAVAILABLE)
->and((string) $response->getBody())->toBeEmpty();
});
6 changes: 6 additions & 0 deletions tests/Unit/Hub/HubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
use Freddie\Subscription\Subscriber;
use Generator;
use InvalidArgumentException;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use RuntimeException;
use Symfony\Component\Uid\Ulid;

use function func_get_args;
Expand Down Expand Up @@ -70,3 +72,7 @@ public function reconciliate(string $lastEventID): Generator
$hub = new Hub();
$hub->getOption('foo');
})->throws(InvalidArgumentException::class, 'Invalid option `foo`.');

it('dies', function () {
Hub::die(new RuntimeException('😵'));
})->throws(RuntimeException::class, '😵');

0 comments on commit a868e3b

Please sign in to comment.