diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b435bdc48..f068a23b6 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -34,6 +34,7 @@ /packages/http/ @brendt @aidan-casey /packages/http-client/ @aidan-casey /packages/icon/ @innocenzi +/packages/idempotency/ @xHeaven /packages/kv-store/ @innocenzi /packages/log/ @brendt /packages/mail/ @innocenzi diff --git a/composer.json b/composer.json index 41627bf64..51ede72ca 100644 --- a/composer.json +++ b/composer.json @@ -110,6 +110,7 @@ "tempest/http": "self.version", "tempest/http-client": "self.version", "tempest/icon": "self.version", + "tempest/idempotency": "self.version", "tempest/intl": "self.version", "tempest/kv-store": "self.version", "tempest/log": "self.version", @@ -151,6 +152,7 @@ "Tempest\\HttpClient\\": "packages/http-client/src", "Tempest\\Http\\": "packages/http/src", "Tempest\\Icon\\": "packages/icon/src", + "Tempest\\Idempotency\\": "packages/idempotency/src", "Tempest\\Intl\\": "packages/intl/src", "Tempest\\KeyValue\\": "packages/kv-store/src", "Tempest\\Log\\": "packages/log/src", @@ -220,6 +222,7 @@ "Tempest\\HttpClient\\Tests\\": "packages/http-client/tests", "Tempest\\Http\\Tests\\": "packages/http/tests", "Tempest\\Icon\\Tests\\": "packages/icon/tests", + "Tempest\\Idempotency\\Tests\\": "packages/idempotency/tests", "Tempest\\Intl\\Tests\\": "packages/intl/tests", "Tempest\\KeyValue\\Tests\\": "packages/kv-store/tests", "Tempest\\Log\\Tests\\": "packages/log/tests", diff --git a/docs/1-essentials/01-routing.md b/docs/1-essentials/01-routing.md index 968319af4..cd447d2ed 100644 --- a/docs/1-essentials/01-routing.md +++ b/docs/1-essentials/01-routing.md @@ -678,6 +678,39 @@ final readonly class Auth implements RouteDecorator } ``` +## Idempotent routes + +For operations like payment processing or order creation, retrying the same request should not produce duplicate side effects. Tempest provides the {b`Tempest\Idempotency\Attributes\Idempotent`} route decorator to handle this. Clients send an `Idempotency-Key` header; the first request executes normally and caches the response, while subsequent requests with the same key replay the cached response. + +```php app/OrderController.php +use Tempest\Router\Post; +use Tempest\Http\Response; +use Tempest\Http\GenericResponse; +use Tempest\Http\Status; +use Tempest\Idempotency\Attributes\Idempotent; + +final readonly class OrderController +{ + #[Post('/orders')] + #[Idempotent] + public function create(CreateOrderRequest $request): Response + { + $order = $this->orderService->create($request); + + return new GenericResponse( + status: Status::CREATED, + body: ['id' => $order->id], + ); + } +} +``` + +Idempotency is only supported for `POST` and `PATCH` routes. The attribute can be applied at the class level to make all routes in a controller idempotent, and accepts optional TTL parameters. Route-specific settings like key requirement and header name can be configured with the `#[IdempotentRoute]` attribute. + +:::info +Read the full [idempotency documentation](../2-features/19-idempotency.md) for details on scope resolvers, configuration, response behavior, and command bus idempotency. +::: + ## Responses All requests to a controller action expect a response to be returned to the client. This is done by returning a {b`Tempest\View\View`} or a {b`Tempest\Http\Response`} object. diff --git a/docs/2-features/10-command-bus.md b/docs/2-features/10-command-bus.md index 2a21ded5b..800eed136 100644 --- a/docs/2-features/10-command-bus.md +++ b/docs/2-features/10-command-bus.md @@ -115,6 +115,70 @@ In order to _run_ an asynchronous command, you'll have to run the `tempest comma Note that async command handling is still an early feature, and will receive many improvements over time. +## Idempotent commands + +Commands that should not be processed more than once—such as payment processing or invoice imports—can be marked with {b`Tempest\Idempotency\Attributes\Idempotent`}. The attribute can be placed on the command class or on the handler method. Duplicate dispatches with the same payload are silently skipped. + +```php +// app/ImportInvoicesCommand.php + +use Tempest\Idempotency\Attributes\Idempotent; + +#[Idempotent] +final readonly class ImportInvoicesCommand +{ + public function __construct( + public string $vendorId, + public string $month, + ) {} +} +``` + +Alternatively, the attribute can be placed on the handler method instead: + +```php +// app/ImportInvoicesHandler.php + +use Tempest\CommandBus\CommandHandler; +use Tempest\Idempotency\Attributes\Idempotent; + +final class ImportInvoicesHandler +{ + #[Idempotent] + #[CommandHandler] + public function handle(ImportInvoicesCommand $command): void { /* … */ } +} +``` + +By default, the deduplication key is derived from the command's properties. Two commands with identical property values are considered duplicates. For explicit control over the key, implement the {b`Tempest\Idempotency\HasIdempotencyKey`} interface: + +```php +// app/ProcessPaymentCommand.php + +use Tempest\Idempotency\Attributes\Idempotent; +use Tempest\Idempotency\HasIdempotencyKey; + +#[Idempotent] +final readonly class ProcessPaymentCommand implements HasIdempotencyKey +{ + public function __construct( + public string $paymentId, + public int $amount, + ) {} + + public function getIdempotencyKey(): string + { + return $this->paymentId; + } +} +``` + +When using explicit keys, the payload fingerprint is still verified. Dispatching the same key with a different payload throws {b`Tempest\Idempotency\Exceptions\IdempotencyKeyWasAlreadyUsed`}. + +:::info +Read the full [idempotency documentation](./19-idempotency.md) for details on configuration, TTL overrides, custom stores, and HTTP route idempotency. +::: + ## Command bus middleware Whenever commands are dispatched, they are passed to the command bus, which will pass the command along to each of its handlers. Similar to web requests and console commands, this command bus supports middleware. Command bus middleware can be used to, for example, do logging for specific commands, add metadata to commands, or anything else. Command bus middleware are classes that implement the `CommandBusMiddleware` interface, and look like this: diff --git a/docs/2-features/19-idempotency.md b/docs/2-features/19-idempotency.md new file mode 100644 index 000000000..faf92de0a --- /dev/null +++ b/docs/2-features/19-idempotency.md @@ -0,0 +1,287 @@ +--- +title: Idempotency +description: "Prevent duplicate side effects for HTTP routes and command bus commands by storing and replaying the result of the first execution." +--- + +## Overview + +Payment processing, order creation, resource provisioning - any operation where retrying the same request should not produce duplicate side effects. Timeouts, client retries, and accidental double clicks all cause the same problem: the server cannot distinguish a retry from a new request. + +The `tempest/idempotency` package solves this by storing the result of the first execution and replaying it for subsequent requests with the same idempotency key. It supports both [HTTP routes](#idempotent-routes) and [command bus commands](#idempotent-commands). + +## Idempotent routes + +Add the {b`Tempest\Idempotency\Attributes\Idempotent`} attribute to a controller method. Clients send an `Idempotency-Key` header with a unique value (typically a UUID). The first request executes normally and caches the response. Subsequent requests with the same key replay the cached response without re-executing the handler. + +```php app/OrderController.php +use Tempest\Router\Post; +use Tempest\Http\Response; +use Tempest\Http\GenericResponse; +use Tempest\Http\Status; +use Tempest\Idempotency\Attributes\Idempotent; + +final readonly class OrderController +{ + #[Post('/orders')] + #[Idempotent] + public function create(CreateOrderRequest $request): Response + { + $order = $this->orderService->create($request); + + return new GenericResponse( + status: Status::CREATED, + body: ['id' => $order->id], + ); + } +} +``` + +The client must include the idempotency key as a header: + +``` +POST /orders HTTP/1.1 +Idempotency-Key: 550e8400-e29b-41d4-a716-446655440000 +Content-Type: application/json + +{"product": "widget", "quantity": 3} +``` + +When a cached response is replayed, the response includes an `idempotency-replayed: true` header so the client can distinguish replays from original executions. + +### Supported methods + +Idempotency is only supported for `POST` and `PATCH` routes. Applying `#[Idempotent]` to a `GET`, `PUT`, `DELETE`, or other method will throw an {b`Tempest\Idempotency\Exceptions\IdempotencyMethodWasNotSupported`} exception. `GET` is inherently idempotent, `PUT` and `DELETE` are idempotent by definition in HTTP semantics, and only `POST` and `PATCH` produce non-idempotent side effects. + +### Scope resolver + +Idempotency keys must be scoped per user or client to prevent key collisions across different actors. This is done by implementing the {b`Tempest\Idempotency\IdempotencyScopeResolver`} interface and registering it in the container. + +The `resolve()` method receives the current request and must return a string that uniquely identifies the caller - such as a user ID, session ID, or API key: + +```php app/UserIdempotencyScopeResolver.php +use Tempest\Http\Request; +use Tempest\Idempotency\IdempotencyScopeResolver; + +final readonly class UserIdempotencyScopeResolver implements IdempotencyScopeResolver +{ + public function __construct( + private AuthManager $auth, + ) {} + + public function resolve(Request $request): string + { + return (string) $this->auth->currentUser()->id; + } +} +``` + +:::warning +A scope resolver is required. If no implementation of {b`Tempest\Idempotency\IdempotencyScopeResolver`} is registered in the container, the middleware will fail at construction time. +::: + +### Per-route overrides + +The `#[Idempotent]` attribute accepts optional TTL parameters to override the global configuration on a per-route basis. For route-specific settings like key requirement and header name, use the {b`Tempest\Idempotency\Attributes\IdempotentRoute`} attribute alongside `#[Idempotent]`: + +```php app/PaymentController.php +use Tempest\Router\Post; +use Tempest\Http\Response; +use Tempest\Idempotency\Attributes\Idempotent; +use Tempest\Idempotency\Attributes\IdempotentRoute; + +final readonly class PaymentController +{ + #[Post('/payments')] + #[Idempotent(ttlInSeconds: 172_800)] + #[IdempotentRoute(requireKey: true)] + public function charge(ChargeRequest $request): Response + { + // Cached response persists for 48 hours instead of the default 24 + } +} +``` + +#### `#[Idempotent]` parameters + +| Parameter | Type | Description | +|---|---|---| +| `ttlInSeconds` | `?int` | How long a completed response is cached. Defaults to the config value (86400 / 24 hours). | +| `pendingTtlInSeconds` | `?int` | How long a pending (in-progress) record is considered active. Defaults to the config value (60 seconds). | + +#### `#[IdempotentRoute]` parameters + +| Parameter | Type | Description | +|---|---|---| +| `requireKey` | `?bool` | Whether requests without the idempotency key header should be rejected with a 400 response. Defaults to `true`. | +| `header` | `?string` | The header name to read the idempotency key from. Defaults to `Idempotency-Key`. | + +When `requireKey` is set to `false`, requests without the header bypass idempotency protection entirely and execute normally. + +### Class-level application + +The `#[Idempotent]` attribute can be applied at the class level to make all routes in a controller idempotent: + +```php app/ApiOrderController.php +use Tempest\Router\Post; +use Tempest\Router\Patch; +use Tempest\Http\Response; +use Tempest\Idempotency\Attributes\Idempotent; + +#[Idempotent] +final readonly class ApiOrderController +{ + #[Post('/api/orders')] + public function create(CreateOrderRequest $request): Response { /* … */ } + + #[Patch('/api/orders/{id}')] + public function update(int $id, UpdateOrderRequest $request): Response { /* … */ } +} +``` + +### Response behavior + +The middleware produces different responses depending on the state of the idempotency key: + +| Scenario | Status | Description | +|---|---|---| +| No existing record | - | The request executes normally and the response is cached. | +| Completed record, same payload | Original status | The cached response is replayed with an `idempotency-replayed: true` header. | +| Completed record, different payload | 422 | The key was already used with a different request body. | +| Pending record (in progress) | 409 | Another request with the same key is currently being processed. A `retry-after: 1` header is included. | +| Missing key (when required) | 400 | The `Idempotency-Key` header was not provided. | + +### How it works + +The `#[Idempotent]` attribute is a [route decorator](../1-essentials/01-routing.md#route-decorators) that adds {b`Tempest\Idempotency\Middleware\IdempotencyMiddleware`} to the route's middleware stack. The middleware: + +1. Reads the idempotency key from the request header. +2. Computes a fingerprint of the request (method, URI, body, and query parameters). +3. Acquires a cache lock to prevent concurrent processing of the same key. +4. Checks for an existing record in the idempotency store. +5. If no record exists, saves a pending record, executes the handler, and stores the completed response. +6. If the handler throws an exception, the pending record is deleted so the request can be retried. + +A heartbeat mechanism keeps pending records alive during long-running requests, preventing other processes from incorrectly taking over an operation that is still in progress. + +## Idempotent commands + +Add the {b`Tempest\Idempotency\Attributes\Idempotent`} attribute to prevent duplicate dispatches. When the same command is dispatched more than once, the duplicate is silently skipped. The attribute can be placed on the command class or on the handler method. + +On the command class: + +```php app/ImportInvoicesCommand.php +use Tempest\Idempotency\Attributes\Idempotent; + +#[Idempotent] +final readonly class ImportInvoicesCommand +{ + public function __construct( + public string $vendorId, + public string $month, + ) {} +} +``` + +Or on the handler method: + +```php app/ImportInvoicesHandler.php +use Tempest\CommandBus\CommandHandler; +use Tempest\Idempotency\Attributes\Idempotent; + +final class ImportInvoicesHandler +{ + #[Idempotent] + #[CommandHandler] + public function handleImportInvoices(ImportInvoicesCommand $command): void + { + // Only executes once per unique command payload. + // Duplicate dispatches are silently skipped. + } +} +``` + +When placed on both the command class and the handler, the command class takes precedence. + +By default, the idempotency key is derived from a fingerprint of the command's properties. Two commands with identical property values produce the same fingerprint and are considered duplicates. + +### Explicit idempotency keys + +Commands can provide an explicit key by implementing the {b`Tempest\Idempotency\HasIdempotencyKey`} interface. This is useful when the deduplication key should be a specific business identifier rather than the full payload: + +```php app/ProcessPaymentCommand.php +use Tempest\Idempotency\Attributes\Idempotent; +use Tempest\Idempotency\HasIdempotencyKey; + +#[Idempotent] +final readonly class ProcessPaymentCommand implements HasIdempotencyKey +{ + public function __construct( + public string $paymentId, + public int $amount, + ) {} + + public function getIdempotencyKey(): string + { + return $this->paymentId; + } +} +``` + +When using explicit keys, the fingerprint of the command payload is still verified. If the same key is dispatched with a different payload, an {b`Tempest\Idempotency\Exceptions\IdempotencyKeyWasAlreadyUsed`} exception is thrown. + +### Per-command TTL overrides + +The `#[Idempotent]` attribute accepts the same optional TTL parameters for commands as it does for routes: + +```php +#[Idempotent(ttlInSeconds: 3600, pendingTtlInSeconds: 30)] +final readonly class ProcessPaymentCommand { /* … */ } +``` + +| Parameter | Type | Description | +|---|---|---| +| `ttlInSeconds` | `?int` | How long the completed record is cached. Defaults to the config value (86400 / 24 hours). | +| `pendingTtlInSeconds` | `?int` | How long a pending record is considered active. Defaults to the config value (60 seconds). | + +## Configuration + +The idempotency package is configured by creating an `idempotency.config.php` file. All settings have sensible defaults: + +```php app/idempotency.config.php +use Tempest\Idempotency\Config\IdempotencyConfig; + +return new IdempotencyConfig( + header: 'Idempotency-Key', + requireKey: true, + ttlInSeconds: 86_400, + pendingTtlInSeconds: 60, + cachePrefix: 'idempotency', +); +``` + +| Parameter | Default | Description | +|---|---|---| +| `header` | `Idempotency-Key` | The HTTP header name to read the idempotency key from. | +| `requireKey` | `true` | Whether to reject requests that do not include the idempotency key header. | +| `ttlInSeconds` | `86400` (24h) | How long a completed response is cached. | +| `pendingTtlInSeconds` | `60` | How long a pending record is considered active before it can be taken over. | +| `cachePrefix` | `idempotency` | Prefix for cache keys in the idempotency store. | +| `storeClass` | `CacheIdempotencyStore` | The {b`Tempest\Idempotency\Store\IdempotencyStore`} implementation to use. | + +### Custom stores + +The default store uses Tempest's [cache](./06-cache.md) component. A custom store can be created by implementing the {b`Tempest\Idempotency\Store\IdempotencyStore`} interface and setting the `storeClass` in the configuration: + +```php app/idempotency.config.php +use Tempest\Idempotency\Config\IdempotencyConfig; +use App\RedisIdempotencyStore; + +return new IdempotencyConfig( + storeClass: RedisIdempotencyStore::class, +); +``` + +## Limitations + +- **Windows is not supported.** The heartbeat mechanism relies on `pcntl_alarm` and `pcntl_signal`, which are not available on Windows. Attempting to use idempotency on Windows will throw an {b`Tempest\Idempotency\Exceptions\IdempotencyPlatformWasNotSupported`} exception. +- **Stored responses must be serializable.** Response bodies are stored using PHP serialization or JSON encoding. Non-serializable bodies (such as generators or views) are stored as type name strings and will not reproduce the original output on replay. diff --git a/packages/idempotency/.gitattributes b/packages/idempotency/.gitattributes new file mode 100644 index 000000000..3f7775660 --- /dev/null +++ b/packages/idempotency/.gitattributes @@ -0,0 +1,14 @@ +# Exclude build/test files from the release +.github/ export-ignore +tests/ export-ignore +.gitattributes export-ignore +.gitignore export-ignore +phpunit.xml export-ignore +README.md export-ignore + +# Configure diff output +*.view.php diff=html +*.php diff=php +*.css diff=css +*.html diff=html +*.md diff=markdown diff --git a/packages/idempotency/LICENSE.md b/packages/idempotency/LICENSE.md new file mode 100644 index 000000000..54215b726 --- /dev/null +++ b/packages/idempotency/LICENSE.md @@ -0,0 +1,9 @@ +The MIT License (MIT) + +Copyright (c) 2024 Brent Roose brendt@stitcher.io + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/packages/idempotency/composer.json b/packages/idempotency/composer.json new file mode 100644 index 000000000..2d3159398 --- /dev/null +++ b/packages/idempotency/composer.json @@ -0,0 +1,28 @@ +{ + "name": "tempest/idempotency", + "description": "Idempotency support for Tempest applications.", + "type": "library", + "require": { + "php": "^8.5", + "tempest/cache": "3.x-dev", + "tempest/container": "3.x-dev", + "tempest/datetime": "3.x-dev", + "tempest/http": "3.x-dev", + "tempest/router": "3.x-dev", + "tempest/support": "3.x-dev", + "tempest/command-bus": "3.x-dev", + "tempest/core": "3.x-dev" + }, + "license": "MIT", + "autoload": { + "psr-4": { + "Tempest\\Idempotency\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "Tempest\\Idempotency\\Tests\\": "tests" + } + }, + "minimum-stability": "dev" +} diff --git a/packages/idempotency/phpunit.xml b/packages/idempotency/phpunit.xml new file mode 100644 index 000000000..e92d239e0 --- /dev/null +++ b/packages/idempotency/phpunit.xml @@ -0,0 +1,23 @@ + + + + + tests + + + + + src + + + diff --git a/packages/idempotency/src/Attributes/Idempotent.php b/packages/idempotency/src/Attributes/Idempotent.php new file mode 100644 index 000000000..73c302f63 --- /dev/null +++ b/packages/idempotency/src/Attributes/Idempotent.php @@ -0,0 +1,33 @@ +middleware, true)) { + return $route; + } + + $route->middleware = [ + ...$route->middleware, + IdempotencyMiddleware::class, + ]; + + return $route; + } +} diff --git a/packages/idempotency/src/Attributes/IdempotentRoute.php b/packages/idempotency/src/Attributes/IdempotentRoute.php new file mode 100644 index 000000000..58746a524 --- /dev/null +++ b/packages/idempotency/src/Attributes/IdempotentRoute.php @@ -0,0 +1,16 @@ + */ + public string $storeClass = CacheIdempotencyStore::class, + ) {} +} diff --git a/packages/idempotency/src/Config/idempotency.config.php b/packages/idempotency/src/Config/idempotency.config.php new file mode 100644 index 000000000..d93f034d4 --- /dev/null +++ b/packages/idempotency/src/Config/idempotency.config.php @@ -0,0 +1,7 @@ +value)); + } +} diff --git a/packages/idempotency/src/Exceptions/IdempotencyPlatformWasNotSupported.php b/packages/idempotency/src/Exceptions/IdempotencyPlatformWasNotSupported.php new file mode 100644 index 000000000..86feff930 --- /dev/null +++ b/packages/idempotency/src/Exceptions/IdempotencyPlatformWasNotSupported.php @@ -0,0 +1,13 @@ +normalize($command, $seen))); + } + + private function normalize(mixed $value, array &$seen): mixed + { + if ($value instanceof UnitEnum) { + return $value::class . '::' . $value->name; + } + + if (is_array($value)) { + if (array_is_list($value)) { + foreach ($value as $index => $item) { + $value[$index] = $this->normalize($item, $seen); + } + + return $value; + } + + ksort($value); + + foreach ($value as $key => $item) { + $value[$key] = $this->normalize($item, $seen); + } + + return $value; + } + + if (is_object($value)) { + $objectId = spl_object_id($value); + + if (isset($seen[$objectId])) { + throw new RuntimeException('Circular reference detected while generating command fingerprint.'); + } + + $seen[$objectId] = true; + $normalized = ['@class' => $value::class]; + + foreach ((array) $value as $property => $item) { + $normalized[$this->normalizePropertyName($property)] = $this->normalize($item, $seen); + } + + unset($seen[$objectId]); + + ksort($normalized); + + return $normalized; + } + + if (is_resource($value)) { + return get_resource_type($value); + } + + return $value; + } + + private function normalizePropertyName(string $property): string + { + if (! str_contains($property, "\0")) { + return $property; + } + + $chunks = explode("\0", $property); + + return $chunks[count($chunks) - 1]; + } +} diff --git a/packages/idempotency/src/Fingerprint/RequestFingerprintGenerator.php b/packages/idempotency/src/Fingerprint/RequestFingerprintGenerator.php new file mode 100644 index 000000000..6267df8e7 --- /dev/null +++ b/packages/idempotency/src/Fingerprint/RequestFingerprintGenerator.php @@ -0,0 +1,43 @@ + $request->method->value, + 'path' => $request->path, + 'query' => $this->normalize($request->query), + 'body' => $request->raw ?? $this->normalize($request->body), + ]; + + return hash('sha256', encode($payload)); + } + + private function normalize(mixed $value): mixed + { + if (! is_array($value)) { + return $value; + } + + if (array_is_list($value)) { + return array_map($this->normalize(...), $value); + } + + ksort($value); + + foreach ($value as $key => $item) { + $value[$key] = $this->normalize($item); + } + + return $value; + } +} diff --git a/packages/idempotency/src/HasIdempotencyKey.php b/packages/idempotency/src/HasIdempotencyKey.php new file mode 100644 index 000000000..7115a2767 --- /dev/null +++ b/packages/idempotency/src/HasIdempotencyKey.php @@ -0,0 +1,10 @@ +get(ObjectFingerprintGenerator::class); + } +} diff --git a/packages/idempotency/src/Initializers/IdempotencyFingerprintGeneratorInitializer.php b/packages/idempotency/src/Initializers/IdempotencyFingerprintGeneratorInitializer.php new file mode 100644 index 000000000..1cef8270d --- /dev/null +++ b/packages/idempotency/src/Initializers/IdempotencyFingerprintGeneratorInitializer.php @@ -0,0 +1,20 @@ +get(RequestFingerprintGenerator::class); + } +} diff --git a/packages/idempotency/src/Initializers/IdempotencyInitializer.php b/packages/idempotency/src/Initializers/IdempotencyInitializer.php new file mode 100644 index 000000000..4246b1e34 --- /dev/null +++ b/packages/idempotency/src/Initializers/IdempotencyInitializer.php @@ -0,0 +1,22 @@ +get(IdempotencyConfig::class)->storeClass; + + return $container->get($storeClass); + } +} diff --git a/packages/idempotency/src/Middleware/IdempotencyMiddleware.php b/packages/idempotency/src/Middleware/IdempotencyMiddleware.php new file mode 100644 index 000000000..5127a9601 --- /dev/null +++ b/packages/idempotency/src/Middleware/IdempotencyMiddleware.php @@ -0,0 +1,238 @@ +method)) { + throw IdempotencyMethodWasNotSupported::forMethod($request->method); + } + + $options = $this->resolveOptions(); + $key = trim($request->headers->get($options['header'], default: '') ?? ''); + + if ($key === '') { + if (! $options['requireKey']) { + return $next($request); + } + + return new GenericResponse( + status: Status::BAD_REQUEST, + body: ['error' => 'Missing idempotency key.'], + ); + } + + $scope = $this->resolveScope($request); + $fingerprint = $this->fingerprintGenerator->generate($request); + $pendingTtlInSeconds = $options['pendingTtlInSeconds']; + $pendingRecordTtlInSeconds = max($options['ttlInSeconds'], $pendingTtlInSeconds); + $owner = $this->processingOwner->create(); + + $lock = $this->cache->lock( + key: $this->keyResolver->lockKey($scope, $key), + duration: Duration::seconds($pendingTtlInSeconds), + owner: $owner, + ); + + if (! $lock->acquire()) { + return $this->inProgressResponse(); + } + + $pendingRecordCreated = false; + + try { + if (($existingAfterLock = $this->store->find($scope, $key)) instanceof IdempotencyRecord) { + if (! $this->shouldTakeOverPendingRecord($existingAfterLock, $fingerprint, $pendingTtlInSeconds)) { + return $this->resolveExistingRecord($existingAfterLock, $fingerprint); + } + + $this->store->delete($scope, $key); + } + + $this->store->savePending( + scope: $scope, + key: $key, + fingerprint: $fingerprint, + ttlInSeconds: $pendingRecordTtlInSeconds, + pendingOwner: $owner, + pendingHeartbeatAt: time(), + ); + + $pendingRecordCreated = true; + + $heartbeat = new HeartbeatRenewer(); + $heartbeat->start( + store: $this->store, + scope: $scope, + key: $key, + owner: $owner, + intervalInSeconds: max(1, intdiv($pendingTtlInSeconds, 3)), + recordTtlInSeconds: $pendingRecordTtlInSeconds, + ); + + try { + $response = $next($request); + } finally { + $heartbeat->stop(); + } + + $this->store->saveCompleted( + scope: $scope, + key: $key, + fingerprint: $fingerprint, + response: StoredResponse::fromResponse($response), + ttlInSeconds: $options['ttlInSeconds'], + ); + + return $response; + } catch (Throwable $throwable) { + if ($pendingRecordCreated) { + $this->store->delete($scope, $key); + } + + throw $throwable; + } finally { + $lock->release(); + } + } + + private function resolveExistingRecord(IdempotencyRecord $record, string $fingerprint): Response + { + if ($record->fingerprint !== $fingerprint) { + return new GenericResponse( + status: Status::UNPROCESSABLE_CONTENT, + body: ['error' => 'The idempotency key has already been used for another payload.'], + ); + } + + if ($record->state === IdempotencyState::PENDING) { + return $this->inProgressResponse(); + } + + $response = $record->response?->toResponse() ?? new GenericResponse(status: Status::NO_CONTENT); + + return $response->addHeader('idempotency-replayed', 'true'); + } + + private function shouldTakeOverPendingRecord(IdempotencyRecord $record, string $fingerprint, int $pendingTtlInSeconds): bool + { + if ($record->fingerprint !== $fingerprint) { + return false; + } + + if ($record->state !== IdempotencyState::PENDING) { + return false; + } + + return ( + $this->processingOwner->resolveLiveness( + owner: $record->pendingOwner, + heartbeatAt: $record->pendingHeartbeatAt, + staleAfterInSeconds: $pendingTtlInSeconds, + ) === ProcessingOwnerLiveness::DEAD + ); + } + + private function inProgressResponse(): Response + { + return new GenericResponse( + status: Status::CONFLICT, + body: ['error' => 'Request is already being processed for this idempotency key.'], + )->addHeader('retry-after', '1'); + } + + /** @return array{header: string, requireKey: bool, ttlInSeconds: int, pendingTtlInSeconds: int} */ + private function resolveOptions(): array + { + $idempotent = $this->resolveAttribute(Idempotent::class); + $route = $this->resolveAttribute(IdempotentRoute::class); + + return [ + 'header' => $route->header ?? $this->config->header, + 'requireKey' => $route->requireKey ?? $this->config->requireKey, + 'ttlInSeconds' => $idempotent->ttlInSeconds ?? $this->config->ttlInSeconds, + 'pendingTtlInSeconds' => $idempotent->pendingTtlInSeconds ?? $this->config->pendingTtlInSeconds, + ]; + } + + /** + * @template T of object + * @param class-string $attributeClass + * @return T|null + */ + private function resolveAttribute(string $attributeClass): ?object + { + $attribute = $this->matchedRoute + ->route + ->handler + ->getAttribute($attributeClass); + + return $attribute ?? $this->matchedRoute + ->route + ->handler + ->getDeclaringClass() + ->getAttribute($attributeClass); + } + + private function resolveScope(Request $request): string + { + $handler = $this->matchedRoute->route->handler; + + return sprintf( + '%s::%s:%s:%s:%s', + $handler->getDeclaringClass()->getName(), + $handler->getName(), + $request->method->value, + $this->matchedRoute->route->uri, + $this->scopeResolver->resolve($request), + ); + } +} diff --git a/packages/idempotency/src/Middleware/IdempotentCommandMiddleware.php b/packages/idempotency/src/Middleware/IdempotentCommandMiddleware.php new file mode 100644 index 000000000..a227cb4b5 --- /dev/null +++ b/packages/idempotency/src/Middleware/IdempotentCommandMiddleware.php @@ -0,0 +1,186 @@ +resolveOptions($command); + + if ($options === null) { + $next($command); + + return; + } + + if (PHP_OS_FAMILY === 'Windows') { + throw IdempotencyPlatformWasNotSupported::forWindows(); + } + + $scope = $this->resolveScope($command); + $fingerprint = $this->fingerprintGenerator->generate($command); + $key = $this->resolveKey($command, $fingerprint); + $pendingTtlInSeconds = $options['pendingTtlInSeconds']; + $pendingRecordTtlInSeconds = max($options['ttlInSeconds'], $pendingTtlInSeconds); + $owner = $this->processingOwner->create(); + + $lock = $this->cache->lock( + key: $this->keyResolver->lockKey($scope, $key), + duration: Duration::seconds($pendingTtlInSeconds), + owner: $owner, + ); + + $lock->execute( + function () use ($scope, $key, $fingerprint, $options, $pendingRecordTtlInSeconds, $owner, $next, $command): void { + $record = $this->store->find($scope, $key); + + if ($record instanceof IdempotencyRecord) { + if (! $this->shouldTakeOverPendingRecord($record, $fingerprint, $scope, $key, $options['pendingTtlInSeconds'])) { + return; + } + + $this->store->delete($scope, $key); + } + + $this->store->savePending( + scope: $scope, + key: $key, + fingerprint: $fingerprint, + ttlInSeconds: $pendingRecordTtlInSeconds, + pendingOwner: $owner, + pendingHeartbeatAt: time(), + ); + + $heartbeat = new HeartbeatRenewer(); + $heartbeat->start( + store: $this->store, + scope: $scope, + key: $key, + owner: $owner, + intervalInSeconds: max(1, intdiv($options['pendingTtlInSeconds'], 3)), + recordTtlInSeconds: $pendingRecordTtlInSeconds, + ); + + try { + $next($command); + + $this->store->saveCompleted( + scope: $scope, + key: $key, + fingerprint: $fingerprint, + response: null, + ttlInSeconds: $options['ttlInSeconds'], + ); + } catch (Throwable $throwable) { + $this->store->delete($scope, $key); + + throw $throwable; + } finally { + $heartbeat->stop(); + } + }, + wait: Duration::seconds($pendingTtlInSeconds), + ); + } + + private function assertRecordMatches(IdempotencyRecord $record, string $fingerprint, string $scope, string $key): void + { + if ($record->fingerprint !== $fingerprint) { + throw IdempotencyKeyWasAlreadyUsed::forScope($scope, $key); + } + } + + private function resolveKey(object $command, string $fingerprint): string + { + if (! $command instanceof HasIdempotencyKey) { + return $fingerprint; + } + + $key = trim($command->getIdempotencyKey()); + + if ($key === '') { + return $fingerprint; + } + + return $key; + } + + /** @return null|array{ttlInSeconds: int, pendingTtlInSeconds: int} */ + private function resolveOptions(object $command): ?array + { + $instance = new ClassReflector($command)->getAttribute(Idempotent::class); + + if ($instance === null) { + $instance = ($this->commandBusConfig->handlers[$command::class] ?? null) + ?->handler + ->getAttribute(Idempotent::class); + } + + if ($instance === null) { + return null; + } + + return [ + 'ttlInSeconds' => $instance->ttlInSeconds ?? $this->config->ttlInSeconds, + 'pendingTtlInSeconds' => $instance->pendingTtlInSeconds ?? $this->config->pendingTtlInSeconds, + ]; + } + + private function shouldTakeOverPendingRecord(IdempotencyRecord $record, string $fingerprint, string $scope, string $key, int $pendingTtlInSeconds): bool + { + $this->assertRecordMatches($record, $fingerprint, $scope, $key); + + if ($record->state !== IdempotencyState::PENDING) { + return false; + } + + return ( + $this->processingOwner->resolveLiveness( + owner: $record->pendingOwner, + heartbeatAt: $record->pendingHeartbeatAt, + staleAfterInSeconds: $pendingTtlInSeconds, + ) === ProcessingOwnerLiveness::DEAD + ); + } + + private function resolveScope(object $command): string + { + return 'command:' . $command::class; + } +} diff --git a/packages/idempotency/src/Store/CacheIdempotencyStore.php b/packages/idempotency/src/Store/CacheIdempotencyStore.php new file mode 100644 index 000000000..cbcad5b9d --- /dev/null +++ b/packages/idempotency/src/Store/CacheIdempotencyStore.php @@ -0,0 +1,81 @@ +cache->get($this->keyResolver->recordKey($scope, $key)); + + if (! $record instanceof IdempotencyRecord) { + return null; + } + + return $record; + } + + public function savePending(string $scope, string $key, string $fingerprint, int $ttlInSeconds, ?string $pendingOwner = null, ?int $pendingHeartbeatAt = null): void + { + $this->cache->put( + key: $this->keyResolver->recordKey($scope, $key), + value: new IdempotencyRecord( + fingerprint: $fingerprint, + state: IdempotencyState::PENDING, + pendingOwner: $pendingOwner, + pendingHeartbeatAt: $pendingHeartbeatAt, + ), + expiration: Duration::seconds($ttlInSeconds), + ); + } + + public function updateHeartbeat(string $scope, string $key, string $owner, int $heartbeatAt, int $ttlInSeconds): void + { + $record = $this->find($scope, $key); + + if ($record === null || $record->state !== IdempotencyState::PENDING || $record->pendingOwner !== $owner) { + return; + } + + $this->cache->put( + key: $this->keyResolver->recordKey($scope, $key), + value: new IdempotencyRecord( + fingerprint: $record->fingerprint, + state: $record->state, + response: $record->response, + pendingOwner: $record->pendingOwner, + pendingHeartbeatAt: $heartbeatAt, + ), + expiration: Duration::seconds($ttlInSeconds), + ); + } + + public function saveCompleted(string $scope, string $key, string $fingerprint, ?StoredResponse $response, int $ttlInSeconds): void + { + $this->cache->put( + key: $this->keyResolver->recordKey($scope, $key), + value: new IdempotencyRecord( + fingerprint: $fingerprint, + state: IdempotencyState::COMPLETED, + response: $response, + ), + expiration: Duration::seconds($ttlInSeconds), + ); + } + + public function delete(string $scope, string $key): void + { + $this->cache->remove($this->keyResolver->recordKey($scope, $key)); + } +} diff --git a/packages/idempotency/src/Store/IdempotencyRecord.php b/packages/idempotency/src/Store/IdempotencyRecord.php new file mode 100644 index 000000000..246cf7d56 --- /dev/null +++ b/packages/idempotency/src/Store/IdempotencyRecord.php @@ -0,0 +1,16 @@ +> */ + public array $headers, + public ?string $serializedBody, + ) {} + + public static function fromResponse(Response $response): self + { + $headers = []; + + foreach ($response->headers as $header) { + $headers[$header->name] = array_map( + callback: static fn (mixed $value): string => (string) $value, + array: $header->values, + ); + } + + return new self( + statusCode: $response->status->value, + headers: $headers, + serializedBody: self::serializeBody($response->body), + ); + } + + public function toResponse(): Response + { + return new GenericResponse( + status: Status::fromCode($this->statusCode), + body: $this->unserializeBody(), + headers: $this->headers, + ); + } + + private static function serializeBody(mixed $body): ?string + { + if ($body === null) { + return null; + } + + if ($body instanceof JsonSerializable) { + $encodedBody = self::tryJsonEncode($body); + + if ($encodedBody !== null) { + return 'json:' . base64_encode($encodedBody); + } + } + + if ($body instanceof Generator || $body instanceof View) { + return 'string:' . base64_encode(get_debug_type($body)); + } + + try { + return 'php:' . base64_encode(serialize($body)); + } catch (Throwable) { + $encodedBody = self::tryJsonEncode($body); + + if ($encodedBody !== null) { + return 'json:' . base64_encode($encodedBody); + } + + return 'string:' . base64_encode(get_debug_type($body)); + } + } + + private function unserializeBody(): mixed + { + if ($this->serializedBody === null) { + return null; + } + + if (str_starts_with($this->serializedBody, 'php:')) { + try { + $decoded = base64_decode(substr($this->serializedBody, 4), true); + + if ($decoded === false) { + return null; + } + + return $this->normalizeBody(unserialize($decoded, ['allowed_classes' => false])); + } catch (Throwable) { + return null; + } + } + + if (str_starts_with($this->serializedBody, 'json:')) { + try { + $decoded = base64_decode(substr($this->serializedBody, 5), true); + + if ($decoded === false) { + return null; + } + + return $this->normalizeBody(decode($decoded)); + } catch (Throwable) { + return null; + } + } + + if (str_starts_with($this->serializedBody, 'string:')) { + $decoded = base64_decode(substr($this->serializedBody, 7), true); + + return $decoded === false ? null : $decoded; + } + + try { + return $this->normalizeBody(unserialize($this->serializedBody, ['allowed_classes' => false])); + } catch (Throwable) { + return null; + } + } + + private function normalizeBody(mixed $body): Generator|View|string|array|JsonSerializable|null + { + if ($body === null || is_string($body) || is_array($body) || $body instanceof Generator || $body instanceof View || $body instanceof JsonSerializable) { + return $body; + } + + if (is_bool($body)) { + return $body ? 'true' : 'false'; + } + + if (is_int($body) || is_float($body)) { + return (string) $body; + } + + return get_debug_type($body); + } + + private static function tryJsonEncode(mixed $body): ?string + { + try { + return encode($body); + } catch (Throwable) { + return null; + } + } +} diff --git a/packages/idempotency/src/Support/HeartbeatRenewer.php b/packages/idempotency/src/Support/HeartbeatRenewer.php new file mode 100644 index 000000000..a1a67d378 --- /dev/null +++ b/packages/idempotency/src/Support/HeartbeatRenewer.php @@ -0,0 +1,81 @@ +active || ! self::supported()) { + return; + } + + $this->previousAsyncSignals = pcntl_async_signals(true); + $this->previousHandler = pcntl_signal_get_handler(SIGALRM); + + pcntl_signal(SIGALRM, static function () use ($store, $scope, $key, $owner, $intervalInSeconds, $recordTtlInSeconds): void { + try { + $store->updateHeartbeat($scope, $key, $owner, time(), $recordTtlInSeconds); + } catch (Throwable) { // @mago-expect lint:no-empty-catch-clause + } + + pcntl_alarm($intervalInSeconds); + }); + + $this->previousAlarmRemaining = pcntl_alarm($intervalInSeconds); + $this->active = true; + } + + /** + * Stops the recurring heartbeat alarm and restores any previous signal configuration. + */ + public function stop(): void + { + if (! $this->active) { + return; + } + + pcntl_alarm(0); + pcntl_signal(SIGALRM, $this->previousHandler); + + if ($this->previousAlarmRemaining > 0 && is_callable($this->previousHandler)) { + pcntl_alarm($this->previousAlarmRemaining); + } + + pcntl_async_signals($this->previousAsyncSignals); + $this->active = false; + } + + public static function supported(): bool + { + return function_exists('pcntl_alarm') && function_exists('pcntl_signal') && function_exists('pcntl_async_signals') && function_exists('pcntl_signal_get_handler'); + } +} diff --git a/packages/idempotency/src/Support/IdempotencyKeyResolver.php b/packages/idempotency/src/Support/IdempotencyKeyResolver.php new file mode 100644 index 000000000..f9ad34e76 --- /dev/null +++ b/packages/idempotency/src/Support/IdempotencyKeyResolver.php @@ -0,0 +1,30 @@ +config->cachePrefix) ?? 'idempotency'; + + return sprintf( + '%s_%s', + $prefix, + hash('sha256', $scope . ':' . $key), + ); + } + + public function lockKey(string $scope, string $key): string + { + return $this->recordKey($scope, $key) . '_lock'; + } +} diff --git a/packages/idempotency/src/Support/ProcessingOwner.php b/packages/idempotency/src/Support/ProcessingOwner.php new file mode 100644 index 000000000..e4d3acfa6 --- /dev/null +++ b/packages/idempotency/src/Support/ProcessingOwner.php @@ -0,0 +1,74 @@ +isHeartbeatStale($heartbeatAt, $staleAfterInSeconds) + ? ProcessingOwnerLiveness::DEAD + : ProcessingOwnerLiveness::UNKNOWN; + } + + if (function_exists('posix_getpgid')) { + return posix_getpgid($pidAsInt) !== false + ? ProcessingOwnerLiveness::ALIVE + : ProcessingOwnerLiveness::DEAD; + } + + return $this->isHeartbeatStale($heartbeatAt, $staleAfterInSeconds) + ? ProcessingOwnerLiveness::DEAD + : ProcessingOwnerLiveness::UNKNOWN; + } + + private function isHeartbeatStale(?int $heartbeatAt, int $staleAfterInSeconds): bool + { + if ($heartbeatAt === null) { + return false; + } + + return (time() - $heartbeatAt) >= max(1, $staleAfterInSeconds); + } +} diff --git a/packages/idempotency/src/Support/ProcessingOwnerLiveness.php b/packages/idempotency/src/Support/ProcessingOwnerLiveness.php new file mode 100644 index 000000000..842d6d6b9 --- /dev/null +++ b/packages/idempotency/src/Support/ProcessingOwnerLiveness.php @@ -0,0 +1,12 @@ +value) !== null; + } + + public static function allowed(): string + { + return implode(', ', array_column(self::cases(), 'value')); + } +} diff --git a/packages/idempotency/tests/Fixtures/FixedScopeResolver.php b/packages/idempotency/tests/Fixtures/FixedScopeResolver.php new file mode 100644 index 000000000..505347c19 --- /dev/null +++ b/packages/idempotency/tests/Fixtures/FixedScopeResolver.php @@ -0,0 +1,20 @@ +scope; + } +} diff --git a/packages/idempotency/tests/Fixtures/RecordingCache.php b/packages/idempotency/tests/Fixtures/RecordingCache.php new file mode 100644 index 000000000..cd16d4360 --- /dev/null +++ b/packages/idempotency/tests/Fixtures/RecordingCache.php @@ -0,0 +1,86 @@ +cache = new GenericCache(new ArrayAdapter()); + } + + public function lock(Stringable|string $key, null|Duration|DateTimeInterface $duration = null, null|Stringable|string $owner = null): Lock + { + $this->lastLockDuration = $duration instanceof Duration ? $duration : null; + + return $this->cache->lock($key, $duration, $owner); + } + + public function has(Stringable|string $key): bool + { + return $this->cache->has($key); + } + + public function put(Stringable|string $key, mixed $value, null|Duration|DateTimeInterface $expiration = null): CacheItemInterface + { + return $this->cache->put($key, $value, $expiration); + } + + public function putMany(iterable $values, null|Duration|DateTimeInterface $expiration = null): array + { + return $this->cache->putMany($values, $expiration); + } + + public function increment(Stringable|string $key, int $by = 1): int + { + return $this->cache->increment($key, $by); + } + + public function decrement(Stringable|string $key, int $by = 1): int + { + return $this->cache->decrement($key, $by); + } + + public function get(Stringable|string $key): mixed + { + return $this->cache->get($key); + } + + public function getMany(iterable $key): array + { + return $this->cache->getMany($key); + } + + public function resolve(Stringable|string $key, Closure $callback, null|Duration|DateTimeInterface $expiration = null, ?Duration $stale = null): mixed + { + return $this->cache->resolve($key, $callback, $expiration, $stale); + } + + public function remove(Stringable|string $key): void + { + $this->cache->remove($key); + } + + public function clear(): void + { + $this->cache->clear(); + } +} diff --git a/packages/idempotency/tests/Fixtures/RecordingStore.php b/packages/idempotency/tests/Fixtures/RecordingStore.php new file mode 100644 index 000000000..07009c674 --- /dev/null +++ b/packages/idempotency/tests/Fixtures/RecordingStore.php @@ -0,0 +1,58 @@ +store = new CacheIdempotencyStore($cache, $resolver); + } + + public function find(string $scope, string $key): ?IdempotencyRecord + { + return $this->store->find($scope, $key); + } + + public function savePending(string $scope, string $key, string $fingerprint, int $ttlInSeconds, ?string $pendingOwner = null, ?int $pendingHeartbeatAt = null): void + { + $this->lastSavePending = [ + 'scope' => $scope, + 'key' => $key, + 'fingerprint' => $fingerprint, + 'ttlInSeconds' => $ttlInSeconds, + 'pendingOwner' => $pendingOwner, + 'pendingHeartbeatAt' => $pendingHeartbeatAt, + ]; + + $this->store->savePending($scope, $key, $fingerprint, $ttlInSeconds, $pendingOwner, $pendingHeartbeatAt); + } + + public function updateHeartbeat(string $scope, string $key, string $owner, int $heartbeatAt, int $ttlInSeconds): void + { + $this->store->updateHeartbeat($scope, $key, $owner, $heartbeatAt, $ttlInSeconds); + } + + public function saveCompleted(string $scope, string $key, string $fingerprint, ?StoredResponse $response, int $ttlInSeconds): void + { + $this->store->saveCompleted($scope, $key, $fingerprint, $response, $ttlInSeconds); + } + + public function delete(string $scope, string $key): void + { + $this->store->delete($scope, $key); + } +} diff --git a/packages/idempotency/tests/HeartbeatRenewerTest.php b/packages/idempotency/tests/HeartbeatRenewerTest.php new file mode 100644 index 000000000..9a821eb79 --- /dev/null +++ b/packages/idempotency/tests/HeartbeatRenewerTest.php @@ -0,0 +1,166 @@ +markTestSkipped('Idempotency tests are not supported on Windows.'); + } + } + + #[Test] + public function update_heartbeat_refreshes_pending_record_timestamp(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + + $store->savePending( + scope: 'test-scope', + key: 'test-key', + fingerprint: 'abc123', + ttlInSeconds: 120, + pendingOwner: 'host|123|token', + pendingHeartbeatAt: 1000, + ); + + $store->updateHeartbeat('test-scope', 'test-key', 'host|123|token', 2000, 120); + + $record = $store->find('test-scope', 'test-key'); + + $this->assertNotNull($record); + $this->assertSame(IdempotencyState::PENDING, $record->state); + $this->assertSame(2000, $record->pendingHeartbeatAt); + $this->assertSame('abc123', $record->fingerprint); + $this->assertSame('host|123|token', $record->pendingOwner); + } + + #[Test] + public function update_heartbeat_does_nothing_when_record_does_not_exist(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + + $store->updateHeartbeat('test-scope', 'missing-key', 'host|123|token', 2000, 120); + + $this->assertNull($store->find('test-scope', 'missing-key')); + } + + #[Test] + public function update_heartbeat_does_nothing_when_record_is_completed(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + + $store->saveCompleted( + scope: 'test-scope', + key: 'test-key', + fingerprint: 'abc123', + response: null, + ttlInSeconds: 120, + ); + + $store->updateHeartbeat('test-scope', 'test-key', 'host|123|token', 2000, 120); + + $record = $store->find('test-scope', 'test-key'); + + $this->assertNotNull($record); + $this->assertSame(IdempotencyState::COMPLETED, $record->state); + $this->assertNull($record->pendingHeartbeatAt); + } + + #[Test] + public function update_heartbeat_does_nothing_when_owner_does_not_match(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + + $store->savePending( + scope: 'test-scope', + key: 'test-key', + fingerprint: 'abc123', + ttlInSeconds: 120, + pendingOwner: 'host|123|token', + pendingHeartbeatAt: 1000, + ); + + $store->updateHeartbeat('test-scope', 'test-key', 'other-host|456|other', 2000, 120); + + $record = $store->find('test-scope', 'test-key'); + + $this->assertNotNull($record); + $this->assertSame(1000, $record->pendingHeartbeatAt); + } + + #[Test] + public function renewer_updates_heartbeat_during_processing(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + + $initialTime = time() - 10; + + $store->savePending( + scope: 'test-scope', + key: 'test-key', + fingerprint: 'abc123', + ttlInSeconds: 120, + pendingOwner: 'host|123|token', + pendingHeartbeatAt: $initialTime, + ); + + $renewer = new HeartbeatRenewer(); + $renewer->start( + store: $store, + scope: 'test-scope', + key: 'test-key', + owner: 'host|123|token', + intervalInSeconds: 1, + recordTtlInSeconds: 120, + ); + + sleep(2); + + $renewer->stop(); + + $record = $store->find('test-scope', 'test-key'); + + $this->assertNotNull($record); + $this->assertGreaterThan($initialTime, $record->pendingHeartbeatAt); + } + + #[Test] + public function renewer_stop_is_safe_when_not_started(): void + { + $this->expectNotToPerformAssertions(); + + $renewer = new HeartbeatRenewer(); + $renewer->stop(); + } +} diff --git a/packages/idempotency/tests/IdempotencyMiddlewareTest.php b/packages/idempotency/tests/IdempotencyMiddlewareTest.php new file mode 100644 index 000000000..1abdaf995 --- /dev/null +++ b/packages/idempotency/tests/IdempotencyMiddlewareTest.php @@ -0,0 +1,772 @@ +markTestSkipped('Idempotency tests are not supported on Windows.'); + } + } + + #[Test] + public function idempotency_middleware_is_not_globally_discovered(): void + { + $reflector = new ClassReflector(IdempotencyMiddleware::class); + + $this->assertTrue($reflector->hasAttribute(SkipDiscovery::class)); + } + + #[Test] + public function requires_an_idempotency_key_by_default(): void + { + $middleware = $this->createMiddleware('create'); + $calls = 0; + + $response = $middleware( + new GenericRequest(Method::POST, '/orders', body: ['amount' => 100]), + new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['ok' => true]); + }), + ); + + $this->assertSame(Status::BAD_REQUEST, $response->status); + $this->assertSame(0, $calls); + } + + #[Test] + public function replays_the_original_response_for_the_same_key_and_payload(): void + { + $middleware = $this->createMiddleware('create'); + $calls = 0; + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'order-100'], + ); + + $next = new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['id' => 'order-1']); + }); + + $firstResponse = $middleware($request, $next); + $secondResponse = $middleware($request, $next); + + $this->assertSame(Status::CREATED, $firstResponse->status); + $this->assertSame(Status::CREATED, $secondResponse->status); + $this->assertSame(1, $calls); + $this->assertSame('true', $secondResponse->getHeader('idempotency-replayed')?->first()); + $this->assertSame(['id' => 'order-1'], $secondResponse->body); + } + + #[Test] + public function rejects_the_same_key_when_the_payload_changes(): void + { + $middleware = $this->createMiddleware('create'); + $calls = 0; + + $next = new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['id' => 'order-1']); + }); + + $middleware( + new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'same-key'], + ), + $next, + ); + + $secondResponse = $middleware( + new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 101], + headers: ['Idempotency-Key' => 'same-key'], + ), + $next, + ); + + $this->assertSame(Status::UNPROCESSABLE_CONTENT, $secondResponse->status); + $this->assertSame(1, $calls); + } + + #[Test] + public function scopes_idempotency_keys_per_route_when_a_handler_has_multiple_routes(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $keyResolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $keyResolver); + + $firstMiddleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRouteForUri('createForMultipleRoutes', '/bulk-orders'), + processingOwner: new ProcessingOwner(), + scopeResolver: new FixedScopeResolver(), + ); + + $secondMiddleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRouteForUri('createForMultipleRoutes', '/bulk-orders/import'), + processingOwner: new ProcessingOwner(), + scopeResolver: new FixedScopeResolver(), + ); + + $calls = 0; + $next = new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['id' => 'order-' . $calls]); + }); + + $firstResponse = $firstMiddleware( + new GenericRequest( + Method::POST, + '/bulk-orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'shared-key'], + ), + $next, + ); + + $secondResponse = $secondMiddleware( + new GenericRequest( + Method::POST, + '/bulk-orders/import', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'shared-key'], + ), + $next, + ); + + $this->assertSame(Status::CREATED, $firstResponse->status); + $this->assertSame(Status::CREATED, $secondResponse->status); + $this->assertSame(2, $calls); + } + + #[Test] + public function isolates_idempotency_keys_per_scope_resolver_identity(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $keyResolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $keyResolver); + + $userAMiddleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute('create'), + processingOwner: new ProcessingOwner(), + scopeResolver: new FixedScopeResolver('user-a'), + ); + + $userBMiddleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute('create'), + processingOwner: new ProcessingOwner(), + scopeResolver: new FixedScopeResolver('user-b'), + ); + + $calls = 0; + $next = new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['id' => 'order-' . $calls]); + }); + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'shared-key'], + ); + + $firstResponse = $userAMiddleware($request, $next); + $secondResponse = $userBMiddleware($request, $next); + + $this->assertSame(Status::CREATED, $firstResponse->status); + $this->assertSame(Status::CREATED, $secondResponse->status); + $this->assertSame(2, $calls); + $this->assertNull($firstResponse->getHeader('idempotency-replayed')); + $this->assertNull($secondResponse->getHeader('idempotency-replayed')); + } + + #[Test] + public function can_skip_key_requirement_for_specific_routes(): void + { + $middleware = $this->createMiddleware('createWithoutKeyRequirement'); + $calls = 0; + + $response = $middleware( + new GenericRequest(Method::POST, '/drafts', body: ['draft' => true]), + new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['ok' => true]); + }), + ); + + $this->assertSame(Status::CREATED, $response->status); + $this->assertSame(1, $calls); + } + + #[Test] + public function idempotent_decorator_adds_the_idempotency_middleware(): void + { + $route = new FakeRoute(); + $attribute = new Idempotent(); + + $attribute->decorate($route); + + $this->assertContains(IdempotencyMiddleware::class, $route->middleware); + } + + #[Test] + public function idempotent_decorator_does_not_add_duplicate_middleware_entries(): void + { + $route = new FakeRoute(); + $attribute = new Idempotent(); + + $attribute->decorate($route); + $attribute->decorate($route); + + $this->assertCount( + 1, + array_filter( + $route->middleware, + static fn (string $middleware): bool => $middleware === IdempotencyMiddleware::class, + ), + ); + } + + #[Test] + public function throws_for_non_post_and_patch_methods(): void + { + $middleware = $this->createMiddleware('create'); + + $this->expectException(IdempotencyMethodWasNotSupported::class); + + $middleware( + new GenericRequest( + Method::PUT, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'order-100'], + ), + new HttpMiddlewareCallable(static fn (Request $_): Response => new GenericResponse(Status::OK, ['ok' => true])), + ); + } + + #[Test] + public function uses_pending_ttl_for_lock_and_completion_ttl_for_pending_record(): void + { + $cache = new RecordingCache(); + $config = new IdempotencyConfig(ttlInSeconds: 120, pendingTtlInSeconds: 5); + $keyResolver = new IdempotencyKeyResolver($config); + $store = new RecordingStore($cache, $keyResolver); + $middleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute('create'), + processingOwner: new ProcessingOwner(), + scopeResolver: new FixedScopeResolver(), + ); + + $response = $middleware( + new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'order-ttl'], + ), + new HttpMiddlewareCallable(static fn (Request $_): Response => new GenericResponse(Status::CREATED, ['id' => 'order-1'])), + ); + + $this->assertSame(Status::CREATED, $response->status); + $this->assertSame(5.0, $cache->lastLockDuration?->getTotalSeconds()); + $this->assertSame(120, $store->lastSavePending['ttlInSeconds'] ?? null); + $this->assertIsString($store->lastSavePending['pendingOwner'] ?? null); + $this->assertIsInt($store->lastSavePending['pendingHeartbeatAt'] ?? null); + } + + #[Test] + public function replays_a_non_serializable_body_as_fallback_text_instead_of_null(): void + { + $middleware = $this->createMiddleware('create'); + $calls = 0; + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'non-serializable-body'], + ); + + $next = new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse( + Status::CREATED, + (static function (): Generator { + yield 'chunk'; + })(), + ); + }); + + $middleware($request, $next); + $replayedResponse = $middleware($request, $next); + + $this->assertSame(1, $calls); + $this->assertSame(Status::CREATED, $replayedResponse->status); + $this->assertNotNull($replayedResponse->body); + $this->assertSame('true', $replayedResponse->getHeader('idempotency-replayed')?->first()); + } + + #[Test] + public function replays_a_json_serializable_body(): void + { + $middleware = $this->createMiddleware('create'); + $calls = 0; + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'json-serializable-body'], + ); + + $next = new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, new SerializableBody('order-1')); + }); + + $middleware($request, $next); + $replayedResponse = $middleware($request, $next); + + $this->assertSame(1, $calls); + $this->assertSame(Status::CREATED, $replayedResponse->status); + $this->assertSame(['id' => 'order-1'], $replayedResponse->body); + $this->assertSame('true', $replayedResponse->getHeader('idempotency-replayed')?->first()); + } + + #[Test] + public function takes_over_a_pending_record_owned_by_a_dead_process(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $keyResolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $keyResolver); + $scopeResolver = new FixedScopeResolver(); + $middleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute('create'), + processingOwner: new ProcessingOwner(), + scopeResolver: $scopeResolver, + ); + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'stale-order'], + ); + + $store->savePending( + scope: sprintf('%s::%s:%s:%s:%s', IdempotencyTestController::class, 'create', Method::POST->value, '/orders', $scopeResolver->resolve($request)), + key: 'stale-order', + fingerprint: new RequestFingerprintGenerator()->generate($request), + ttlInSeconds: 120, + pendingOwner: sprintf('%s|%d|%s', php_uname('n'), 99999999, 'stale-owner'), + pendingHeartbeatAt: time(), + ); + + $calls = 0; + $response = $middleware( + $request, + new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['id' => 'order-1']); + }), + ); + + $this->assertSame(1, $calls); + $this->assertSame(Status::CREATED, $response->status); + } + + #[Test] + public function takes_over_a_pending_record_owned_by_another_host_when_the_heartbeat_is_stale(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $keyResolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $keyResolver); + $scopeResolver = new FixedScopeResolver(); + $middleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute('create'), + processingOwner: new ProcessingOwner(), + scopeResolver: $scopeResolver, + ); + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'stale-order'], + ); + + $store->savePending( + scope: sprintf('%s::%s:%s:%s:%s', IdempotencyTestController::class, 'create', Method::POST->value, '/orders', $scopeResolver->resolve($request)), + key: 'stale-order', + fingerprint: new RequestFingerprintGenerator()->generate($request), + ttlInSeconds: 120, + pendingOwner: 'remote-host|12345|stale-owner', + pendingHeartbeatAt: time() - 120, + ); + + $calls = 0; + $response = $middleware( + $request, + new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['id' => 'order-1']); + }), + ); + + $this->assertSame(1, $calls); + $this->assertSame(Status::CREATED, $response->status); + } + + #[Test] + public function does_not_take_over_a_pending_record_owned_by_another_host_when_the_heartbeat_is_fresh(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $keyResolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $keyResolver); + $scopeResolver = new FixedScopeResolver(); + $middleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute('create'), + processingOwner: new ProcessingOwner(), + scopeResolver: $scopeResolver, + ); + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'live-order'], + ); + + $store->savePending( + scope: sprintf('%s::%s:%s:%s:%s', IdempotencyTestController::class, 'create', Method::POST->value, '/orders', $scopeResolver->resolve($request)), + key: 'live-order', + fingerprint: new RequestFingerprintGenerator()->generate($request), + ttlInSeconds: 120, + pendingOwner: 'remote-host|12345|alive-owner', + pendingHeartbeatAt: time(), + ); + + $calls = 0; + $response = $middleware( + $request, + new HttpMiddlewareCallable(function (Request $_) use (&$calls): Response { + $calls++; + + return new GenericResponse(Status::CREATED, ['id' => 'order-1']); + }), + ); + + $this->assertSame(0, $calls); + $this->assertSame(Status::CONFLICT, $response->status); + } + + #[Test] + public function does_not_delete_existing_completed_record_when_lookup_fails_before_pending_is_saved(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $keyResolver = new IdempotencyKeyResolver($config); + $baseStore = new CacheIdempotencyStore($cache, $keyResolver); + $store = new ThrowingFindStore($baseStore); + $scopeResolver = new FixedScopeResolver(); + + $middleware = new IdempotencyMiddleware( + cache: $cache, + store: $store, + keyResolver: $keyResolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute('create'), + processingOwner: new ProcessingOwner(), + scopeResolver: $scopeResolver, + ); + + $request = new GenericRequest( + Method::POST, + '/orders', + body: ['amount' => 100], + headers: ['Idempotency-Key' => 'order-123'], + ); + + $scope = sprintf('%s::%s:%s:%s:%s', IdempotencyTestController::class, 'create', Method::POST->value, '/orders', $scopeResolver->resolve($request)); + $fingerprint = new RequestFingerprintGenerator()->generate($request); + + $baseStore->saveCompleted( + scope: $scope, + key: 'order-123', + fingerprint: $fingerprint, + response: StoredResponse::fromResponse(new GenericResponse(Status::CREATED, ['id' => 'order-1'])), + ttlInSeconds: 120, + ); + + try { + $middleware( + $request, + new HttpMiddlewareCallable(static fn (Request $_): Response => new GenericResponse(Status::CREATED, ['id' => 'order-1'])), + ); + + $this->fail('Expected RuntimeException to be thrown.'); + } catch (RuntimeException) { // @mago-expect lint:no-empty-catch-clause + } + + $this->assertFalse($store->deleteCalled); + + $record = $baseStore->find($scope, 'order-123'); + + $this->assertNotNull($record); + $this->assertSame(IdempotencyState::COMPLETED, $record->state); + } + + private function createMiddleware(string $method, ?FixedScopeResolver $scopeResolver = null): IdempotencyMiddleware + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + + return new IdempotencyMiddleware( + cache: $cache, + store: new CacheIdempotencyStore($cache, $resolver), + keyResolver: $resolver, + fingerprintGenerator: new RequestFingerprintGenerator(), + config: $config, + matchedRoute: $this->createMatchedRoute($method), + processingOwner: new ProcessingOwner(), + scopeResolver: $scopeResolver ?? new FixedScopeResolver(), + ); + } + + private function createMatchedRoute(string $method): MatchedRoute + { + $methodReflector = MethodReflector::fromParts(IdempotencyTestController::class, $method); + $route = $methodReflector->getAttribute(Route::class); + + if ($route === null) { + throw new RuntimeException(sprintf('No route found for `%s`.', $method)); + } + + return $this->createMatchedRouteFromRoute($methodReflector, $route); + } + + private function createMatchedRouteForUri(string $method, string $uri): MatchedRoute + { + $methodReflector = MethodReflector::fromParts(IdempotencyTestController::class, $method); + $route = array_find( + $methodReflector->getAttributes(Route::class), + static fn (Route $route): bool => $route->uri === $uri, + ); + + if ($route === null) { + throw new RuntimeException(sprintf('No route `%s` found for `%s`.', $uri, $method)); + } + + return $this->createMatchedRouteFromRoute($methodReflector, $route); + } + + private function createMatchedRouteFromRoute(MethodReflector $methodReflector, Route $route): MatchedRoute + { + $decorators = [ + ...$methodReflector->getAttributes(RouteDecorator::class), + ...$methodReflector->getDeclaringClass()->getAttributes(RouteDecorator::class), + ]; + + return new MatchedRoute( + route: DiscoveredRoute::fromRoute($route, $decorators, $methodReflector), + params: [], + ); + } +} + +final class ThrowingFindStore implements IdempotencyStore +{ + public bool $deleteCalled = false; + + public function __construct( + private readonly IdempotencyStore $store, + ) {} + + public function find(string $scope, string $key): ?\Tempest\Idempotency\Store\IdempotencyRecord + { + throw new RuntimeException('Simulated store read failure.'); + } + + public function savePending(string $scope, string $key, string $fingerprint, int $ttlInSeconds, ?string $pendingOwner = null, ?int $pendingHeartbeatAt = null): void + { + $this->store->savePending($scope, $key, $fingerprint, $ttlInSeconds, $pendingOwner, $pendingHeartbeatAt); + } + + public function updateHeartbeat(string $scope, string $key, string $owner, int $heartbeatAt, int $ttlInSeconds): void + { + $this->store->updateHeartbeat($scope, $key, $owner, $heartbeatAt, $ttlInSeconds); + } + + public function saveCompleted(string $scope, string $key, string $fingerprint, ?StoredResponse $response, int $ttlInSeconds): void + { + $this->store->saveCompleted($scope, $key, $fingerprint, $response, $ttlInSeconds); + } + + public function delete(string $scope, string $key): void + { + $this->deleteCalled = true; + + $this->store->delete($scope, $key); + } +} + +final class FakeRoute implements Route +{ + public Method $method = Method::POST; + + public string $uri = '/orders'; + + public array $middleware = []; + + public array $without = []; +} + +final class IdempotencyTestController +{ + #[Post('/orders')] + #[Idempotent] + public function create(): Response + { + return new GenericResponse(Status::CREATED, ['id' => 'order-1']); + } + + #[Post('/drafts')] + #[Idempotent] + #[IdempotentRoute(requireKey: false)] + public function createWithoutKeyRequirement(): Response + { + return new GenericResponse(Status::CREATED, ['ok' => true]); + } + + #[Post('/bulk-orders')] + #[Post('/bulk-orders/import')] + #[Idempotent] + public function createForMultipleRoutes(): Response + { + return new GenericResponse(Status::CREATED, ['id' => 'bulk-order-1']); + } +} + +final readonly class SerializableBody implements JsonSerializable +{ + public function __construct( + private string $id, + ) {} + + public function jsonSerialize(): array + { + return ['id' => $this->id]; + } +} diff --git a/packages/idempotency/tests/IdempotentCommandMiddlewareTest.php b/packages/idempotency/tests/IdempotentCommandMiddlewareTest.php new file mode 100644 index 000000000..52c9c11f4 --- /dev/null +++ b/packages/idempotency/tests/IdempotentCommandMiddlewareTest.php @@ -0,0 +1,344 @@ +markTestSkipped('Idempotency tests are not supported on Windows.'); + } + } + + #[Test] + public function ignores_commands_without_idempotent_attribute(): void + { + $middleware = $this->createMiddleware(); + $calls = 0; + $command = new CreateDraftCommand('A'); + + $next = new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }); + + $middleware($command, $next); + $middleware($command, $next); + + $this->assertSame(2, $calls); + } + + #[Test] + public function supports_idempotent_attribute_on_handler_method(): void + { + $commandBusConfig = new CommandBusConfig(); + $handler = new CommandHandler(); + $commandBusConfig->addHandler( + $handler, + SyncInventoryCommand::class, + MethodReflector::fromParts(SyncInventoryHandler::class, 'handle'), + ); + + $middleware = $this->createMiddleware($commandBusConfig); + $calls = 0; + $command = new SyncInventoryCommand(warehouse: 'east', sku: 'WIDGET-1'); + + $next = new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }); + + $middleware($command, $next); + $middleware($command, $next); + + $this->assertSame(1, $calls); + } + + #[Test] + public function executes_an_idempotent_command_only_once_for_equal_payloads(): void + { + $middleware = $this->createMiddleware(); + $calls = 0; + $command = new ImportInvoicesCommand(tenant: 'acme', month: '2026-01'); + + $next = new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }); + + $middleware($command, $next); + $middleware($command, $next); + + $this->assertSame(1, $calls); + } + + #[Test] + public function throws_when_the_same_explicit_key_is_used_for_different_payloads(): void + { + $middleware = $this->createMiddleware(); + $calls = 0; + + $next = new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }); + + $middleware(new CreatePayoutCommand('payout-1', 100), $next); + + $this->expectException(IdempotencyKeyWasAlreadyUsed::class); + + $middleware(new CreatePayoutCommand('payout-1', 101), $next); + } + + #[Test] + public function allows_replay_for_same_explicit_key_and_same_payload(): void + { + $middleware = $this->createMiddleware(); + $calls = 0; + $command = new CreatePayoutCommand('payout-1', 100); + + $next = new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }); + + $middleware($command, $next); + $middleware($command, $next); + + $this->assertSame(1, $calls); + } + + #[Test] + public function uses_pending_ttl_for_lock_and_completion_ttl_for_pending_record(): void + { + $cache = new RecordingCache(); + $config = new IdempotencyConfig(ttlInSeconds: 120, pendingTtlInSeconds: 5); + $resolver = new IdempotencyKeyResolver($config); + $store = new RecordingStore($cache, $resolver); + $middleware = new IdempotentCommandMiddleware( + cache: $cache, + store: $store, + keyResolver: $resolver, + fingerprintGenerator: new ObjectFingerprintGenerator(), + config: $config, + commandBusConfig: new CommandBusConfig(), + processingOwner: new ProcessingOwner(), + ); + + $middleware( + new ImportInvoicesCommand(tenant: 'acme', month: '2026-01'), + new CommandBusMiddlewareCallable(static function (object $_): void {}), + ); + + $this->assertSame(5.0, $cache->lastLockDuration?->getTotalSeconds()); + $this->assertSame(120, $store->lastSavePending['ttlInSeconds'] ?? null); + $this->assertIsString($store->lastSavePending['pendingOwner'] ?? null); + $this->assertIsInt($store->lastSavePending['pendingHeartbeatAt'] ?? null); + } + + #[Test] + public function takes_over_a_pending_record_owned_by_a_dead_process(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + $command = new ImportInvoicesCommand(tenant: 'acme', month: '2026-01'); + $fingerprintGenerator = new ObjectFingerprintGenerator(); + $fingerprint = $fingerprintGenerator->generate($command); + $middleware = new IdempotentCommandMiddleware( + cache: $cache, + store: $store, + keyResolver: $resolver, + fingerprintGenerator: $fingerprintGenerator, + config: $config, + commandBusConfig: new CommandBusConfig(), + processingOwner: new ProcessingOwner(), + ); + + $store->savePending( + scope: 'command:' . $command::class, + key: $fingerprint, + fingerprint: $fingerprint, + ttlInSeconds: 120, + pendingOwner: sprintf('%s|%d|%s', php_uname('n'), 99999999, 'stale-owner'), + pendingHeartbeatAt: time(), + ); + + $calls = 0; + $middleware( + $command, + new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }), + ); + + $this->assertSame(1, $calls); + } + + #[Test] + public function takes_over_a_pending_record_owned_by_another_host_when_the_heartbeat_is_stale(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + $command = new ImportInvoicesCommand(tenant: 'acme', month: '2026-01'); + $fingerprintGenerator = new ObjectFingerprintGenerator(); + $fingerprint = $fingerprintGenerator->generate($command); + + $middleware = new IdempotentCommandMiddleware( + cache: $cache, + store: $store, + keyResolver: $resolver, + fingerprintGenerator: $fingerprintGenerator, + config: $config, + commandBusConfig: new CommandBusConfig(), + processingOwner: new ProcessingOwner(), + ); + + $store->savePending( + scope: 'command:' . $command::class, + key: $fingerprint, + fingerprint: $fingerprint, + ttlInSeconds: 120, + pendingOwner: 'remote-host|12345|stale-owner', + pendingHeartbeatAt: time() - 120, + ); + + $calls = 0; + $middleware( + $command, + new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }), + ); + + $this->assertSame(1, $calls); + } + + #[Test] + public function does_not_take_over_a_pending_record_owned_by_another_host_when_the_heartbeat_is_fresh(): void + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + $store = new CacheIdempotencyStore($cache, $resolver); + $command = new ImportInvoicesCommand(tenant: 'acme', month: '2026-01'); + $fingerprintGenerator = new ObjectFingerprintGenerator(); + $fingerprint = $fingerprintGenerator->generate($command); + + $middleware = new IdempotentCommandMiddleware( + cache: $cache, + store: $store, + keyResolver: $resolver, + fingerprintGenerator: $fingerprintGenerator, + config: $config, + commandBusConfig: new CommandBusConfig(), + processingOwner: new ProcessingOwner(), + ); + + $store->savePending( + scope: 'command:' . $command::class, + key: $fingerprint, + fingerprint: $fingerprint, + ttlInSeconds: 120, + pendingOwner: 'remote-host|12345|alive-owner', + pendingHeartbeatAt: time(), + ); + + $calls = 0; + $middleware( + $command, + new CommandBusMiddlewareCallable(function (object $_) use (&$calls): void { + $calls++; + }), + ); + + $this->assertSame(0, $calls); + } + + private function createMiddleware(?CommandBusConfig $commandBusConfig = null): IdempotentCommandMiddleware + { + $cache = new GenericCache(new ArrayAdapter()); + $config = new IdempotencyConfig(); + $resolver = new IdempotencyKeyResolver($config); + + return new IdempotentCommandMiddleware( + cache: $cache, + store: new CacheIdempotencyStore($cache, $resolver), + keyResolver: $resolver, + fingerprintGenerator: new ObjectFingerprintGenerator(), + config: $config, + commandBusConfig: $commandBusConfig ?? new CommandBusConfig(), + processingOwner: new ProcessingOwner(), + ); + } +} + +final readonly class CreateDraftCommand +{ + public function __construct( + public string $title, + ) {} +} + +#[Idempotent] +final readonly class ImportInvoicesCommand +{ + public function __construct( + public string $tenant, + public string $month, + ) {} +} + +#[Idempotent] +final readonly class CreatePayoutCommand implements HasIdempotencyKey +{ + public function __construct( + public string $idempotencyKey, + public int $amount, + ) {} + + public function getIdempotencyKey(): string + { + return $this->idempotencyKey; + } +} + +final readonly class SyncInventoryCommand +{ + public function __construct( + public string $warehouse, + public string $sku, + ) {} +} + +final class SyncInventoryHandler +{ + #[Idempotent] + #[CommandHandler] + public function handle(SyncInventoryCommand $command): void + { + } +} diff --git a/packages/idempotency/tests/ObjectFingerprintGeneratorTest.php b/packages/idempotency/tests/ObjectFingerprintGeneratorTest.php new file mode 100644 index 000000000..20501dde5 --- /dev/null +++ b/packages/idempotency/tests/ObjectFingerprintGeneratorTest.php @@ -0,0 +1,48 @@ +markTestSkipped('Idempotency tests are not supported on Windows.'); + } + } + + #[Test] + #[RunInSeparateProcess] + public function throws_for_circular_references(): void + { + $node = new CircularNode(); + $node->next = $node; + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Circular reference detected while generating command fingerprint.'); + + new ObjectFingerprintGenerator()->generate(new CircularCommand($node)); + } +} + +final class CircularNode +{ + public ?CircularNode $next = null; +} + +final readonly class CircularCommand +{ + public function __construct( + public CircularNode $node, + ) {} +} diff --git a/packages/idempotency/tests/ProcessingOwnerTest.php b/packages/idempotency/tests/ProcessingOwnerTest.php new file mode 100644 index 000000000..e5ce58a62 --- /dev/null +++ b/packages/idempotency/tests/ProcessingOwnerTest.php @@ -0,0 +1,34 @@ +markTestSkipped('Idempotency tests are not supported on Windows.'); + } + } + + #[Test] + public function resolves_liveness_for_hosts_with_colons(): void + { + $liveness = new ProcessingOwner()->resolveLiveness( + owner: '2001:db8::1|99999999|token', + heartbeatAt: time() - 120, + staleAfterInSeconds: 5, + ); + + $this->assertSame(ProcessingOwnerLiveness::DEAD, $liveness); + } +}