Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udp/tcp): queue operations instead of throwing #279

Merged
merged 1 commit into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/component/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
- [HandleInterface](./../../src/Psl/IO/HandleInterface.php#L21)
- [ReadHandleInterface](./../../src/Psl/IO/ReadHandleInterface.php#L10)
- [ReadWriteHandleInterface](./../../src/Psl/IO/ReadWriteHandleInterface.php#L7)
- [SeekHandleInterface](./../../src/Psl/IO/SeekHandleInterface.php#L12)
- [SeekHandleInterface](./../../src/Psl/IO/SeekHandleInterface.php#L10)
- [SeekReadHandleInterface](./../../src/Psl/IO/SeekReadHandleInterface.php#L7)
- [SeekReadWriteHandleInterface](./../../src/Psl/IO/SeekReadWriteHandleInterface.php#L7)
- [SeekWriteHandleInterface](./../../src/Psl/IO/SeekWriteHandleInterface.php#L7)
- [WriteHandleInterface](./../../src/Psl/IO/WriteHandleInterface.php#L10)

#### `Classes`

- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L15)
- [Reader](./../../src/Psl/IO/Reader.php#L17)
- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L14)
- [Reader](./../../src/Psl/IO/Reader.php#L16)

#### `Traits`

Expand Down
2 changes: 1 addition & 1 deletion docs/component/tcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#### `Classes`

- [ConnectOptions](./../../src/Psl/TCP/ConnectOptions.php#L7)
- [Server](./../../src/Psl/TCP/Server.php#L17)
- [Server](./../../src/Psl/TCP/Server.php#L18)
- [ServerOptions](./../../src/Psl/TCP/ServerOptions.php#L9)


2 changes: 1 addition & 1 deletion docs/component/unix.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@

#### `Classes`

- [Server](./../../src/Psl/Unix/Server.php#L16)
- [Server](./../../src/Psl/Unix/Server.php#L17)


12 changes: 5 additions & 7 deletions examples/io/benchmark.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
use function memory_get_peak_usage;
use function microtime;
use function round;
use function strlen;
use const PHP_OS_FAMILY;

require __DIR__ . '/../../vendor/autoload.php';

Async\main(static function (): int {
if (PHP_OS_FAMILY === 'Windows') {
IO\output_handle()->writeAll('This example requires does not support Windows.');
IO\output_handle()->writeAll('This example does not support Windows.');

return 0;
}
Expand All @@ -42,20 +41,19 @@
Async\Scheduler::delay($seconds, $input->close(...));

$start = microtime(true);
$bytes = 0;

$i = 0;
try {
while (($chunk = $input->readAll(65536))) {
while ($chunk = $input->readFixedSize(65536)) {
$output->writeAll($chunk);
$bytes += strlen($chunk);
$i++;

Async\later();
}
} catch (IO\Exception\AlreadyClosedException) {
}

$seconds = microtime(true) - $start;

$bytes = $i * 65536;
$bytes_formatted = round($bytes / 1024 / 1024 / $seconds, 1);

$stdout->writeAll('read ' . $bytes . ' byte(s) in ' . round($seconds, 3) . ' second(s) => ' . $bytes_formatted . ' MiB/s' . PHP_EOL);
Expand Down
22 changes: 8 additions & 14 deletions examples/tcp/basic-http-server.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,44 @@
use Psl\Async;
use Psl\IO;
use Psl\Network\Exception\AlreadyStoppedException;
use Psl\Str;
use Psl\TCP;
use Throwable;

require __DIR__ . '/../../vendor/autoload.php';

Async\main(static function (): int {
$output = IO\output_handle();
$server = TCP\Server::create('localhost', 3030);

$output->writeAll("Server is listening on http://localhost:3030\n");
IO\output_handle()->writeAll("Server is listening on http://localhost:3030\n");

Async\Scheduler::defer(static function () use ($server, $output) {
Async\Scheduler::defer(static function () use ($server) {
Async\await_signal(SIGINT);

$output->writeAll("\nGoodbye 👋\n");

$server->stopListening();
});

try {
while (true) {
$connection = $server->nextConnection();

Async\Scheduler::defer(static function() use ($connection, $output) {
Async\Scheduler::defer(static function() use ($connection) {
try {
$request = $connection->read();
$stream = $connection->getStream();

$output->writeAll("[" . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . "MiB] " . Str\split($request, "\n")[0] . "\n");
Async\await_readable($stream);
$connection->read();

$connection->writeAll("HTTP/1.1 200 OK\n");

$connection->writeAll("Server: PSL\n");
$connection->writeAll("Server: PHP+PSL\n");
$connection->writeAll("Connection: close\n");
$connection->writeAll("Content-Type: text/plain\n\n");

$connection->writeAll("Hello, World!");
$connection->close();
} catch (Throwable) {
}
});
}
} catch (AlreadyStoppedException) {
// server stopped.
IO\output_handle()->writeAll("\nGoodbye 👋\n");
}

return 0;
Expand Down
6 changes: 3 additions & 3 deletions src/Psl/Filesystem/read_file.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
/**
* Reads entire file into a string.
*
* @param int $offset The offset where the reading starts.
* @param null|int $length Maximum length of data read. The default is to read
* until end of file is reached.
* @param positive-int|0 $offset The offset where the reading starts.
* @param positive-int|null $length Maximum length of data read.
* The default is to read until end of file is reached.
*
* @throws Psl\Exception\InvariantViolationException If the file specified by
* $file does not exist, or is not readable.
Expand Down
51 changes: 16 additions & 35 deletions src/Psl/IO/Internal/ResourceHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Psl;
use Psl\Async;
use Psl\Exception\InvariantViolationException;
use Psl\IO;
use Psl\IO\Exception;
use Psl\Type;
Expand Down Expand Up @@ -115,17 +114,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws InvariantViolationException If $timeout is negative.
* {@inheritDoc}
*/
public function write(string $bytes, ?float $timeout = null): int
{
Psl\invariant(
$timeout === null || $timeout > 0,
'$timeout must be null, or > 0',
);

$written = $this->writeImmediately($bytes);
if ($this->blocks || $written === strlen($bytes)) {
return $written;
Expand All @@ -139,6 +131,7 @@ public function write(string $bytes, ?float $timeout = null): int
EventLoop::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$timeout = $timeout < 0.0 ? 0.0 : $timeout;
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
Expand All @@ -165,10 +158,7 @@ static function () use (&$deferred) {
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
*
* @psalm-suppress MissingThrowsDocblock
* {@inheritDoc}
*/
public function writeImmediately(string $bytes): int
{
Expand All @@ -191,20 +181,20 @@ public function writeImmediately(string $bytes): int
throw new Exception\RuntimeException($error['message'] ?? 'unknown error.');
}

return $result;
return $result >= 0 ? $result : 0;
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* {@inheritDoc}
*/
public function seek(int $offset): void
{
if (!is_resource($this->stream)) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
}

Psl\invariant($offset >= 0, '$offset must be a positive-int.');
/** @psalm-suppress MissingThrowsDocblock */
Psl\invariant($offset >= 0, '$offset must be a 0 or positive-int.');

/** @psalm-suppress PossiblyInvalidArgument */
$result = @fseek($this->stream, $offset);
Expand All @@ -214,8 +204,7 @@ public function seek(int $offset): void
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* {@inheritDoc}
*/
public function tell(): int
{
Expand All @@ -231,22 +220,14 @@ public function tell(): int
throw new Exception\RuntimeException($error['message'] ?? 'unknown error.');
}

return $result;
return $result >= 0 ? $result : 0;
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws Exception\TimeoutException If reached timeout.
* @throws InvariantViolationException If $max_bytes is 0, or $timeout is negative.
* {@inheritDoc}
*/
public function read(?int $max_bytes = null, ?float $timeout = null): string
{
Psl\invariant(
$timeout === null || $timeout > 0,
'$timeout must be null, or > 0',
);

$chunk = $this->readImmediately($max_bytes);
if ('' !== $chunk || $this->blocks) {
return $chunk;
Expand All @@ -258,6 +239,7 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string
EventLoop::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$timeout = $timeout < 0.0 ? 0.0 : $timeout;
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
Expand All @@ -284,9 +266,7 @@ static function () use (&$deferred) {
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws InvariantViolationException If $max_bytes is 0.
* {@inheritDoc}
*/
public function readImmediately(?int $max_bytes = null): string
{
Expand All @@ -302,12 +282,13 @@ public function readImmediately(?int $max_bytes = null): string
throw new Exception\AlreadyClosedException('Handle has already been closed.');
}

Psl\invariant($max_bytes === null || $max_bytes > 0, '$max_bytes must be null, or > 0');

if ($max_bytes === null) {
$max_bytes = self::DEFAULT_READ_BUFFER_SIZE;
} elseif ($max_bytes > self::MAXIMUM_READ_BUFFER_SIZE) {
$max_bytes = self::MAXIMUM_READ_BUFFER_SIZE;
} else {
/** @psalm-suppress MissingThrowsDocblock */
Psl\invariant($max_bytes > 0, '$max_bytes must be null, or > 0');
}

/** @psalm-suppress PossiblyInvalidArgument */
Expand All @@ -323,7 +304,7 @@ public function readImmediately(?int $max_bytes = null): string
}

/**
* @throws Exception\RuntimeException If unable to close the handle.
* {@inheritDoc}
*/
public function close(): void
{
Expand Down
Loading