diff --git a/README.md b/README.md index f67d6a1..33c8ba4 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,13 @@ Events are first written to a durable store (in-memory or SQL), then dispatched - **Process** queued events sequentially — each event is routed to every registered subscriber by type - **Two stores out of the box** — in-memory for tests/dev, SQL (MySQL / SQLite) for production - **Two processors out of the box** — standard (fail-fast for local/CI) and silent (log-and-continue for production) -- **Outbox pattern** — events are marked `processed` only after they're successfully dequeued +- **Outbox pattern** — events transition `pending → processing → processed`, with the intermediate state surviving worker crashes +- **Per-listener retries** — configurable `RetryPolicy` plus a hybrid in-process + persisted `RedeliveryTracker`; a single failing listener of an event is retried independently while the others continue +- **Ignored exceptions** — pass a list of `Throwable` classes to silently swallow expected domain failures (no retry, no log, no DB row) +- **Status audit trail** — every event status transition is recorded in `event_outbox_status` for ops visibility - **Scheduled delivery** — set `publishAt` in the future; the processor only picks up events whose time has come - **Worker-safe** — MySQL store uses `FOR UPDATE SKIP LOCKED` to allow multiple workers without double-processing -- **Auto schema** — `SqlEventStore` creates its own tables on first boot, no migrations needed +- **Auto schema** — `SqlEventStore` and `SqlRedeliveryTracker` each create their own tables on first boot, no migrations needed - **Clean architecture boundaries** — `EventSerializer` and `EventHydrator` keep `RawEvent` out of your application layer --- @@ -48,16 +51,21 @@ composer require hivesper/php-events | Class | Role | |---|---| | `RawEvent` | Immutable value object representing a single stored event | -| `EventStore` | Interface — a durable FIFO queue of `RawEvent` | +| `EventStore` | Interface — a durable queue of `RawEvent`, with `add()` / `next()` / `markProcessed()` | | `EventSerializer` | Interface — converts a domain event object into a `SerializedEvent` | | `EventHydrator` | Interface — reconstructs a domain event object from a stored name + payload | | `SerializedEvent` | Value object holding the event `name` and `payload` array | | `EventPublisher` | Serializes a domain event and pushes it into the store, returning its ID | | `EventSubscriberMap` | Registry of `name → callable[]` mappings | | `EventProcessor` | Interface — drains the store and dispatches to subscribers | -| `SequentialEventProcessor` | Built-in processor — dispatches events one by one; exceptions propagate (fail-fast) | -| `SilentSequentialEventProcessor` | Built-in processor — same as above but catches per-listener failures and logs them via PSR-3 | +| `SequentialEventProcessor` | Built-in processor — dispatches events one by one with optional retry policy; exceptions propagate after retries are exhausted (fail-fast) | +| `SilentSequentialEventProcessor` | Same as above but catches per-listener failures after retries are exhausted, logs them via PSR-3, and continues | | `EventSubscriberBuilder` | Fluent builder that produces a ready-to-use `EventSubscriberMap` | +| `RetryPolicy` | Interface — decides whether and when to retry a failed listener | +| `NoRetryPolicy` | Default — never retries | +| `ExponentialBackoffRetryPolicy` | Built-in — five attempts with `100ms / 500ms / 1min / 5min` backoff | +| `RedeliveryTracker` | Interface — persists per-(event, listener) retry state and exposes `retryNow()` for admin tooling | +| `InMemoryRedeliveryTracker` / `SqlRedeliveryTracker` | Tracker implementations | --- @@ -303,15 +311,45 @@ $store = new SqlEventStore($pdo); // schema created here if not present CREATE TABLE event_outbox ( id VARCHAR(36) NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL, - status VARCHAR(255) NOT NULL, -- 'pending' | 'processed' | 'failed' + status VARCHAR(255) NOT NULL, -- 'pending' | 'processing' | 'processed' | 'failed' payload JSON NOT NULL, - created_at DATETIME NOT NULL, - publish_at DATETIME NOT NULL, - INDEX idx_event_outbox_status_publish (status, publish_at) + created_at DATETIME(6) NOT NULL, + publish_at DATETIME(6) NOT NULL, + INDEX idx_event_outbox_status_publish (status, publish_at), + INDEX idx_event_outbox_created_at (created_at) +); + +-- Audit trail: one row per event status transition (pending → processing → processed) +CREATE TABLE event_outbox_status ( + event_id VARCHAR(36) NOT NULL, + status VARCHAR(255) NOT NULL, + error_message TEXT, + created_at DATETIME(6) NOT NULL, + INDEX idx_event_outbox_status_event_created (event_id, created_at DESC) ); ``` -> MySQL workers use `SELECT … FOR UPDATE SKIP LOCKED` for safe concurrent processing. +> MySQL workers use `SELECT … FOR UPDATE SKIP LOCKED` on the `event_outbox` table for safe +> concurrent processing. + +If you wire up a `RedeliveryTracker` (see [Automatic retry & failure tracking](#automatic-retry--failure-tracking)), +a third table `event_outbox_redelivery` is created on demand for per-listener retry state. + +### Event lifecycle + +The processor advances each event through three states: + +1. `pending` — written by `add()` inside the caller's transaction, alongside a matching audit row. +2. `processing` — set by `next()` when the worker claims the event. The row is *claimed* but not + yet declared finished. The audit table gets a second row. +3. `processed` — set by `markProcessed()` once **every** listener for the event has settled + (succeeded, been persisted to the redelivery queue, been swallowed by the ignored-exceptions + list, or been marked permanently failed). The audit table gets a third row. + +If a worker dies between `next()` and `markProcessed()`, the row stays in `processing` — +intentionally. Any redelivery rows that *did* get persisted before the crash remain durable, so +listener-level retries still fire when their time comes. The stuck `processing` row is detectable +by querying `event_outbox_status` (see [Future work](#future-work)). --- @@ -386,11 +424,35 @@ class MyProcessor implements EventProcessor { while ($event = $store->next()) { // your dispatch logic + $store->markProcessed($event->id); } } } ``` +> Custom processors must call `$store->markProcessed($event->id)` once dispatch for an event is +> complete. `next()` only moves the row from `pending` to `processing`; the final advance to +> `processed` is the processor's responsibility, so it can hold the row in `processing` while it +> drives any per-listener retries. + +### Custom EventStore + +If you implement your own `EventStore`, you need to satisfy the new `markProcessed()` method +alongside `add()` and `next()`. For an in-memory or queue-style store with no persisted status, +this is a one-liner: + +```php +class MyEventStore implements EventStore +{ + public function add(RawEvent $event): void { /* ... */ } + public function next(): ?RawEvent { /* ... */ } + public function markProcessed(string $eventId): void + { + // No-op when there's no persisted status to flip. + } +} +``` + --- ## Silent event processing @@ -443,20 +505,249 @@ $processor = new SilentSequentialEventProcessor($subscribers, app('log')); | Local / CI | `SequentialEventProcessor` | Any listener exception propagates immediately — nothing is hidden | | Production | `SilentSequentialEventProcessor` | A failing listener is logged and skipped; all other listeners and subsequent events continue processing | -> **Coming soon:** Automatic retry and failure tracking will be added to the base processor. `SilentSequentialEventProcessor` is designed to remain the right choice for production once that lands — it already captures everything needed to identify and reprocess a specific failed listener. +Both processors support [Automatic retry & failure tracking](#automatic-retry--failure-tracking) +through the same constructor parameters. The retry policy and the redelivery tracker are opt-in: +default behaviour for both processors is unchanged from earlier versions. + +--- + +## Automatic retry & failure tracking + +A failed listener should not become a silent loss. This library supports automatic retry and +durable failure tracking on a **per-(event, listener)** basis: if a single listener of an event +fails, only that listener is retried — the others continue to run and successful deliveries are +not re-fired. + +The model is **hybrid**: + +1. A small in-process retry burst (sleep + retry) for fast-recovery transient failures. +2. A persisted, scheduled redelivery for slower-recovery failures and for surviving worker crashes. + +Once configured, the same retry behaviour applies to both `SequentialEventProcessor` (which +fails fast after retries are exhausted) and `SilentSequentialEventProcessor` (which logs and +continues). + +### Retry policy + +A `RetryPolicy` decides whether a failed dispatch should be retried, and at what time: + +```php +use Vesper\Tool\Event\Retry\RetryPolicy; + +interface RetryPolicy +{ + /** @return CarbonImmutable|null null when no further retries should be made */ + public function nextRetryAt(int $previousAttempt): ?CarbonImmutable; +} +``` + +Two implementations ship out of the box: + +- **`NoRetryPolicy`** *(default)* — never retries. Same behaviour as before this feature existed, + so wiring up the new processor parameters without choosing a policy is a no-op. +- **`ExponentialBackoffRetryPolicy`** — five total attempts (one initial + four retries) with + delays of `100ms, 500ms, 1min, 5min` by default. The 100ms / 500ms retries happen in-process; + the 1min / 5min retries are persisted and picked up on a future processor run. + +```php +use Vesper\Tool\Event\Infrastructure\Retry\ExponentialBackoffRetryPolicy; + +// Default delays — 100ms, 500ms, 1min, 5min. +$retryPolicy = new ExponentialBackoffRetryPolicy(); + +// Or roll your own delays: +$retryPolicy = new ExponentialBackoffRetryPolicy(delaysMs: [50, 250, 1_000, 30_000]); +``` + +The processor classifies each delay as **in-process** (sleep + retry on the same worker) when it +is less than `inProcessRetryThresholdMs` (default `1000`), and **persisted** otherwise. Persisted +retries require a `RedeliveryTracker` to be configured — without one, a long-delay retry is +treated as exhausted and reported. + +### Redelivery tracker + +The `RedeliveryTracker` interface owns per-listener retry state — when an attempt failed, how +many attempts have been made, when the next one should run, what the last error was. Two +implementations: + +- **`InMemoryRedeliveryTracker`** — array-backed, for tests and dev. +- **`SqlRedeliveryTracker`** — durable, MySQL/SQLite-compatible. Auto-creates its + `event_outbox_redelivery` table on first construction. Worker-safe via `FOR UPDATE SKIP LOCKED` + on MySQL. + +```php +use Vesper\Tool\Event\Infrastructure\SqlRedeliveryTracker; + +$tracker = new SqlRedeliveryTracker($pdo); // schema created here if not present +``` + +### Wiring a processor with retries + +```php +use Vesper\Tool\Event\Infrastructure\Retry\ExponentialBackoffRetryPolicy; +use Vesper\Tool\Event\Infrastructure\SilentSequentialEventProcessor; +use Vesper\Tool\Event\Infrastructure\SqlRedeliveryTracker; + +$processor = new SilentSequentialEventProcessor( + subscribers: $subscribers, + logger: $logger, + retryPolicy: new ExponentialBackoffRetryPolicy(), + redeliveryTracker: new SqlRedeliveryTracker($pdo), + ignoredExceptions: [ + UserNotFoundException::class, + InvalidPayloadException::class, + ], +); + +$processor->process($store); +``` + +A single call to `process()` will: + +1. Drain new pending events from the store and dispatch them to every registered listener. +2. After the main queue is empty, drain any **due** redeliveries — listener failures from earlier + runs whose `next_retry_at` has now passed. + +Long backoffs (e.g. the default 1min / 5min steps) won't be drained until a future `process()` +call after their `next_retry_at` passes — which is exactly how outbox workers already poll on a +schedule. + +### Ignored exceptions (skip-list) + +Some listener failures are not bugs — they're expected domain outcomes that the application +already handles upstream (e.g. `UserNotFoundException`, `OrderAlreadyShipped`). Retrying them +wastes time and reporting them spams the error tracker. + +The `ignoredExceptions` constructor parameter takes a list of `Throwable` class-strings. +Matching is `instanceof`-based, so subclasses are also matched. When a listener throws an +ignored exception: + +- **No retry attempt** — the policy is skipped entirely. +- **No PSR-3 log line** (in `SilentSequentialEventProcessor`). +- **No row written to `event_outbox_redelivery`.** +- **No exception propagation** in `SequentialEventProcessor` either — this is treated as + silent success, the next listener for the same event runs as normal. + +The recommended pattern is to share the same list with whatever already configures your +application's error reporter (Sentry/Bugsnag/etc.) so behaviour stays consistent across the +boundary: anything your app considers "expected and not worth a page" is also considered +expected here. + +### Permanently failed dispatches + +When a listener has exhausted its retries (or `nextRetryAt` returns `null` immediately because +no retry policy is configured), it is recorded in `event_outbox_redelivery` with +`status = 'failed'`. The row stays in the table so operators can inspect it. The behaviour +differs slightly between processors: + +| Processor | On exhaustion | +|---|---| +| `SequentialEventProcessor` | Marks the row `failed`, then **rethrows** the original exception (fail-fast). The event row stays in `processing` — no `markProcessed()` is called. | +| `SilentSequentialEventProcessor` | Marks the row `failed`, **logs** via PSR-3 (same shape as today), and **swallows** the exception so the next listener runs. The event eventually advances to `processed`. | + +### Re-triggering a failed dispatch + +`RedeliveryTracker::retryNow($eventId, $listener)` re-queues a dispatch for immediate retry, +regardless of its current status (including `failed`). The attempt count is preserved — the +retry policy's max-attempts ceiling still applies on subsequent automatic failures. The library +ships no CLI; wire `retryNow()` into whatever admin surface you prefer (admin UI, Slack +command, console script, etc.). + +For listing failures, query `event_outbox_redelivery` directly. + +### Listener identity and closures + +The redelivery row's `listener` column is the class name for class-string subscribers, the +class name for invokable objects (`get_class($obj)`), or the literal string `'Closure'` for +anonymous closures. Class-string and invokable-object listeners can be reliably retried across +processes; closures cannot (their identity is not stable across process boundaries). If your +listener registrations and your retry policy together require closure tracking, use a class +that implements `__invoke()` instead. --- ## RawEventStatus ```php -RawEventStatus::pending // event is waiting to be processed -RawEventStatus::processed // event was successfully dequeued -RawEventStatus::failed // reserved for failed-delivery tracking +RawEventStatus::pending // event is waiting to be processed +RawEventStatus::processing // event has been claimed by a worker; dispatch in flight +RawEventStatus::processed // event was successfully dispatched to all listeners +RawEventStatus::failed // reserved for future event-level fatal use (see "Future work") ``` --- +## Future work + +This section captures known gaps and what we'd want to add. The redelivery layer is the headline +feature shipped now; what follows is intentionally deferred so we can let real usage shape it. + +### Stuck-events monitor + +If a worker dies between `next()` and `markProcessed()`, the row stays in +`event_outbox.status = 'processing'` indefinitely. Redelivery rows that *did* get persisted are +durable and will still fire — but the parent event row is wedged. + +Detection query (works today against existing tables): + +```sql +SELECT id, name, created_at, publish_at +FROM event_outbox +WHERE status = 'processing' + AND id IN ( + SELECT event_id FROM event_outbox_status + WHERE status = 'processing' + AND created_at < NOW() - INTERVAL 30 MINUTE -- pick a threshold that matches your workload + ); +``` + +A future sweeper should support three recovery modes: + +- **Re-claim** (`processing → pending`) — safe **only** if listeners are idempotent. Re-dispatch + may re-fire listeners that already succeeded. +- **Force-complete** (`processing → processed`) — safe only when the dispatch is believed to + have finished but the bookkeeping commit was lost. Should write a `processed` audit row marked + as recovered so dashboards can distinguish organic vs. recovered transitions. +- **Mark dead** (`processing → failed`) — finally puts `RawEventStatus::failed` to use; operator + decides next steps. + +### Schema columns to consider when the monitor lands + +Based on Spring Modulith's `EVENT_PUBLICATION` table (the closest production-grade analogue): + +| Column | Why we'd want it | +|---|---| +| `completion_attempts` (INT) on `event_outbox` | How many times this event has been claimed for processing. Increments on every `next()`. Distinguishes "stuck once" from "repeatedly poisoning workers". | +| `last_resubmission_date` (TIMESTAMP) on `event_outbox` | Distinguishes organic stuck rows from rows an operator has already touched. | +| A `RESUBMITTED` enum case (or a `recovered_at` / `recovered_by` column on `event_outbox_status`) | So dashboards can show "rescued by hand" separately from happy path. | + +The existing `event_outbox_redelivery` table already tracks `attempt_number` and `last_error` at +the *listener* level; the columns above are for *event-level* tracking, which we don't need yet +but will once the monitor lands. + +### Operational signals you can build today + +Without waiting for the in-library monitor, these are queryable from the existing tables: + +- "Events stuck in `processing` for more than T" — the SQL above. +- "Listeners with permanent failures" — `SELECT * FROM event_outbox_redelivery WHERE status = 'failed'`. +- "Average time-between rows in `event_outbox_status` by status pair" — reveals dispatch latency. + +### What `RawEventStatus::failed` is reserved for + +A natural future use: a sweep over `event_outbox_redelivery` that, when **every** listener for +an event has permanently failed and there's nothing left to retry, marks the event row itself +`failed`. Computable from the existing tables; out of scope for this PR. The enum case is +documented as reserved so future readers don't think it's dead code. + +### Reference reading + +- [Spring Modulith — Event Publication Registry](https://docs.spring.io/spring-modulith/reference/events.html) — closest reference shape (per-listener rows, status enum including `PROCESSING` / `RESUBMITTED`, completion attempts). +- [gruelbox/transaction-outbox](https://github.com/gruelbox/transaction-outbox) — alternative model with no `processing` state (uses optimistic-lock `version` column + `nextAttemptTime` lease). Worth understanding for context on why our model needs `processing` and theirs doesn't (their event = single dispatch; ours = multiple listeners). +- [AWS Prescriptive Guidance: Transactional Outbox](https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html) — high-level pattern doc. + +--- + ## Testing ```bash diff --git a/src/DueRedelivery.php b/src/DueRedelivery.php new file mode 100644 index 0000000..8c898c8 --- /dev/null +++ b/src/DueRedelivery.php @@ -0,0 +1,17 @@ +queue) ?? null; } + + #[Override] public function markProcessed(string $eventId): void + { + // No-op: the in-memory queue discards events on next(); there is no persisted status to flip. + } } diff --git a/src/Infrastructure/InMemoryRedeliveryTracker.php b/src/Infrastructure/InMemoryRedeliveryTracker.php new file mode 100644 index 0000000..448395c --- /dev/null +++ b/src/Infrastructure/InMemoryRedeliveryTracker.php @@ -0,0 +1,125 @@ + */ + private array $rows = []; + + #[Override] + public function schedule( + RawEvent $event, + string $listener, + int $attemptNumber, + CarbonImmutable $nextRetryAt, + Throwable $lastError, + ): void { + $key = self::key($event->id, $listener); + $now = CarbonImmutable::now(); + + $this->rows[$key] = [ + 'event' => $event, + 'listener' => $listener, + 'status' => self::STATUS_PENDING, + 'attempt_number' => $attemptNumber, + 'next_retry_at' => $nextRetryAt, + 'last_error' => self::formatError($lastError), + 'created_at' => $this->rows[$key]['created_at'] ?? $now, + 'updated_at' => $now, + ]; + } + + #[Override] + public function nextDue(): ?DueRedelivery + { + $now = CarbonImmutable::now(); + $candidate = null; + + foreach ($this->rows as $row) { + if ($row['status'] !== self::STATUS_PENDING) { + continue; + } + if ($row['next_retry_at'] === null || $row['next_retry_at']->greaterThan($now)) { + continue; + } + if ($candidate === null || $row['next_retry_at']->lessThan($candidate['next_retry_at'])) { + $candidate = $row; + } + } + + if ($candidate === null) { + return null; + } + + return new DueRedelivery( + event: $candidate['event'], + listener: $candidate['listener'], + attemptNumber: $candidate['attempt_number'], + ); + } + + #[Override] + public function markFailedPermanently(string $eventId, string $listener, Throwable $lastError): void + { + $key = self::key($eventId, $listener); + + if (!isset($this->rows[$key])) { + return; + } + + $this->rows[$key]['status'] = self::STATUS_FAILED; + $this->rows[$key]['next_retry_at'] = null; + $this->rows[$key]['last_error'] = self::formatError($lastError); + $this->rows[$key]['updated_at'] = CarbonImmutable::now(); + } + + #[Override] + public function markSucceeded(string $eventId, string $listener): void + { + $key = self::key($eventId, $listener); + + if (!isset($this->rows[$key])) { + return; + } + + $this->rows[$key]['status'] = self::STATUS_SUCCEEDED; + $this->rows[$key]['next_retry_at'] = null; + $this->rows[$key]['updated_at'] = CarbonImmutable::now(); + } + + #[Override] + public function retryNow(string $eventId, string $listener): void + { + $key = self::key($eventId, $listener); + + if (!isset($this->rows[$key])) { + return; + } + + $this->rows[$key]['status'] = self::STATUS_PENDING; + $this->rows[$key]['next_retry_at'] = CarbonImmutable::now(); + $this->rows[$key]['updated_at'] = CarbonImmutable::now(); + } + + private static function key(string $eventId, string $listener): string + { + return $eventId . '|' . $listener; + } + + private static function formatError(Throwable $error): string + { + return $error::class . ': ' . $error->getMessage(); + } +} diff --git a/src/Infrastructure/Retry/ExponentialBackoffRetryPolicy.php b/src/Infrastructure/Retry/ExponentialBackoffRetryPolicy.php new file mode 100644 index 0000000..71522a5 --- /dev/null +++ b/src/Infrastructure/Retry/ExponentialBackoffRetryPolicy.php @@ -0,0 +1,43 @@ + */ + private array $delaysMs; + + /** + * @param list $delaysMs delay before each retry in milliseconds; the i-th element is the + * delay before attempt (i + 2). Default: 100ms, 500ms, 1min, 5min, + * yielding 5 total attempts (one initial + four retries). + */ + public function __construct( + array $delaysMs = [100, 500, 60_000, 300_000], + ) { + foreach ($delaysMs as $ms) { + if ($ms < 0) { + throw new InvalidArgumentException("Retry delays must be non-negative, got {$ms}."); + } + } + + $this->delaysMs = $delaysMs; + } + + #[Override] + public function nextRetryAt(int $previousAttempt): ?CarbonImmutable + { + $index = $previousAttempt - 1; + + if ($index < 0 || $index >= count($this->delaysMs)) { + return null; + } + + return CarbonImmutable::now()->addMilliseconds($this->delaysMs[$index]); + } +} diff --git a/src/Infrastructure/Retry/NoRetryPolicy.php b/src/Infrastructure/Retry/NoRetryPolicy.php new file mode 100644 index 0000000..eb82acb --- /dev/null +++ b/src/Infrastructure/Retry/NoRetryPolicy.php @@ -0,0 +1,16 @@ +prepare( + <<execute(['table' => $table]); + + if ($stmt->fetchColumn()) { + return; + } + + $connection->exec($creationQuery); + } +} diff --git a/src/Infrastructure/Schema/SqliteRedeliverySchema.php b/src/Infrastructure/Schema/SqliteRedeliverySchema.php new file mode 100644 index 0000000..d4e8725 --- /dev/null +++ b/src/Infrastructure/Schema/SqliteRedeliverySchema.php @@ -0,0 +1,38 @@ +exec( + <<exec( + 'CREATE INDEX IF NOT EXISTS idx_redelivery_due + ON event_outbox_redelivery (status, next_retry_at)', + ); + } +} diff --git a/src/Infrastructure/SequentialEventProcessor.php b/src/Infrastructure/SequentialEventProcessor.php index 3807919..80c19d7 100644 --- a/src/Infrastructure/SequentialEventProcessor.php +++ b/src/Infrastructure/SequentialEventProcessor.php @@ -2,23 +2,45 @@ namespace Vesper\Tool\Event\Infrastructure; +use Carbon\CarbonImmutable; +use Closure; use Override; +use RuntimeException; +use Throwable; use Vesper\Tool\Event\EventHydrator; use Vesper\Tool\Event\EventProcessor; use Vesper\Tool\Event\EventStore; use Vesper\Tool\Event\EventSubscriberMap; use Vesper\Tool\Event\HandlerResolver; +use Vesper\Tool\Event\Infrastructure\Retry\NoRetryPolicy; use Vesper\Tool\Event\RawEvent; +use Vesper\Tool\Event\RedeliveryTracker; +use Vesper\Tool\Event\Retry\RetryPolicy; -readonly class SequentialEventProcessor implements EventProcessor +class SequentialEventProcessor implements EventProcessor { /** - * @param EventSubscriberMap $subscribers + * @param EventSubscriberMap $subscribers + * @param list> $ignoredExceptions exceptions thrown by a + * listener that should be + * silently swallowed: no + * retry, no log, no redelivery + * row + * @param int $inProcessRetryThresholdMs the policy's next-retry + * delay is performed in-process + * (sleep + retry) when ≤ this + * value; otherwise the failure + * is persisted for later + * redelivery */ public function __construct( - private EventSubscriberMap $subscribers, - private HandlerResolver $resolver = new DefaultHandlerResolver(), - private EventHydrator $hydrator = new JacksonHydrator(), + private readonly EventSubscriberMap $subscribers, + private readonly HandlerResolver $resolver = new DefaultHandlerResolver(), + private readonly EventHydrator $hydrator = new JacksonHydrator(), + private readonly RetryPolicy $retryPolicy = new NoRetryPolicy(), + private readonly ?RedeliveryTracker $redeliveryTracker = null, + private readonly array $ignoredExceptions = [], + private readonly int $inProcessRetryThresholdMs = 1000, ) { } @@ -28,15 +50,127 @@ public function __construct( foreach ($this->subscribers->of($event->name) as $subscriber) { $this->dispatch($event, $subscriber); } + $store->markProcessed($event->id); + } + + if ($this->redeliveryTracker !== null) { + while ($due = $this->redeliveryTracker->nextDue()) { + $subscriber = $this->findRegisteredSubscriber($due->event->name, $due->listener); + + if ($subscriber === null) { + $this->redeliveryTracker->markFailedPermanently( + $due->event->id, + $due->listener, + new RuntimeException("Listener '{$due->listener}' is no longer registered for event '{$due->event->name}'."), + ); + continue; + } + + $this->dispatch($due->event, $subscriber, $due->attemptNumber); + } } } - protected function dispatch(RawEvent $event, callable|string $subscriber): void + private function findRegisteredSubscriber(string $eventName, string $listenerKey): callable|string|null { - $callable = $this->resolver->resolve($subscriber); + foreach ($this->subscribers->of($eventName) as $subscriber) { + if ($this->listenerKey($subscriber) === $listenerKey) { + return $subscriber; + } + } + return null; + } + /** + * @param int $attemptsMade attempts already made before this dispatch call (0 for a fresh + * event, ≥1 when called from the redelivery drain) + */ + protected function dispatch(RawEvent $event, callable|string $subscriber, int $attemptsMade = 0): void + { + $callable = $this->resolver->resolve($subscriber); $domainEvent = $this->hydrator->hydrate($event->name, $event->payload, $callable); + $listener = $this->listenerKey($subscriber); + + while (true) { + try { + $callable($domainEvent); + $this->redeliveryTracker?->markSucceeded($event->id, $listener); + return; + } catch (Throwable $e) { + if ($this->isIgnored($e)) { + return; + } + + $attemptsMade++; + $nextRetryAt = $this->retryPolicy->nextRetryAt($attemptsMade); + + if ($nextRetryAt === null) { + $this->onPermanentFailure($event, $subscriber, $e); + throw $e; + } + + $delayMs = $this->msUntil($nextRetryAt); + + if ($delayMs <= $this->inProcessRetryThresholdMs) { + $this->sleep($delayMs); + continue; + } + + if ($this->redeliveryTracker !== null) { + $this->redeliveryTracker->schedule($event, $listener, $attemptsMade, $nextRetryAt, $e); + return; + } + + $this->onPermanentFailure($event, $subscriber, $e); + throw $e; + } + } + } - $callable($domainEvent); + /** + * Hook called once a listener's failure can no longer be retried (policy exhausted, or no + * tracker is configured to persist a long-delay retry). Default: persist the permanent-failure + * marker on the redelivery tracker if one is configured. Subclasses can extend (e.g. log). + */ + protected function onPermanentFailure(RawEvent $event, callable|string $subscriber, Throwable $error): void + { + $this->redeliveryTracker?->markFailedPermanently($event->id, $this->listenerKey($subscriber), $error); + } + + protected function listenerKey(callable|string $subscriber): string + { + if (is_string($subscriber)) { + return $subscriber; + } + + if (is_object($subscriber) && !($subscriber instanceof Closure)) { + return $subscriber::class; + } + + return 'Closure'; + } + + protected function sleep(int $milliseconds): void + { + if ($milliseconds <= 0) { + return; + } + usleep($milliseconds * 1000); + } + + private function isIgnored(Throwable $error): bool + { + foreach ($this->ignoredExceptions as $class) { + if ($error instanceof $class) { + return true; + } + } + return false; + } + + private function msUntil(CarbonImmutable $when): int + { + $diffMs = (int) round(CarbonImmutable::now()->diffInMilliseconds($when, absolute: false)); + return max(0, $diffMs); } } diff --git a/src/Infrastructure/SilentSequentialEventProcessor.php b/src/Infrastructure/SilentSequentialEventProcessor.php index 8e878b5..b20ec77 100644 --- a/src/Infrastructure/SilentSequentialEventProcessor.php +++ b/src/Infrastructure/SilentSequentialEventProcessor.php @@ -4,36 +4,52 @@ use Override; use Psr\Log\LoggerInterface; +use Throwable; use Vesper\Tool\Event\EventHydrator; use Vesper\Tool\Event\EventSubscriberMap; use Vesper\Tool\Event\HandlerResolver; +use Vesper\Tool\Event\Infrastructure\Retry\NoRetryPolicy; use Vesper\Tool\Event\RawEvent; -use Throwable; +use Vesper\Tool\Event\RedeliveryTracker; +use Vesper\Tool\Event\Retry\RetryPolicy; -readonly class SilentSequentialEventProcessor extends SequentialEventProcessor +class SilentSequentialEventProcessor extends SequentialEventProcessor { /** - * @param EventSubscriberMap $subscribers + * @param EventSubscriberMap $subscribers + * @param list> $ignoredExceptions */ public function __construct( EventSubscriberMap $subscribers, - private LoggerInterface $logger, + private readonly LoggerInterface $logger, HandlerResolver $resolver = new DefaultHandlerResolver(), EventHydrator $hydrator = new JacksonHydrator(), + RetryPolicy $retryPolicy = new NoRetryPolicy(), + ?RedeliveryTracker $redeliveryTracker = null, + array $ignoredExceptions = [], + int $inProcessRetryThresholdMs = 1000, ) { - parent::__construct($subscribers, $resolver, $hydrator); + parent::__construct( + subscribers: $subscribers, + resolver: $resolver, + hydrator: $hydrator, + retryPolicy: $retryPolicy, + redeliveryTracker: $redeliveryTracker, + ignoredExceptions: $ignoredExceptions, + inProcessRetryThresholdMs: $inProcessRetryThresholdMs, + ); } #[Override] - protected function dispatch(RawEvent $event, callable|string $subscriber): void + protected function dispatch(RawEvent $event, callable|string $subscriber, int $attemptsMade = 0): void { try { - parent::dispatch($event, $subscriber); + parent::dispatch($event, $subscriber, $attemptsMade); } catch (Throwable $exception) { $this->logger->error('Failed to dispatch event to listener.', [ 'event' => $event->name, 'event_id' => $event->id, - 'listener' => is_string($subscriber) ? $subscriber : 'Closure', + 'listener' => $this->listenerKey($subscriber), 'exception' => $exception, ]); } diff --git a/src/Infrastructure/SqlEventStore.php b/src/Infrastructure/SqlEventStore.php index 254513b..2e05a41 100644 --- a/src/Infrastructure/SqlEventStore.php +++ b/src/Infrastructure/SqlEventStore.php @@ -8,6 +8,7 @@ use PDO; use PDOException; use RuntimeException; +use Throwable; use Vesper\Tool\Event\EventStore; use Vesper\Tool\Event\Infrastructure\Schema\MysqlEventStoreSchema; use Vesper\Tool\Event\Infrastructure\Schema\SqliteEventStoreSchema; @@ -22,7 +23,8 @@ public function __construct(private PDO $connection) } /** - * Add a new event to the outbox + * Add a new event to the outbox. Inserts both the event row and a + * 'pending' audit row in the caller's transaction. * * @throws JsonException * @throws PDOException @@ -45,16 +47,91 @@ public function add(RawEvent $event): void 'created_at' => $event->createdAt->format('Y-m-d H:i:s.u'), 'publish_at' => $event->publishAt->format('Y-m-d H:i:s.u'), ]); + + $this->insertStatusAudit($event->id, $event->status->value); } /** - * Retrieve the next pending event (worker-safe) + * Claim the next pending event for this worker (worker-safe via FOR UPDATE + * SKIP LOCKED on MySQL). Transitions the row from `pending` to `processing` + * inside a single transaction together with the audit row insert. * * @throws JsonException * @throws PDOException */ #[Override] public function next(): ?RawEvent + { + $startedTransaction = $this->beginTransactionIfNeeded(); + + try { + $row = $this->fetchNextPendingRow(); + + if ($row === null) { + $this->commitIfStarted($startedTransaction); + return null; + } + + $this->connection->prepare( + "UPDATE event_outbox SET status = 'processing' WHERE id = :id", + )->execute(['id' => $row['id']]); + + $this->insertStatusAudit($row['id'], RawEventStatus::processing->value); + + $this->commitIfStarted($startedTransaction); + + /** @var array $payload */ + $payload = json_decode($row['payload'], true, flags: JSON_THROW_ON_ERROR); + + return RawEvent::retrieve( + id: $row['id'], + name: $row['name'], + status: RawEventStatus::processing, + payload: $payload, + createdAt: new CarbonImmutable($row['created_at']), + publishAt: new CarbonImmutable($row['publish_at']), + ); + } catch (Throwable $e) { + $this->rollBackIfStarted($startedTransaction); + throw $e; + } + } + + /** + * Advance an event from `processing` to `processed`. Inserts the matching + * audit row in the same transaction. The UPDATE is guarded with + * `status = 'processing'` so a stuck row that's already been recovered by + * a sweep won't be silently overwritten. + * + * @throws PDOException + */ + #[Override] + public function markProcessed(string $eventId): void + { + $startedTransaction = $this->beginTransactionIfNeeded(); + + try { + $this->connection->prepare( + <<execute(['id' => $eventId]); + + $this->insertStatusAudit($eventId, RawEventStatus::processed->value); + + $this->commitIfStarted($startedTransaction); + } catch (Throwable $e) { + $this->rollBackIfStarted($startedTransaction); + throw $e; + } + } + + /** + * @return array{id: string, name: string, status: string, payload: string, created_at: string, publish_at: string}|null + */ + private function fetchNextPendingRow(): ?array { $lockClause = $this->lockingClause(); @@ -76,25 +153,48 @@ public function next(): ?RawEvent /** @var array{id: string, name: string, status: string, payload: string, created_at: string, publish_at: string}|false $row */ $row = $stmt->fetch(PDO::FETCH_ASSOC); - if ($row === false) { - return null; + return $row === false ? null : $row; + } + + private function insertStatusAudit(string $eventId, string $status, ?string $errorMessage = null): void + { + $stmt = $this->connection->prepare( + <<execute([ + 'event_id' => $eventId, + 'status' => $status, + 'error_message' => $errorMessage, + 'created_at' => CarbonImmutable::now()->format('Y-m-d H:i:s.u'), + ]); + } + + private function beginTransactionIfNeeded(): bool + { + if ($this->connection->inTransaction()) { + return false; } - $this->connection->prepare( - "UPDATE event_outbox SET status = 'processed' WHERE id = :id", - )->execute(['id' => $row['id']]); + $this->connection->beginTransaction(); + return true; + } - /** @var array $payload */ - $payload = json_decode($row['payload'], true, flags: JSON_THROW_ON_ERROR); + private function commitIfStarted(bool $started): void + { + if ($started) { + $this->connection->commit(); + } + } - return RawEvent::retrieve( - id: $row['id'], - name: $row['name'], - status: RawEventStatus::from($row['status']), - payload: $payload, - createdAt: new CarbonImmutable($row['created_at']), - publishAt: new CarbonImmutable($row['publish_at']), - ); + private function rollBackIfStarted(bool $started): void + { + if ($started && $this->connection->inTransaction()) { + $this->connection->rollBack(); + } } private function ensureOutboxSchema(): void diff --git a/src/Infrastructure/SqlRedeliveryTracker.php b/src/Infrastructure/SqlRedeliveryTracker.php new file mode 100644 index 0000000..143f599 --- /dev/null +++ b/src/Infrastructure/SqlRedeliveryTracker.php @@ -0,0 +1,246 @@ +ensureRedeliverySchema(); + } + + /** + * @throws PDOException + */ + #[Override] + public function schedule( + RawEvent $event, + string $listener, + int $attemptNumber, + CarbonImmutable $nextRetryAt, + Throwable $lastError, + ): void { + $now = CarbonImmutable::now()->format('Y-m-d H:i:s.u'); + $errorMessage = self::formatError($lastError); + $nextRetryAtSql = $nextRetryAt->format('Y-m-d H:i:s.u'); + + $sql = match ($this->driverName()) { + 'mysql' => << << throw new RuntimeException('Unsupported database driver: ' . $this->driverName()), + }; + + $this->connection->prepare($sql)->execute([ + 'event_id' => $event->id, + 'listener' => $listener, + 'status' => self::STATUS_PENDING, + 'attempt_number' => $attemptNumber, + 'next_retry_at' => $nextRetryAtSql, + 'last_error' => $errorMessage, + 'created_at' => $now, + 'updated_at' => $now, + ]); + } + + /** + * @throws JsonException + * @throws PDOException + */ + #[Override] + public function nextDue(): ?DueRedelivery + { + $lockClause = $this->lockingClause(); + + $stmt = $this->connection->prepare( + <<execute([ + 'status' => self::STATUS_PENDING, + 'now' => CarbonImmutable::now()->format('Y-m-d H:i:s.u'), + ]); + + /** @var array{event_id: string, listener: string, attempt_number: int, event_name: string, event_status: string, event_payload: string, event_created_at: string, event_publish_at: string}|false $row */ + $row = $stmt->fetch(PDO::FETCH_ASSOC); + + if ($row === false) { + return null; + } + + /** @var array $payload */ + $payload = json_decode($row['event_payload'], true, flags: JSON_THROW_ON_ERROR); + + $event = RawEvent::retrieve( + id: $row['event_id'], + name: $row['event_name'], + status: RawEventStatus::from($row['event_status']), + payload: $payload, + createdAt: new CarbonImmutable($row['event_created_at']), + publishAt: new CarbonImmutable($row['event_publish_at']), + ); + + return new DueRedelivery( + event: $event, + listener: $row['listener'], + attemptNumber: (int) $row['attempt_number'], + ); + } + + /** + * @throws PDOException + */ + #[Override] + public function markFailedPermanently(string $eventId, string $listener, Throwable $lastError): void + { + $this->connection->prepare( + <<execute([ + 'status' => self::STATUS_FAILED, + 'last_error' => self::formatError($lastError), + 'updated_at' => CarbonImmutable::now()->format('Y-m-d H:i:s.u'), + 'event_id' => $eventId, + 'listener' => $listener, + ]); + } + + /** + * @throws PDOException + */ + #[Override] + public function markSucceeded(string $eventId, string $listener): void + { + $this->connection->prepare( + <<execute([ + 'status' => self::STATUS_SUCCEEDED, + 'updated_at' => CarbonImmutable::now()->format('Y-m-d H:i:s.u'), + 'event_id' => $eventId, + 'listener' => $listener, + ]); + } + + /** + * @throws PDOException + */ + #[Override] + public function retryNow(string $eventId, string $listener): void + { + $now = CarbonImmutable::now()->format('Y-m-d H:i:s.u'); + + $this->connection->prepare( + <<execute([ + 'status' => self::STATUS_PENDING, + 'next_retry_at' => $now, + 'updated_at' => $now, + 'event_id' => $eventId, + 'listener' => $listener, + ]); + } + + private static function formatError(Throwable $error): string + { + return $error::class . ': ' . $error->getMessage(); + } + + private function ensureRedeliverySchema(): void + { + $driver = $this->driverName(); + + match ($driver) { + 'mysql' => MysqlRedeliverySchema::create($this->connection), + 'sqlite' => SqliteRedeliverySchema::create($this->connection), + default => throw new RuntimeException('Unsupported database driver: ' . $driver), + }; + } + + private function lockingClause(): string + { + return match ($this->driverName()) { + 'mysql' => 'FOR UPDATE SKIP LOCKED', + default => '', + }; + } + + private function driverName(): string + { + $driver = $this->connection->getAttribute(PDO::ATTR_DRIVER_NAME); + assert(is_string($driver)); + + return $driver; + } +} diff --git a/src/RawEventStatus.php b/src/RawEventStatus.php index 0ab40a8..d5eed18 100644 --- a/src/RawEventStatus.php +++ b/src/RawEventStatus.php @@ -5,6 +5,7 @@ enum RawEventStatus: string { case pending = 'pending'; + case processing = 'processing'; case processed = 'processed'; case failed = 'failed'; } diff --git a/src/RedeliveryTracker.php b/src/RedeliveryTracker.php new file mode 100644 index 0000000..325d21e --- /dev/null +++ b/src/RedeliveryTracker.php @@ -0,0 +1,54 @@ +id and + * join event_outbox to look the rest up. + */ + public function schedule( + RawEvent $event, + string $listener, + int $attemptNumber, + CarbonImmutable $nextRetryAt, + Throwable $lastError, + ): void; + + /** + * Pick up the next due redelivery (worker-safe; locks the row on MySQL). + * Returns null when none are due. + */ + public function nextDue(): ?DueRedelivery; + + /** + * Mark a redelivery as permanently failed — no further automatic attempts. + * The row stays in the table so operators can inspect it and call retryNow() + * if appropriate. + */ + public function markFailedPermanently(string $eventId, string $listener, Throwable $lastError): void; + + /** + * Mark a redelivery as succeeded. The row stays in the table for audit; + * future nextDue() calls will not return it. + */ + public function markSucceeded(string $eventId, string $listener): void; + + /** + * Application-side admin API: re-queue a redelivery for immediate retry. + * Sets status to pending_retry and next_retry_at to now(), regardless of the + * row's current status (including 'failed'). Attempt count is preserved so + * the retry policy's max-attempts ceiling still applies on subsequent + * automatic failures. + */ + public function retryNow(string $eventId, string $listener): void; +} diff --git a/src/Retry/RetryPolicy.php b/src/Retry/RetryPolicy.php new file mode 100644 index 0000000..983cca7 --- /dev/null +++ b/src/Retry/RetryPolicy.php @@ -0,0 +1,19 @@ +pdo = new PDO('sqlite::memory:'); + $this->pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); + + $this->store = new SqlEventStore($this->pdo); + } + + public function test_add_inserts_pending_audit_row(): void + { + $event = self::createEvent('order.placed'); + + $this->store->add($event); + + $rows = $this->fetchAuditRows($event->id); + + self::assertSame([['status' => 'pending', 'error_message' => null]], $rows); + } + + public function test_next_transitions_event_to_processing_and_inserts_audit_row(): void + { + $event = self::createEvent('order.placed'); + $this->store->add($event); + + $retrieved = $this->store->next(); + + self::assertNotNull($retrieved); + self::assertSame(RawEventStatus::processing, $retrieved->status, 'returned RawEvent reflects new status'); + self::assertSame('processing', $this->fetchEventStatus($event->id), 'event_outbox row was flipped to processing'); + + $rows = $this->fetchAuditRows($event->id); + self::assertCount(2, $rows); + self::assertSame('pending', $rows[0]['status']); + self::assertSame('processing', $rows[1]['status']); + } + + public function test_mark_processed_advances_event_and_inserts_audit_row(): void + { + $event = self::createEvent('order.placed'); + $this->store->add($event); + $this->store->next(); + + $this->store->markProcessed($event->id); + + self::assertSame('processed', $this->fetchEventStatus($event->id)); + + $rows = $this->fetchAuditRows($event->id); + self::assertCount(3, $rows); + self::assertSame('pending', $rows[0]['status']); + self::assertSame('processing', $rows[1]['status']); + self::assertSame('processed', $rows[2]['status']); + } + + public function test_next_does_not_pick_up_a_row_already_in_processing(): void + { + $event = self::createEvent('order.placed'); + $this->store->add($event); + $this->store->next(); // pending → processing + + // A second call must not pick up the processing row (it's not pending anymore). + self::assertNull($this->store->next()); + } + + public function test_mark_processed_is_a_noop_when_row_is_not_in_processing(): void + { + $event = self::createEvent('order.placed'); + $this->store->add($event); + + // Skip next(); call markProcessed directly. The guard `WHERE status='processing'` should + // make the UPDATE a no-op for a row still in 'pending'. + $this->store->markProcessed($event->id); + + self::assertSame('pending', $this->fetchEventStatus($event->id), 'pending row is left untouched'); + } + + /** + * @return array + */ + private function fetchAuditRows(string $eventId): array + { + $stmt = $this->pdo->prepare( + 'SELECT status, error_message FROM event_outbox_status WHERE event_id = :id ORDER BY created_at', + ); + $stmt->execute(['id' => $eventId]); + + /** @var array */ + return $stmt->fetchAll(PDO::FETCH_ASSOC); + } + + private function fetchEventStatus(string $eventId): string + { + $stmt = $this->pdo->prepare('SELECT status FROM event_outbox WHERE id = :id'); + $stmt->execute(['id' => $eventId]); + $value = $stmt->fetchColumn(); + self::assertIsString($value); + return $value; + } + + /** + * @param array $payload + */ + private static function createEvent( + string $name, + array $payload = [], + ?CarbonImmutable $publishAt = null, + ): RawEvent { + return RawEvent::create( + name: $name, + payload: $payload, + publishAt: $publishAt ?? CarbonImmutable::now()->subSecond(), + ); + } +} diff --git a/tests/Feature/SqlRedeliveryTrackerTest.php b/tests/Feature/SqlRedeliveryTrackerTest.php new file mode 100644 index 0000000..1058659 --- /dev/null +++ b/tests/Feature/SqlRedeliveryTrackerTest.php @@ -0,0 +1,193 @@ +pdo = new PDO('sqlite::memory:'); + $this->pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); + + $this->store = new SqlEventStore($this->pdo); + $this->tracker = new SqlRedeliveryTracker($this->pdo); + } + + public function test_schema_is_idempotent_across_multiple_instantiations(): void + { + new SqlRedeliveryTracker($this->pdo); + new SqlRedeliveryTracker($this->pdo); + + self::assertTrue(true); + } + + public function test_next_due_returns_null_when_table_is_empty(): void + { + self::assertNull($this->tracker->nextDue()); + } + + public function test_schedule_then_next_due_round_trip(): void + { + $event = $this->insertEvent(); + + $this->tracker->schedule( + event: $event, + listener: 'App\\Listener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + + $due = $this->tracker->nextDue(); + + self::assertNotNull($due); + self::assertSame($event->id, $due->event->id); + self::assertSame('App\\Listener', $due->listener); + self::assertSame(1, $due->attemptNumber); + self::assertSame($event->name, $due->event->name); + } + + public function test_next_due_excludes_rows_whose_retry_time_is_in_the_future(): void + { + $event = $this->insertEvent(); + + $this->tracker->schedule( + event: $event, + listener: 'App\\Listener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->addMinute(), + lastError: new RuntimeException('boom'), + ); + + self::assertNull($this->tracker->nextDue()); + } + + public function test_schedule_is_idempotent_and_updates_on_repeat(): void + { + $event = $this->insertEvent(); + + $this->tracker->schedule( + event: $event, + listener: 'App\\Listener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->addMinute(), + lastError: new RuntimeException('first'), + ); + $this->tracker->schedule( + event: $event, + listener: 'App\\Listener', + attemptNumber: 2, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('second'), + ); + + $due = $this->tracker->nextDue(); + + self::assertNotNull($due); + self::assertSame(2, $due->attemptNumber, 'second schedule overwrote the first row'); + + $rows = $this->fetchAllRedeliveryRows($event->id); + self::assertCount(1, $rows, 'no duplicate rows for (event_id, listener)'); + } + + public function test_mark_succeeded_removes_row_from_due_queue(): void + { + $event = $this->insertEvent(); + + $this->tracker->schedule( + event: $event, + listener: 'App\\Listener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + + $this->tracker->markSucceeded($event->id, 'App\\Listener'); + + self::assertNull($this->tracker->nextDue()); + self::assertSame('succeeded', $this->fetchRedeliveryStatus($event->id, 'App\\Listener')); + } + + public function test_mark_failed_permanently_removes_row_from_due_queue(): void + { + $event = $this->insertEvent(); + + $this->tracker->schedule( + event: $event, + listener: 'App\\Listener', + attemptNumber: 5, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + + $this->tracker->markFailedPermanently($event->id, 'App\\Listener', new RuntimeException('final')); + + self::assertNull($this->tracker->nextDue()); + self::assertSame('failed', $this->fetchRedeliveryStatus($event->id, 'App\\Listener')); + } + + public function test_retry_now_re_queues_a_permanently_failed_row_preserving_attempt_count(): void + { + $event = $this->insertEvent(); + + $this->tracker->schedule( + event: $event, + listener: 'App\\Listener', + attemptNumber: 5, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + $this->tracker->markFailedPermanently($event->id, 'App\\Listener', new RuntimeException('final')); + + $this->tracker->retryNow($event->id, 'App\\Listener'); + + $due = $this->tracker->nextDue(); + self::assertNotNull($due); + self::assertSame(5, $due->attemptNumber, 'attempt count is preserved across retryNow()'); + } + + private function insertEvent(): RawEvent + { + $event = TestEventFactory::retrieveOrderPlaced(['order_id' => 1]); + $this->store->add($event); + return $event; + } + + /** + * @return array> + */ + private function fetchAllRedeliveryRows(string $eventId): array + { + $stmt = $this->pdo->prepare( + 'SELECT * FROM event_outbox_redelivery WHERE event_id = :id', + ); + $stmt->execute(['id' => $eventId]); + + /** @var array> */ + return $stmt->fetchAll(PDO::FETCH_ASSOC); + } + + private function fetchRedeliveryStatus(string $eventId, string $listener): string + { + $stmt = $this->pdo->prepare( + 'SELECT status FROM event_outbox_redelivery WHERE event_id = :id AND listener = :listener', + ); + $stmt->execute(['id' => $eventId, 'listener' => $listener]); + $value = $stmt->fetchColumn(); + self::assertIsString($value); + return $value; + } +} diff --git a/tests/Unit/InMemoryRedeliveryTrackerTest.php b/tests/Unit/InMemoryRedeliveryTrackerTest.php new file mode 100644 index 0000000..39061ba --- /dev/null +++ b/tests/Unit/InMemoryRedeliveryTrackerTest.php @@ -0,0 +1,171 @@ +tracker = new InMemoryRedeliveryTracker(); + $this->event = TestEventFactory::retrieveOrderPlaced(); + } + + public function test_next_due_returns_null_when_empty(): void + { + self::assertNull($this->tracker->nextDue()); + } + + public function test_schedule_makes_redelivery_pickable_when_time_passes(): void + { + $this->tracker->schedule( + event: $this->event, + listener: 'App\\SomeListener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + + $due = $this->tracker->nextDue(); + + self::assertNotNull($due); + self::assertSame($this->event->id, $due->event->id); + self::assertSame('App\\SomeListener', $due->listener); + self::assertSame(1, $due->attemptNumber); + } + + public function test_next_due_excludes_rows_whose_retry_time_is_in_the_future(): void + { + $this->tracker->schedule( + event: $this->event, + listener: 'App\\SomeListener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->addMinute(), + lastError: new RuntimeException('boom'), + ); + + self::assertNull($this->tracker->nextDue()); + } + + public function test_next_due_returns_earliest_scheduled_first(): void + { + $eventEarly = TestEventFactory::retrieveOrderPlaced(['n' => 1]); + $eventLate = TestEventFactory::retrieveOrderPlaced(['n' => 2]); + + $this->tracker->schedule( + event: $eventLate, + listener: 'App\\Late', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + $this->tracker->schedule( + event: $eventEarly, + listener: 'App\\Early', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->subMinute(), + lastError: new RuntimeException('boom'), + ); + + $due = $this->tracker->nextDue(); + + self::assertNotNull($due); + self::assertSame('App\\Early', $due->listener); + } + + public function test_mark_succeeded_removes_row_from_due_queue(): void + { + $this->tracker->schedule( + event: $this->event, + listener: 'App\\Listener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + + $this->tracker->markSucceeded($this->event->id, 'App\\Listener'); + + self::assertNull($this->tracker->nextDue()); + } + + public function test_mark_failed_permanently_removes_row_from_due_queue(): void + { + $this->tracker->schedule( + event: $this->event, + listener: 'App\\Listener', + attemptNumber: 5, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + + $this->tracker->markFailedPermanently($this->event->id, 'App\\Listener', new RuntimeException('final')); + + self::assertNull($this->tracker->nextDue()); + } + + public function test_retry_now_re_queues_a_permanently_failed_row(): void + { + $this->tracker->schedule( + event: $this->event, + listener: 'App\\Listener', + attemptNumber: 5, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('boom'), + ); + $this->tracker->markFailedPermanently($this->event->id, 'App\\Listener', new RuntimeException('final')); + + $this->tracker->retryNow($this->event->id, 'App\\Listener'); + + $due = $this->tracker->nextDue(); + + self::assertNotNull($due); + self::assertSame('App\\Listener', $due->listener); + self::assertSame(5, $due->attemptNumber, 'attempt count is preserved across retryNow()'); + } + + public function test_schedule_is_idempotent_on_event_id_listener(): void + { + $this->tracker->schedule( + event: $this->event, + listener: 'App\\Listener', + attemptNumber: 1, + nextRetryAt: CarbonImmutable::now()->addMinute(), + lastError: new RuntimeException('boom'), + ); + $this->tracker->schedule( + event: $this->event, + listener: 'App\\Listener', + attemptNumber: 2, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('again'), + ); + + $due = $this->tracker->nextDue(); + + self::assertNotNull($due); + self::assertSame(2, $due->attemptNumber, 'rescheduling updates attempt count, does not insert a duplicate'); + } + + public function test_mark_succeeded_is_a_noop_for_unknown_row(): void + { + $this->tracker->markSucceeded('unknown-id', 'App\\Listener'); + + self::assertNull($this->tracker->nextDue()); + } + + public function test_retry_now_is_a_noop_for_unknown_row(): void + { + $this->tracker->retryNow('unknown-id', 'App\\Listener'); + + self::assertNull($this->tracker->nextDue()); + } +} diff --git a/tests/Unit/Retry/ExponentialBackoffRetryPolicyTest.php b/tests/Unit/Retry/ExponentialBackoffRetryPolicyTest.php new file mode 100644 index 0000000..c43e647 --- /dev/null +++ b/tests/Unit/Retry/ExponentialBackoffRetryPolicyTest.php @@ -0,0 +1,78 @@ +nextRetryAt(1); + + self::assertNotNull($next); + self::assertSame(100, (int) round(CarbonImmutable::now()->diffInMilliseconds($next))); + } + + public function test_returns_now_plus_second_delay_for_second_attempt(): void + { + $policy = new ExponentialBackoffRetryPolicy(delaysMs: [100, 500]); + + $next = $policy->nextRetryAt(2); + + self::assertNotNull($next); + self::assertSame(500, (int) round(CarbonImmutable::now()->diffInMilliseconds($next))); + } + + public function test_returns_null_when_attempts_exhausted(): void + { + $policy = new ExponentialBackoffRetryPolicy(delaysMs: [100, 500]); + + self::assertNull($policy->nextRetryAt(3)); + self::assertNull($policy->nextRetryAt(99)); + } + + public function test_default_delays_yield_four_retries_with_in_process_then_persisted_durations(): void + { + $policy = new ExponentialBackoffRetryPolicy(); + $now = CarbonImmutable::now(); + + self::assertSame(100, (int) round($now->diffInMilliseconds($policy->nextRetryAt(1)))); + self::assertSame(500, (int) round($now->diffInMilliseconds($policy->nextRetryAt(2)))); + self::assertSame(60_000, (int) round($now->diffInMilliseconds($policy->nextRetryAt(3)))); + self::assertSame(300_000, (int) round($now->diffInMilliseconds($policy->nextRetryAt(4)))); + self::assertNull($policy->nextRetryAt(5)); + } + + public function test_rejects_negative_delays(): void + { + $this->expectException(InvalidArgumentException::class); + + new ExponentialBackoffRetryPolicy(delaysMs: [100, -1]); + } + + public function test_returns_null_for_zero_or_negative_attempt_number(): void + { + $policy = new ExponentialBackoffRetryPolicy(delaysMs: [100]); + + self::assertNull($policy->nextRetryAt(0)); + self::assertNull($policy->nextRetryAt(-1)); + } +} diff --git a/tests/Unit/Retry/NoRetryPolicyTest.php b/tests/Unit/Retry/NoRetryPolicyTest.php new file mode 100644 index 0000000..29e28e9 --- /dev/null +++ b/tests/Unit/Retry/NoRetryPolicyTest.php @@ -0,0 +1,23 @@ +nextRetryAt(1)); + } + + public function test_returns_null_for_arbitrary_attempt(): void + { + $policy = new NoRetryPolicy(); + + self::assertNull($policy->nextRetryAt(42)); + } +} diff --git a/tests/Unit/SequentialEventProcessorTest.php b/tests/Unit/SequentialEventProcessorTest.php index 59cc98e..01bd90a 100644 --- a/tests/Unit/SequentialEventProcessorTest.php +++ b/tests/Unit/SequentialEventProcessorTest.php @@ -2,14 +2,22 @@ namespace Test\Vesper\Tool\Event\Unit; +use Carbon\CarbonImmutable; use PHPUnit\Framework\TestCase; +use RuntimeException; +use Test\Vesper\Tool\Event\_Fixtures\IgnorableExceptionStub; +use Test\Vesper\Tool\Event\_Fixtures\RecordingSequentialEventProcessor; +use Test\Vesper\Tool\Event\_Fixtures\TestEventFactory; +use Test\Vesper\Tool\Event\_Fixtures\TrackingEventStore; +use Test\Vesper\Tool\Event\_Fixtures\TrackingListener; use Vesper\Tool\Event\EventHydrator; use Vesper\Tool\Event\EventSubscriberMap; use Vesper\Tool\Event\HandlerResolver; use Vesper\Tool\Event\Infrastructure\InMemoryEventStore; +use Vesper\Tool\Event\Infrastructure\InMemoryRedeliveryTracker; use Vesper\Tool\Event\Infrastructure\SequentialEventProcessor; -use Test\Vesper\Tool\Event\_Fixtures\TestEventFactory; -use Test\Vesper\Tool\Event\_Fixtures\TrackingListener; +use Vesper\Tool\Event\RedeliveryTracker; +use Vesper\Tool\Event\Retry\RetryPolicy; class SequentialEventProcessorTest extends TestCase { @@ -199,4 +207,243 @@ public function test_hydrates_once_per_subscriber_passing_subscriber_as_context( self::assertSame($subscriberB, $hydrateCalls[1]); } + // ── retry policy / redelivery ────────────────────────────────────────────── + + public function test_calls_mark_processed_after_each_event_succeeds(): void + { + $store = new TrackingEventStore(); + $event = TestEventFactory::retrieveOrderPlaced(); + $store->add($event); + + $this->subscribers->subscribe('order.placed', function () {}); + + $this->processor->process($store); + + self::assertSame([$event->id], $store->markProcessedCalls); + } + + public function test_does_not_call_mark_processed_when_a_listener_throws_in_fail_fast_mode(): void + { + $store = new TrackingEventStore(); + $event = TestEventFactory::retrieveOrderPlaced(); + $store->add($event); + + $this->subscribers->subscribe('order.placed', function () { + throw new RuntimeException('boom'); + }); + + try { + $this->processor->process($store); + self::fail('Expected exception was not thrown'); + } catch (RuntimeException) { + // expected + } + + self::assertSame([], $store->markProcessedCalls, 'event must remain in processing when dispatch propagates'); + } + + public function test_in_process_retry_succeeds_on_second_attempt(): void + { + CarbonImmutable::setTestNow(CarbonImmutable::createFromFormat('Y-m-d H:i:s.u', '2026-04-27 12:00:00.000000')); + try { + $event = TestEventFactory::retrieveOrderPlaced(); + $this->store->add($event); + + $calls = 0; + $this->subscribers->subscribe('order.placed', function () use (&$calls) { + $calls++; + if ($calls === 1) { + throw new RuntimeException('transient'); + } + }); + + $policy = self::policyReturning(retryDelayMs: 50); + $processor = new RecordingSequentialEventProcessor( + $this->subscribers, + retryPolicy: $policy, + inProcessRetryThresholdMs: 100, + ); + + $processor->process($this->store); + + self::assertSame(2, $calls); + self::assertSame([50], $processor->sleeps, 'one in-process sleep happened between the two attempts'); + } finally { + CarbonImmutable::setTestNow(); + } + } + + public function test_in_process_retry_exhausted_propagates_in_fail_fast(): void + { + $this->store->add(TestEventFactory::retrieveOrderPlaced()); + + $exception = new RuntimeException('always fails'); + $this->subscribers->subscribe('order.placed', function () use ($exception) { + throw $exception; + }); + + $policy = self::policyReturning(retryDelayMs: 10, exhaustAfter: 1); + $processor = new RecordingSequentialEventProcessor( + $this->subscribers, + retryPolicy: $policy, + inProcessRetryThresholdMs: 100, + ); + + $this->expectExceptionObject($exception); + $processor->process($this->store); + } + + public function test_persists_to_tracker_when_next_retry_delay_exceeds_threshold(): void + { + $event = TestEventFactory::retrieveOrderPlaced(); + $this->store->add($event); + + $exception = new RuntimeException('boom'); + $this->subscribers->subscribe('order.placed', function () use ($exception) { + throw $exception; + }); + + $tracker = $this->createMock(RedeliveryTracker::class); + $tracker->expects($this->once()) + ->method('schedule') + ->with( + $this->callback(fn($e) => $e->id === $event->id), + 'Closure', + 1, + $this->isInstanceOf(CarbonImmutable::class), + $exception, + ); + $tracker->method('nextDue')->willReturn(null); + + $policy = self::policyReturning(retryDelayMs: 5_000); // way above threshold + $processor = new RecordingSequentialEventProcessor( + $this->subscribers, + retryPolicy: $policy, + redeliveryTracker: $tracker, + inProcessRetryThresholdMs: 100, + ); + + $processor->process($this->store); + + self::assertSame([], $processor->sleeps, 'no in-process sleep when delay exceeds threshold'); + } + + public function test_calls_mark_succeeded_when_listener_succeeds_with_tracker_configured(): void + { + $event = TestEventFactory::retrieveOrderPlaced(); + $this->store->add($event); + + $this->subscribers->subscribe('order.placed', function () {}); + + $tracker = $this->createMock(RedeliveryTracker::class); + $tracker->expects($this->once()) + ->method('markSucceeded') + ->with($event->id, 'Closure'); + $tracker->method('nextDue')->willReturn(null); + + $processor = new SequentialEventProcessor($this->subscribers, redeliveryTracker: $tracker); + $processor->process($this->store); + } + + public function test_ignored_exception_short_circuits_with_no_retry_no_propagation_no_tracker_calls(): void + { + $this->store->add(TestEventFactory::retrieveOrderPlaced()); + + $calls = 0; + $this->subscribers->subscribe('order.placed', function () use (&$calls) { + $calls++; + throw new IgnorableExceptionStub('expected domain failure'); + }); + + $tracker = $this->createMock(RedeliveryTracker::class); + $tracker->expects($this->never())->method('schedule'); + $tracker->expects($this->never())->method('markFailedPermanently'); + $tracker->expects($this->never())->method('markSucceeded'); + $tracker->method('nextDue')->willReturn(null); + + $processor = new SequentialEventProcessor( + $this->subscribers, + retryPolicy: self::policyReturning(retryDelayMs: 10), + redeliveryTracker: $tracker, + ignoredExceptions: [IgnorableExceptionStub::class], + ); + + $processor->process($this->store); + + self::assertSame(1, $calls, 'listener ran exactly once and was not retried'); + } + + public function test_drains_due_redeliveries_after_the_main_queue(): void + { + $event = TestEventFactory::retrieveOrderPlaced(); + + $tracker = new InMemoryRedeliveryTracker(); + $tracker->schedule( + event: $event, + listener: 'Closure', + attemptNumber: 2, + nextRetryAt: CarbonImmutable::now()->subSecond(), + lastError: new RuntimeException('earlier failure'), + ); + + $received = null; + $this->subscribers->subscribe('order.placed', function (object $e) use (&$received) { + $received = $e; + }); + + $processor = new SequentialEventProcessor($this->subscribers, redeliveryTracker: $tracker); + $processor->process($this->store); + + self::assertNotNull($received, 'redelivery dispatch invoked the listener'); + self::assertNull($tracker->nextDue(), 'redelivery row marked succeeded after dispatch'); + } + + public function test_calls_mark_failed_permanently_when_no_more_retries_with_tracker(): void + { + $this->store->add(TestEventFactory::retrieveOrderPlaced()); + + $exception = new RuntimeException('boom'); + $this->subscribers->subscribe('order.placed', function () use ($exception) { + throw $exception; + }); + + $tracker = $this->createMock(RedeliveryTracker::class); + $tracker->expects($this->once())->method('markFailedPermanently'); + $tracker->expects($this->never())->method('schedule'); + $tracker->method('nextDue')->willReturn(null); + + $processor = new SequentialEventProcessor( + $this->subscribers, + redeliveryTracker: $tracker, + ); + + try { + $processor->process($this->store); + self::fail('expected exception'); + } catch (RuntimeException) { + // expected — base class is fail-fast + } + } + + /** + * Build a RetryPolicy stub that returns now()+$retryDelayMs for the first $exhaustAfter + * attempts, then null. + */ + private static function policyReturning(int $retryDelayMs, int $exhaustAfter = 5): RetryPolicy + { + return new readonly class ($retryDelayMs, $exhaustAfter) implements RetryPolicy { + public function __construct( + private int $retryDelayMs, + private int $exhaustAfter, + ) {} + + public function nextRetryAt(int $previousAttempt): ?CarbonImmutable + { + if ($previousAttempt > $this->exhaustAfter) { + return null; + } + return CarbonImmutable::now()->addMilliseconds($this->retryDelayMs); + } + }; + } } diff --git a/tests/Unit/SilentSequentialEventProcessorTest.php b/tests/Unit/SilentSequentialEventProcessorTest.php index 67356f2..1e4f1a1 100644 --- a/tests/Unit/SilentSequentialEventProcessorTest.php +++ b/tests/Unit/SilentSequentialEventProcessorTest.php @@ -5,12 +5,14 @@ use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; use RuntimeException; +use Test\Vesper\Tool\Event\_Fixtures\IgnorableExceptionStub; +use Test\Vesper\Tool\Event\_Fixtures\TestEventFactory; +use Test\Vesper\Tool\Event\_Fixtures\ThrowingListener; use Vesper\Tool\Event\EventHydrator; use Vesper\Tool\Event\EventSubscriberMap; use Vesper\Tool\Event\Infrastructure\InMemoryEventStore; use Vesper\Tool\Event\Infrastructure\SilentSequentialEventProcessor; -use Test\Vesper\Tool\Event\_Fixtures\TestEventFactory; -use Test\Vesper\Tool\Event\_Fixtures\ThrowingListener; +use Vesper\Tool\Event\RedeliveryTracker; class SilentSequentialEventProcessorTest extends TestCase { @@ -171,4 +173,77 @@ public function test_continues_processing_subsequent_events_after_failure(): voi self::assertSame([2], $received); } + + // ── retry / redelivery integration ───────────────────────────────────────── + + public function test_logs_and_marks_failed_permanently_when_retries_are_exhausted(): void + { + $event = TestEventFactory::retrieveOrderPlaced(); + $this->store->add($event); + + $exception = new RuntimeException('always'); + $this->subscribers->subscribe('order.placed', function () use ($exception) { + throw $exception; + }); + + $tracker = $this->createMock(RedeliveryTracker::class); + $tracker->expects($this->once()) + ->method('markFailedPermanently') + ->with($event->id, 'Closure', $exception); + $tracker->method('nextDue')->willReturn(null); + + $this->logger->expects($this->once()) + ->method('error') + ->with( + 'Failed to dispatch event to listener.', + [ + 'event' => 'order.placed', + 'event_id' => $event->id, + 'listener' => 'Closure', + 'exception' => $exception, + ], + ); + + $hydrator = $this->createStub(EventHydrator::class); + $hydrator->method('hydrate')->willReturnCallback(fn(string $name, array $payload) => (object) $payload); + + $processor = new SilentSequentialEventProcessor( + subscribers: $this->subscribers, + logger: $this->logger, + hydrator: $hydrator, + redeliveryTracker: $tracker, + ); + + $processor->process($this->store); + } + + public function test_ignored_exception_is_suppressed_with_no_log_or_tracker_call(): void + { + $this->store->add(TestEventFactory::retrieveOrderPlaced()); + + $this->subscribers->subscribe('order.placed', function () { + throw new IgnorableExceptionStub('expected domain failure'); + }); + + $tracker = $this->createMock(RedeliveryTracker::class); + $tracker->expects($this->never())->method('schedule'); + $tracker->expects($this->never())->method('markFailedPermanently'); + $tracker->expects($this->never())->method('markSucceeded'); + $tracker->method('nextDue')->willReturn(null); + + $this->logger->expects($this->never())->method('error'); + + $hydrator = $this->createStub(EventHydrator::class); + $hydrator->method('hydrate')->willReturnCallback(fn(string $name, array $payload) => (object) $payload); + + $processor = new SilentSequentialEventProcessor( + subscribers: $this->subscribers, + logger: $this->logger, + hydrator: $hydrator, + redeliveryTracker: $tracker, + ignoredExceptions: [IgnorableExceptionStub::class], + ); + + $processor->process($this->store); + } } diff --git a/tests/_Fixtures/IgnorableExceptionStub.php b/tests/_Fixtures/IgnorableExceptionStub.php new file mode 100644 index 0000000..b626b52 --- /dev/null +++ b/tests/_Fixtures/IgnorableExceptionStub.php @@ -0,0 +1,9 @@ + */ + public array $sleeps = []; + + protected function sleep(int $milliseconds): void + { + $this->sleeps[] = $milliseconds; + } +} diff --git a/tests/_Fixtures/TrackingEventStore.php b/tests/_Fixtures/TrackingEventStore.php new file mode 100644 index 0000000..422d881 --- /dev/null +++ b/tests/_Fixtures/TrackingEventStore.php @@ -0,0 +1,21 @@ + */ + public array $markProcessedCalls = []; + + public function markProcessed(string $eventId): void + { + $this->markProcessedCalls[] = $eventId; + } +} diff --git a/tests/_Fixtures/TrackingListener.php b/tests/_Fixtures/TrackingListener.php index 292e44c..03254c2 100644 --- a/tests/_Fixtures/TrackingListener.php +++ b/tests/_Fixtures/TrackingListener.php @@ -1,6 +1,6 @@