Skip to content

Commit

Permalink
feat(udp/tcp): queue operations instead of throwing
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 17, 2021
1 parent 4d1da9e commit e1600c6
Show file tree
Hide file tree
Showing 20 changed files with 280 additions and 275 deletions.
6 changes: 3 additions & 3 deletions docs/component/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
- [HandleInterface](./../../src/Psl/IO/HandleInterface.php#L21)
- [ReadHandleInterface](./../../src/Psl/IO/ReadHandleInterface.php#L10)
- [ReadWriteHandleInterface](./../../src/Psl/IO/ReadWriteHandleInterface.php#L7)
- [SeekHandleInterface](./../../src/Psl/IO/SeekHandleInterface.php#L12)
- [SeekHandleInterface](./../../src/Psl/IO/SeekHandleInterface.php#L10)
- [SeekReadHandleInterface](./../../src/Psl/IO/SeekReadHandleInterface.php#L7)
- [SeekReadWriteHandleInterface](./../../src/Psl/IO/SeekReadWriteHandleInterface.php#L7)
- [SeekWriteHandleInterface](./../../src/Psl/IO/SeekWriteHandleInterface.php#L7)
- [WriteHandleInterface](./../../src/Psl/IO/WriteHandleInterface.php#L10)

#### `Classes`

- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L15)
- [Reader](./../../src/Psl/IO/Reader.php#L17)
- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L14)
- [Reader](./../../src/Psl/IO/Reader.php#L16)

#### `Traits`

Expand Down
2 changes: 1 addition & 1 deletion docs/component/tcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#### `Classes`

- [ConnectOptions](./../../src/Psl/TCP/ConnectOptions.php#L7)
- [Server](./../../src/Psl/TCP/Server.php#L17)
- [Server](./../../src/Psl/TCP/Server.php#L18)
- [ServerOptions](./../../src/Psl/TCP/ServerOptions.php#L9)


2 changes: 1 addition & 1 deletion docs/component/unix.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@

#### `Classes`

- [Server](./../../src/Psl/Unix/Server.php#L16)
- [Server](./../../src/Psl/Unix/Server.php#L17)


12 changes: 5 additions & 7 deletions examples/io/benchmark.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
use function memory_get_peak_usage;
use function microtime;
use function round;
use function strlen;
use const PHP_OS_FAMILY;

require __DIR__ . '/../../vendor/autoload.php';

Async\main(static function (): int {
if (PHP_OS_FAMILY === 'Windows') {
IO\output_handle()->writeAll('This example requires does not support Windows.');
IO\output_handle()->writeAll('This example does not support Windows.');

return 0;
}
Expand All @@ -42,20 +41,19 @@
Async\Scheduler::delay($seconds, $input->close(...));

$start = microtime(true);
$bytes = 0;

$i = 0;
try {
while (($chunk = $input->readAll(65536))) {
while ($chunk = $input->readFixedSize(65536)) {
$output->writeAll($chunk);
$bytes += strlen($chunk);
$i++;

Async\later();
}
} catch (IO\Exception\AlreadyClosedException) {
}

$seconds = microtime(true) - $start;

$bytes = $i * 65536;
$bytes_formatted = round($bytes / 1024 / 1024 / $seconds, 1);

$stdout->writeAll('read ' . $bytes . ' byte(s) in ' . round($seconds, 3) . ' second(s) => ' . $bytes_formatted . ' MiB/s' . PHP_EOL);
Expand Down
22 changes: 8 additions & 14 deletions examples/tcp/basic-http-server.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,44 @@
use Psl\Async;
use Psl\IO;
use Psl\Network\Exception\AlreadyStoppedException;
use Psl\Str;
use Psl\TCP;
use Throwable;

require __DIR__ . '/../../vendor/autoload.php';

Async\main(static function (): int {
$output = IO\output_handle();
$server = TCP\Server::create('localhost', 3030);

$output->writeAll("Server is listening on http://localhost:3030\n");
IO\output_handle()->writeAll("Server is listening on http://localhost:3030\n");

Async\Scheduler::defer(static function () use ($server, $output) {
Async\Scheduler::defer(static function () use ($server) {
Async\await_signal(SIGINT);

$output->writeAll("\nGoodbye 👋\n");

$server->stopListening();
});

try {
while (true) {
$connection = $server->nextConnection();

Async\Scheduler::defer(static function() use ($connection, $output) {
Async\Scheduler::defer(static function() use ($connection) {
try {
$request = $connection->read();
$stream = $connection->getStream();

$output->writeAll("[" . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . "MiB] " . Str\split($request, "\n")[0] . "\n");
Async\await_readable($stream);
$connection->read();

$connection->writeAll("HTTP/1.1 200 OK\n");

$connection->writeAll("Server: PSL\n");
$connection->writeAll("Server: PHP+PSL\n");
$connection->writeAll("Connection: close\n");
$connection->writeAll("Content-Type: text/plain\n\n");

$connection->writeAll("Hello, World!");
$connection->close();
} catch (Throwable) {
}
});
}
} catch (AlreadyStoppedException) {
// server stopped.
IO\output_handle()->writeAll("\nGoodbye 👋\n");
}

return 0;
Expand Down
6 changes: 3 additions & 3 deletions src/Psl/Filesystem/read_file.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
/**
* Reads entire file into a string.
*
* @param int $offset The offset where the reading starts.
* @param null|int $length Maximum length of data read. The default is to read
* until end of file is reached.
* @param positive-int|0 $offset The offset where the reading starts.
* @param positive-int|null $length Maximum length of data read.
* The default is to read until end of file is reached.
*
* @throws Psl\Exception\InvariantViolationException If the file specified by
* $file does not exist, or is not readable.
Expand Down
51 changes: 16 additions & 35 deletions src/Psl/IO/Internal/ResourceHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Psl;
use Psl\Async;
use Psl\Exception\InvariantViolationException;
use Psl\IO;
use Psl\IO\Exception;
use Psl\Type;
Expand Down Expand Up @@ -115,17 +114,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws InvariantViolationException If $timeout is negative.
* {@inheritDoc}
*/
public function write(string $bytes, ?float $timeout = null): int
{
Psl\invariant(
$timeout === null || $timeout > 0,
'$timeout must be null, or > 0',
);

$written = $this->writeImmediately($bytes);
if ($this->blocks || $written === strlen($bytes)) {
return $written;
Expand All @@ -139,6 +131,7 @@ public function write(string $bytes, ?float $timeout = null): int
EventLoop::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$timeout = $timeout < 0.0 ? 0.0 : $timeout;
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
Expand All @@ -165,10 +158,7 @@ static function () use (&$deferred) {
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
*
* @psalm-suppress MissingThrowsDocblock
* {@inheritDoc}
*/
public function writeImmediately(string $bytes): int
{
Expand All @@ -191,20 +181,20 @@ public function writeImmediately(string $bytes): int
throw new Exception\RuntimeException($error['message'] ?? 'unknown error.');
}

return $result;
return $result >= 0 ? $result : 0;
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* {@inheritDoc}
*/
public function seek(int $offset): void
{
if (!is_resource($this->stream)) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
}

Psl\invariant($offset >= 0, '$offset must be a positive-int.');
/** @psalm-suppress MissingThrowsDocblock */
Psl\invariant($offset >= 0, '$offset must be a 0 or positive-int.');

/** @psalm-suppress PossiblyInvalidArgument */
$result = @fseek($this->stream, $offset);
Expand All @@ -214,8 +204,7 @@ public function seek(int $offset): void
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* {@inheritDoc}
*/
public function tell(): int
{
Expand All @@ -231,22 +220,14 @@ public function tell(): int
throw new Exception\RuntimeException($error['message'] ?? 'unknown error.');
}

return $result;
return $result >= 0 ? $result : 0;
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws Exception\TimeoutException If reached timeout.
* @throws InvariantViolationException If $max_bytes is 0, or $timeout is negative.
* {@inheritDoc}
*/
public function read(?int $max_bytes = null, ?float $timeout = null): string
{
Psl\invariant(
$timeout === null || $timeout > 0,
'$timeout must be null, or > 0',
);

$chunk = $this->readImmediately($max_bytes);
if ('' !== $chunk || $this->blocks) {
return $chunk;
Expand All @@ -258,6 +239,7 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string
EventLoop::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$timeout = $timeout < 0.0 ? 0.0 : $timeout;
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
Expand All @@ -284,9 +266,7 @@ static function () use (&$deferred) {
}

/**
* @throws Exception\AlreadyClosedException If the handle has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws InvariantViolationException If $max_bytes is 0.
* {@inheritDoc}
*/
public function readImmediately(?int $max_bytes = null): string
{
Expand All @@ -302,12 +282,13 @@ public function readImmediately(?int $max_bytes = null): string
throw new Exception\AlreadyClosedException('Handle has already been closed.');
}

Psl\invariant($max_bytes === null || $max_bytes > 0, '$max_bytes must be null, or > 0');

if ($max_bytes === null) {
$max_bytes = self::DEFAULT_READ_BUFFER_SIZE;
} elseif ($max_bytes > self::MAXIMUM_READ_BUFFER_SIZE) {
$max_bytes = self::MAXIMUM_READ_BUFFER_SIZE;
} else {
/** @psalm-suppress MissingThrowsDocblock */
Psl\invariant($max_bytes > 0, '$max_bytes must be null, or > 0');
}

/** @psalm-suppress PossiblyInvalidArgument */
Expand All @@ -323,7 +304,7 @@ public function readImmediately(?int $max_bytes = null): string
}

/**
* @throws Exception\RuntimeException If unable to close the handle.
* {@inheritDoc}
*/
public function close(): void
{
Expand Down
Loading

0 comments on commit e1600c6

Please sign in to comment.