Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .agent/packages/idempotency.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,24 @@
## Concrete classes

- `ApcuStore` _(final)_ — implements `IdempotencyStoreInterface`
- `IdempotencyKeyMiddleware` _(final)_ — implements `MiddlewareInterface`
- `InMemoryStore` _(final)_ — implements `IdempotencyStoreInterface`
- `RedisStore` _(final)_ — implements `IdempotencyStoreInterface`
- `RequestBodyHasher`
- `StoredResponse` _(final)_

## Tests as documentation

- `tests/Idempotency/Hash/RequestBodyHasherTest.php`
- `tests/Idempotency/Middleware/IdempotencyKeyMiddlewareTest.php`
- `tests/Idempotency/Storage/ApcuStoreTest.php`
- `tests/Idempotency/Storage/InMemoryStoreTest.php`
- `tests/Idempotency/Storage/RedisStoreTest.php`
- `tests/Idempotency/Storage/StoredResponseTest.php`

## Related packages

- `psr/http-factory`
- `psr/http-message`
- `psr/http-server-handler`
- `psr/http-server-middleware`
49 changes: 49 additions & 0 deletions src/Altair/Idempotency/Hash/RequestBodyHasher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

/*
* This file is part of the univeros/framework
*
* For the full copyright and license information, please view
* the LICENSE file that was distributed with this source code.
*/

namespace Altair\Idempotency\Hash;

use Psr\Http\Message\ServerRequestInterface;

/**
* Hashes the raw request body bytes for use as an idempotency
* payload-fingerprint. SHA-256 by default; the algorithm is fixed
* because cross-environment consistency matters more than
* configurability.
*
* The hash is over the raw bytes, not parsed JSON — so semantically
* equivalent representations (attribute reordering, whitespace
* differences) produce different hashes. Applications that want
* JSON-canonical hashing add an upstream middleware that canonicalises
* the body first.
*
* The hasher rewinds the body stream after reading so downstream
* middleware / handlers still see the same content from position 0.
*/
class RequestBodyHasher
{
public function hash(ServerRequestInterface $request): string
{
$body = $request->getBody();

if ($body->isSeekable()) {
$body->rewind();
}

$contents = (string) $body;

if ($body->isSeekable()) {
$body->rewind();
}

return hash('sha256', $contents);
}
}
240 changes: 240 additions & 0 deletions src/Altair/Idempotency/Middleware/IdempotencyKeyMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
<?php

declare(strict_types=1);

/*
* This file is part of the univeros/framework
*
* For the full copyright and license information, please view
* the LICENSE file that was distributed with this source code.
*/

namespace Altair\Idempotency\Middleware;

use Altair\Idempotency\Contracts\IdempotencyStoreInterface;
use Altair\Idempotency\Hash\RequestBodyHasher;
use Altair\Idempotency\Storage\StoredResponse;
use Override;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Server\RequestHandlerInterface;
use Throwable;

/**
* Stripe-style Idempotency-Key middleware.
*
* Reads `Idempotency-Key` from the request, hashes the request body,
* and coordinates with an {@see IdempotencyStoreInterface} so that a
* replayed request returns the original response and a key reused
* with a different payload is rejected with 409.
*
* Behaviour matrix:
*
* | Situation | Response |
* |------------------------------------------------|---------------------------------------|
* | GET / HEAD / OPTIONS | Pass through; no caching. |
* | Header absent, mode=`optional` | Pass through; no caching. |
* | Header absent, mode=`required` | 400 with `{error}` envelope. |
* | Header malformed (>255 chars or ctrl/ws) | 400 with `{error}` envelope. |
* | Key unseen | Claim; execute; cache; return. |
* | Key seen, same hash, completed | Replay + `Idempotency-Replayed: true` |
* | Key seen, same hash, in-progress (≤ maxWait) | Wait + retry; replay when ready. |
* | Key seen, same hash, in-progress (> maxWait) | 409 conflict. |
* | Key seen, different hash | 409 conflict. |
* | Handler throws | Release claim; re-throw. |
* | Response is streaming | Pass through without caching. |
*
* Response headers are stored on an allow-list basis (default
* `Content-Type`, `Location`, `Link`) so that sensitive headers
* (`Set-Cookie`, `Authorization`) never end up in shared storage.
*/
final readonly class IdempotencyKeyMiddleware implements MiddlewareInterface
{
public const string MODE_OPTIONAL = 'optional';

public const string MODE_REQUIRED = 'required';

public const string HEADER_KEY = 'Idempotency-Key';

public const string HEADER_REPLAYED = 'Idempotency-Replayed';

private const array SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS'];

private const array DEFAULT_ALLOWED_HEADERS = ['Content-Type', 'Location', 'Link'];

/**
* @param list<string> $allowedResponseHeaders Subset of response headers that should be replayed verbatim.
*/
public function __construct(
private IdempotencyStoreInterface $store,
private ResponseFactoryInterface $responseFactory,
private StreamFactoryInterface $streamFactory,
private int $ttlSeconds,
private string $mode = self::MODE_OPTIONAL,
private RequestBodyHasher $hasher = new RequestBodyHasher(),
private array $allowedResponseHeaders = self::DEFAULT_ALLOWED_HEADERS,
private int $maxWaitMs = 500,
private int $waitIntervalMs = 50,
) {}

#[Override]
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
if (\in_array(strtoupper($request->getMethod()), self::SAFE_METHODS, true)) {
return $handler->handle($request);
}

$key = $request->getHeaderLine(self::HEADER_KEY);
if ($key === '') {
if ($this->mode === self::MODE_REQUIRED) {
return $this->errorResponse(400, 'Idempotency-Key header required for this endpoint.');
}

return $handler->handle($request);
}

if (!$this->isValidKey($key)) {
return $this->errorResponse(400, 'Idempotency-Key header is malformed.');
}

$hash = $this->hasher->hash($request);
$existing = $this->store->claim($key, $hash, $this->ttlSeconds);

if (!$existing instanceof StoredResponse) {
return $this->executeAndCache($key, $hash, $request, $handler);
}

if ($existing->requestHash !== $hash) {
return $this->errorResponse(409, 'Idempotency-Key reused with a different payload.');
}

if (!$existing->inProgress) {
return $this->replay($existing);
}

return $this->waitForInProgress($key);
}

private function executeAndCache(
string $key,
string $hash,
ServerRequestInterface $request,
RequestHandlerInterface $handler,
): ResponseInterface {
try {
$response = $handler->handle($request);
} catch (Throwable $throwable) {
$this->store->release($key);

throw $throwable;
}

if ($this->isStreaming($response)) {
$this->store->release($key);

return $response;
}

$bodyContents = (string) $response->getBody();
$stored = StoredResponse::completed(
requestHash: $hash,
status: $response->getStatusCode(),
headers: $this->filterHeaders($response),
body: $bodyContents,
createdAt: time(),
);
$this->store->complete($key, $stored, $this->ttlSeconds);

// Rebuild the body so downstream output is not consumed.
return $response->withBody($this->streamFactory->createStream($bodyContents));
}

private function waitForInProgress(string $key): ResponseInterface
{
$waited = 0;
while ($waited < $this->maxWaitMs) {
usleep($this->waitIntervalMs * 1000);
$waited += $this->waitIntervalMs;

$current = $this->store->get($key);
if (!$current instanceof StoredResponse) {
return $this->errorResponse(409, 'Idempotency-Key claim was released; retry the request.');
}

if (!$current->inProgress) {
return $this->replay($current);
}
}

return $this->errorResponse(409, 'Idempotency-Key claim is still in progress; retry later.');
}

private function replay(StoredResponse $stored): ResponseInterface
{
$response = $this->responseFactory->createResponse($stored->status);
foreach ($stored->headers as $name => $values) {
foreach ($values as $value) {
$response = $response->withAddedHeader($name, $value);
}
}

$response = $response->withHeader(self::HEADER_REPLAYED, 'true');

return $response->withBody($this->streamFactory->createStream($stored->body));
}

/**
* @return array<string, list<string>>
*/
private function filterHeaders(ResponseInterface $response): array
{
$kept = [];
foreach ($this->allowedResponseHeaders as $name) {
if (!$response->hasHeader($name)) {
continue;
}

$kept[$name] = array_values($response->getHeader($name));
}

return $kept;
}

private function isStreaming(ResponseInterface $response): bool
{
if (strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') {
return true;
}

$contentType = strtolower($response->getHeaderLine('Content-Type'));

return str_starts_with($contentType, 'text/event-stream');
}

private function isValidKey(string $key): bool
{
if ($key === '' || \strlen($key) > 255) {
return false;
}

// Reject ASCII control characters (including tab/newline) and any
// whitespace; the spec leaves the rest of the printable set alone.
return preg_match('/[\x00-\x20\x7F\s]/', $key) !== 1;
}

private function errorResponse(int $status, string $message): ResponseInterface
{
$body = json_encode(['error' => $message], JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
if ($body === false) {
$body = '{"error":"idempotency error"}';
}

$response = $this->responseFactory->createResponse($status);
$response = $response->withHeader('Content-Type', 'application/json');

return $response->withBody($this->streamFactory->createStream($body));
}
}
6 changes: 5 additions & 1 deletion src/Altair/Idempotency/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
"source": "https://github.com/univeros/framework"
},
"require": {
"php": ">=8.3"
"php": ">=8.3",
"psr/http-factory": "^1.1",
"psr/http-message": "^1.1 || ^2.0",
"psr/http-server-handler": "^1.0",
"psr/http-server-middleware": "^1.0"
},
"suggest": {
"ext-apcu": "Required for the single-host ApcuStore adapter.",
Expand Down
69 changes: 69 additions & 0 deletions tests/Idempotency/Hash/RequestBodyHasherTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

declare(strict_types=1);

namespace Altair\Tests\Idempotency\Hash;

use Altair\Idempotency\Hash\RequestBodyHasher;
use Laminas\Diactoros\ServerRequest;
use Laminas\Diactoros\Stream;
use PHPUnit\Framework\TestCase;

final class RequestBodyHasherTest extends TestCase
{
public function testHashesBodyBytes(): void
{
$request = $this->requestWithBody('{"a":1}');

$hash = (new RequestBodyHasher())->hash($request);

self::assertSame(hash('sha256', '{"a":1}'), $hash);
}

public function testDifferentBodiesProduceDifferentHashes(): void
{
$a = (new RequestBodyHasher())->hash($this->requestWithBody('{"a":1}'));
$b = (new RequestBodyHasher())->hash($this->requestWithBody('{"a":2}'));

self::assertNotSame($a, $b);
}

public function testWhitespaceCountsTowardsHash(): void
{
// Semantically equivalent JSON bodies produce different hashes.
// Applications that want canonical hashing add a canonicalising
// middleware upstream of this one.
$a = (new RequestBodyHasher())->hash($this->requestWithBody('{"a":1}'));
$b = (new RequestBodyHasher())->hash($this->requestWithBody('{"a": 1}'));

self::assertNotSame($a, $b);
}

public function testRewindsBodyForDownstreamConsumers(): void
{
$request = $this->requestWithBody('{"a":1}');
$hasher = new RequestBodyHasher();

$hasher->hash($request);

$body = $request->getBody();
$body->rewind();
self::assertSame('{"a":1}', (string) $body);
}

public function testEmptyBodyHashesDeterministically(): void
{
$hash = (new RequestBodyHasher())->hash($this->requestWithBody(''));

self::assertSame(hash('sha256', ''), $hash);
}

private function requestWithBody(string $contents): ServerRequest
{
$stream = new Stream('php://temp', 'r+');
$stream->write($contents);
$stream->rewind();

return (new ServerRequest())->withBody($stream);
}
}
Loading
Loading