Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 305 additions & 14 deletions README.md

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/DueRedelivery.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Vesper\Tool\Event;

readonly class DueRedelivery
{
/**
* @param string $listener class-string of the listener, or "Closure" for anonymous
* @param int $attemptNumber attempts already made; the upcoming retry will be (this + 1)
*/
public function __construct(
public RawEvent $event,
public string $listener,
public int $attemptNumber,
) {
}
}
22 changes: 22 additions & 0 deletions src/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,29 @@

interface EventStore
{
/**
* Persist a new event in the outbox. Should be called inside the caller's
* transaction so the event commits or rolls back together with the
* business operation that produced it.
*/
public function add(RawEvent $event): void;

/**
* Claim the next pending event for this worker, transitioning it from
* `pending` to `processing`. Returns null when nothing is due.
*
* The returned event remains in `processing` state until the processor
* calls markProcessed() once every listener has settled. A worker that
* dies between next() and markProcessed() leaves the row in `processing`,
* which is intentional — it lets a future stuck-events monitor recover it.
*/
public function next(): ?RawEvent;

/**
* Advance the given event from `processing` to `processed`, signalling that
* every listener has either succeeded, been persisted to the redelivery
* queue, been swallowed by the ignored-exceptions list, or been marked
* permanently failed.
*/
public function markProcessed(string $eventId): void;
}
5 changes: 5 additions & 0 deletions src/Infrastructure/InMemoryEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ class InMemoryEventStore implements EventStore
{
return array_shift($this->queue) ?? null;
}

#[Override] public function markProcessed(string $eventId): void
{
// No-op: the in-memory queue discards events on next(); there is no persisted status to flip.
}
}
125 changes: 125 additions & 0 deletions src/Infrastructure/InMemoryRedeliveryTracker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
<?php

namespace Vesper\Tool\Event\Infrastructure;

use Carbon\CarbonImmutable;
use Override;
use Throwable;
use Vesper\Tool\Event\DueRedelivery;
use Vesper\Tool\Event\RawEvent;
use Vesper\Tool\Event\RedeliveryTracker;

class InMemoryRedeliveryTracker implements RedeliveryTracker
{
public const string STATUS_PENDING = 'pending_retry';
public const string STATUS_FAILED = 'failed';
public const string STATUS_SUCCEEDED = 'succeeded';

/** @var array<string, array{event: RawEvent, listener: string, status: string, attempt_number: int, next_retry_at: ?CarbonImmutable, last_error: ?string, created_at: CarbonImmutable, updated_at: CarbonImmutable}> */
private array $rows = [];

#[Override]
public function schedule(
RawEvent $event,
string $listener,
int $attemptNumber,
CarbonImmutable $nextRetryAt,
Throwable $lastError,
): void {
$key = self::key($event->id, $listener);
$now = CarbonImmutable::now();

$this->rows[$key] = [
'event' => $event,
'listener' => $listener,
'status' => self::STATUS_PENDING,
'attempt_number' => $attemptNumber,
'next_retry_at' => $nextRetryAt,
'last_error' => self::formatError($lastError),
'created_at' => $this->rows[$key]['created_at'] ?? $now,
'updated_at' => $now,
];
}

#[Override]
public function nextDue(): ?DueRedelivery
{
$now = CarbonImmutable::now();
$candidate = null;

foreach ($this->rows as $row) {
if ($row['status'] !== self::STATUS_PENDING) {
continue;
}
if ($row['next_retry_at'] === null || $row['next_retry_at']->greaterThan($now)) {
continue;
}
if ($candidate === null || $row['next_retry_at']->lessThan($candidate['next_retry_at'])) {
$candidate = $row;
}
}

if ($candidate === null) {
return null;
}

return new DueRedelivery(
event: $candidate['event'],
listener: $candidate['listener'],
attemptNumber: $candidate['attempt_number'],
);
}

#[Override]
public function markFailedPermanently(string $eventId, string $listener, Throwable $lastError): void
{
$key = self::key($eventId, $listener);

if (!isset($this->rows[$key])) {
return;
}

$this->rows[$key]['status'] = self::STATUS_FAILED;
$this->rows[$key]['next_retry_at'] = null;
$this->rows[$key]['last_error'] = self::formatError($lastError);
$this->rows[$key]['updated_at'] = CarbonImmutable::now();
}

#[Override]
public function markSucceeded(string $eventId, string $listener): void
{
$key = self::key($eventId, $listener);

if (!isset($this->rows[$key])) {
return;
}

$this->rows[$key]['status'] = self::STATUS_SUCCEEDED;
$this->rows[$key]['next_retry_at'] = null;
$this->rows[$key]['updated_at'] = CarbonImmutable::now();
}

#[Override]
public function retryNow(string $eventId, string $listener): void
{
$key = self::key($eventId, $listener);

if (!isset($this->rows[$key])) {
return;
}

$this->rows[$key]['status'] = self::STATUS_PENDING;
$this->rows[$key]['next_retry_at'] = CarbonImmutable::now();
$this->rows[$key]['updated_at'] = CarbonImmutable::now();
}

private static function key(string $eventId, string $listener): string
{
return $eventId . '|' . $listener;
}

private static function formatError(Throwable $error): string
{
return $error::class . ': ' . $error->getMessage();
}
}
43 changes: 43 additions & 0 deletions src/Infrastructure/Retry/ExponentialBackoffRetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Vesper\Tool\Event\Infrastructure\Retry;

use Carbon\CarbonImmutable;
use InvalidArgumentException;
use Override;
use Vesper\Tool\Event\Retry\RetryPolicy;

final readonly class ExponentialBackoffRetryPolicy implements RetryPolicy
{
/** @var list<int> */
private array $delaysMs;

/**
* @param list<int> $delaysMs delay before each retry in milliseconds; the i-th element is the
* delay before attempt (i + 2). Default: 100ms, 500ms, 1min, 5min,
* yielding 5 total attempts (one initial + four retries).
*/
public function __construct(
array $delaysMs = [100, 500, 60_000, 300_000],
) {
foreach ($delaysMs as $ms) {
if ($ms < 0) {
throw new InvalidArgumentException("Retry delays must be non-negative, got {$ms}.");
}
}

$this->delaysMs = $delaysMs;
}

#[Override]
public function nextRetryAt(int $previousAttempt): ?CarbonImmutable
{
$index = $previousAttempt - 1;

if ($index < 0 || $index >= count($this->delaysMs)) {
return null;
}

return CarbonImmutable::now()->addMilliseconds($this->delaysMs[$index]);
}
}
16 changes: 16 additions & 0 deletions src/Infrastructure/Retry/NoRetryPolicy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

namespace Vesper\Tool\Event\Infrastructure\Retry;

use Carbon\CarbonImmutable;
use Override;
use Vesper\Tool\Event\Retry\RetryPolicy;

final readonly class NoRetryPolicy implements RetryPolicy
{
#[Override]
public function nextRetryAt(int $previousAttempt): ?CarbonImmutable
{
return null;
}
}
56 changes: 56 additions & 0 deletions src/Infrastructure/Schema/MysqlRedeliverySchema.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

namespace Vesper\Tool\Event\Infrastructure\Schema;

use PDO;
use PDOException;

class MysqlRedeliverySchema
{
/**
* Ensure the redelivery table and indexes exist.
*
* @throws PDOException
*/
public static function create(PDO $connection): void
{
self::createIfNeeded(
connection: $connection,
table: 'event_outbox_redelivery',
creationQuery: <<<SQL
CREATE TABLE event_outbox_redelivery (
event_id VARCHAR(36) NOT NULL,
listener VARCHAR(255) NOT NULL,
status VARCHAR(32) NOT NULL,
attempt_number INT NOT NULL,
next_retry_at DATETIME(6) NULL,
last_error TEXT NULL,
created_at DATETIME(6) NOT NULL,
updated_at DATETIME(6) NOT NULL,

PRIMARY KEY (event_id, listener),
INDEX idx_redelivery_due (status, next_retry_at),
CONSTRAINT fk_redelivery_event FOREIGN KEY (event_id) REFERENCES event_outbox(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
SQL,
);
}

private static function createIfNeeded(PDO $connection, string $table, string $creationQuery): void
{
$stmt = $connection->prepare(
<<<MYSQL
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name = :table
MYSQL,
);
$stmt->execute(['table' => $table]);

if ($stmt->fetchColumn()) {
return;
}

$connection->exec($creationQuery);
}
}
38 changes: 38 additions & 0 deletions src/Infrastructure/Schema/SqliteRedeliverySchema.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Vesper\Tool\Event\Infrastructure\Schema;

use PDO;
use PDOException;

class SqliteRedeliverySchema
{
/**
* Ensure the redelivery table and indexes exist.
*
* @throws PDOException
*/
public static function create(PDO $connection): void
{
$connection->exec(
<<<SQL
CREATE TABLE IF NOT EXISTS event_outbox_redelivery (
event_id TEXT NOT NULL,
listener TEXT NOT NULL,
status TEXT NOT NULL,
attempt_number INTEGER NOT NULL,
next_retry_at TEXT,
last_error TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (event_id, listener)
)
SQL,
);

$connection->exec(
'CREATE INDEX IF NOT EXISTS idx_redelivery_due
ON event_outbox_redelivery (status, next_retry_at)',
);
}
}
Loading