Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Shutdown hub when transport is gone #25

Merged
merged 9 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ jobs:
matrix:
transport:
- "php://localhost?size=10000"
- "redis://localhost?size=10000&trimInterval=0.5"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0.1"
- "redis://localhost?size=10000&trimInterval=0.5&pingInterval=0.1&readTimeout=2"
steps:
- name: Checkout
uses: actions/checkout@v2
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ To launch the hub with the Redis transport, change the `TRANSPORT_DSN` environme
TRANSPORT_DSN="redis://127.0.0.1:6379" ./bin/freddie
```

Optional parameters you can pass in the DSN's query string:
- `pingInterval` - regularly ping Redis connection, which will help detect outages (default `2.0`)
- `readTimeout` - max duration in seconds of a ping or publish request (default `0.0`: considered disabled)

_Alternatively, you can set this variable into `.env.local`._

## Advantages and limitations
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"nyholm/dsn": "^2.0",
"phpdocumentor/reflection-docblock": "^5.3",
"react/async": "^4.0.0",
"react/promise-timer": "^1.10",
"rize/uri-template": "^0.3.4",
"symfony/console": "^5.4.0|^6.0.0",
"symfony/dotenv": "^5.4.0|^6.0.0",
Expand Down
24 changes: 10 additions & 14 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ parameters:

ignoreErrors:
- '#React\\Promise\\PromiseInterface is not generic#'
- '#Unable to resolve the template type T in call to function React\\Promise\\Timer\\timeout#'
- '#Unable to resolve the template type T in call to function Freddie\\maybeTimeout#'
9 changes: 8 additions & 1 deletion src/Hub/Controller/PublishController.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
use React\Http\Message\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
use Symfony\Component\HttpKernel\Exception\ServiceUnavailableHttpException;
use Symfony\Component\Uid\Ulid;
use Throwable;

use function BenTools\QueryString\query_string;
use function Freddie\is_truthy;
use function Freddie\nullify;
use function React\Async\await;

final class PublishController implements HubControllerInterface
{
Expand Down Expand Up @@ -69,7 +72,11 @@ public function __invoke(ServerRequestInterface $request): ResponseInterface
throw new AccessDeniedHttpException('Your rights are not sufficient to publish this update.');
}

$this->hub->publish($update);
try {
await($this->hub->publish($update));
} catch (Throwable) {
throw new ServiceUnavailableHttpException();
}

return new Response(201, body: (string) $update->message->id);
}
Expand Down
8 changes: 8 additions & 0 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
use Throwable;

use function array_key_exists;
use function sprintf;
Expand Down Expand Up @@ -99,4 +100,11 @@ public function getOption(string $name): mixed

return $this->options[$name];
}

public static function die(Throwable $e): never
{
Loop::stop();

throw $e;
}
}
25 changes: 24 additions & 1 deletion src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
use Clue\React\Redis\Client;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Freddie\Hub\Hub;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Update;
use Generator;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;

use function Freddie\maybeTimeout;
use function React\Async\await;
use function React\Promise\resolve;

Expand Down Expand Up @@ -41,8 +43,26 @@ public function __construct(
'trimInterval' => 0.0,
'channel' => 'mercure',
'key' => 'mercureUpdates',
'pingInterval' => 2.0,
'readTimeout' => 0.0,
]);
$this->options = $resolver->resolve($options);
if ($this->options['pingInterval']) {
Loop::addPeriodicTimer($this->options['pingInterval'], fn () => $this->ping());
}
}

/**
* @codeCoverageIgnore
*/
private function ping(): void
{
/** @var PromiseInterface $ping */
$ping = $this->redis->ping(); // @phpstan-ignore-line
$ping = maybeTimeout($ping, $this->options['readTimeout']);
$ping->then(
onRejected: Hub::die(...),
);
}

public function subscribe(callable $callback): void
Expand All @@ -61,7 +81,10 @@ public function publish(Update $update): PromiseInterface
$this->init();
$payload = $this->serializer->serialize($update);

return $this->redis->publish($this->options['channel'], $payload) // @phpstan-ignore-line
/** @var PromiseInterface $promise */
$promise = $this->redis->publish($this->options['channel'], $payload); // @phpstan-ignore-line

return maybeTimeout($promise, $this->options['readTimeout'])
->then(fn () => $this->store($update))
->then(fn () => $update);
}
Expand Down
2 changes: 2 additions & 0 deletions src/Hub/Transport/Redis/RedisTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public function create(string $dsn): TransportInterface
options: [
'size' => (int) max(0, $parsed->getParameter('size', 0)),
'trimInterval' => (float) max(0, $parsed->getParameter('trimInterval', 0.0)),
'pingInterval' => (float) max(0, $parsed->getParameter('pingInterval', 2.0)),
'readTimeout' => (float) max(0, $parsed->getParameter('readTimeout', 0.0)),
'channel' => (string) $parsed->getParameter('channel', 'mercure'),
'key' => (string) $parsed->getParameter('key', 'mercureUpdates'),
],
Expand Down
13 changes: 13 additions & 0 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
use Freddie\Helper\FlatQueryParser;
use Freddie\Helper\TopicHelper;
use Psr\Http\Message\ServerRequestInterface;
use React\Promise\PromiseInterface;

use function BenTools\QueryString\query_string;
use function in_array;
use function is_string;
use function React\Promise\Timer\timeout;
use function settype;
use function strtolower;
use function trim;
Expand Down Expand Up @@ -53,3 +55,14 @@ function extract_last_event_id(ServerRequestInterface $request): ?string
?? $qs->getParam('LAST-EVENT-ID')
?? null;
}

/**
* @internal
* @template T
* @param PromiseInterface<T> $promise
* @return PromiseInterface<T>
*/
function maybeTimeout(PromiseInterface $promise, float $time = 0.0): PromiseInterface
{
return 0.0 === $time ? $promise : timeout($promise, $time);
}
57 changes: 57 additions & 0 deletions tests/Unit/Hub/Controller/PublishControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@

namespace Freddie\Tests\Unit\Hub\Controller;

use Fig\Http\Message\StatusCodeInterface;
use FrameworkX\App;
use Freddie\Hub\Controller\PublishController;
use Freddie\Hub\Hub;
use Freddie\Hub\Middleware\HttpExceptionConverterMiddleware;
use Freddie\Hub\Middleware\TokenExtractorMiddleware;
use Freddie\Hub\Transport\PHP\PHPTransport;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Message;
use Freddie\Message\Update;
use Generator;
use Psr\Http\Message\ResponseInterface;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Promise\PromiseInterface;
use ReflectionClass;
use RuntimeException;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
use Symfony\Component\HttpKernel\Exception\BadRequestHttpException;
use Symfony\Component\Uid\Ulid;
Expand All @@ -24,6 +29,7 @@
use function Freddie\Tests\handle;
use function Freddie\Tests\jwt_config;
use function Freddie\Tests\with_token;
use function React\Promise\reject;

it('publishes updates to the hub', function (
string $payload,
Expand Down Expand Up @@ -203,3 +209,54 @@
AccessDeniedHttpException::class,
'Your rights are not sufficient to publish this update.'
);

it('throws a service unavailable exception when publishing fails', function () {
$transport = new class implements TransportInterface {
public function publish(Update $update): PromiseInterface
{
return reject(new RuntimeException('☠️'));
}

public function subscribe(callable $callback): void
{
}

public function unsubscribe(callable $callback): void
{
}

public function reconciliate(string $lastEventID): Generator
{
}
};
$controller = new PublishController();
$app = new App(
new TokenExtractorMiddleware(
jwt_config()->parser(),
jwt_config()->validator(),
),
new HttpExceptionConverterMiddleware(),
$controller,
);
$hub = new Hub($app, $transport);
$controller->setHub($hub);

// Given
$jwt = create_jwt(['mercure' => ['publish' => ['*']]]);
$request = new ServerRequest(
'POST',
'/.well-known/mercure',
[
'Authorization' => "Bearer $jwt",
'Content-Type' => 'application/x-www-form-urlencoded',
],
body: 'topic=/foo&topic=/bar&data=foobar&private=true&id=' . Ulid::generate(),
);

// When
$response = handle($app, $request);

// Then
expect($response->getStatusCode())->toBe(StatusCodeInterface::STATUS_SERVICE_UNAVAILABLE)
->and((string) $response->getBody())->toBeEmpty();
});
6 changes: 6 additions & 0 deletions tests/Unit/Hub/HubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
use Freddie\Subscription\Subscriber;
use Generator;
use InvalidArgumentException;
use React\EventLoop\Loop;
use React\Promise\PromiseInterface;
use RuntimeException;
use Symfony\Component\Uid\Ulid;

use function func_get_args;
Expand Down Expand Up @@ -70,3 +72,7 @@ public function reconciliate(string $lastEventID): Generator
$hub = new Hub();
$hub->getOption('foo');
})->throws(InvalidArgumentException::class, 'Invalid option `foo`.');

it('dies', function () {
Hub::die(new RuntimeException('😵'));
})->throws(RuntimeException::class, '😵');
Loading