Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:

services:
throttr:
image: ghcr.io/throttr/throttr:5.0.8-debug-${{ matrix.size }}-AMD64-metrics-enabled
image: ghcr.io/throttr/throttr:5.0.9-debug-${{ matrix.size }}-AMD64-metrics-enabled
ports:
- 9000:9000

Expand Down
63 changes: 49 additions & 14 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Throttr\SDK\Enum\RequestType;
use Throttr\SDK\Enum\ValueSize;
use Throttr\SDK\Responses\GetResponse;
use Throttr\SDK\Responses\ListResponse;
use Throttr\SDK\Responses\QueryResponse;
use Throttr\SDK\Responses\StatusResponse;

Expand Down Expand Up @@ -113,8 +114,12 @@ public function send(array $requests): array
}

$channel = new Channel(1);
$this->queue->push([$buffer, $operations, $channel]);
$result = $channel->pop();
$this->queue->push([
"buffer" => $buffer,
"operations" => $operations,
"channel" => $channel
]);
$result = $channel->pop(60);
$channel->close();
return $result;
}
Expand All @@ -133,8 +138,11 @@ private function processQueue(): void
break;
}

$this->client->send($job[0]);
$this->pendingChannels->push([$job[1], $job[2]]);
$this->client->send($job["buffer"]);
$this->pendingChannels->push([
"operations" => $job["operations"],
"channel" => $job["channel"]
]);
}
}

Expand All @@ -145,23 +153,50 @@ private function processQueue(): void
*/
private function processResponses(): void
{
$buffer = '';
$currentResult = null;
$pendingOperations = [];
$resolvedResponses = [];

while ($this->connected) {
$result = $this->pendingChannels->pop(60);
if ($currentResult === null) {
$currentResult = $this->pendingChannels->pop(60);
$pendingOperations = $currentResult["operations"];
$resolvedResponses = [];
}

$responses = [];
$buffer .= $this->client->recv(60); // Leer más datos y acumular

$data = $this->client->recv();

foreach ($result[0] as $operation) {
/* @var RequestType $operation */
$responses[] = match ($operation) {
RequestType::INSERT, RequestType::UPDATE, RequestType::PURGE, RequestType::SET => StatusResponse::fromBytes($data, $this->size),
RequestType::QUERY => QueryResponse::fromBytes($data, $this->size),
RequestType::GET => GetResponse::fromBytes($data, $this->size),
while (!empty($pendingOperations)) {
$operation = $pendingOperations[0];
$response = match ($operation) {
RequestType::INSERT, RequestType::UPDATE, RequestType::PURGE, RequestType::SET => StatusResponse::fromBytes($buffer, $this->size),
RequestType::QUERY => QueryResponse::fromBytes($buffer, $this->size),
RequestType::GET => GetResponse::fromBytes($buffer, $this->size),
RequestType::LIST => ListResponse::fromBytes($buffer, $this->size),
};

if ($response === null) {
break;
}

array_shift($pendingOperations);

$resolvedResponses[] = $response;
}

$result[1]->push($responses);
if (empty($pendingOperations) && $currentResult !== null) {

/* @var Channel $channel */
$channel = $currentResult["channel"];

$channel->push($resolvedResponses);

$buffer = "";
$currentResult = null;
$resolvedResponses = [];
}
}
}

Expand Down
34 changes: 34 additions & 0 deletions src/Enum/KeyType.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php declare(strict_types=1);

// Copyright (C) 2025 Ian Torres
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

namespace Throttr\SDK\Enum;

/**
* Key type
*/
enum KeyType: int
{
/**
* Buffer
*/
case BUFFER = 0x01;

/**
* Counter
*/
case COUNTER = 0x00;
}
7 changes: 6 additions & 1 deletion src/Enum/RequestType.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
namespace Throttr\SDK\Enum;

/**
* BaseRequest type
* Request type
*/
enum RequestType: int
{
Expand Down Expand Up @@ -51,4 +51,9 @@ enum RequestType: int
* Get
*/
case GET = 0x06;

/**
* LIST
*/
case LIST = 0x07;
}
37 changes: 37 additions & 0 deletions src/Requests/ListRequest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Throttr\SDK\Requests;

use Throttr\SDK\Enum\RequestType;
use Throttr\SDK\Enum\ValueSize;

/**
* List request
*/
class ListRequest extends BaseRequest
{
/**
* Type
*
* @var RequestType
*/
public RequestType $type = RequestType::LIST;

/**
* Constructor
*/
public function __construct()
{
}

/**
* To bytes
*
* @param ValueSize $size
* @return string
*/
public function toBytes(ValueSize $size): string
{
return pack(static::pack(ValueSize::UINT8), $this->type->value);
}
}
23 changes: 21 additions & 2 deletions src/Responses/GetResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,40 @@ public function __construct(public string $data, public bool $status,public TTLT
*
* @param string $data
* @param ValueSize $size
* @return GetResponse
* @return GetResponse|null
*/
public static function fromBytes(string $data, ValueSize $size) : GetResponse {
public static function fromBytes(string $data, ValueSize $size) : GetResponse|null {
$valueSize = $size->value;
$offset = 0;

// Less than 1 byte? not enough for status.
if (strlen($data) < 1) return null;

$status = ord($data[$offset]) === 1;
$offset++;

if ($status) {
// Less than 2 bytes? not enough for ttl type.
if (strlen($data) < 2) return null;

$ttl_type = TTLType::from(ord($data[$offset]));
$offset++;

// Less than 2 + N bytes? not enough for ttl.
if (strlen($data) < 2 + $valueSize) return null;

$ttl = unpack(BaseRequest::pack($size), substr($data, $offset, $valueSize))[1];
$offset += $valueSize;

// Less than 2 + 2 * N bytes? not enough for value size.
if (strlen($data) < 2 + ($valueSize * 2)) return null;

$value_sized = unpack(BaseRequest::pack($size), substr($data, $offset, $valueSize))[1];
$offset += $valueSize;

// Less than 2 + 2 * N + O bytes? not enough for value.
if (strlen($data) < 2 + ($valueSize * 2) + $value_sized) return null;

$value = substr($data, $offset, $value_sized);
return new GetResponse($data, true, $ttl_type, $ttl, $value);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Responses/IResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ interface IResponse
*
* @param string $data
* @param ValueSize $size
* @return IResponse
* @return IResponse|null
*/
public static function fromBytes(string $data, ValueSize $size) : IResponse;
public static function fromBytes(string $data, ValueSize $size) : IResponse|null;
}
127 changes: 127 additions & 0 deletions src/Responses/ListResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
<?php

namespace Throttr\SDK\Responses;

use Throttr\SDK\Enum\KeyType;
use Throttr\SDK\Enum\TTLType;
use Throttr\SDK\Enum\ValueSize;
use Throttr\SDK\Requests\BaseRequest;

/**
* QueryResponse
*/
class ListResponse extends Response implements IResponse {
/**
* Constructor
*
* @param string $data
* @param bool $status
* @param array $keys
*/
public function __construct(public string $data, public bool $status, public array $keys) {}

/**
* From bytes
*
* @param string $data
* @param ValueSize $size
* @return ListResponse|null
*/
public static function fromBytes(string $data, ValueSize $size) : ListResponse|null {
$valueSize = $size->value;
$offset = 0;

// Less than 1 byte? not enough for status.
if (strlen($data) < 1) return null;

$status = ord($data[$offset]) === 1;
$offset++;

if ($status) {
// Less than 1 + N bytes? not enough for quota.
if (strlen($data) < 1 + 8) return null;

$fragments = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1];
$offset += ValueSize::UINT64->value;

if ($fragments === 0) return new ListResponse($data, true, []);

$keys_container = [];

for ($i = 0; $i < $fragments; ++$i) {
// Less than offset + 8 bytes? not enough for fragment index.
if (strlen($data) < $offset + ValueSize::UINT64->value) return null;

$fragment = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1];
$offset += ValueSize::UINT64->value;

// Less than offset + 8 bytes? not enough for fragment keys count.
if (strlen($data) < $offset + ValueSize::UINT64->value) return null;

$number_of_keys = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1];
$offset += ValueSize::UINT64->value;

$keys_in_fragment = [];

// Per key in fragment
for ($e = 0; $e < $number_of_keys; ++$e) {
// Less than offset + 1 byte? not enough for key size.
if (strlen($data) < $offset + ValueSize::UINT8->value) return null;

$key_size = unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1];
$offset += ValueSize::UINT8->value;

// Less than offset + 1 byte? not enough for key type.
if (strlen($data) < $offset + ValueSize::UINT8->value) return null;

$key_type = KeyType::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]);
$offset += ValueSize::UINT8->value;

// Less than offset + 1 byte? not enough for ttl type.
if (strlen($data) < $offset + ValueSize::UINT8->value) return null;

$ttl_type = TTLType::from(unpack(BaseRequest::pack(ValueSize::UINT8), substr($data, $offset, ValueSize::UINT8->value))[1]);
$offset += ValueSize::UINT8->value;

// Less than offset + 8 bytes? not enough for ttl.
if (strlen($data) < $offset + ValueSize::UINT64->value) return null;

$ttl = unpack(BaseRequest::pack(ValueSize::UINT64), substr($data, $offset, ValueSize::UINT64->value))[1];
$offset += ValueSize::UINT64->value;

// Less than offset + N bytes? not enough for bytes used.
if (strlen($data) < $offset + $size->value) return null;

$bytes_used = unpack(BaseRequest::pack( $size), substr($data, $offset, $size->value))[1];
$offset += $size->value;

$keys_in_fragment[] = [
"size" => $key_size,
"type" => $key_type,
"ttl_type" => $ttl_type,
"ttl" => $ttl,
"bytes_used" => $bytes_used,
];
}

$total = array_sum(array_column($keys_in_fragment, 'size'));

// Less than offset + total keys bytes? not enough for name parsing
if (strlen($data) < $offset + $total) return null;

for ($e = 0; $e < $number_of_keys; ++$e) {
$keys_in_fragment[$e]["key"] = substr($data, $offset, $keys_in_fragment[$e]["size"]);
$offset += $keys_in_fragment[$e]["size"];
unset($keys_in_fragment[$e]["size"]);
}

$keys_container = array_merge($keys_container, $keys_in_fragment);
}

return new ListResponse($data, true, $keys_container);
}

return new ListResponse($data, false, []);
}
}

Loading