Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(io): add streaming function #335

Merged
merged 3 commits into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/component/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [input_handle](./../../src/Psl/IO/input_handle.php#L20)
- [output_handle](./../../src/Psl/IO/output_handle.php#L20)
- [pipe](./../../src/Psl/IO/pipe.php#L24)
- [streaming](./../../src/Psl/IO/streaming.php#L38)
- [write](./../../src/Psl/IO/write.php#L21)
- [write_error](./../../src/Psl/IO/write_error.php#L23)
- [write_error_line](./../../src/Psl/IO/write_error_line.php#L23)
Expand Down
2 changes: 1 addition & 1 deletion docs/component/shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

#### `Functions`

- [execute](./../../src/Psl/Shell/execute.php#L42)
- [execute](./../../src/Psl/Shell/execute.php#L41)
- [unpack](./../../src/Psl/Shell/unpack.php#L20)

#### `Enums`
Expand Down
109 changes: 109 additions & 0 deletions src/Psl/IO/streaming.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?php

declare(strict_types=1);

namespace Psl\IO;

use Generator;
use Psl;
use Psl\Channel;
use Psl\Result;
use Psl\Str;
use Revolt\EventLoop;

/**
* Streaming the output of the given read stream handles using a generator.
*
* Example:
*
* $handles = [
* 'foo' => get_read_stream('foo'),
* 'bar' => get_read_stream('bar'),
* ];
*
* foreach(IO\streaming($handles) as $type => $chunk) {
* IO\write_line('received chunk "%s" from "%s" stream', $chunk, $type);
* }
*
* @template T of array-key
*
* @param iterable<T, ReadStreamHandleInterface> $handles
*
* @throws Exception\AlreadyClosedException If one of the handles has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws Exception\TimeoutException If $timeout is reached before being able to read all the handles until the end.
*
* @return Generator<T, string, mixed, void>
*/
function streaming(iterable $handles, ?float $timeout = null): Generator
{
/**
* @psalm-suppress UnnecessaryVarAnnotation
*
* @var Channel\ReceiverInterface<array{T|null, Result\ResultInterface<string>}> $receiver
*
* @psalm-suppress UnnecessaryVarAnnotation
*
* @var Channel\SenderInterface<array{T|null, Result\ResultInterface<string>}> $sender
*/
[$receiver, $sender] = Channel\unbounded();

/** @var Psl\Ref<array<T, string>> $watchers */
$watchers = new Psl\Ref([]);
foreach ($handles as $index => $handle) {
$stream = $handle->getStream();
if ($stream === null) {
throw new Exception\AlreadyClosedException(Str\format('Handle "%s" is already closed.', (string) $index));
}

$watchers->value[$index] = EventLoop::onReadable($stream, static function (string $watcher) use ($index, $handle, $sender, $watchers): void {
try {
$result = Result\wrap($handle->tryRead(...));
if ($result->isFailed() || ($result->isSucceeded() && $result->getResult() === '')) {
EventLoop::cancel($watcher);
unset($watchers->value[$index]);
}

$sender->send([$index, $result]);
} finally {
if ($watchers->value === []) {
$sender->close();
}
}
});
}

$timeout_watcher = null;
if ($timeout !== null) {
$timeout_watcher = EventLoop::delay($timeout, static function () use ($sender): void {
/** @var Result\ResultInterface<string> $failure */
$failure = new Result\Failure(
new Exception\TimeoutException('Reached timeout before being able to read all the handles until the end.')
);

$sender->send([null, $failure]);
});
}

try {
while (true) {
[$index, $result] = $receiver->receive();
if (null === $index || $result->isFailed()) {
throw $result->getThrowable();
}

yield $index => $result->getResult();
}
} catch (Channel\Exception\ClosedChannelException) {
// completed.
return;
} finally {
if ($timeout_watcher !== null) {
EventLoop::cancel($timeout_watcher);
}

foreach ($watchers->value as $watcher) {
EventLoop::cancel($watcher);
}
}
}
1 change: 1 addition & 0 deletions src/Psl/Internal/Loader.php
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ final class Loader
'Psl\Unix\connect',
'Psl\Channel\bounded',
'Psl\Channel\unbounded',
'Psl\IO\streaming',
'Psl\IO\write',
'Psl\IO\write_line',
'Psl\IO\write_error',
Expand Down
44 changes: 11 additions & 33 deletions src/Psl/Shell/execute.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Psl\Shell;

use Psl\Async;
use Psl\Dict;
use Psl\Env;
use Psl\Filesystem;
Expand Down Expand Up @@ -156,25 +155,15 @@ static function (array $m) use (
$stderr = new IO\CloseReadStreamHandle($pipes[2]);

try {
[$stdout_content, $stderr_content] = Async\concurrently([
static fn(): string => $stdout->readAll(timeout: $timeout),
static fn(): string => $stderr->readAll(timeout: $timeout),
]);
// @codeCoverageIgnoreStart
} catch (Async\Exception\CompositeException $exception) {
$reasons = $exception->getReasons();
if ($reasons[0] instanceof IO\Exception\TimeoutException) {
throw new Exception\TimeoutException('reached timeout while the process output is still not readable.', 0, $reasons[0]);
}

if ($reasons[1] instanceof IO\Exception\TimeoutException) {
throw new Exception\TimeoutException('reached timeout while the process output is still not readable.', 0, $reasons[1]);
$result = '';
/** @psalm-suppress MissingThrowsDocblock */
foreach (IO\streaming([1 => $stdout, 2 => $stderr], $timeout) as $type => $chunk) {
if ($chunk) {
$result .= pack('C1N1', $type, Str\Byte\length($chunk)) . $chunk;
}
}

throw new Exception\RuntimeException('Failed to reach process output.', 0, $exception ?? null);
} catch (IO\Exception\TimeoutException $previous) {
throw new Exception\TimeoutException('reached timeout while the process output is still not readable.', 0, $previous);
// @codeCoverageIgnoreEnd
} finally {
/** @psalm-suppress MissingThrowsDocblock */
$stdout->close();
Expand All @@ -185,29 +174,18 @@ static function (array $m) use (
}

if ($code !== 0) {
/** @psalm-suppress MissingThrowsDocblock */
[$stdout_content, $stderr_content] = namespace\unpack($result);

throw new Exception\FailedExecutionException($commandline, $stdout_content, $stderr_content, $code);
}

if (ErrorOutputBehavior::Packed === $error_output_behavior) {
$result = '';
$stdout_length = Str\Byte\length($stdout_content);
$stderr_length = Str\Byte\length($stderr_content);

if ($stdout_length) {
$stdout_header = pack('C1N1', 1, $stdout_length);

$result .= $stdout_header . $stdout_content;
}

if ($stderr_length) {
$stderr_header = pack('C1N1', 2, $stderr_length);

$result .= $stderr_header . $stderr_content;
}

return $result;
}

/** @psalm-suppress MissingThrowsDocblock */
[$stdout_content, $stderr_content] = namespace\unpack($result);
return match ($error_output_behavior) {
ErrorOutputBehavior::Prepend => $stderr_content . $stdout_content,
ErrorOutputBehavior::Append => $stdout_content . $stderr_content,
Expand Down