Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(async): introduce more async helper functions #291

Merged
merged 1 commit into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/component/async.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@

#### `Functions`

- [all](./../../src/Psl/Async/all.php#L22)
- [all](./../../src/Psl/Async/all.php#L26)
- [any](./../../src/Psl/Async/any.php#L25)
- [await](./../../src/Psl/Async/await.php#L18)
- [await_readable](./../../src/Psl/Async/await_readable.php#L23)
- [await_signal](./../../src/Psl/Async/await_signal.php#L18)
- [await_writable](./../../src/Psl/Async/await_writable.php#L21)
- [concurrent](./../../src/Psl/Async/concurrent.php#L21)
- [first](./../../src/Psl/Async/first.php#L24)
- [first](./../../src/Psl/Async/first.php#L22)
- [later](./../../src/Psl/Async/later.php#L14)
- [main](./../../src/Psl/Async/main.php#L16)
- [parallel](./../../src/Psl/Async/parallel.php#L64)
- [reflect](./../../src/Psl/Async/reflect.php#L25)
- [run](./../../src/Psl/Async/run.php#L20)
- [series](./../../src/Psl/Async/series.php#L21)
- [sleep](./../../src/Psl/Async/sleep.php#L10)
- [wrap](./../../src/Psl/Async/wrap.php#L21)

#### `Classes`

Expand Down
2 changes: 1 addition & 1 deletion examples/async/usleep.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Async\main(static function (): int {
$start = time();

Async\concurrent([
Async\parallel([
static fn() => Async\sleep(2.0),
static fn() => Async\sleep(2.0),
static fn() => Async\sleep(2.0),
Expand Down
2 changes: 1 addition & 1 deletion examples/io/pipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

$output = IO\output_handle();

Async\concurrent([
Async\parallel([
static function() use($read, $output): void {
$output->writeAll("< sleeping.\n");

Expand Down
2 changes: 1 addition & 1 deletion examples/shell/concurrent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
Async\main(static function (): int {
$start = time();

Async\concurrent([
Async\parallel([
static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']),
static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']),
static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']),
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp/concurrent.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Async\main(static function (): int {
$output = IO\output_handle();

Async\concurrent([
Async\parallel([
'server' => static function () use ($output): void {
$server = TCP\Server::create('localhost', 91337);
$output->writeAll("< server is listening\n");
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/concurrent.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

$output = IO\output_handle();

Async\concurrent([
Async\parallel([
'server' => static function () use ($file, $output): void {
$server = Unix\Server::create($file);
$output->writeAll("< server is listening\n");
Expand Down
35 changes: 27 additions & 8 deletions src/Psl/Async/all.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
/**
* Awaits all awaitables to complete concurrently.
*
* If one or more awaitables fail, all awaitables will be completed before throwing.
* If one awaitable fails, the exception will be thrown immediately, and the result of the callables will be ignored.
*
* If multiple awaitables failed at once, a {@see Exception\CompositeException} will be thrown.
*
* Once the awaitables have completed, an array containing the results will be returned preserving the original awaitables order.
*
* @template Tk of array-key
* @template Tv
Expand All @@ -22,20 +26,35 @@
function all(iterable $awaitables): array
{
$values = [];
$errors = [];

// Awaitable::iterate() to throw the first error based on completion order instead of argument order
foreach (Awaitable::iterate($awaitables) as $index => $awaitable) {
try {
$values[$index] = $awaitable->await();
} catch (Throwable $throwable) {
$errors[] = $throwable;
}
}
$errors = [];
foreach ($awaitables as $original) {
if ($original === $awaitable) {
continue;
}

if ($errors !== []) {
/** @psalm-suppress MissingThrowsDocblock */
throw $errors[0];
if (!$original->isComplete()) {
$original->ignore();
} else {
try {
$original->await();
} catch (Throwable $error) {
$errors[] = $error;
}
}
}

if ($errors === []) {
throw $throwable;
}

throw new Exception\CompositeException([$throwable, ...$errors], 'Multiple exceptions thrown while waiting.');
}
}

return Dict\map_with_key(
Expand Down
9 changes: 8 additions & 1 deletion src/Psl/Async/any.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ function any(iterable $awaitables): mixed
$errors = [];
foreach (Awaitable::iterate($awaitables) as $first) {
try {
return $first->await();
$result = $first->await();
foreach ($awaitables as $awaitable) {
if ($awaitable !== $first) {
$awaitable->ignore();
}
}

return $result;
} catch (Throwable $throwable) {
$errors[] = $throwable;
}
Expand Down
34 changes: 0 additions & 34 deletions src/Psl/Async/concurrent.php

This file was deleted.

8 changes: 6 additions & 2 deletions src/Psl/Async/first.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
* @throws Psl\Exception\InvariantViolationException If $awaitables is empty.
*
* @return T
*
* @codeCoverageIgnore
*/
function first(iterable $awaitables): mixed
{
foreach (Awaitable::iterate($awaitables) as $first) {
foreach ($awaitables as $awaitable) {
if ($awaitable !== $first) {
$awaitable->ignore();
}
}

return $first->await();
}

Expand Down
77 changes: 77 additions & 0 deletions src/Psl/Async/parallel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Psl\Dict;

/**
* Run the tasks iterable of functions in parallel, without waiting until the previous function has completed.
*
* If one tasks fails, the exception will be thrown immediately, and the result of the callables will be ignored.
*
* If multiple tasks failed at once, a {@see Exception\CompositeException} will be thrown.
*
* Once the tasks have completed, an array containing the results will be returned preserving the original awaitables order.
*
* <code>
* use Psl\Async;
*
* // execute `getOne()` and `getTwo()` functions in parallel.
*
* [$one, $two] = Async\parallel([
* getOne(...),
* getTwo(...),
* ]);
* </code>
*
* @note {@see parallel()} is about kicking-off I/O tasks in parallel, not about parallel execution of code.
* If your tasks do not use any timers or perform any I/O, they will actually be executed in series.
*
* <code>
* use Psl\Async;
*
* // the following runs in series.
*
* [$one, $two] = Async\parallel([
* fn() => file_get_contents('path/to/file1.txt'),
* fn() => file_get_contents('path/to/file2.txt'),
* ]);
* </code>
*
* @note Use {@see reflect()} to continue the execution of other tasks when a task fails.
*
* <code>
* use Psl\Async;
*
* // execute `getOne()` and `getTwo()` functions in parallel.
* // if either one of the given tasks fails, the other will continue execution.
*
* [$one, $two] = Async\parallel([
* Async\reflect(getOne(...)),
* Async\reflect(getTwo(...)),
* ]);
* </code>
*
* @template Tk of array-key
* @template Tv
*
* @param iterable<Tk, (callable(): Tv)> $tasks
*
* @return array<Tk, Tv>
*/
function parallel(iterable $tasks): array
{
$awaitables = Dict\map(
$tasks,
/**
* @param callable(): Tv $callable
*
* @return Awaitable<Tv>
*/
static fn(callable $callable): Awaitable => run($callable),
);

return namespace\all($awaitables);
}
28 changes: 28 additions & 0 deletions src/Psl/Async/reflect.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Closure;
use Exception;
use Psl\Result;

/**
* Wraps the given task in another task that always completes with a {@see Result\Success},
* or {@see Result\Failure} if the callable throws an {@see Exception}.
*
* @template T
*
* @param (callable(): T) $task
*
* @return (Closure(): Result\ResultInterface<T>)
*
* @see Result\wrap()
*
* @pure
*/
function reflect(callable $task): Closure
{
return static fn() => Result\wrap($task);
}
32 changes: 32 additions & 0 deletions src/Psl/Async/series.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Psl\Dict;

/**
* Run the functions in the tasks collection in series, each one running once the previous function has completed.
*
* If any functions in the series throws, no more functions are run, and the exception is immediately thrown.
*
* @template Tk of array-key
* @template Tv
*
* @param iterable<Tk, (callable(): Tv)> $tasks
*
* @return array<Tk, Tv>
*/
function series(iterable $tasks): array
{
return Dict\map(
$tasks,
/**
* @param callable(): Tv $callable
*
* @return Tv
*/
static fn(callable $callable): mixed => run($callable)->await(),
);
}
24 changes: 24 additions & 0 deletions src/Psl/Async/wrap.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Psl\Result;

/**
* Wraps the async function in an awaitable that completes with a {@see Result\Success},
* or {@see Result\Failure} if the task throws an {@see Exception}.
*
* @template T
*
* @param (callable(): T) $task
*
* @return Result\ResultInterface<T>
*
* @see reflect()
*/
function wrap(callable $task): Result\ResultInterface
{
return run(reflect($task))->await();
}
5 changes: 4 additions & 1 deletion src/Psl/Internal/Loader.php
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,10 @@ final class Loader
'Psl\Trait\defined',
'Psl\Async\main',
'Psl\Async\run',
'Psl\Async\concurrent',
'Psl\Async\parallel',
'Psl\Async\reflect',
'Psl\Async\wrap',
'Psl\Async\series',
'Psl\Async\await',
'Psl\Async\any',
'Psl\Async\all',
Expand Down
2 changes: 1 addition & 1 deletion src/Psl/Shell/execute.php
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static function (array $m) use (
$stderr = new Stream\CloseReadHandle($pipes[2]);

try {
[$stdout_content, $stderr_content] = Async\concurrent([
[$stdout_content, $stderr_content] = Async\parallel([
static fn(): string => $stdout->readAll(timeout: $timeout),
static fn(): string => $stderr->readAll(timeout: $timeout),
]);
Expand Down
Loading