From 6536a93205dd246b602313b584c337508fe02941 Mon Sep 17 00:00:00 2001 From: Chad Sikorra Date: Sat, 25 Apr 2026 08:39:21 -0400 Subject: [PATCH] When reading data, don't peek. This elimates a costly fread when dealing with lots of data. Update the message queue to correctly use generators for getMessages. Remove the useless generate usage for getMessage. --- composer.json | 5 +- src/FreeDSx/Socket/Queue/MessageQueue.php | 81 ++++++------ src/FreeDSx/Socket/Socket.php | 56 +++++---- .../Socket/Queue/Asn1MessageQueueSpec.php | 41 ++++++ .../FreeDSx/Socket/RequiresUnixTransport.php | 24 ++++ tests/spec/FreeDSx/Socket/SocketPoolSpec.php | 35 +++++- .../spec/FreeDSx/Socket/SocketServerSpec.php | 7 +- tests/spec/FreeDSx/Socket/SocketSpec.php | 119 ++++++++++++++++-- 8 files changed, 289 insertions(+), 79 deletions(-) create mode 100644 tests/spec/FreeDSx/Socket/RequiresUnixTransport.php diff --git a/composer.json b/composer.json index dc8aba5..8a00e5f 100644 --- a/composer.json +++ b/composer.json @@ -31,7 +31,10 @@ "psr-4": {"FreeDSx\\Socket\\": "src/FreeDSx/Socket"} }, "autoload-dev": { - "psr-4": {"fixture\\FreeDSx\\Socket\\": "tests/fixture/FreeDSx/Socket"} + "psr-4": { + "fixture\\FreeDSx\\Socket\\": "tests/fixture/FreeDSx/Socket", + "spec\\FreeDSx\\Socket\\": "tests/spec/FreeDSx/Socket" + } }, "scripts": { "test-coverage": [ diff --git a/src/FreeDSx/Socket/Queue/MessageQueue.php b/src/FreeDSx/Socket/Queue/MessageQueue.php index e17e879..73183fb 100644 --- a/src/FreeDSx/Socket/Queue/MessageQueue.php +++ b/src/FreeDSx/Socket/Queue/MessageQueue.php @@ -13,6 +13,9 @@ use FreeDSx\Socket\Exception\ConnectionException; use FreeDSx\Socket\Exception\PartialMessageException; use FreeDSx\Socket\Socket; +use Generator; +use function strlen; +use function substr; /** * Used to retrieve Messages/PDUs from a socket. @@ -46,43 +49,42 @@ public function __construct(Socket $socket) /** * @param int|null $id - * @return \Generator + * @return Generator * @throws ConnectionException */ - public function getMessages(?int $id = null) + public function getMessages(?int $id = null): Generator { - if (!$this->hasBuffer()) { - $this->addToAvailableBufferOrFail(); + while (true) { + yield $this->getMessage($id); } + } - while ($this->hasBuffer()) { - try { - if ($this->hasAvailableBuffer()) { - $this->addToConsumableBuffer(); - } elseif (!$this->hasConsumableBuffer()) { - $this->addToAvailableBufferOrFail(); - } - } catch (PartialMessageException $exception) { + /** + * @throws ConnectionException + * @throws PartialMessageException + */ + private function readOneMessage(): Message + { + while (true) { + if (!$this->hasConsumableBuffer() && !$this->hasAvailableBuffer()) { $this->addToAvailableBufferOrFail(); } + if ($this->hasAvailableBuffer()) { + $this->addToConsumableBuffer(); + } + try { - while ($this->hasConsumableBuffer()) { - $message = $this->consume(); - if ($message !== null) { - yield $this->constructMessage($message, $id); - } - } - } catch (PartialMessageException $e) { - if ($this->hasAvailableBuffer()) { - $this->addToConsumableBuffer(); - } else { - $this->addToAvailableBufferOrFail(); - } + return $this->consume(); + } catch (PartialMessageException $exception) { + $this->addToAvailableBufferOrFail(); } } } + /** + * @throws ConnectionException + */ protected function addToAvailableBufferOrFail(): void { $bytes = $this->socket->read(); @@ -98,7 +100,7 @@ protected function addToConsumableBuffer(): void { if ($this->hasAvailableBuffer()) { $buffer = $this->unwrap((string)$this->buffer); - $this->buffer = \substr((string)$this->buffer, $buffer->endsAt()); + $this->buffer = substr((string)$this->buffer, $buffer->endsAt()); $this->toConsume .= $buffer->bytes(); } } @@ -110,12 +112,12 @@ protected function hasBuffer(): bool protected function hasAvailableBuffer(): bool { - return \strlen((string)$this->buffer) !== 0; + return strlen((string)$this->buffer) !== 0; } protected function hasConsumableBuffer(): bool { - return \strlen((string)$this->toConsume) !== 0; + return strlen((string)$this->toConsume) !== 0; } /** @@ -129,11 +131,10 @@ protected function consume(): ?Message try { $message = $this->decode($this->toConsume); $lastPos = (int)$message->getLastPosition(); - $this->toConsume = \substr($this->toConsume, $lastPos); - - if ($this->toConsume === '' && ($peek = $this->socket->read(false)) !== false) { - $this->buffer .= $peek; - } + $this->toConsume = substr( + $this->toConsume, + $lastPos + ); } catch (PartialMessageException $exception) { # If we have available buffer, it might have what we need. Attempt to add it. Otherwise let it bubble... if ($this->hasAvailableBuffer()) { @@ -157,7 +158,10 @@ protected function consume(): ?Message */ protected function unwrap($bytes) : Buffer { - return new Buffer($bytes, \strlen($bytes)); + return new Buffer( + $bytes, + strlen($bytes) + ); } /** @@ -177,7 +181,10 @@ protected abstract function decode($bytes) : Message; */ public function getMessage(?int $id = null) { - return $this->getMessages($id)->current(); + return $this->constructMessage( + $this->readOneMessage(), + $id + ); } /** @@ -187,8 +194,10 @@ public function getMessage(?int $id = null) * @param int|null $id * @return mixed */ - protected function constructMessage(Message $message, ?int $id = null) - { + protected function constructMessage( + Message $message, + ?int $id = null + ) { return $message->getMessage(); } } diff --git a/src/FreeDSx/Socket/Socket.php b/src/FreeDSx/Socket/Socket.php index 28b926a..9c28fbf 100644 --- a/src/FreeDSx/Socket/Socket.php +++ b/src/FreeDSx/Socket/Socket.php @@ -11,6 +11,13 @@ namespace FreeDSx\Socket; use FreeDSx\Socket\Exception\ConnectionException; +use function array_merge; +use function fread; +use function in_array; +use function stream_set_blocking; +use function stream_set_timeout; +use function stream_socket_client; +use function stream_socket_enable_crypto; /** * Represents a generic socket. @@ -103,8 +110,8 @@ class Socket public function __construct($resource = null, array $options = []) { $this->socket = $resource; - $this->options = \array_merge($this->options, $options); - if (!\in_array($this->options['transport'], self::TRANSPORTS, true)) { + $this->options = array_merge($this->options, $options); + if (!in_array($this->options['transport'], self::TRANSPORTS, true)) { throw new \RuntimeException(sprintf( 'The transport "%s" is not valid. It must be one of: %s', $this->options['transport'], @@ -122,19 +129,22 @@ public function __construct($resource = null, array $options = []) */ public function read(bool $block = true) { - $data = false; - - \stream_set_blocking($this->socket, $block); - while (\strlen((string) ($buffer = \fread($this->socket, $this->options['buffer_size']))) > 0) { - $data .= $buffer; - if ($block) { - $block = false; - \stream_set_blocking($this->socket, false); - } + stream_set_blocking($this->socket, $block); + $data = fread( + $this->socket, + $this->options['buffer_size'] + ); + + if (!$block) { + stream_set_blocking( + $this->socket, + true + ); } - \stream_set_blocking($this->socket, true); - return $data; + return $data === '' + ? false + : $data; } /** @@ -154,7 +164,7 @@ public function write(string $data) */ public function block(bool $block) { - \stream_set_blocking($this->socket, $block); + stream_set_blocking($this->socket, $block); return $this; } @@ -210,9 +220,9 @@ public function close() */ public function encrypt(bool $encrypt) { - \stream_set_blocking($this->socket, true); - $result = \stream_socket_enable_crypto($this->socket, $encrypt, $this->options['ssl_crypto_method']); - \stream_set_blocking($this->socket, false); + stream_set_blocking($this->socket, true); + $result = stream_socket_enable_crypto($this->socket, $encrypt, $this->options['ssl_crypto_method']); + stream_set_blocking($this->socket, false); if ((bool) $result == false) { throw new ConnectionException(sprintf( @@ -244,7 +254,7 @@ public function connect(string $host) $uri .= ':' . $this->options['port']; } - $socket = @\stream_socket_client( + $socket = @stream_socket_client( $uri, $this->errorNumber, $this->errorMessage, @@ -303,7 +313,7 @@ public static function unix( ): Socket { return self::create( $file, - \array_merge( + array_merge( $options, ['transport' => 'unix'] ) @@ -320,7 +330,7 @@ public static function unix( */ public static function tcp(string $host, array $options = []) : Socket { - return self::create($host, \array_merge($options, ['transport' => 'tcp'])); + return self::create($host, array_merge($options, ['transport' => 'tcp'])); } /** @@ -333,7 +343,7 @@ public static function tcp(string $host, array $options = []) : Socket */ public static function udp(string $host, array $options = []) : Socket { - return self::create($host, \array_merge($options, [ + return self::create($host, array_merge($options, [ 'transport' => 'udp', 'buffer_size' => 65507, ])); @@ -351,7 +361,7 @@ protected function createSocketContext() } } if ($this->options['ssl_validate_cert'] === false) { - $sslOpts = \array_merge($sslOpts, [ + $sslOpts = array_merge($sslOpts, [ 'allow_self_signed' => true, 'verify_peer' => false, 'verify_peer_name' => false, @@ -369,6 +379,6 @@ protected function createSocketContext() */ protected function setStreamOpts() : void { - \stream_set_timeout($this->socket, $this->options['timeout_read']); + stream_set_timeout($this->socket, $this->options['timeout_read']); } } diff --git a/tests/spec/FreeDSx/Socket/Queue/Asn1MessageQueueSpec.php b/tests/spec/FreeDSx/Socket/Queue/Asn1MessageQueueSpec.php index 725a084..5bfdb7b 100644 --- a/tests/spec/FreeDSx/Socket/Queue/Asn1MessageQueueSpec.php +++ b/tests/spec/FreeDSx/Socket/Queue/Asn1MessageQueueSpec.php @@ -58,4 +58,45 @@ function it_should_throw_an_exception_on_get_message_when_there_is_none($socket) $this->shouldThrow(ConnectionException::class)->duringGetMessage(); } + + function it_should_not_peek_the_socket_after_decoding_a_complete_message($socket, $encoder) + { + $socket->read()->willReturn('foo'); + $socket->read(false)->shouldNotBeCalled(); + $encoder->decode('foo')->shouldBeCalled()->willReturn(new IntegerType(100)); + $encoder->getLastPosition()->willReturn(3); + + $this->getMessage()->shouldBeLike(new Pdu(new IntegerType(100))); + } + + function it_should_yield_messages_continuously_from_the_generator($socket, $encoder) + { + $socket->read()->willReturn('foobar', 'baz'); + $encoder->decode('foobar')->willReturn(new IntegerType(1)); + $encoder->decode('bar')->willReturn(new IntegerType(2)); + $encoder->decode('baz')->willReturn(new IntegerType(3)); + $encoder->getLastPosition()->willReturn(3); + + $iter = $this->getMessages()->getWrappedObject(); + + if (!$iter instanceof \Generator) { + throw new \RuntimeException('getMessages() must return a Generator.'); + } + + $first = $iter->current(); + $iter->next(); + $second = $iter->current(); + $iter->next(); + $third = $iter->current(); + + if ($first != new Pdu(new IntegerType(1))) { + throw new \RuntimeException('First yielded message did not match.'); + } + if ($second != new Pdu(new IntegerType(2))) { + throw new \RuntimeException('Second yielded message did not match.'); + } + if ($third != new Pdu(new IntegerType(3))) { + throw new \RuntimeException('Third yielded message did not match.'); + } + } } diff --git a/tests/spec/FreeDSx/Socket/RequiresUnixTransport.php b/tests/spec/FreeDSx/Socket/RequiresUnixTransport.php new file mode 100644 index 0000000..b0550d6 --- /dev/null +++ b/tests/spec/FreeDSx/Socket/RequiresUnixTransport.php @@ -0,0 +1,24 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace spec\FreeDSx\Socket; + +use PhpSpec\Exception\Example\SkippingException; + +trait RequiresUnixTransport +{ + private function requireUnixTransport(): void + { + if (!in_array('unix', stream_get_transports(), true)) { + throw new SkippingException('The "unix" stream transport is not available on this platform.'); + } + } +} diff --git a/tests/spec/FreeDSx/Socket/SocketPoolSpec.php b/tests/spec/FreeDSx/Socket/SocketPoolSpec.php index 494727e..59516d5 100644 --- a/tests/spec/FreeDSx/Socket/SocketPoolSpec.php +++ b/tests/spec/FreeDSx/Socket/SocketPoolSpec.php @@ -11,7 +11,6 @@ namespace spec\FreeDSx\Socket; use FreeDSx\Socket\SocketPool; -use PhpSpec\Exception\Example\SkippingException; use PhpSpec\ObjectBehavior; /** @@ -19,11 +18,34 @@ */ class SocketPoolSpec extends ObjectBehavior { + use RequiresUnixTransport; + + + /** + * @var resource|null + */ + private $unixServer; + + /** + * @var string|null + */ + private $unixPath; + function let() { $this->beConstructedWith(['servers' => ['foo', 'bar']]); } + function letGo(): void + { + if (is_resource($this->unixServer)) { + fclose($this->unixServer); + } + if ($this->unixPath !== null && file_exists($this->unixPath)) { + @unlink($this->unixPath); + } + } + function it_is_initializable() { $this->shouldHaveType(SocketPool::class); @@ -31,12 +53,13 @@ function it_is_initializable() function it_should_respect_the_transport_type_when_connecting() { - if (!file_exists('/var/run/docker.sock')) { - throw new SkippingException('The /var/run/docker.sock file must exist to test unix sockets.'); - } + $this->requireUnixTransport(); + $this->unixPath = sys_get_temp_dir() . '/freedsx_socket_pool_' . uniqid('', true) . '.sock'; + $this->unixServer = stream_socket_server('unix://' . $this->unixPath); + $this->beConstructedWith([ - 'servers' => ['/var/run/docker.sock'], - 'transport' => 'unix', + 'servers' => [$this->unixPath], + 'transport' => 'unix', ]); $this->connect()->isConnected()->shouldBeEqualTo(true); diff --git a/tests/spec/FreeDSx/Socket/SocketServerSpec.php b/tests/spec/FreeDSx/Socket/SocketServerSpec.php index c2a5f6f..ef034fc 100644 --- a/tests/spec/FreeDSx/Socket/SocketServerSpec.php +++ b/tests/spec/FreeDSx/Socket/SocketServerSpec.php @@ -13,11 +13,12 @@ use FreeDSx\Socket\Exception\ConnectionException; use FreeDSx\Socket\Socket; use FreeDSx\Socket\SocketServer; -use PhpSpec\Exception\Example\SkippingException; use PhpSpec\ObjectBehavior; class SocketServerSpec extends ObjectBehavior { + use RequiresUnixTransport; + private $testSocket = ''; function let() @@ -70,9 +71,7 @@ function it_should_construct_a_udp_based_socket_server() function it_should_construct_a_unix_based_socket_server() { - if (stripos(PHP_OS, 'WIN') === 0) { - throw new SkippingException('Unix socket not available in Windows.'); - } + $this->requireUnixTransport(); $this->beConstructedThrough('bindUnix', [$this->testSocket]); diff --git a/tests/spec/FreeDSx/Socket/SocketSpec.php b/tests/spec/FreeDSx/Socket/SocketSpec.php index 5e61feb..fad4a3d 100644 --- a/tests/spec/FreeDSx/Socket/SocketSpec.php +++ b/tests/spec/FreeDSx/Socket/SocketSpec.php @@ -11,11 +11,71 @@ namespace spec\FreeDSx\Socket; use FreeDSx\Socket\Socket; -use PhpSpec\Exception\Example\SkippingException; use PhpSpec\ObjectBehavior; class SocketSpec extends ObjectBehavior { + use RequiresUnixTransport; + + + /** + * @var resource|null + */ + private $local; + + /** + * @var resource|null + */ + private $remote; + + /** + * @var resource|null + */ + private $unixServer; + + /** + * @var string|null + */ + private $unixPath; + + function letGo(): void + { + if (is_resource($this->remote)) { + fclose($this->remote); + } + if (is_resource($this->local)) { + fclose($this->local); + } + if (is_resource($this->unixServer)) { + fclose($this->unixServer); + } + if ($this->unixPath !== null && file_exists($this->unixPath)) { + @unlink($this->unixPath); + } + } + + private function createSocketPair(): void + { + $domain = DIRECTORY_SEPARATOR === '\\' + ? STREAM_PF_INET + : STREAM_PF_UNIX; + + [$this->local, $this->remote] = stream_socket_pair( + $domain, + STREAM_SOCK_STREAM, + STREAM_IPPROTO_IP + ); + } + + private function createUnixServer(): string + { + $this->requireUnixTransport(); + $this->unixPath = sys_get_temp_dir() . '/freedsx_socket_' . uniqid('', true) . '.sock'; + $this->unixServer = stream_socket_server('unix://' . $this->unixPath); + + return $this->unixPath; + } + function it_is_initializable() { $this->shouldHaveType(Socket::class); @@ -46,10 +106,9 @@ function it_should_create_a_socket() function it_should_create_a_unix_based_socket() { - if (!file_exists('/var/run/docker.sock')) { - throw new SkippingException('The /var/run/docker.sock file must exist to test unix sockets.'); - } - $this::unix('/var/run/docker.sock'); + $path = $this->createUnixServer(); + + $this::unix($path)->shouldBeAnInstanceOf(Socket::class); } function it_should_create_a_tcp_based_socket() @@ -87,13 +146,55 @@ function it_should_tell_whether_or_not_it_is_connected_for_udp() function it_should_tell_whether_it_is_connected_for_unix() { - if (!file_exists('/var/run/docker.sock')) { - throw new SkippingException('The /var/run/docker.sock file must exist to test unix sockets.'); - } - $this->beConstructedThrough('unix', ['/var/run/docker.sock']); + $path = $this->createUnixServer(); + $this->beConstructedThrough('unix', [$path]); $this->isConnected()->shouldBeEqualTo(true); $this->close(); $this->isConnected()->shouldBeEqualTo(false); } + + function it_should_return_at_most_buffer_size_bytes_per_read() + { + $this->createSocketPair(); + fwrite($this->remote, '0123456789'); + + $this->beConstructedWith($this->local, ['buffer_size' => 4]); + + $this->read()->shouldBe('0123'); + $this->read()->shouldBe('4567'); + $this->read()->shouldBe('89'); + } + + function it_should_return_false_on_a_non_blocking_read_when_no_data_is_available() + { + $this->createSocketPair(); + + $this->beConstructedWith($this->local); + + $this->read(false)->shouldBe(false); + } + + function it_should_return_false_on_a_blocking_read_when_the_peer_has_closed() + { + $this->createSocketPair(); + fclose($this->remote); + + $this->beConstructedWith($this->local); + + $this->read()->shouldBe(false); + } + + function it_should_leave_the_socket_in_blocking_mode_after_a_non_blocking_read() + { + $this->createSocketPair(); + + $this->beConstructedWith($this->local); + + $this->read(false); + + if (stream_get_meta_data($this->local)['blocked'] !== true) { + throw new \RuntimeException('Socket should be in blocking mode after a non-blocking read.'); + } + } }