From e1600c6632e739210c546b9607b9b65cbc2461a1 Mon Sep 17 00:00:00 2001 From: azjezz Date: Wed, 17 Nov 2021 16:15:18 +0100 Subject: [PATCH] feat(udp/tcp): queue operations instead of throwing Signed-off-by: azjezz --- docs/component/io.md | 6 +- docs/component/tcp.md | 2 +- docs/component/unix.md | 2 +- examples/io/benchmark.php | 12 ++-- examples/tcp/basic-http-server.php | 22 +++--- src/Psl/Filesystem/read_file.php | 6 +- src/Psl/IO/Internal/ResourceHandle.php | 51 +++++-------- src/Psl/IO/MemoryHandle.php | 57 ++++++--------- .../IO/ReadHandleConvenienceMethodsTrait.php | 9 ++- src/Psl/IO/ReadHandleInterface.php | 8 ++- src/Psl/IO/Reader.php | 71 +++++------------- src/Psl/IO/SeekHandleInterface.php | 7 +- src/Psl/IO/WriteHandleInterface.php | 4 +- src/Psl/TCP/Internal/Socket.php | 72 ++++++++++++++++--- src/Psl/TCP/Server.php | 56 ++++++++------- src/Psl/Unix/Internal/Socket.php | 72 ++++++++++++++++--- src/Psl/Unix/Server.php | 49 +++++++------ tests/unit/IO/MemoryHandleTest.php | 4 -- tests/unit/TCP/ServerTest.php | 20 ------ tests/unit/Unix/ServerTest.php | 25 ------- 20 files changed, 280 insertions(+), 275 deletions(-) diff --git a/docs/component/io.md b/docs/component/io.md index 324a7abf5..632322d44 100644 --- a/docs/component/io.md +++ b/docs/component/io.md @@ -30,7 +30,7 @@ - [HandleInterface](./../../src/Psl/IO/HandleInterface.php#L21) - [ReadHandleInterface](./../../src/Psl/IO/ReadHandleInterface.php#L10) - [ReadWriteHandleInterface](./../../src/Psl/IO/ReadWriteHandleInterface.php#L7) -- [SeekHandleInterface](./../../src/Psl/IO/SeekHandleInterface.php#L12) +- [SeekHandleInterface](./../../src/Psl/IO/SeekHandleInterface.php#L10) - [SeekReadHandleInterface](./../../src/Psl/IO/SeekReadHandleInterface.php#L7) - [SeekReadWriteHandleInterface](./../../src/Psl/IO/SeekReadWriteHandleInterface.php#L7) - [SeekWriteHandleInterface](./../../src/Psl/IO/SeekWriteHandleInterface.php#L7) @@ -38,8 +38,8 @@ #### `Classes` -- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L15) -- [Reader](./../../src/Psl/IO/Reader.php#L17) +- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L14) +- [Reader](./../../src/Psl/IO/Reader.php#L16) #### `Traits` diff --git a/docs/component/tcp.md b/docs/component/tcp.md index cea677a0b..23f19ecb7 100644 --- a/docs/component/tcp.md +++ b/docs/component/tcp.md @@ -21,7 +21,7 @@ #### `Classes` - [ConnectOptions](./../../src/Psl/TCP/ConnectOptions.php#L7) -- [Server](./../../src/Psl/TCP/Server.php#L17) +- [Server](./../../src/Psl/TCP/Server.php#L18) - [ServerOptions](./../../src/Psl/TCP/ServerOptions.php#L9) diff --git a/docs/component/unix.md b/docs/component/unix.md index 36a68d69e..c4c6f04ee 100644 --- a/docs/component/unix.md +++ b/docs/component/unix.md @@ -20,6 +20,6 @@ #### `Classes` -- [Server](./../../src/Psl/Unix/Server.php#L16) +- [Server](./../../src/Psl/Unix/Server.php#L17) diff --git a/examples/io/benchmark.php b/examples/io/benchmark.php index 0d7f65d8c..c11f60062 100644 --- a/examples/io/benchmark.php +++ b/examples/io/benchmark.php @@ -12,14 +12,13 @@ use function memory_get_peak_usage; use function microtime; use function round; -use function strlen; use const PHP_OS_FAMILY; require __DIR__ . '/../../vendor/autoload.php'; Async\main(static function (): int { if (PHP_OS_FAMILY === 'Windows') { - IO\output_handle()->writeAll('This example requires does not support Windows.'); + IO\output_handle()->writeAll('This example does not support Windows.'); return 0; } @@ -42,12 +41,11 @@ Async\Scheduler::delay($seconds, $input->close(...)); $start = microtime(true); - $bytes = 0; - + $i = 0; try { - while (($chunk = $input->readAll(65536))) { + while ($chunk = $input->readFixedSize(65536)) { $output->writeAll($chunk); - $bytes += strlen($chunk); + $i++; Async\later(); } @@ -55,7 +53,7 @@ } $seconds = microtime(true) - $start; - + $bytes = $i * 65536; $bytes_formatted = round($bytes / 1024 / 1024 / $seconds, 1); $stdout->writeAll('read ' . $bytes . ' byte(s) in ' . round($seconds, 3) . ' second(s) => ' . $bytes_formatted . ' MiB/s' . PHP_EOL); diff --git a/examples/tcp/basic-http-server.php b/examples/tcp/basic-http-server.php index b60cb8b3f..b0da4b15a 100644 --- a/examples/tcp/basic-http-server.php +++ b/examples/tcp/basic-http-server.php @@ -7,23 +7,18 @@ use Psl\Async; use Psl\IO; use Psl\Network\Exception\AlreadyStoppedException; -use Psl\Str; use Psl\TCP; use Throwable; require __DIR__ . '/../../vendor/autoload.php'; Async\main(static function (): int { - $output = IO\output_handle(); $server = TCP\Server::create('localhost', 3030); - $output->writeAll("Server is listening on http://localhost:3030\n"); + IO\output_handle()->writeAll("Server is listening on http://localhost:3030\n"); - Async\Scheduler::defer(static function () use ($server, $output) { + Async\Scheduler::defer(static function () use ($server) { Async\await_signal(SIGINT); - - $output->writeAll("\nGoodbye 👋\n"); - $server->stopListening(); }); @@ -31,18 +26,17 @@ while (true) { $connection = $server->nextConnection(); - Async\Scheduler::defer(static function() use ($connection, $output) { + Async\Scheduler::defer(static function() use ($connection) { try { - $request = $connection->read(); + $stream = $connection->getStream(); - $output->writeAll("[" . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . "MiB] " . Str\split($request, "\n")[0] . "\n"); + Async\await_readable($stream); + $connection->read(); $connection->writeAll("HTTP/1.1 200 OK\n"); - - $connection->writeAll("Server: PSL\n"); + $connection->writeAll("Server: PHP+PSL\n"); $connection->writeAll("Connection: close\n"); $connection->writeAll("Content-Type: text/plain\n\n"); - $connection->writeAll("Hello, World!"); $connection->close(); } catch (Throwable) { @@ -50,7 +44,7 @@ }); } } catch (AlreadyStoppedException) { - // server stopped. + IO\output_handle()->writeAll("\nGoodbye 👋\n"); } return 0; diff --git a/src/Psl/Filesystem/read_file.php b/src/Psl/Filesystem/read_file.php index 4cebf56e7..fbde7fed7 100644 --- a/src/Psl/Filesystem/read_file.php +++ b/src/Psl/Filesystem/read_file.php @@ -12,9 +12,9 @@ /** * Reads entire file into a string. * - * @param int $offset The offset where the reading starts. - * @param null|int $length Maximum length of data read. The default is to read - * until end of file is reached. + * @param positive-int|0 $offset The offset where the reading starts. + * @param positive-int|null $length Maximum length of data read. + * The default is to read until end of file is reached. * * @throws Psl\Exception\InvariantViolationException If the file specified by * $file does not exist, or is not readable. diff --git a/src/Psl/IO/Internal/ResourceHandle.php b/src/Psl/IO/Internal/ResourceHandle.php index c1bff2200..d9d8f2044 100644 --- a/src/Psl/IO/Internal/ResourceHandle.php +++ b/src/Psl/IO/Internal/ResourceHandle.php @@ -6,7 +6,6 @@ use Psl; use Psl\Async; -use Psl\Exception\InvariantViolationException; use Psl\IO; use Psl\IO\Exception; use Psl\Type; @@ -115,17 +114,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek) } /** - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws Exception\RuntimeException If an error occurred during the operation. - * @throws InvariantViolationException If $timeout is negative. + * {@inheritDoc} */ public function write(string $bytes, ?float $timeout = null): int { - Psl\invariant( - $timeout === null || $timeout > 0, - '$timeout must be null, or > 0', - ); - $written = $this->writeImmediately($bytes); if ($this->blocks || $written === strlen($bytes)) { return $written; @@ -139,6 +131,7 @@ public function write(string $bytes, ?float $timeout = null): int EventLoop::enable($this->writeWatcher); $delay_watcher = null; if (null !== $timeout) { + $timeout = $timeout < 0.0 ? 0.0 : $timeout; $delay_watcher = EventLoop::delay( $timeout, static function () use (&$deferred) { @@ -165,10 +158,7 @@ static function () use (&$deferred) { } /** - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws Exception\RuntimeException If an error occurred during the operation. - * - * @psalm-suppress MissingThrowsDocblock + * {@inheritDoc} */ public function writeImmediately(string $bytes): int { @@ -191,12 +181,11 @@ public function writeImmediately(string $bytes): int throw new Exception\RuntimeException($error['message'] ?? 'unknown error.'); } - return $result; + return $result >= 0 ? $result : 0; } /** - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws Exception\RuntimeException If an error occurred during the operation. + * {@inheritDoc} */ public function seek(int $offset): void { @@ -204,7 +193,8 @@ public function seek(int $offset): void throw new Exception\AlreadyClosedException('Handle has already been closed.'); } - Psl\invariant($offset >= 0, '$offset must be a positive-int.'); + /** @psalm-suppress MissingThrowsDocblock */ + Psl\invariant($offset >= 0, '$offset must be a 0 or positive-int.'); /** @psalm-suppress PossiblyInvalidArgument */ $result = @fseek($this->stream, $offset); @@ -214,8 +204,7 @@ public function seek(int $offset): void } /** - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws Exception\RuntimeException If an error occurred during the operation. + * {@inheritDoc} */ public function tell(): int { @@ -231,22 +220,14 @@ public function tell(): int throw new Exception\RuntimeException($error['message'] ?? 'unknown error.'); } - return $result; + return $result >= 0 ? $result : 0; } /** - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws Exception\RuntimeException If an error occurred during the operation. - * @throws Exception\TimeoutException If reached timeout. - * @throws InvariantViolationException If $max_bytes is 0, or $timeout is negative. + * {@inheritDoc} */ public function read(?int $max_bytes = null, ?float $timeout = null): string { - Psl\invariant( - $timeout === null || $timeout > 0, - '$timeout must be null, or > 0', - ); - $chunk = $this->readImmediately($max_bytes); if ('' !== $chunk || $this->blocks) { return $chunk; @@ -258,6 +239,7 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string EventLoop::enable($this->readWatcher); $delay_watcher = null; if (null !== $timeout) { + $timeout = $timeout < 0.0 ? 0.0 : $timeout; $delay_watcher = EventLoop::delay( $timeout, static function () use (&$deferred) { @@ -284,9 +266,7 @@ static function () use (&$deferred) { } /** - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws Exception\RuntimeException If an error occurred during the operation. - * @throws InvariantViolationException If $max_bytes is 0. + * {@inheritDoc} */ public function readImmediately(?int $max_bytes = null): string { @@ -302,12 +282,13 @@ public function readImmediately(?int $max_bytes = null): string throw new Exception\AlreadyClosedException('Handle has already been closed.'); } - Psl\invariant($max_bytes === null || $max_bytes > 0, '$max_bytes must be null, or > 0'); - if ($max_bytes === null) { $max_bytes = self::DEFAULT_READ_BUFFER_SIZE; } elseif ($max_bytes > self::MAXIMUM_READ_BUFFER_SIZE) { $max_bytes = self::MAXIMUM_READ_BUFFER_SIZE; + } else { + /** @psalm-suppress MissingThrowsDocblock */ + Psl\invariant($max_bytes > 0, '$max_bytes must be null, or > 0'); } /** @psalm-suppress PossiblyInvalidArgument */ @@ -323,7 +304,7 @@ public function readImmediately(?int $max_bytes = null): string } /** - * @throws Exception\RuntimeException If unable to close the handle. + * {@inheritDoc} */ public function close(): void { diff --git a/src/Psl/IO/MemoryHandle.php b/src/Psl/IO/MemoryHandle.php index de0e2a113..99a2151fe 100644 --- a/src/Psl/IO/MemoryHandle.php +++ b/src/Psl/IO/MemoryHandle.php @@ -5,7 +5,6 @@ namespace Psl\IO; use Psl; -use Psl\Exception\InvariantViolationException; use Psl\Math; use function str_repeat; @@ -17,8 +16,11 @@ final class MemoryHandle implements CloseSeekReadWriteHandleInterface use WriteHandleConvenienceMethodsTrait; use ReadHandleConvenienceMethodsTrait; - private string $buffer; + /** + * @var 0|positive-int + */ private int $offset = 0; + private string $buffer; private bool $closed = false; public function __construct(string $buffer = '') @@ -29,10 +31,9 @@ public function __construct(string $buffer = '') /** * Read from the handle. * - * @param int|null $max_bytes the maximum number of bytes to read. + * @param positive-int|null $max_bytes the maximum number of bytes to read. * * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws InvariantViolationException If $max_bytes is 0. * * @return string the read data on success, or an empty string if the end of file is reached. */ @@ -40,9 +41,13 @@ public function readImmediately(?int $max_bytes = null): string { $this->assertHandleIsOpen(); - $max_bytes ??= Math\INT64_MAX; + if (null === $max_bytes) { + $max_bytes = Math\INT64_MAX; + } else { + /** @psalm-suppress MissingThrowsDocblock */ + Psl\invariant($max_bytes > 0, '$max_bytes must be null or positive.'); + } - Psl\invariant($max_bytes > 0, '$max_bytes must be null or positive.'); $length = strlen($this->buffer); if ($this->offset >= $length) { return ''; @@ -51,7 +56,7 @@ public function readImmediately(?int $max_bytes = null): string $length -= $this->offset; $length = $length > $max_bytes ? $max_bytes : $length; $result = substr($this->buffer, $this->offset, $length); - $this->offset += $length; + $this->offset = ($offset = $this->offset + $length) >= 0 ? $offset : 0; return $result; } @@ -59,10 +64,9 @@ public function readImmediately(?int $max_bytes = null): string /** * Read from the handle. * - * @param int|null $max_bytes the maximum number of bytes to read. + * @param positive-int|null $max_bytes the maximum number of bytes to read. * * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws InvariantViolationException If $max_bytes is 0. * * @return string the read data on success, or an empty string if the end of file is reached. */ @@ -72,26 +76,19 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string } /** - * Move to a specific offset within a handle. - * - * Offset is relative to the start of the handle - so, the beginning of the - * handle is always offset 0. - * - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws InvariantViolationException If $offset is negative. + * {@inheritDoc} */ public function seek(int $offset): void { $this->assertHandleIsOpen(); + /** @psalm-suppress MissingThrowsDocblock */ Psl\invariant($offset >= 0, '$offset must be a positive-int.'); $this->offset = $offset; } /** - * Get the current pointer position within a handle. - * - * @throws Exception\AlreadyClosedException If the handle has been already closed. + * {@inheritDoc} */ public function tell(): int { @@ -101,11 +98,7 @@ public function tell(): int } /** - * Write data. - * - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * - * @return int the number of bytes written on success. + * {@inheritDoc} */ public function writeImmediately(string $bytes, ?float $timeout = null): int { @@ -116,6 +109,7 @@ public function writeImmediately(string $bytes, ?float $timeout = null): int $length = $this->offset; } + /** @var positive-int|0 $bytes_length */ $bytes_length = strlen($bytes); $new = substr($this->buffer, 0, $this->offset) . $bytes; if ($this->offset < $length) { @@ -125,16 +119,12 @@ public function writeImmediately(string $bytes, ?float $timeout = null): int } $this->buffer = $new; - $this->offset += $bytes_length; + $this->offset = ($offset = $this->offset + $bytes_length) >= 0 ? $offset : 0; return $bytes_length; } /** - * Write data. - * - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * - * @return int the number of bytes written on success. + * {@inheritDoc} */ public function write(string $bytes, ?float $timeout = null): int { @@ -142,16 +132,11 @@ public function write(string $bytes, ?float $timeout = null): int } /** - * Close the handle. - * - * @throws Exception\AlreadyClosedException If the handle has been already closed. + * {@inheritDoc} */ public function close(): void { - $this->assertHandleIsOpen(); - $this->closed = true; - $this->offset = -1; } public function getBuffer(): string diff --git a/src/Psl/IO/ReadHandleConvenienceMethodsTrait.php b/src/Psl/IO/ReadHandleConvenienceMethodsTrait.php index d4a5e5560..ab44d1b0f 100644 --- a/src/Psl/IO/ReadHandleConvenienceMethodsTrait.php +++ b/src/Psl/IO/ReadHandleConvenienceMethodsTrait.php @@ -24,6 +24,8 @@ trait ReadHandleConvenienceMethodsTrait * Up to `$max_bytes` may be allocated in a buffer; large values may lead to * unnecessarily hitting the request memory limit. * + * @param ?positive-int $max_bytes the maximum number of bytes to read + * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. * @throws Exception\TimeoutException If $timeout is reached before being able to read from the handle. @@ -48,7 +50,10 @@ static function () use ($data): void { do { $chunk_size = $to_read; - /** @psalm-suppress MissingThrowsDocblock */ + /** + * @var positive-int|null $chunk_size + * @psalm-suppress UnnecessaryVarAnnotation + */ $chunk = $this->read($chunk_size, $timer->getRemaining()); $data->value .= $chunk; if ($to_read !== null) { @@ -66,6 +71,8 @@ static function () use ($data): void { * or socket which the other end keeps open forever. Set a timeout if you * do not want this to happen. * + * @param positive-int $size the number of bytes to read. + * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. * @throws Exception\TimeoutException If $timeout is reached before being able to read from the handle. diff --git a/src/Psl/IO/ReadHandleInterface.php b/src/Psl/IO/ReadHandleInterface.php index d3040aec2..8ed155a28 100644 --- a/src/Psl/IO/ReadHandleInterface.php +++ b/src/Psl/IO/ReadHandleInterface.php @@ -15,7 +15,7 @@ interface ReadHandleInterface extends HandleInterface * Up to `$max_bytes` may be allocated in a buffer; large values may lead to * unnecessarily hitting the request memory limit. * - * @param ?int $max_bytes the maximum number of bytes to read + * @param ?positive-int $max_bytes the maximum number of bytes to read * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. @@ -30,7 +30,7 @@ public function readImmediately(?int $max_bytes = null): string; /** * Read from the handle, waiting for data if necessary. * - * @param ?int $max_bytes the maximum number of bytes to read + * @param ?positive-int $max_bytes the maximum number of bytes to read * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. @@ -53,6 +53,8 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string; * Up to `$max_bytes` may be allocated in a buffer; large values may lead to * unnecessarily hitting the request memory limit. * + * @param ?positive-int $max_bytes the maximum number of bytes to read + * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. * @throws Exception\TimeoutException If $timeout is reached before being able to read from the handle. @@ -66,6 +68,8 @@ public function readAll(?int $max_bytes = null, ?float $timeout = null): string; * or socket which the other end keeps open forever. Set a timeout if you * do not want this to happen. * + * @param positive-int $size the number of bytes to read. + * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. * @throws Exception\TimeoutException If $timeout is reached before being able to read from the handle. diff --git a/src/Psl/IO/Reader.php b/src/Psl/IO/Reader.php index 32088078f..c80d67131 100644 --- a/src/Psl/IO/Reader.php +++ b/src/Psl/IO/Reader.php @@ -5,7 +5,6 @@ namespace Psl\IO; use Psl; -use Psl\Exception\InvariantViolationException; use Psl\Str; use function strlen; @@ -29,14 +28,7 @@ public function __construct(ReadHandleInterface $handle) } /** - * Read fixed amount of bytes specified by $size. - * - * @param int $size The number of bytes to read. - * - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * @throws Exception\RuntimeException If an error occurred during the operation, - * or reached end of file before requested size. - * @throws InvariantViolationException If $size is not positive. + * {@inheritDoc} */ public function readFixedSize(int $size, ?float $timeout = null): string { @@ -53,11 +45,10 @@ function (): void { }, ); - while (strlen($this->buffer) < $size && !$this->eof) { - $this->fillBuffer( - $size - strlen($this->buffer), - $timer->getRemaining(), - ); + while (($length = strlen($this->buffer)) < $size && !$this->eof) { + /** @var positive-int $to_read */ + $to_read = $size - $length; + $this->fillBuffer($to_read, $timer->getRemaining()); } if ($this->eof) { @@ -65,6 +56,7 @@ function (): void { } $buffer_size = strlen($this->buffer); + /** @psalm-suppress MissingThrowsDocblock */ Psl\invariant($buffer_size >= $size, "Should have read the requested data or reached EOF"); if ($size === $buffer_size) { @@ -79,6 +71,8 @@ function (): void { } /** + * @param null|positive-int $desired_bytes + * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. * @throws Exception\TimeoutException If $timeout is reached before being able to read from the handle. @@ -98,14 +92,11 @@ private function fillBuffer(?int $desired_bytes, ?float $timeout): void * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation, or reached end of file. - * @throws Psl\Exception\InvariantViolationException If $timeout is negative. */ public function readByte(?float $timeout = null): string { - Psl\invariant( - $timeout === null || $timeout > 0, - '$timeout must be null, or > 0', - ); + /** @psalm-suppress MissingThrowsDocblock */ + Psl\invariant($timeout === null || $timeout > 0.0, '$timeout must be null, or > 0'); if ($this->buffer === '' && !$this->eof) { // @codeCoverageIgnoreStart @@ -195,24 +186,12 @@ public function readUntil(string $suffix): ?string } /** - * Read from the handle, waiting for data if necessary. - * - * @param ?int $max_bytes the maximum number of bytes to read - * - * @throws Exception\RuntimeException If an error occurred during the operation. - * @throws Exception\TimeoutException If $timeout is reached before being able to read from the handle. - * @throws InvariantViolationException If $max_bytes is 0. - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * - * @return string the read data on success, or an empty string if the end of file is reached. - * - * Up to `$max_bytes` may be allocated in a buffer; large values may lead to - * unnecessarily hitting the request memory limit. + * {@inheritDoc} */ public function read(?int $max_bytes = null, ?float $timeout = null): string { - Psl\invariant($max_bytes === null || $max_bytes >= 0, '$max_bytes must be null, or >= 0'); - Psl\invariant($timeout === null || $timeout > 0, '$timeout must be null, or > 0'); + /** @psalm-suppress MissingThrowsDocblock */ + Psl\invariant($timeout === null || $timeout > 0.0, '$timeout must be null, or > 0'); if ($this->eof) { return ''; @@ -228,28 +207,14 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string } /** - * An immediate, unordered read. - * - * Up to `$max_bytes` may be allocated in a buffer; large values may lead to - * unnecessarily hitting the request memory limit. - * - * @param ?int $max_bytes the maximum number of bytes to read - * - * @throws Exception\RuntimeException If an error occurred during the operation. - * @throws InvariantViolationException If $max_bytes is 0. - * @throws Exception\AlreadyClosedException If the handle has been already closed. - * - * @return string the read data on success, or an empty string if the end of file is reached. - * - * @see ReadHandleInterface::read() - * @see ReadHandleInterface::readAll() + * {@inheritDoc} */ public function readImmediately(?int $max_bytes = null): string { - Psl\invariant( - $max_bytes === null || $max_bytes > 0, - '$max_bytes must be null, or greater than 0', - ); + if (null !== $max_bytes) { + /** @psalm-suppress MissingThrowsDocblock */ + Psl\invariant($max_bytes > 0, '$max_bytes must be null, or greater than 0'); + } if ($this->eof) { return ''; diff --git a/src/Psl/IO/SeekHandleInterface.php b/src/Psl/IO/SeekHandleInterface.php index 9f3f9b501..3f8d39754 100644 --- a/src/Psl/IO/SeekHandleInterface.php +++ b/src/Psl/IO/SeekHandleInterface.php @@ -4,8 +4,6 @@ namespace Psl\IO; -use Psl; - /** * A handle that can have its' position changed. */ @@ -17,7 +15,8 @@ interface SeekHandleInterface extends HandleInterface * Offset is relative to the start of the handle - so, the beginning of the * handle is always offset 0. * - * @throws Psl\Exception\InvariantViolationException If $offset is negative. + * @param positive-int|0 $offset + * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. */ @@ -28,6 +27,8 @@ public function seek(int $offset): void; * * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. + * + * @return positive-int|0 */ public function tell(): int; } diff --git a/src/Psl/IO/WriteHandleInterface.php b/src/Psl/IO/WriteHandleInterface.php index 83c7a712c..4fcfc75ba 100644 --- a/src/Psl/IO/WriteHandleInterface.php +++ b/src/Psl/IO/WriteHandleInterface.php @@ -15,7 +15,7 @@ interface WriteHandleInterface extends HandleInterface * @throws Exception\AlreadyClosedException If the handle has been already closed. * @throws Exception\RuntimeException If an error occurred during the operation. * - * @return int the number of bytes written on success, which may be 0. + * @return positive-int|0 the number of bytes written on success, which may be 0. * * @see WriteHandleInterface::write() * @see WriteHandleInterface::writeAll() @@ -32,7 +32,7 @@ public function writeImmediately(string $bytes): int; * @throws Exception\RuntimeException If an error occurred during the operation. * @throws Exception\TimeoutException If reached timeout before completing the operation. * - * @return int the number of bytes written, which may be less than the length of input string. + * @return positive-int|0 the number of bytes written, which may be less than the length of input string. */ public function write(string $bytes, ?float $timeout = null): int; diff --git a/src/Psl/TCP/Internal/Socket.php b/src/Psl/TCP/Internal/Socket.php index d82c38626..bb16386a0 100644 --- a/src/Psl/TCP/Internal/Socket.php +++ b/src/Psl/TCP/Internal/Socket.php @@ -4,25 +4,73 @@ namespace Psl\TCP\Internal; +use Psl\IO; use Psl\IO\Exception; use Psl\IO\Internal; use Psl\Network; use Psl\Network\Address; use Psl\TCP; +use function is_resource; + /** * @internal * * @codeCoverageIgnore */ -final class Socket extends Internal\ResourceHandle implements TCP\SocketInterface +final class Socket implements IO\Stream\CloseReadWriteHandleInterface, TCP\SocketInterface { + use IO\WriteHandleConvenienceMethodsTrait; + use IO\ReadHandleConvenienceMethodsTrait; + + private Internal\ResourceHandle $handle; + /** * @param resource $stream */ public function __construct($stream) { - parent::__construct($stream, read: true, write: true, seek: false); + $this->handle = new Internal\ResourceHandle($stream, read: true, write: true, seek: false); + } + + /** + * {@inheritDoc} + */ + public function readImmediately(?int $max_bytes = null): string + { + return $this->handle->readImmediately($max_bytes); + } + + /** + * {@inheritDoc} + */ + public function read(?int $max_bytes = null, ?float $timeout = null): string + { + return $this->handle->read($max_bytes, $timeout); + } + + /** + * {@inheritDoc} + */ + public function writeImmediately(string $bytes): int + { + return $this->handle->writeImmediately($bytes); + } + + /** + * {@inheritDoc} + */ + public function write(string $bytes, ?float $timeout = null): int + { + return $this->handle->write($bytes, $timeout); + } + + /** + * {@inheritDoc} + */ + public function getStream(): mixed + { + return $this->handle->getStream(); } /** @@ -30,12 +78,12 @@ public function __construct($stream) */ public function getLocalAddress(): Address { - if (null === $this->stream) { + $stream = $this->handle->getStream(); + if (!is_resource($stream)) { throw new Exception\AlreadyClosedException('Socket handle has already been closed.'); } - /** @psalm-suppress PossiblyInvalidArgument */ - return Network\Internal\get_sock_name($this->stream); + return Network\Internal\get_sock_name($stream); } /** @@ -43,11 +91,19 @@ public function getLocalAddress(): Address */ public function getPeerAddress(): Address { - if (null === $this->stream) { + $stream = $this->handle->getStream(); + if (!is_resource($stream)) { throw new Exception\AlreadyClosedException('Socket handle has already been closed.'); } - /** @psalm-suppress PossiblyInvalidArgument */ - return Network\Internal\get_peer_name($this->stream); + return Network\Internal\get_peer_name($stream); + } + + /** + * {@inheritDoc} + */ + public function close(): void + { + $this->handle->close(); } } diff --git a/src/Psl/TCP/Server.php b/src/Psl/TCP/Server.php index cd59e6057..efc60b995 100644 --- a/src/Psl/TCP/Server.php +++ b/src/Psl/TCP/Server.php @@ -5,6 +5,7 @@ namespace Psl\TCP; use Psl; +use Psl\Async; use Psl\Network; use Revolt\EventLoop; @@ -20,7 +21,11 @@ final class Server implements Network\ServerInterface * @var resource|null $impl */ private mixed $impl; - private ?EventLoop\Suspension $suspension = null; + + /** + * @var null|Async\Deferred + */ + private ?Async\Deferred $deferred = null; private string $watcher; /** @@ -29,22 +34,18 @@ final class Server implements Network\ServerInterface private function __construct(mixed $impl) { $this->impl = $impl; - $suspension = &$this->suspension; + $deferred = &$this->deferred; $this->watcher = EventLoop::onReadable( $this->impl, /** * @param resource|object $resource */ - static function (string $_watcher, mixed $resource) use (&$suspension): void { - /** - * @var resource $resource - */ + static function (string $_watcher, mixed $resource) use (&$deferred): void { + /** @var resource $resource */ $sock = @stream_socket_accept($resource, timeout: 0.0); - /** @var \Revolt\EventLoop\Suspension|null $tmp */ - $tmp = $suspension; - $suspension = null; if ($sock !== false) { - $tmp?->resume($sock); + /** @var Async\Deferred|null $deferred */ + $deferred?->complete($sock); return; } @@ -52,7 +53,8 @@ static function (string $_watcher, mixed $resource) use (&$suspension): void { // @codeCoverageIgnoreStart /** @var array{file: string, line: int, message: string, type: int} $err */ $err = error_get_last(); - $tmp?->throw(new Network\Exception\RuntimeException('Failed to accept incoming connection: ' . $err['message'], $err['type'])); + /** @var Async\Deferred|null $deferred */ + $deferred?->error(new Network\Exception\RuntimeException('Failed to accept incoming connection: ' . $err['message'], $err['type'])); // @codeCoverageIgnoreEnd }, ); @@ -94,27 +96,29 @@ public static function create( */ public function nextConnection(): SocketInterface { - if (null !== $this->suspension) { - throw new Network\Exception\RuntimeException('Pending operation.'); - } + $this->deferred + ?->getAwaitable() + ->then(static fn() => null, static fn() => null) + ->ignore() + ->await(); if (null === $this->impl) { throw new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.'); } - $this->suspension = $suspension = EventLoop::createSuspension(); + /** @var Async\Deferred */ + $this->deferred = new Async\Deferred(); + /** @psalm-suppress MissingThrowsDocblock */ EventLoop::enable($this->watcher); try { - /** @var resource $socket */ - $socket = $suspension->suspend(); + /** @psalm-suppress PossiblyNullReference */ + return new Internal\Socket($this->deferred->getAwaitable()->await()); } finally { EventLoop::disable($this->watcher); + $this->deferred = null; } - - /** @psalm-suppress MissingThrowsDocblock */ - return new Internal\Socket($socket); } /** @@ -143,18 +147,18 @@ public function stopListening(): void return; } - $suspension = null; + $resource = $this->impl; + + $deferred = null; if (null !== $this->watcher) { EventLoop::cancel($this->watcher); - $suspension = $this->suspension; - $this->suspension = null; + $deferred = $this->deferred; + $this->deferred = null; } - $resource = $this->impl; $this->impl = null; - /** @psalm-suppress PossiblyNullArgument */ fclose($resource); - $suspension?->throw(new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.')); + $deferred?->error(new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.')); } } diff --git a/src/Psl/Unix/Internal/Socket.php b/src/Psl/Unix/Internal/Socket.php index 5e1eaefa7..79c500e0b 100644 --- a/src/Psl/Unix/Internal/Socket.php +++ b/src/Psl/Unix/Internal/Socket.php @@ -4,25 +4,73 @@ namespace Psl\Unix\Internal; +use Psl\IO; use Psl\IO\Exception; use Psl\IO\Internal; use Psl\Network; use Psl\Network\Address; use Psl\Unix; +use function is_resource; + /** * @internal * * @codeCoverageIgnore */ -final class Socket extends Internal\ResourceHandle implements Unix\SocketInterface +final class Socket implements IO\Stream\CloseReadWriteHandleInterface, Unix\SocketInterface { + use IO\WriteHandleConvenienceMethodsTrait; + use IO\ReadHandleConvenienceMethodsTrait; + + private Internal\ResourceHandle $handle; + /** * @param resource $stream */ public function __construct($stream) { - parent::__construct($stream, read: true, write: true, seek: false); + $this->handle = new Internal\ResourceHandle($stream, read: true, write: true, seek: false); + } + + /** + * {@inheritDoc} + */ + public function readImmediately(?int $max_bytes = null): string + { + return $this->handle->readImmediately($max_bytes); + } + + /** + * {@inheritDoc} + */ + public function read(?int $max_bytes = null, ?float $timeout = null): string + { + return $this->handle->read($max_bytes, $timeout); + } + + /** + * {@inheritDoc} + */ + public function writeImmediately(string $bytes): int + { + return $this->handle->writeImmediately($bytes); + } + + /** + * {@inheritDoc} + */ + public function write(string $bytes, ?float $timeout = null): int + { + return $this->handle->write($bytes, $timeout); + } + + /** + * {@inheritDoc} + */ + public function getStream(): mixed + { + return $this->handle->getStream(); } /** @@ -30,12 +78,12 @@ public function __construct($stream) */ public function getLocalAddress(): Address { - if (null === $this->stream) { + $stream = $this->handle->getStream(); + if (!is_resource($stream)) { throw new Exception\AlreadyClosedException('Socket handle has already been closed.'); } - /** @psalm-suppress PossiblyInvalidArgument */ - return Network\Internal\get_sock_name($this->stream); + return Network\Internal\get_sock_name($stream); } /** @@ -43,11 +91,19 @@ public function getLocalAddress(): Address */ public function getPeerAddress(): Address { - if (null === $this->stream) { + $stream = $this->handle->getStream(); + if (!is_resource($stream)) { throw new Exception\AlreadyClosedException('Socket handle has already been closed.'); } - /** @psalm-suppress PossiblyInvalidArgument */ - return Network\Internal\get_peer_name($this->stream); + return Network\Internal\get_peer_name($stream); + } + + /** + * {@inheritDoc} + */ + public function close(): void + { + $this->handle->close(); } } diff --git a/src/Psl/Unix/Server.php b/src/Psl/Unix/Server.php index 7d4de06c8..8f5d54d54 100644 --- a/src/Psl/Unix/Server.php +++ b/src/Psl/Unix/Server.php @@ -4,6 +4,7 @@ namespace Psl\Unix; +use Psl\Async; use Psl\Network; use Revolt\EventLoop; @@ -19,7 +20,10 @@ final class Server implements Network\ServerInterface * @var resource|null $impl */ private mixed $impl; - private ?EventLoop\Suspension $suspension = null; + /** + * @var null|Async\Deferred + */ + private ?Async\Deferred $deferred = null; private string $watcher; /** @@ -28,22 +32,20 @@ final class Server implements Network\ServerInterface private function __construct(mixed $impl) { $this->impl = $impl; - $suspension = &$this->suspension; + $deferred = &$this->deferred; $this->watcher = EventLoop::onReadable( $this->impl, /** * @param resource|object $resource */ - static function (string $_watcher, mixed $resource) use (&$suspension): void { + static function (string $_watcher, mixed $resource) use (&$deferred): void { /** * @var resource $resource */ $sock = @stream_socket_accept($resource, timeout: 0.0); - /** @var \Revolt\EventLoop\Suspension|null $tmp */ - $tmp = $suspension; - $suspension = null; if ($sock !== false) { - $tmp?->resume($sock); + /** @var Async\Deferred|null $deferred */ + $deferred?->complete($sock); return; } @@ -51,7 +53,8 @@ static function (string $_watcher, mixed $resource) use (&$suspension): void { // @codeCoverageIgnoreStart /** @var array{file: string, line: int, message: string, type: int} $err */ $err = error_get_last(); - $tmp?->throw(new Network\Exception\RuntimeException('Failed to accept incoming connection: ' . $err['message'], $err['type'])); + /** @var Async\Deferred|null $deferred */ + $deferred?->error(new Network\Exception\RuntimeException('Failed to accept incoming connection: ' . $err['message'], $err['type'])); // @codeCoverageIgnoreEnd }, ); @@ -83,27 +86,28 @@ public static function create(string $file): self */ public function nextConnection(): SocketInterface { - if (null !== $this->suspension) { - throw new Network\Exception\RuntimeException('Pending operation.'); - } + $this->deferred + ?->getAwaitable() + ->then(static fn() => null, static fn() => null) + ->ignore() + ->await(); if (null === $this->impl) { throw new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.'); } - $this->suspension = $suspension = EventLoop::createSuspension(); + /** @var Async\Deferred */ + $this->deferred = new Async\Deferred(); /** @psalm-suppress MissingThrowsDocblock */ EventLoop::enable($this->watcher); try { - /** @var resource $socket */ - $socket = $suspension->suspend(); + /** @psalm-suppress PossiblyNullReference */ + return new Internal\Socket($this->deferred->getAwaitable()->await()); } finally { EventLoop::disable($this->watcher); + $this->deferred = null; } - - /** @psalm-suppress MissingThrowsDocblock */ - return new Internal\Socket($socket); } /** @@ -127,19 +131,18 @@ public function stopListening(): void return; } - $suspension = null; + $resource = $this->impl; + $deferred = null; if (null !== $this->watcher) { EventLoop::cancel($this->watcher); - $suspension = $this->suspension; - $this->suspension = null; + $deferred = $this->deferred; + $this->deferred = null; } - $resource = $this->impl; $this->impl = null; - /** @psalm-suppress PossiblyNullArgument */ fclose($resource); - $suspension?->throw(new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.')); + $deferred?->error(new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.')); } public function __destruct() diff --git a/tests/unit/IO/MemoryHandleTest.php b/tests/unit/IO/MemoryHandleTest.php index a5661fa91..cb0809f37 100644 --- a/tests/unit/IO/MemoryHandleTest.php +++ b/tests/unit/IO/MemoryHandleTest.php @@ -59,10 +59,6 @@ public function provideOperations(): iterable yield [ static fn(IO\ReadHandleInterface $handle) => $handle->readImmediately(), ]; - - yield [ - static fn(IO\CloseHandleInterface $handle) => $handle->close(), - ]; } public function testMemoryHandle(): void diff --git a/tests/unit/TCP/ServerTest.php b/tests/unit/TCP/ServerTest.php index e28030a7a..d06a212aa 100644 --- a/tests/unit/TCP/ServerTest.php +++ b/tests/unit/TCP/ServerTest.php @@ -46,24 +46,4 @@ public function testGetLocalAddressOnStoppedServer(): void $server->getLocalAddress(); } - - public function testThrowsForPendingOperation(): void - { - $server = TCP\Server::create('127.0.0.1'); - - $first = Async\run(static fn() => $server->nextConnection()); - $second = Async\run(static fn() => $server->nextConnection()); - - try { - $second->await(); - } catch (Throwable $exception) { - static::assertInstanceOf(Network\Exception\RuntimeException::class, $exception); - static::assertStringContainsStringIgnoringCase('pending', $exception->getMessage()); - } - - $first->ignore(); - $second->ignore(); - - $server->stopListening(); - } } diff --git a/tests/unit/Unix/ServerTest.php b/tests/unit/Unix/ServerTest.php index a17cdbc46..1878aeb7b 100644 --- a/tests/unit/Unix/ServerTest.php +++ b/tests/unit/Unix/ServerTest.php @@ -46,29 +46,4 @@ public function testGetLocalAddressOnStoppedServer(): void $server->getLocalAddress(); } - - public function testThrowsForPendingOperation(): void - { - if (PHP_OS_FAMILY === 'Windows') { - static::markTestSkipped('Unix Server is not supported on Windows platform.'); - } - - $sock = Filesystem\create_temporary_file(prefix: 'psl-examples') . ".sock"; - $server = Unix\Server::create($sock); - - $first = Async\run(static fn() => $server->nextConnection()); - $second = Async\run(static fn() => $server->nextConnection()); - - try { - $second->await(); - } catch (Throwable $exception) { - static::assertInstanceOf(Exception\RuntimeException::class, $exception); - static::assertStringContainsStringIgnoringCase('pending', $exception->getMessage()); - } - - $first->ignore(); - $second->ignore(); - - $server->stopListening(); - } }