diff --git a/tests/E2E/SubscribeTest.php b/tests/E2E/SubscribeTest.php index 4b86aa1..35d0ba9 100644 --- a/tests/E2E/SubscribeTest.php +++ b/tests/E2E/SubscribeTest.php @@ -4,18 +4,26 @@ namespace CrazyGoat\RabbitStream\Tests\E2E; +use CrazyGoat\RabbitStream\Client\OsirisChunkParser; use CrazyGoat\RabbitStream\Request\CreateRequestV1; +use CrazyGoat\RabbitStream\Request\CreditRequestV1; +use CrazyGoat\RabbitStream\Request\DeclarePublisherRequestV1; +use CrazyGoat\RabbitStream\Request\DeletePublisherRequestV1; use CrazyGoat\RabbitStream\Request\OpenRequestV1; use CrazyGoat\RabbitStream\Request\PeerPropertiesRequestV1; +use CrazyGoat\RabbitStream\Request\PublishRequestV1; use CrazyGoat\RabbitStream\Request\SaslAuthenticateRequestV1; use CrazyGoat\RabbitStream\Request\SaslHandshakeRequestV1; use CrazyGoat\RabbitStream\Request\SubscribeRequestV1; use CrazyGoat\RabbitStream\Request\TuneRequestV1; use CrazyGoat\RabbitStream\Response\CreateResponseV1; +use CrazyGoat\RabbitStream\Response\DeclarePublisherResponseV1; +use CrazyGoat\RabbitStream\Response\DeliverResponseV1; use CrazyGoat\RabbitStream\Response\SubscribeResponseV1; use CrazyGoat\RabbitStream\Response\TuneResponseV1; use CrazyGoat\RabbitStream\StreamConnection; use CrazyGoat\RabbitStream\VO\OffsetSpec; +use CrazyGoat\RabbitStream\VO\PublishedMessage; use PHPUnit\Framework\TestCase; class SubscribeTest extends TestCase @@ -135,4 +143,211 @@ public function testDuplicateSubscriptionIdThrows(): void $connection->close(); } + + public function testSubscribeFromSpecificOffset(): void + { + $connection = $this->connectAndOpen(); + $streamName = 'test-subscribe-offset-' . uniqid(); + + $connection->sendMessage(new CreateRequestV1($streamName)); + $createResponse = $connection->readMessage(); + $this->assertInstanceOf(CreateResponseV1::class, $createResponse); + + $connection->sendMessage(new DeclarePublisherRequestV1(1, null, $streamName)); + $declareResponse = $connection->readMessage(); + $this->assertInstanceOf(DeclarePublisherResponseV1::class, $declareResponse); + + $messages = []; + for ($i = 0; $i < 10; $i++) { + $messages[] = new PublishedMessage($i, "Message $i"); + } + + $connection->sendMessage(new PublishRequestV1(1, ...$messages)); + + $confirmedIds = []; + $connection->registerPublisher( + 1, + function (array $ids) use (&$confirmedIds): void { + $confirmedIds = array_merge($confirmedIds, $ids); + }, + function (): void { + } + ); + + $connection->readLoop(maxFrames: 1); + $this->assertCount(10, $confirmedIds, 'All 10 messages should be confirmed'); + + $receivedEntries = []; + $connection->registerSubscriber(1, function (DeliverResponseV1 $deliver) use (&$receivedEntries): void { + $chunkBytes = $deliver->getChunkBytes(); + $entries = OsirisChunkParser::parse($chunkBytes); + foreach ($entries as $entry) { + $receivedEntries[] = $entry; + } + }); + + $connection->sendMessage(new SubscribeRequestV1(1, $streamName, OffsetSpec::offset(5), 10)); + $subscribeResponse = $connection->readMessage(); + $this->assertInstanceOf(SubscribeResponseV1::class, $subscribeResponse); + + $connection->sendMessage(new CreditRequestV1(1, 10)); + $connection->readLoop(maxFrames: 1); + + $this->assertCount(5, $receivedEntries, 'Should receive exactly 5 messages (offsets 5-9)'); + + $getOffset = fn(\CrazyGoat\RabbitStream\Client\ChunkEntry $e): int => $e->getOffset(); + $offsets = array_map($getOffset, $receivedEntries); + $this->assertSame([5, 6, 7, 8, 9], $offsets, 'Should receive messages with offsets 5, 6, 7, 8, 9'); + + try { + if ($connection->isConnected()) { + $connection->sendMessage(new DeletePublisherRequestV1(1)); + $connection->readMessage(); + } + } catch (\Throwable) { + } + + $connection->close(); + } + + public function testSubscribeFromOffsetZero(): void + { + $connection = $this->connectAndOpen(); + $streamName = 'test-subscribe-offset-zero-' . uniqid(); + + $connection->sendMessage(new CreateRequestV1($streamName)); + $createResponse = $connection->readMessage(); + $this->assertInstanceOf(CreateResponseV1::class, $createResponse); + + $connection->sendMessage(new DeclarePublisherRequestV1(1, null, $streamName)); + $declareResponse = $connection->readMessage(); + $this->assertInstanceOf(DeclarePublisherResponseV1::class, $declareResponse); + + $messages = []; + for ($i = 0; $i < 10; $i++) { + $messages[] = new PublishedMessage($i, "Message $i"); + } + + $connection->sendMessage(new PublishRequestV1(1, ...$messages)); + + $confirmedIds = []; + $connection->registerPublisher( + 1, + function (array $ids) use (&$confirmedIds): void { + $confirmedIds = array_merge($confirmedIds, $ids); + }, + function (): void { + } + ); + + $connection->readLoop(maxFrames: 1); + $this->assertCount(10, $confirmedIds, 'All 10 messages should be confirmed'); + + $receivedEntries = []; + $connection->registerSubscriber(1, function (DeliverResponseV1 $deliver) use (&$receivedEntries): void { + $chunkBytes = $deliver->getChunkBytes(); + $entries = OsirisChunkParser::parse($chunkBytes); + foreach ($entries as $entry) { + $receivedEntries[] = $entry; + } + }); + + $connection->sendMessage(new SubscribeRequestV1(1, $streamName, OffsetSpec::offset(0), 10)); + $subscribeResponse = $connection->readMessage(); + $this->assertInstanceOf(SubscribeResponseV1::class, $subscribeResponse); + + $connection->sendMessage(new CreditRequestV1(1, 10)); + $connection->readLoop(maxFrames: 1); + + $this->assertCount(10, $receivedEntries, 'Should receive all 10 messages when subscribing from offset 0'); + + $getOffset = fn(\CrazyGoat\RabbitStream\Client\ChunkEntry $e): int => $e->getOffset(); + $offsets = array_map($getOffset, $receivedEntries); + $this->assertSame([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], $offsets, 'Should receive messages with offsets 0-9'); + + try { + if ($connection->isConnected()) { + $connection->sendMessage(new DeletePublisherRequestV1(1)); + $connection->readMessage(); + } + } catch (\Throwable) { + } + + $connection->close(); + } + + public function testSubscribeFromOffsetBeyondEnd(): void + { + $connection = $this->connectAndOpen(); + $streamName = 'test-subscribe-offset-beyond-' . uniqid(); + + $connection->sendMessage(new CreateRequestV1($streamName)); + $createResponse = $connection->readMessage(); + $this->assertInstanceOf(CreateResponseV1::class, $createResponse); + + $connection->sendMessage(new DeclarePublisherRequestV1(1, null, $streamName)); + $declareResponse = $connection->readMessage(); + $this->assertInstanceOf(DeclarePublisherResponseV1::class, $declareResponse); + + $messages = []; + for ($i = 0; $i < 5; $i++) { + $messages[] = new PublishedMessage($i, "Message $i"); + } + + $connection->sendMessage(new PublishRequestV1(1, ...$messages)); + + $confirmedIds = []; + $connection->registerPublisher( + 1, + function (array $ids) use (&$confirmedIds): void { + $confirmedIds = array_merge($confirmedIds, $ids); + }, + function (): void { + } + ); + + $connection->readLoop(maxFrames: 1); + $this->assertCount(5, $confirmedIds, 'All 5 messages should be confirmed'); + + $receivedEntries = []; + $connection->registerSubscriber(1, function (DeliverResponseV1 $deliver) use (&$receivedEntries): void { + $chunkBytes = $deliver->getChunkBytes(); + $entries = OsirisChunkParser::parse($chunkBytes); + foreach ($entries as $entry) { + $receivedEntries[] = $entry; + } + }); + + $connection->sendMessage(new SubscribeRequestV1(1, $streamName, OffsetSpec::offset(100), 10)); + $subscribeResponse = $connection->readMessage(); + $this->assertInstanceOf(SubscribeResponseV1::class, $subscribeResponse); + + $connection->sendMessage(new CreditRequestV1(1, 10)); + + $connection->readLoop(maxFrames: 1, timeout: 1.0); + + $this->assertCount( + 0, + $receivedEntries, + 'Should receive no messages when subscribing from offset beyond stream end' + ); + + $connection->sendMessage(new PublishRequestV1(1, new PublishedMessage(5, 'Message 5'))); + $connection->readLoop(maxFrames: 1); + + $connection->readLoop(maxFrames: 1); + + $this->assertCount(1, $receivedEntries, 'Should receive 1 message after publishing to offset 5'); + $this->assertSame(5, $receivedEntries[0]->getOffset(), 'Received message should have offset 5'); + + try { + if ($connection->isConnected()) { + $connection->sendMessage(new DeletePublisherRequestV1(1)); + $connection->readMessage(); + } + } catch (\Throwable) { + } + + $connection->close(); + } }