Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support delay stamp #66

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,23 @@ on:

jobs:
test:
uses: zenstruck/.github/.github/workflows/php-test-symfony.yml@main
name: PHP ${{ matrix.php }}, SF ${{ matrix.symfony }} - ${{ matrix.deps }}
runs-on: ubuntu-latest
strategy:
matrix:
php: [8.1, 8.2, 8.3]
deps: [highest]
symfony: [5.4.*, 6.3.*, 6.4.*]
include:
- php: 8.1
deps: lowest
symfony: '*'
steps:
- uses: zenstruck/.github@php-test-symfony
with:
php: ${{ matrix.php }}
symfony: ${{ matrix.symfony }}
deps: ${{ matrix.deps }}

code-coverage:
uses: zenstruck/.github/.github/workflows/php-coverage-codecov.yml@main
Expand Down
129 changes: 112 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,23 +304,6 @@ when@test:
async: test://?test_serialization=false
```

### Enable Retries

By default, the `TestTransport` does not retry failed messages (your retry settings
are ignored). This behavior can be disabled with the transport dsn:

```yaml
# config/packages/messenger.yaml

# ...

when@test:
framework:
messenger:
transports:
async: test://?disable_retries=false
```

### Multiple Transports

If you have multiple transports you'd like to test, change all their dsn's to
Expand Down Expand Up @@ -357,6 +340,118 @@ class MyTest extends KernelTestCase // or WebTestCase
}
```

### Support of `DelayStamp`

Support of `DelayStamp` could be enabled per transport, within its dsn:

```yaml
# config/packages/messenger.yaml

when@test:
framework:
messenger:
transports:
async: test://?support_delay_stamp=true
```

> [!NOTE]
> Support of delay stamp was added in version 1.8.0.

#### Usage of a clock

> [!WARNING]
> Support of delay stamp needs an implementation of [PSR-20 Clock](https://www.php-fig.org/psr/psr-20/).

You can, for example use Symfony's clock component:
```bash
composer require symfony/clock
```

When using Symfony's clock component, the service will be automatically configured.
Otherwise, you need to configure it manually:

```yaml
# config/services.yaml
services:
app.clock:
class: Some\Clock\Implementation
Psr\Clock\ClockInterface: '@app.clock'
```

#### Example of code supporting `DelayStamp`

> [!NOTE]
> This example uses `symfony/clock` component, but you can use any other implementation of `Psr\Clock\ClockInterface`.

```php

// Let's say somewhere in your app, you register some actions that should occur in the future:

$bus->dispatch(new Enevelope(new TakeSomeAction1(), [DelayStamp::delayFor(new \DateInterval('P1D'))])); // will be handled in 1 day
$bus->dispatch(new Enevelope(new TakeSomeAction2(), [DelayStamp::delayFor(new \DateInterval('P3D'))])); // will be handled in 3 days

// In your test, you can check that the action is not yet performed:

class TestDelayedActions extends KernelTestCase
{
use InteractsWithMessenger;
use ClockSensitiveTrait;

public function testDelayedActions(): void
{
// 1. mock the clock, in order to perform sleeps
$clock = self::mockTime();

// 2. trigger the action that will dispatch the two messages

// ...

// 3. assert nothing happens yet
$transport=$this->transport('async');

$transport->process();
$transport->queue()->assertCount(2);
$transport->acknowledged()->assertCount(0);

// 4. sleep, process queue, and assert some messages have been handled
$clock->sleep(60 * 60 * 24); // wait one day
nikophil marked this conversation as resolved.
Show resolved Hide resolved
$transport->process()->acknowledged()->assertContains(TakeSomeAction1::class);
$this->asssertTakeSomeAction1IsHandled();

// TakeSomeAction2 is still in the queue
$transport->queue()->assertCount(1);

$clock->sleep(60 * 60 * 24 * 2); // wait two other days
$transport->process()->acknowledged()->assertContains(TakeSomeAction2::class);
$this->asssertTakeSomeAction2IsHandled();
}
}
```

#### `DelayStamp` and unblock mode

"delayed" messages cannot be handled by the unblocking mechanism, `$transport->process()` must be called after a
`sleep()` has been made.

### Enable Retries

By default, the `TestTransport` does not retry failed messages (your retry settings
are ignored). This behavior can be disabled with the transport dsn:

```yaml
# config/packages/messenger.yaml

when@test:
framework:
messenger:
transports:
async: test://?disable_retries=false
```

> [!NOTE]
> When using retries along with `support_delay_stamp` you must mock the time to sleep between retries.


## Bus

In addition to transport testing you also can make assertions on the bus. You can test message
Expand Down
8 changes: 6 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@
}
],
"require": {
"php": ">=8.0",
"php": ">=8.1",
"symfony/deprecation-contracts": "^2.2|^3.0",
"symfony/framework-bundle": "^5.4|^6.0",
"symfony/messenger": "^5.4|^6.0",
"zenstruck/assert": "^1.0"
},
"require-dev": {
"phpstan/phpstan": "^1.4",
"phpunit/phpunit": "^9.5.0",
"phpunit/phpunit": "^9.6.0",
"symfony/browser-kit": "^5.4|^6.0",
"symfony/clock": "^6.3",
"symfony/phpunit-bridge": "^5.4|^6.0",
"symfony/yaml": "^5.4|^6.0"
},
"suggest": {
"symfony/clock": "A PSR-20 clock implementation in order to support DelayStamp."
},
"config": {
"preferred-install": "dist",
"sort-packages": true
Expand Down
29 changes: 29 additions & 0 deletions src/Stamp/AvailableAtStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php


namespace Zenstruck\Messenger\Test\Stamp;

use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;

/**
* @internal
*/
final class AvailableAtStamp implements StampInterface
kbond marked this conversation as resolved.
Show resolved Hide resolved
{
public function __construct(private \DateTimeImmutable $availableAt)
{
}

public static function fromDelayStamp(DelayStamp $delayStamp, \DateTimeImmutable $now): self
{
return new self(
$now->modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000))
);
}

public function getAvailableAt(): \DateTimeImmutable
{
return $this->availableAt;
}
}
50 changes: 47 additions & 3 deletions src/Transport/TestTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@

namespace Zenstruck\Messenger\Test\Transport;

use Psr\Clock\ClockInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\Messenger\Worker;
use Zenstruck\Assert;
use Zenstruck\Messenger\Test\Stamp\AvailableAtStamp;

/**
* @author Kevin Bond <kevinbond@gmail.com>
Expand All @@ -33,12 +36,14 @@ final class TestTransport implements TransportInterface
'catch_exceptions' => true,
'test_serialization' => true,
'disable_retries' => true,
'support_delay_stamp' => false,
];

private string $name;
private EventDispatcherInterface $dispatcher;
private MessageBusInterface $bus;
private SerializerInterface $serializer;
private ClockInterface|null $clock;

/** @var array<string, bool> */
private static array $intercept = [];
Expand All @@ -52,6 +57,9 @@ final class TestTransport implements TransportInterface
/** @var array<string, bool> */
private static array $disableRetries = [];

/** @var array<string, bool> */
private static array $supportDelayStamp = [];

/** @var array<string, Envelope[]> */
private static array $dispatched = [];

Expand All @@ -72,19 +80,27 @@ final class TestTransport implements TransportInterface
*
* @param array<string,bool> $options
*/
public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, array $options = [])
public function __construct(string $name, MessageBusInterface $bus, EventDispatcherInterface $dispatcher, SerializerInterface $serializer, ClockInterface|null $clock = null, array $options = [])
{
$options = \array_merge(self::DEFAULT_OPTIONS, $options);

$this->name = $name;
$this->dispatcher = $dispatcher;
$this->bus = $bus;
$this->serializer = $serializer;
$this->clock = $clock;

self::$intercept[$name] ??= $options['intercept'];
self::$catchExceptions[$name] ??= $options['catch_exceptions'];
self::$testSerialization[$name] ??= $options['test_serialization'];
self::$disableRetries[$name] ??= $options['disable_retries'];
self::$supportDelayStamp[$name] ??= $options['support_delay_stamp'];

if (!self::$supportDelayStamp[$name]) {
trigger_deprecation('zenstruck/messenger-test', '1.8.0', 'Not supporting DelayStamp is deprecated, support will be removed in 2.0.');
} elseif(!$this->clock) {
throw new \InvalidArgumentException(sprintf('A service aliased "%s" must be available in order to support DelayStamp. You can install for instance symfony/clock (composer require symfony/clock).', ClockInterface::class));
}
}

/**
Expand Down Expand Up @@ -228,7 +244,23 @@ public function get(): iterable
return [];
}

return [\array_shift(self::$queue[$this->name])];
if (!$this->supportsDelayStamp()) {
return [\array_shift(self::$queue[$this->name])];
}

$now = $this->clock->now();

foreach (self::$queue[$this->name] as $i => $envelope) {
if (($availableAtStamp = $envelope->last(AvailableAtStamp::class)) && $now < $availableAtStamp->getAvailableAt()) {
continue;
}

unset(self::$queue[$this->name][$i]);

return [$envelope];
}

return [];
}

/**
Expand Down Expand Up @@ -263,6 +295,10 @@ public function send($what): Envelope

$envelope = Envelope::wrap($what);

if ($this->supportsDelayStamp() && $delayStamp = $envelope->last(DelayStamp::class)) {
$envelope = $envelope->with(AvailableAtStamp::fromDelayStamp($delayStamp, $this->clock->now()));
nikophil marked this conversation as resolved.
Show resolved Hide resolved
}

if ($this->isRetriesDisabled() && $envelope->last(RedeliveryStamp::class)) {
// message is being retried, don't process
return $envelope;
Expand Down Expand Up @@ -303,7 +339,7 @@ public static function resetAll(): void

public static function initialize(): void
{
self::$intercept = self::$catchExceptions = self::$testSerialization = self::$disableRetries = [];
self::$intercept = self::$catchExceptions = self::$testSerialization = self::$disableRetries = self::$supportDelayStamp = [];
}

public static function enableMessagesCollection(): void
Expand Down Expand Up @@ -349,6 +385,14 @@ private function isRetriesDisabled(): bool
return self::$disableRetries[$this->name];
}

/**
* @phpstan-assert-if-true !null $this->clock
*/
private function supportsDelayStamp(): bool
{
return $this->clock && self::$supportDelayStamp[$this->name];
}

private function hasMessagesToProcess(): bool
{
return !empty(self::$queue[$this->name] ?? []);
Expand Down
11 changes: 4 additions & 7 deletions src/Transport/TestTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Zenstruck\Messenger\Test\Transport;

use Psr\Clock\ClockInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -24,18 +25,13 @@
*/
final class TestTransportFactory implements TransportFactoryInterface
{
private MessageBusInterface $bus;
private EventDispatcherInterface $dispatcher;

public function __construct(MessageBusInterface $bus, EventDispatcherInterface $dispatcher)
public function __construct(private MessageBusInterface $bus, private EventDispatcherInterface $dispatcher, private ClockInterface|null $clock = null)
{
$this->bus = $bus;
$this->dispatcher = $dispatcher;
}

public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface // @phpstan-ignore-line
{
return new TestTransport($options['transport_name'], $this->bus, $this->dispatcher, $serializer, $this->parseDsn($dsn));
return new TestTransport($options['transport_name'], $this->bus, $this->dispatcher, $serializer, $this->clock, $this->parseDsn($dsn));
}

public function supports(string $dsn, array $options): bool // @phpstan-ignore-line
Expand All @@ -59,6 +55,7 @@ private function parseDsn(string $dsn): array
'catch_exceptions' => \filter_var($query['catch_exceptions'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'test_serialization' => \filter_var($query['test_serialization'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'disable_retries' => \filter_var($query['disable_retries'] ?? true, \FILTER_VALIDATE_BOOLEAN),
'support_delay_stamp' => \filter_var($query['support_delay_stamp'] ?? true, \FILTER_VALIDATE_BOOLEAN),
];
}
}
Loading