From b2b57536ba5e9b71935c8a673b5d7376f27a9a2d Mon Sep 17 00:00:00 2001 From: Daree_dev Date: Thu, 23 Apr 2026 18:53:19 +0100 Subject: [PATCH] docs: define ABI stability and breaking-change rules for integrators --- API_BEHAVIOR.md | 48 +- docs/ABI_STABILITY.md | 680 ++++++++++++++++++++++++++++ docs/upgrade.md | 182 ++++++++ openapi.yaml | 53 ++- src/middleware/requestProtection.ts | 72 ++- src/routes/dlq.ts | 3 +- src/routes/streams.ts | 98 ++-- src/utils/response.ts | 21 + tests/routes/streams.test.ts | 598 +++++++++++++----------- 9 files changed, 1439 insertions(+), 316 deletions(-) create mode 100644 docs/ABI_STABILITY.md create mode 100644 docs/upgrade.md diff --git a/API_BEHAVIOR.md b/API_BEHAVIOR.md index 96ae92b..454f477 100644 --- a/API_BEHAVIOR.md +++ b/API_BEHAVIOR.md @@ -217,8 +217,14 @@ All successful responses follow this standardized structure: - **Trigger**: POST /api/streams without Idempotency-Key header - **Status**: 400 - **Code**: `VALIDATION_ERROR` -- **Message**: "Idempotency-Key header is required" -- **Recovery**: Add Idempotency-Key header with unique value +- **Message**: "Idempotency-Key header is required and must be a single string value" +- **Recovery**: Add Idempotency-Key header with a unique value matching `[A-Za-z0-9:_-]`, 1–128 chars + +#### Malformed Idempotency-Key +- **Trigger**: Key contains disallowed characters, or exceeds 128 / is under 1 character +- **Status**: 400 +- **Code**: `VALIDATION_ERROR` +- **Recovery**: Use a valid key (UUID v4 recommended) ### Dependency Outages @@ -348,21 +354,43 @@ try { ### Exactly-Once Semantics - **Scope**: POST /api/streams (stream creation) -- **Mechanism**: Idempotency-Key header + request fingerprint -- **Duration**: 24 hours (idempotency key stored in cache) -- **Guarantee**: Same Idempotency-Key + same body = same response +- **Mechanism**: `Idempotency-Key` request header + SHA-256 fingerprint of normalised body +- **Duration**: Process lifetime (in-memory store); Redis-backed store recommended for production (24-hour TTL) +- **Guarantee**: Same `Idempotency-Key` + same body = same response, served from cache ### Idempotency-Key Format -- **Required**: Yes (for POST /api/streams) -- **Format**: UUID or unique string (20+ chars recommended) -- **Example**: `550e8400-e29b-41d4-a716-446655440000` -- **Validation**: Must be non-empty string +- **Required**: Yes — missing or malformed key returns `400 VALIDATION_ERROR` +- **Length**: 1–128 characters +- **Charset**: `[A-Za-z0-9:_-]` — letters, digits, colon, underscore, hyphen +- **Recommended**: UUID v4 (`550e8400-e29b-41d4-a716-446655440000`) +- **Validation**: Enforced by `requireIdempotencyKey` middleware before the handler runs + +### Response Headers +| Header | Value | Meaning | +|--------|-------|---------| +| `Idempotency-Key` | Echoed from request | Confirms which key was processed | +| `Idempotency-Replayed` | `true` / `false` | `true` = served from cache; `false` = fresh creation | + +### Response Body Signal +The `meta` object in every `201` response carries `idempotencyReplayed`: +- **Fresh creation**: `meta.idempotencyReplayed` is absent +- **Replay**: `meta.idempotencyReplayed: true` + +### Collision Behaviour (Same Key, Different Body) +- **Status**: `409 CONFLICT` +- **Code**: `CONFLICT` +- **Message**: "Idempotency-Key has already been used for a different request payload" +- **Details**: `{ hint: "Use a new Idempotency-Key or retry with the original request body" }` +- **Security**: The raw key value is **never** included in the error response body or server logs ### Retry Semantics -- **Safe to Retry**: 201, 400, 409, 413, 422, 503 +- **Safe to Retry**: 201 (with same key+body), 400, 409, 413, 422, 503 - **Unsafe to Retry**: 401, 403, 500 - **Recommended Strategy**: Exponential backoff with jitter (1s, 2s, 4s, 8s, 16s) +### Failure Atomicity +If the database upsert fails (e.g. pool exhausted → 503), the idempotency key is **not** stored. The client may safely retry with the same key and body once the dependency recovers. + --- ## Error Response Format diff --git a/docs/ABI_STABILITY.md b/docs/ABI_STABILITY.md new file mode 100644 index 0000000..056f40c --- /dev/null +++ b/docs/ABI_STABILITY.md @@ -0,0 +1,680 @@ +# ABI Stability Guarantees + +> **Audience:** Indexer operators, wallet developers, and any integrator building against the Fluxora HTTP API or consuming chain-derived events. +> +> **Scope:** HTTP entrypoints, error codes, event schemas, storage discriminants, and the decimal-string serialization contract. +> +> **Current version:** `0.1.0` + +--- + +## Table of Contents + +1. [What "ABI Stability" Means Here](#1-what-abi-stability-means-here) +2. [Versioning Policy](#2-versioning-policy) +3. [Stable Surfaces](#3-stable-surfaces) + - [3.1 HTTP Entrypoints](#31-http-entrypoints) + - [3.2 Request and Response Schemas](#32-request-and-response-schemas) + - [3.3 Error Codes](#33-error-codes) + - [3.4 Event Schemas](#34-event-schemas) + - [3.5 Storage Discriminants](#35-storage-discriminants) + - [3.6 Decimal-String Serialization Contract](#36-decimal-string-serialization-contract) + - [3.7 Idempotency Contract](#37-idempotency-contract) + - [3.8 Webhook Signature Contract](#38-webhook-signature-contract) +4. [Breaking vs. Non-Breaking Changes](#4-breaking-vs-non-breaking-changes) + - [4.1 Breaking Changes (require major version bump)](#41-breaking-changes-require-major-version-bump) + - [4.2 Non-Breaking Changes (allowed in minor/patch)](#42-non-breaking-changes-allowed-in-minorpatch) +5. [Deprecation Process](#5-deprecation-process) +6. [Internal and Admin Surfaces](#6-internal-and-admin-surfaces) +7. [Stability by Surface Summary](#7-stability-by-surface-summary) + +--- + +## 1. What "ABI Stability" Means Here + +Fluxora's "ABI" is the set of observable contracts that external integrators depend on: + +- **HTTP entrypoints** — URL paths, HTTP methods, required headers, and status codes. +- **Request/response schemas** — field names, types, and required vs. optional semantics. +- **Error codes** — the `code` string inside every `{ error: { code, message } }` envelope. +- **Event schemas** — the shape of `ContractEventRecord` batches submitted to the indexer and the `StreamEvent` types emitted to WebSocket consumers. +- **Storage discriminants** — enum values used as type tags in the database (`status`, `store`, `dependency`). +- **Serialization invariants** — the decimal-string rule for all monetary amounts. + +A change is **breaking** if it forces an integrator to update their code to avoid a runtime error or data-integrity failure. A change is **non-breaking** if existing integrators can ignore it without consequence. + +--- + +## 2. Versioning Policy + +Fluxora follows [Semantic Versioning 2.0.0](https://semver.org/): + +| Version component | When it changes | +|---|---| +| **MAJOR** (`1.x.x`) | Any breaking change to a stable surface listed in §3 | +| **MINOR** (`x.1.x`) | New stable surfaces, new optional fields, new non-breaking error codes | +| **PATCH** (`x.x.1`) | Bug fixes, documentation corrections, internal refactors with no observable change | + +The current version is `0.1.0`. While the major version is `0`, breaking changes may occur in minor releases but will always be documented in [`docs/upgrade.md`](./upgrade.md) with a migration path. + +Once the API reaches `1.0.0`, all breaking changes require a major version bump and a minimum **90-day deprecation window**. + +--- + +## 3. Stable Surfaces + +### 3.1 HTTP Entrypoints + +The following routes are **stable** and will not be removed or have their HTTP method changed without a major version bump. + +#### Public Routes + +| Method | Path | Auth | Description | +|---|---|---|---| +| `GET` | `/` | None | API metadata | +| `GET` | `/health` | None | Liveness + indexer health | +| `GET` | `/health/ready` | None | Readiness probe | +| `GET` | `/health/live` | None | Detailed health report | +| `GET` | `/api/streams` | None | List streams (cursor pagination) | +| `GET` | `/api/streams/:id` | None | Get stream by ID | +| `POST` | `/api/streams` | JWT Bearer | Create stream | +| `DELETE` | `/api/streams/:id` | JWT Bearer | Cancel stream | +| `PATCH` | `/api/streams/:id/status` | JWT Bearer | Transition stream status | +| `POST` | `/api/auth/session` | None | Issue JWT from Stellar address | + +#### Internal Routes + +| Method | Path | Auth | Description | +|---|---|---|---| +| `POST` | `/internal/indexer/contract-events` | `x-indexer-worker-token` | Ingest contract event batch | + +> **Note:** Routes under `/api/admin/`, `/admin/dlq/`, and `/api/audit/` are operator-facing and carry a **best-effort** stability guarantee. They may change in minor versions with notice in the changelog. + +#### Path Parameter Formats + +| Parameter | Format | Example | Stable | +|---|---|---|---| +| `:id` (stream) | `stream-{txHash}-{eventIndex}` | `stream-abc123-0` | ✅ Yes | +| `:streamId` | Same as `:id` | `stream-abc123-0` | ✅ Yes | + +The `stream-{txHash}-{eventIndex}` ID format is a **stable discriminant**. Integrators may parse and store it. The separator character (`-`) and prefix (`stream-`) will not change without a major version bump. + +--- + +### 3.2 Request and Response Schemas + +#### `POST /api/streams` — Create Stream + +**Request body** (`application/json`): + +```typescript +{ + sender: string; // Stellar public key — G followed by 55 base32 chars + recipient: string; // Stellar public key — G followed by 55 base32 chars + depositAmount?: string; // Decimal string, e.g. "1000" or "0.0000116" + ratePerSecond?: string; // Decimal string, must be > 0 + startTime?: number; // Unix timestamp (integer, non-negative) + endTime?: number; // Unix timestamp (integer, non-negative; 0 = indefinite) +} +``` + +**Required headers:** + +| Header | Format | Example | +|---|---|---| +| `Authorization` | `Bearer ` | `Bearer eyJ...` | +| `Idempotency-Key` | 1–128 chars, `[A-Za-z0-9:_-]` | `550e8400-e29b-41d4-a716-446655440000` | + +**Response** (`201 Created`): + +```typescript +{ + success: true, + data: { + id: string; // stream-{txHash}-{eventIndex} + sender: string; // Stellar public key + recipient: string; // Stellar public key + depositAmount: string; // Decimal string + ratePerSecond: string; // Decimal string + startTime: number; // Unix timestamp + endTime: number; // Unix timestamp (0 = indefinite) + status: "active" | "paused" | "completed" | "cancelled"; + } +} +``` + +**Response headers:** + +| Header | Value | +|---|---| +| `Idempotency-Key` | Echoes the client-supplied key | +| `Idempotency-Replayed` | `"true"` on cache replay, `"false"` on first creation | + +#### `GET /api/streams` — List Streams + +**Query parameters:** + +| Parameter | Type | Default | Description | +|---|---|---|---| +| `limit` | integer 1–100 | `50` | Page size | +| `cursor` | string | — | Opaque base64 pagination token | +| `status` | `active\|paused\|completed\|cancelled` | — | Filter by status | +| `sender` | Stellar public key | — | Filter by sender address | +| `recipient` | Stellar public key | — | Filter by recipient address | +| `includeTotal` | boolean | `false` | Include total count | + +**Response** (`200 OK`): + +```typescript +{ + success: true, + data: { + streams: Stream[]; // Array of stream objects (same shape as POST response) + has_more: boolean; // Whether more pages exist + next_cursor?: string; // Opaque base64 token; absent when has_more is false + total?: number; // Present only when includeTotal=true + } +} +``` + +> **Cursor stability:** The cursor format is `base64(JSON({ v: 1, lastId: string }))`. The `v: 1` version tag is a stable discriminant. If the cursor format changes, the version tag will increment and old cursors will return a `400 VALIDATION_ERROR` rather than silently returning wrong results. + +#### `GET /api/streams/:id` — Get Stream + +**Response** (`200 OK`): Same `Stream` shape as above. + +#### `DELETE /api/streams/:id` — Cancel Stream + +**Response** (`200 OK`): + +```typescript +{ + success: true, + data: { + message: "Stream cancelled"; + id: string; + } +} +``` + +#### `GET /health` — Health Check + +**Response** (`200 OK` or `503 Service Unavailable`): + +```typescript +{ + success: true, + data: { + status: "ok" | "degraded" | "shutting_down"; + service: "fluxora-backend"; + network: string; + contractAddresses: Record; + timestamp: string; // ISO-8601 + indexer: IndexerHealthSnapshot; + } +} +``` + +--- + +### 3.3 Error Codes + +Every error response uses this envelope: + +```typescript +{ + success: false, + error: { + code: ApiErrorCode; // Machine-readable string — stable + message: string; // Human-readable — NOT stable, do not parse + details?: unknown; // Optional structured context — shape may change + requestId?: string; // Correlation ID for log lookup + } +} +``` + +> **Rule:** Integrators MUST key on `error.code`, never on `error.message`. Messages are for humans and may be reworded in any release. + +#### Stable Error Codes (`ApiErrorCode`) + +| Code | HTTP Status | Meaning | +|---|---|---| +| `VALIDATION_ERROR` | 400 | Malformed input — missing field, wrong type, invalid format | +| `DECIMAL_ERROR` | 400 | Amount field failed decimal-string validation | +| `NOT_FOUND` | 404 | Resource does not exist | +| `CONFLICT` | 409 | Idempotency-Key collision, duplicate cancel, or invalid state transition | +| `UNAUTHORIZED` | 401 | Missing or invalid authentication token | +| `FORBIDDEN` | 403 | Authenticated but insufficient permissions | +| `PAYLOAD_TOO_LARGE` | 413 | Request body exceeds 256 KiB | +| `TOO_MANY_REQUESTS` | 429 | Rate limit exceeded | +| `METHOD_NOT_ALLOWED` | 405 | HTTP method not supported on this path | +| `INTERNAL_ERROR` | 500 | Unexpected server error | +| `SERVICE_UNAVAILABLE` | 503 | Dependency (DB, Redis, Stellar RPC) is down | + +#### Stable Decimal Error Codes (`DecimalErrorCode`) + +These appear in `error.details.decimalErrorCode` when `error.code === "DECIMAL_ERROR"`: + +| Code | Meaning | +|---|---| +| `DECIMAL_INVALID_TYPE` | Amount was not a string | +| `DECIMAL_INVALID_FORMAT` | String did not match `/^[+-]?\d+(\.\d+)?$/` | +| `DECIMAL_OUT_OF_RANGE` | Value exceeds `Number.MAX_SAFE_INTEGER` | +| `DECIMAL_EMPTY_VALUE` | Amount was `null`, `undefined`, or empty string | +| `DECIMAL_PRECISION_EXCEEDED` | More than 7 decimal places (Stellar precision limit) | +| `DECIMAL_PRECISION_LOSS` | Floating-point precision loss detected | + +--- + +### 3.4 Event Schemas + +#### Contract Event Record (Indexer Ingest) + +Submitted to `POST /internal/indexer/contract-events` as `{ events: ContractEventRecord[] }`. + +```typescript +interface ContractEventRecord { + eventId: string; // Unique event ID, max 128 chars — stable discriminant + ledger: number; // Non-negative integer ledger sequence + contractId: string; // Soroban contract ID, max 128 chars + topic: string; // Event topic string, max 128 chars + txHash: string; // Transaction hash, max 128 chars + txIndex: number; // Non-negative integer + operationIndex: number; // Non-negative integer + eventIndex: number; // Non-negative integer + payload: Record; // Arbitrary JSON object + happenedAt: string; // ISO-8601 timestamp + ledgerHash: string; // Ledger hash for reorg detection, max 128 chars +} +``` + +**Batch constraints (stable):** + +| Constraint | Value | +|---|---| +| Minimum events per batch | 1 | +| Maximum events per batch | 100 | +| Duplicate `eventId` within a batch | Rejected with `409 CONFLICT` | + +#### Stream Events (WebSocket / Internal) + +These are the typed events emitted by `streamEventService` and broadcast over `ws:///ws/streams`. + +```typescript +// A new stream was created on-chain +interface StreamCreatedEvent { + type: "StreamCreated"; + contractId: string; + transactionHash: string; + eventIndex: number; + sender: string; + recipient: string; + amount: string; // Decimal string + ratePerSecond: string; // Decimal string + startTime: number; + endTime: number; +} + +// An existing stream was updated on-chain +interface StreamUpdatedEvent { + type: "StreamUpdated"; + contractId: string; + transactionHash: string; + eventIndex: number; + streamId: string; + streamedAmount?: string; // Decimal string + remainingAmount?: string; // Decimal string + status?: StreamStatus; + endTime?: number; +} + +// A stream was cancelled on-chain +interface StreamCancelledEvent { + type: "StreamCancelled"; + contractId: string; + transactionHash: string; + eventIndex: number; + streamId: string; +} +``` + +**WebSocket message envelope** (broadcast to all connected clients): + +```typescript +{ + event: "stream.created" | "stream.updated" | "stream.cancelled" | "service.degraded"; + streamId: string; + payload: Record; + timestamp: string; // ISO-8601 +} +``` + +The `event` string values (`stream.created`, `stream.updated`, `stream.cancelled`, `service.degraded`) are **stable discriminants**. Consumers MUST switch on `event` to determine how to handle a message. + +#### Indexer Health Snapshot + +Returned inside `GET /health` under the `indexer` key: + +```typescript +interface IndexerHealthSnapshot { + dependency: "healthy" | "degraded" | "unavailable"; // stable discriminant + store: "memory" | "postgres"; // stable discriminant + lastSuccessfulIngestAt: string | null; // ISO-8601 or null + lastFailureAt: string | null; // ISO-8601 or null + lastFailureReason: string | null; // Human-readable — NOT stable + acceptedBatchCount: number; + acceptedEventCount: number; + duplicateEventCount: number; + lastSafeLedger: number; + reorgDetected: boolean; + reorgHeight?: number; +} +``` + +--- + +### 3.5 Storage Discriminants + +These enum values are persisted to the database and used as type tags. Changing them is a **breaking change** because existing rows would become unreadable without a migration. + +#### `StreamStatus` — `streams.status` column + +``` +"active" | "paused" | "completed" | "cancelled" +``` + +**State machine (stable):** + +``` +active ──► paused ──► active + │ │ + ▼ ▼ +completed cancelled ← terminal states +``` + +| Transition | Allowed | +|---|---| +| `active` → `paused` | ✅ | +| `active` → `completed` | ✅ | +| `active` → `cancelled` | ✅ | +| `paused` → `active` | ✅ | +| `paused` → `cancelled` | ✅ | +| `completed` → any | ❌ terminal | +| `cancelled` → any | ❌ terminal | + +Any invalid transition returns `409 CONFLICT` with `error.code === "CONFLICT"`. + +#### `IndexerDependencyState` — health snapshot `dependency` field + +``` +"healthy" | "degraded" | "unavailable" +``` + +#### `IndexerStoreKind` — health snapshot `store` field + +``` +"memory" | "postgres" +``` + +#### Stream ID Format + +``` +stream-{transactionHash}-{eventIndex} +``` + +This format is derived deterministically from chain data. The prefix `stream-` and the `-` separator are stable. Integrators may store and parse stream IDs. + +--- + +### 3.6 Decimal-String Serialization Contract + +All monetary amounts cross the chain/API boundary as **decimal strings**. This is a hard invariant. + +**Stable rules:** + +1. All amount fields in JSON responses are `string`, never `number`. +2. The accepted format is `/^[+-]?\d+(\.\d+)?$/`. +3. Stellar precision is 7 decimal places (`0.0000001` = 1 stroop = `10^-7` XLM). +4. Zero serializes as `"0"`, never omitted or `null`. +5. Sending a JSON `number` for any amount field returns `400 DECIMAL_ERROR`. +6. The `STROOPS_PER_UNIT` constant is `10_000_000` (stable). + +**Affected fields:** + +| Field | Location | +|---|---| +| `depositAmount` | `POST /api/streams` request and all stream responses | +| `ratePerSecond` | `POST /api/streams` request and all stream responses | +| `amount` | `StreamCreatedEvent`, `StreamRecord.amount` | +| `streamed_amount` | `StreamRecord`, `StreamUpdatedEvent.streamedAmount` | +| `remaining_amount` | `StreamRecord`, `StreamUpdatedEvent.remainingAmount` | +| `rate_per_second` | `StreamRecord` | + +--- + +### 3.7 Idempotency Contract + +`POST /api/streams` requires an `Idempotency-Key` header. The following behaviors are stable: + +| Scenario | Behavior | +|---|---| +| Same key + same body | `201` with `Idempotency-Replayed: true` | +| Same key + different body | `409 CONFLICT` | +| Missing key | `400 VALIDATION_ERROR` | +| Malformed key (bad charset or length) | `400 VALIDATION_ERROR` | + +**Key format (stable):** + +- Length: 1–128 characters +- Charset: `[A-Za-z0-9:_-]` +- UUID v4 is recommended but not required + +The key value is never echoed in error response bodies or server logs. + +--- + +### 3.8 Webhook Signature Contract + +Fluxora webhook deliveries carry these headers. The signing algorithm is stable: + +| Header | Description | +|---|---| +| `x-fluxora-delivery-id` | Stable delivery ID for deduplication | +| `x-fluxora-timestamp` | Unix timestamp in seconds (string) | +| `x-fluxora-signature` | `HMAC-SHA256(secret, timestamp + "." + rawBody)` — 64-char hex | +| `x-fluxora-event` | Event name, e.g. `stream.created` | + +**Signing payload:** + +``` +${timestamp}.${rawRequestBody} +``` + +**Stable verification rules:** + +- Use raw request bytes exactly as received. +- Reject payloads larger than 256 KiB. +- Reject timestamps outside a 300-second tolerance window. +- Compare signatures with a constant-time equality check. +- Deduplicate on `x-fluxora-delivery-id`. + +--- + +## 4. Breaking vs. Non-Breaking Changes + +### 4.1 Breaking Changes (require major version bump) + +The following changes are **always breaking** and require a major version bump plus a deprecation window: + +#### Entrypoints + +- Removing a stable route (path + method combination). +- Changing the HTTP method of a stable route (e.g., `POST` → `PUT`). +- Changing a path parameter format (e.g., renaming `:id` or changing the `stream-` prefix). +- Adding a new **required** header to an existing route. +- Changing the base path of a stable route. + +#### Request Schemas + +- Removing a previously accepted request field. +- Making an optional field required. +- Narrowing the accepted type of a field (e.g., accepting `string | number` → `string` only). +- Changing the validation regex for a stable field (e.g., `STELLAR_PUBLIC_KEY_REGEX`, `DECIMAL_STRING_REGEX`, `IDEMPOTENCY_KEY_REGEX`). + +#### Response Schemas + +- Removing a field from a response object. +- Renaming a field in a response object. +- Changing the type of a field (e.g., `string` → `number`). +- Changing the decimal-string invariant (e.g., returning numbers for amount fields). +- Changing the stream ID format. +- Changing the cursor format without incrementing the `v` version tag. + +#### Error Codes + +- Removing a stable `ApiErrorCode` value. +- Renaming a stable `ApiErrorCode` value. +- Changing the HTTP status code associated with a stable error code. +- Removing a `DecimalErrorCode` value. + +#### Event Schemas + +- Removing a field from `ContractEventRecord`. +- Renaming a field in `ContractEventRecord`. +- Changing the type of a field in `ContractEventRecord`. +- Removing a `StreamEvent` type (`StreamCreated`, `StreamUpdated`, `StreamCancelled`). +- Renaming the `type` discriminant of a `StreamEvent`. +- Removing a WebSocket event name (`stream.created`, `stream.updated`, `stream.cancelled`). +- Changing the WebSocket message envelope shape. +- Reducing the maximum batch size below 100. + +#### Storage Discriminants + +- Removing a `StreamStatus` value. +- Renaming a `StreamStatus` value. +- Changing the stream status state machine (removing a valid transition). +- Removing an `IndexerDependencyState` value. +- Removing an `IndexerStoreKind` value. + +#### Serialization + +- Changing the decimal-string format regex. +- Changing `STELLAR_DECIMALS` (7) or `STROOPS_PER_UNIT` (10,000,000). +- Returning numeric JSON for any amount field. + +#### Idempotency + +- Changing the `Idempotency-Key` charset or length limits. +- Changing the behavior of same-key + same-body (must remain `201` replay). +- Changing the behavior of same-key + different-body (must remain `409`). + +#### Webhook Signatures + +- Changing the HMAC algorithm (currently SHA-256). +- Changing the signing payload format (`timestamp.rawBody`). +- Renaming any of the four `x-fluxora-*` headers. +- Changing the timestamp tolerance window (currently 300 seconds). + +--- + +### 4.2 Non-Breaking Changes (allowed in minor/patch) + +The following changes are **safe** and do not require a major version bump: + +#### Entrypoints + +- Adding a new route. +- Adding a new optional query parameter to an existing `GET` route. +- Adding a new optional response header. + +#### Request Schemas + +- Adding a new optional request field. +- Widening the accepted type of a field (e.g., accepting additional formats). +- Relaxing a validation constraint (e.g., increasing a max-length limit). + +#### Response Schemas + +- Adding a new optional field to a response object. +- Adding a new value to an open-ended `string` field that is not a discriminant. + +#### Error Codes + +- Adding a new `ApiErrorCode` value. +- Adding a new `DecimalErrorCode` value. +- Adding new fields to `error.details`. +- Rewording `error.message` (messages are not stable). + +#### Event Schemas + +- Adding a new optional field to `ContractEventRecord`. +- Adding a new `StreamEvent` type with a new `type` discriminant. +- Adding a new WebSocket event name. +- Increasing the maximum batch size above 100. + +#### Storage Discriminants + +- Adding a new `StreamStatus` value (with a documented migration path). +- Adding a new `IndexerDependencyState` value. +- Adding a new `IndexerStoreKind` value. + +#### Internal Behavior + +- Changing log message text. +- Changing internal metric names. +- Changing the in-memory idempotency store to Redis (same observable behavior). +- Changing database indexes (no observable API change). +- Changing rate-limit defaults (documented in changelog). + +--- + +## 5. Deprecation Process + +1. **Announce** — The deprecated surface is marked in the changelog and in this document with a `⚠️ DEPRECATED` notice and a target removal version. +2. **Grace period** — A minimum of **90 days** (or one major release cycle, whichever is longer) before removal. +3. **Response header** — Deprecated endpoints return a `Deprecation: true` header and a `Sunset: ` header (RFC 8594). +4. **Remove** — The surface is removed in the next major version. The upgrade guide in [`docs/upgrade.md`](./upgrade.md) documents the migration path. + +--- + +## 6. Internal and Admin Surfaces + +The following surfaces are **not covered** by the stability guarantees in this document: + +| Surface | Stability | +|---|---| +| `/api/admin/*` | Best-effort; may change in minor versions | +| `/admin/dlq/*` | Best-effort; may change in minor versions | +| `/api/audit/*` | Best-effort; may change in minor versions | +| `/api/rate-limits/*` | Best-effort; may change in minor versions | +| `/__test/*` | No stability guarantee; test-only | +| Environment variable names | Documented in README; may change in minor versions with notice | +| Database schema internals | Internal; only the discriminant values listed in §3.5 are stable | +| Log message text | No stability guarantee | +| Internal TypeScript types not exported in a public module | No stability guarantee | + +--- + +## 7. Stability by Surface Summary + +| Surface | Stable Since | Guarantee Level | +|---|---|---| +| Public HTTP routes (§3.1) | `0.1.0` | Breaking change → major bump | +| Stream request/response schema (§3.2) | `0.1.0` | Breaking change → major bump | +| Error codes `ApiErrorCode` (§3.3) | `0.1.0` | Breaking change → major bump | +| Decimal error codes `DecimalErrorCode` (§3.3) | `0.1.0` | Breaking change → major bump | +| `ContractEventRecord` schema (§3.4) | `0.1.0` | Breaking change → major bump | +| `StreamEvent` types and discriminants (§3.4) | `0.1.0` | Breaking change → major bump | +| WebSocket event names (§3.4) | `0.1.0` | Breaking change → major bump | +| `StreamStatus` values and state machine (§3.5) | `0.1.0` | Breaking change → major bump | +| `IndexerDependencyState` values (§3.5) | `0.1.0` | Breaking change → major bump | +| `IndexerStoreKind` values (§3.5) | `0.1.0` | Breaking change → major bump | +| Stream ID format (§3.5) | `0.1.0` | Breaking change → major bump | +| Decimal-string serialization rules (§3.6) | `0.1.0` | Breaking change → major bump | +| Idempotency contract (§3.7) | `0.1.0` | Breaking change → major bump | +| Webhook signature algorithm (§3.8) | `0.1.0` | Breaking change → major bump | +| Admin/internal routes (§6) | — | Best-effort | + +--- + +*See [`docs/upgrade.md`](./upgrade.md) for migration guides between versions.* diff --git a/docs/upgrade.md b/docs/upgrade.md new file mode 100644 index 0000000..74263b3 --- /dev/null +++ b/docs/upgrade.md @@ -0,0 +1,182 @@ +# Upgrade Guide + +This document provides migration instructions for every version that introduces breaking changes to the [ABI stability contract](./ABI_STABILITY.md). + +--- + +## Table of Contents + +- [How to Read This Guide](#how-to-read-this-guide) +- [Unreleased / `main`](#unreleased--main) +- [v0.1.0 — Initial stable surface](#v010--initial-stable-surface) +- [Checklist for Integrators](#checklist-for-integrators) + +--- + +## How to Read This Guide + +Each section covers one version and is structured as: + +1. **What changed** — a concise description of the breaking change. +2. **Who is affected** — which integrator type (indexer, wallet, webhook consumer, operator). +3. **Migration steps** — the exact code or configuration change required. +4. **Rollback** — how to revert if the migration causes issues. + +If a change is non-breaking it will not appear here. Refer to the [CHANGELOG](../CHANGES_DETAILED.md) for the full list of changes per release. + +--- + +## Unreleased / `main` + +No breaking changes pending. + +--- + +## v0.1.0 — Initial stable surface + +**Released:** 2026-04-23 + +This is the first release that defines a stable ABI. There are no migrations from a prior version. All surfaces described in [`docs/ABI_STABILITY.md`](./ABI_STABILITY.md) are stable from this point forward. + +### What integrators should do before going to production + +#### 1. Pin to `error.code`, not `error.message` + +Error messages are human-readable prose and will be reworded without notice. Your error-handling code must branch on `error.code`: + +```typescript +// ✅ Correct — stable +if (response.error.code === 'NOT_FOUND') { ... } + +// ❌ Wrong — will break on any patch release +if (response.error.message.includes('not found')) { ... } +``` + +#### 2. Treat amount fields as strings everywhere + +All monetary amounts are decimal strings. Never coerce them to `number` before storing or comparing: + +```typescript +// ✅ Correct +const rate: string = stream.ratePerSecond; // "0.0000116" + +// ❌ Wrong — loses precision for values > 2^53 +const rate: number = parseFloat(stream.ratePerSecond); +``` + +#### 3. Switch on `event.type` for stream events + +The `StreamEvent` union is discriminated by the `type` field. Always handle the exhaustive set and include a default branch for forward compatibility: + +```typescript +switch (event.type) { + case 'StreamCreated': handleCreated(event); break; + case 'StreamUpdated': handleUpdated(event); break; + case 'StreamCancelled': handleCancelled(event); break; + default: + // A future minor release may add new event types. + // Log and ignore rather than throwing. + console.warn('Unknown stream event type:', (event as any).type); +} +``` + +#### 4. Switch on `event` for WebSocket messages + +```typescript +ws.on('message', (raw) => { + const msg = JSON.parse(raw); + switch (msg.event) { + case 'stream.created': ...; break; + case 'stream.updated': ...; break; + case 'stream.cancelled': ...; break; + case 'service.degraded': ...; break; + default: + // Forward-compatible: ignore unknown event names + break; + } +}); +``` + +#### 5. Handle new optional response fields gracefully + +Non-breaking releases may add new optional fields to response objects. Use optional chaining and do not fail if an unexpected field is present: + +```typescript +// ✅ Correct — ignores unknown fields +const { id, sender, recipient, depositAmount, ratePerSecond, status } = stream; + +// ❌ Wrong — strict schema validation will reject new optional fields +assert(Object.keys(stream).length === 8); +``` + +#### 6. Store and forward stream IDs as opaque strings + +The stream ID format (`stream-{txHash}-{eventIndex}`) is stable, but treat it as an opaque string in your storage layer. Do not parse the components for business logic — use the dedicated fields (`transactionHash`, `eventIndex`) from the event payload instead. + +#### 7. Validate `StreamStatus` with a known-values check + +The state machine is stable, but new status values may be added in minor releases. Guard against unknown statuses: + +```typescript +const KNOWN_STATUSES = new Set(['active', 'paused', 'completed', 'cancelled']); + +function isKnownStatus(s: string): s is StreamStatus { + return KNOWN_STATUSES.has(s); +} + +// Handle unknown status gracefully rather than throwing +if (!isKnownStatus(stream.status)) { + console.warn('Unrecognised stream status:', stream.status); +} +``` + +#### 8. Indexer: validate `eventId` uniqueness before submitting + +The ingest endpoint rejects batches containing duplicate `eventId` values with `409 CONFLICT`. Deduplicate within each batch before submission: + +```typescript +const uniqueEvents = [...new Map(events.map(e => [e.eventId, e])).values()]; +``` + +#### 9. Indexer: handle reorg signals in health + +When `GET /health` returns `indexer.reorgDetected === true`, treat all stream state as potentially stale until `reorgDetected` returns to `false`. Do not surface chain-derived data to end users during this window. + +#### 10. Webhook consumers: verify signatures before processing + +```typescript +import { verifyWebhookSignature } from './src/webhooks/signature.js'; + +const result = verifyWebhookSignature({ + secret: process.env.FLUXORA_WEBHOOK_SECRET, + deliveryId: req.header('x-fluxora-delivery-id'), + timestamp: req.header('x-fluxora-timestamp'), + signature: req.header('x-fluxora-signature'), + rawBody, + isDuplicateDelivery: (id) => seenIds.has(id), +}); + +if (!result.ok) { + return res.status(result.status).json({ error: result.code }); +} +``` + +--- + +## Checklist for Integrators + +Use this checklist when upgrading to any new Fluxora version: + +- [ ] Read the relevant section of this guide for the target version. +- [ ] Check [`docs/ABI_STABILITY.md`](./ABI_STABILITY.md) for any surfaces marked `⚠️ DEPRECATED`. +- [ ] Search your codebase for any use of `error.message` in conditional logic — replace with `error.code`. +- [ ] Search your codebase for `parseFloat` or `Number()` applied to amount fields — replace with string handling. +- [ ] Confirm your `StreamEvent` switch statement has a `default` branch. +- [ ] Confirm your WebSocket message handler has a `default` branch. +- [ ] Confirm your `StreamStatus` handling is forward-compatible with unknown values. +- [ ] Run your integration test suite against the new version before promoting to production. +- [ ] If you maintain a local copy of the OpenAPI spec, regenerate your client from `openapi.yaml`. + +--- + +*For the full list of stable surfaces and what counts as a breaking change, see [`docs/ABI_STABILITY.md`](./ABI_STABILITY.md).* diff --git a/openapi.yaml b/openapi.yaml index d110fea..40dd802 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -198,12 +198,26 @@ paths: in: header required: true description: | - Unique identifier for this request. Used to prevent duplicate submissions. - Must be a UUID or similar unique value. Same key with identical request body - will return the same response (idempotent). + Unique key that makes this request idempotent. Clients MUST supply a + fresh value per logical operation and MUST reuse the same value when + retrying after a network failure. + + **Format**: 1–128 characters; allowed charset: `[A-Za-z0-9:_-]`. + UUID v4 is recommended but any value matching the charset is accepted. + + **Behaviour**: + - Same key + same body → 201 (cached response, `Idempotency-Replayed: true`) + - Same key + different body → 409 CONFLICT + - Missing or malformed key → 400 VALIDATION_ERROR + + **Security**: The key value is never echoed in error response bodies + or server logs to prevent accidental secret leakage. schema: type: string - format: uuid + minLength: 1 + maxLength: 128 + pattern: '^[A-Za-z0-9:_-]+$' + example: '550e8400-e29b-41d4-a716-446655440000' requestBody: required: true content: @@ -212,13 +226,33 @@ paths: $ref: '#/components/schemas/CreateStreamRequest' responses: '201': - description: Stream created successfully + description: | + Stream created (or replayed from cache). + Check `Idempotency-Replayed` response header and `meta.idempotencyReplayed` + to distinguish a fresh creation from a cached replay. + headers: + Idempotency-Key: + description: Echoes the key supplied by the client. + schema: + type: string + Idempotency-Replayed: + description: | + `true` when the response was served from the idempotency cache; + `false` on first creation. + schema: + type: string + enum: ['true', 'false'] content: application/json: schema: $ref: '#/components/schemas/Stream' '400': - description: Invalid request (malformed JSON, invalid address format, etc.) + description: | + Invalid request. Common causes: + - Missing or malformed `Idempotency-Key` header + - Invalid JSON body + - Invalid Stellar address format + - Non-string amount field (must be decimal string) content: application/json: schema: @@ -231,8 +265,9 @@ paths: $ref: '#/components/schemas/ErrorResponse' '409': description: | - Duplicate submission detected. Same Idempotency-Key with different request body. - Retry with same body or use new Idempotency-Key. + Idempotency-Key collision — the same key was previously used with a + different request body. Use a new Idempotency-Key or retry with the + original body. content: application/json: schema: @@ -254,7 +289,7 @@ paths: schema: $ref: '#/components/schemas/ErrorResponse' '503': - description: Service unavailable (database, Stellar RPC, or workers down) + description: Service unavailable (database, Stellar RPC, or idempotency store down) content: application/json: schema: diff --git a/src/middleware/requestProtection.ts b/src/middleware/requestProtection.ts index 85f8240..ad2f817 100644 --- a/src/middleware/requestProtection.ts +++ b/src/middleware/requestProtection.ts @@ -5,19 +5,35 @@ * 1. Body size enforcement — Content-Length fast path + raw stream byte counting * 2. JSON depth validation — applied after express.json() * 3. Request timeout protection + * 4. Idempotency-Key header validation — format + character-set enforcement * - * All 413 responses use the same { error: { code, message } } envelope as the + * All error responses use the same { error: { code, message } } envelope as the * rest of the app (via ApiError / errorHandler). * * Wire-up order in app.ts: * app.use(bodySizeLimitMiddleware) ← before express.json() * app.use(express.json(...)) * app.use(jsonDepthMiddleware) ← after express.json() + * + * Idempotency-Key rules (RFC-aligned): + * - Required on POST /api/streams (enforced at route level via requireIdempotencyKey) + * - 1–128 characters + * - Allowed charset: A-Z a-z 0-9 : _ - + * - Keys are treated as opaque strings; UUID format is recommended but not required */ import type { Request, Response, NextFunction } from 'express'; import { ApiErrorCode, payloadTooLarge, validationError } from './errorHandler.js'; +// ── Idempotency-Key constants ───────────────────────────────────────────────── + +/** Minimum and maximum byte length for an Idempotency-Key value. */ +export const IDEMPOTENCY_KEY_MIN_LENGTH = 1; +export const IDEMPOTENCY_KEY_MAX_LENGTH = 128; + +/** Allowed characters: alphanumeric, colon, underscore, hyphen. */ +export const IDEMPOTENCY_KEY_REGEX = /^[A-Za-z0-9:_-]+$/; + /** 256 KiB — matches the webhook contract and express.json limit. */ export const BODY_LIMIT_BYTES = 256 * 1024; @@ -87,6 +103,60 @@ function checkDepth(value: unknown, max: number, current: number): void { } } +// ── Idempotency-Key validation ──────────────────────────────────────────────── + +/** + * Parse and validate an Idempotency-Key header value. + * + * Returns the trimmed key on success, or throws an ApiError (400) on failure. + * This is a pure helper — it does NOT read from req directly so it can be + * unit-tested without an Express context. + */ +export function parseIdempotencyKeyHeader(headerValue: unknown): string { + if (Array.isArray(headerValue) || typeof headerValue !== 'string') { + throw validationError( + 'Idempotency-Key header is required and must be a single string value', + ); + } + const trimmed = headerValue.trim(); + if (trimmed.length < IDEMPOTENCY_KEY_MIN_LENGTH || trimmed.length > IDEMPOTENCY_KEY_MAX_LENGTH) { + throw validationError( + `Idempotency-Key must be between ${IDEMPOTENCY_KEY_MIN_LENGTH} and ${IDEMPOTENCY_KEY_MAX_LENGTH} characters`, + ); + } + if (!IDEMPOTENCY_KEY_REGEX.test(trimmed)) { + throw validationError( + 'Idempotency-Key must contain only letters, digits, colon, underscore, or hyphen', + ); + } + return trimmed; +} + +/** + * Express middleware that enforces the presence and format of the + * Idempotency-Key header on the current route. + * + * Usage — apply directly to any route that requires idempotency: + * + * router.post('/', requireIdempotencyKey, asyncHandler(async (req, res) => { … })) + * + * On success the validated key is attached to `res.locals.idempotencyKey` + * so downstream handlers can read it without re-parsing. + */ +export function requireIdempotencyKey( + req: Request, + res: Response, + next: NextFunction, +): void { + try { + const key = parseIdempotencyKeyHeader(req.headers['idempotency-key']); + res.locals['idempotencyKey'] = key; + next(); + } catch (err) { + next(err); + } +} + /** * Enforce a socket-level request timeout. * Responds 408 if the socket is idle for longer than timeoutMs. diff --git a/src/routes/dlq.ts b/src/routes/dlq.ts index b62275c..9f2b9e3 100644 --- a/src/routes/dlq.ts +++ b/src/routes/dlq.ts @@ -82,6 +82,7 @@ import { authenticate, requireAuth } from '../middleware/auth.js'; import { asyncHandler, validationError } from '../middleware/errorHandler.js'; import { info, warn } from '../utils/logger.js'; import { recordAuditEvent } from '../lib/auditLog.js'; +import { successResponse, errorResponse } from '../utils/response.js'; /** Shape of a dead-letter entry */ export interface DlqEntry { @@ -188,7 +189,7 @@ dlqRouter.get( { total: filtered.length, returned: page.length, offset, limit, topicFilter } ); - res.json({ + res.json(successResponse({ entries: page, total: filtered.length, limit, diff --git a/src/routes/streams.ts b/src/routes/streams.ts index a55c0aa..2029de3 100644 --- a/src/routes/streams.ts +++ b/src/routes/streams.ts @@ -16,16 +16,32 @@ * - Public internet clients: may list and read streams without authentication. * - Authenticated partners: may create and cancel streams with valid JWT. * + * Idempotency + * ----------- + * POST /api/streams requires an Idempotency-Key header (1–128 chars, + * [A-Za-z0-9:_-]). The key is validated by requireIdempotencyKey middleware + * before the handler runs. A SHA-256 fingerprint of the normalised request + * body is stored alongside the cached response so that: + * - Same key + same body → 201 replay (Idempotency-Replayed: true) + * - Same key + diff body → 409 CONFLICT + * - Missing / bad key → 400 VALIDATION_ERROR + * + * The idempotency store is in-memory (Map) for this iteration. In production + * it should be backed by Redis with a 24-hour TTL. + * * Failure modes * ------------- - * - Invalid decimal string → 400 VALIDATION_ERROR - * - Missing required field → 400 VALIDATION_ERROR - * - Missing authentication → 401 UNAUTHORIZED - * - Invalid token → 401 UNAUTHORIZED - * - Stream not found → 404 NOT_FOUND - * - Duplicate cancel → 409 CONFLICT - * - DB unavailable → 503 SERVICE_UNAVAILABLE - * - Idempotency store down → 503 SERVICE_UNAVAILABLE + * - Missing Idempotency-Key → 400 VALIDATION_ERROR + * - Invalid Idempotency-Key → 400 VALIDATION_ERROR + * - Invalid decimal string → 400 VALIDATION_ERROR + * - Missing required field → 400 VALIDATION_ERROR + * - Missing authentication → 401 UNAUTHORIZED + * - Invalid token → 401 UNAUTHORIZED + * - Stream not found → 404 NOT_FOUND + * - Key reuse / diff payload → 409 CONFLICT + * - Duplicate cancel → 409 CONFLICT + * - DB unavailable → 503 SERVICE_UNAVAILABLE + * - Idempotency store down → 503 SERVICE_UNAVAILABLE * * @module routes/streams */ @@ -44,10 +60,11 @@ import { serviceUnavailable, asyncHandler, } from '../middleware/errorHandler.js'; +import { requireIdempotencyKey } from '../middleware/requestProtection.js'; import { SerializationLogger, info, debug, warn } from '../utils/logger.js'; import { recordAuditEvent } from '../lib/auditLog.js'; import { authenticate, requireAuth } from '../middleware/auth.js'; -import { successResponse } from '../utils/response.js'; +import { successResponse, idempotentReplayResponse } from '../utils/response.js'; import { streamRepository } from '../db/repositories/streamRepository.js'; import { PoolExhaustedError } from '../db/pool.js'; import { @@ -101,6 +118,14 @@ const idempotencyDependency = { state: 'healthy' as DependencyState }; // In-memory idempotency store (Redis-backed in production; sufficient for now) const idempotencyStore = new Map(); +/** + * Legacy shim — audit.test.ts and streams.test.ts reference this array. + * The DB-backed implementation no longer uses it for storage; it is kept + * as an empty array so existing test imports do not break. + * @deprecated Use streamRepository directly. + */ +export const streams: Stream[] = []; + export function setStreamListingDependencyState(state: DependencyState): void { streamListingDependency.state = state; } @@ -188,20 +213,6 @@ function parseIncludeTotal(includeTotalParam: unknown): boolean { throw validationError('include_total must be true or false'); } -function parseIdempotencyKey(headerValue: unknown): string { - if (Array.isArray(headerValue) || typeof headerValue !== 'string') { - throw validationError('Idempotency-Key header is required for unsafe POST operations'); - } - const trimmed = headerValue.trim(); - if (trimmed.length < 1 || trimmed.length > 128) { - throw validationError('Idempotency-Key header must be between 1 and 128 characters'); - } - if (!/^[A-Za-z0-9:_-]+$/.test(trimmed)) { - throw validationError('Idempotency-Key header must use only letters, numbers, colon, underscore, or hyphen'); - } - return trimmed; -} - // ── Body normaliser ─────────────────────────────────────────────────────────── function normalizeCreateInput(body: Record): NormalizedCreateInput { @@ -384,22 +395,33 @@ streamsRouter.get( /** * POST /api/streams - * Create a new stream. Requires authentication. + * Create a new stream. Requires authentication + Idempotency-Key header. + * + * Idempotency-Key is validated by requireIdempotencyKey before this handler + * runs; the validated key is available on res.locals.idempotencyKey. */ streamsRouter.post( '/', authenticate, requireAuth, + requireIdempotencyKey, asyncHandler(async (req: Request, res: Response) => { const requestId = (req as any).id as string | undefined; - const idempotencyKey = parseIdempotencyKey(req.header('Idempotency-Key')); + const correlationId = (req as any).correlationId as string | undefined; + // Key already validated and trimmed by requireIdempotencyKey middleware + const idempotencyKey = res.locals['idempotencyKey'] as string; if (idempotencyDependency.state !== 'healthy') { - warn('Idempotency dependency unavailable', { dependency: 'idempotency-store', requestId, idempotencyKey }); + warn('Idempotency dependency unavailable', { + dependency: 'idempotency-store', + requestId, + // Never log the key value at warn/error level — it could be a secret + idempotencyKeyLength: idempotencyKey.length, + }); throw serviceUnavailable('Idempotency processing is temporarily unavailable. Retry after dependency health is restored.'); } - info('Creating new stream', { requestId, idempotencyKey }); + info('Creating new stream', { requestId, correlationId }); let normalizedInput: NormalizedCreateInput; try { @@ -422,17 +444,29 @@ streamsRouter.post( if (existingResponse) { if (existingResponse.requestFingerprint !== requestFingerprint) { + // Log the decision without leaking the key value itself + warn('Idempotency-Key reused with different payload', { + requestId, + correlationId, + idempotencyKeyLength: idempotencyKey.length, + action: 'conflict', + }); throw new ApiError( ApiErrorCode.CONFLICT, 'Idempotency-Key has already been used for a different request payload', 409, - { idempotencyKey }, + { hint: 'Use a new Idempotency-Key or retry with the original request body' }, ); } - info('Replaying idempotent stream creation', { requestId, idempotencyKey, streamId: existingResponse.body.id }); + info('Replaying idempotent stream creation', { + requestId, + correlationId, + streamId: existingResponse.body.id, + action: 'replay', + }); res.set('Idempotency-Key', idempotencyKey); res.set('Idempotency-Replayed', 'true'); - res.status(existingResponse.statusCode).json(successResponse(existingResponse.body, requestId)); + res.status(existingResponse.statusCode).json(idempotentReplayResponse(existingResponse.body, requestId)); return; } @@ -467,8 +501,8 @@ streamsRouter.post( idempotencyStore.set(idempotencyKey, { requestFingerprint, statusCode: 201, body: stream }); SerializationLogger.amountSerialized(2, requestId); - info('Stream created', { id: stream.id, requestId, idempotencyKey }); - recordAuditEvent('STREAM_CREATED', 'stream', stream.id, (req as any).correlationId, { + info('Stream created', { id: stream.id, requestId, correlationId, action: 'created' }); + recordAuditEvent('STREAM_CREATED', 'stream', stream.id, correlationId, { depositAmount: normalizedInput.depositAmount, ratePerSecond: normalizedInput.ratePerSecond, sender: normalizedInput.sender, diff --git a/src/utils/response.ts b/src/utils/response.ts index 7782ee0..a93dbc0 100644 --- a/src/utils/response.ts +++ b/src/utils/response.ts @@ -15,6 +15,8 @@ export interface ResponseMeta { timestamp: string; /** Opaque request identifier for log correlation */ requestId?: string; + /** Present on idempotent replays — true when the response was served from cache */ + idempotencyReplayed?: boolean; } export interface SuccessEnvelope { @@ -49,6 +51,25 @@ export function successResponse(data: T, requestId?: string): SuccessEnvelope }; } +/** + * Build a success envelope for an idempotent replay response. + * + * Identical to successResponse but stamps `meta.idempotencyReplayed = true` + * so clients can distinguish a fresh creation from a cached replay without + * inspecting the Idempotency-Replayed response header. + */ +export function idempotentReplayResponse(data: T, requestId?: string): SuccessEnvelope { + return { + success: true, + data, + meta: { + timestamp: new Date().toISOString(), + ...(requestId ? { requestId } : {}), + idempotencyReplayed: true, + }, + }; +} + /** * Build an error envelope. */ diff --git a/tests/routes/streams.test.ts b/tests/routes/streams.test.ts index 3e1fcca..2155121 100644 --- a/tests/routes/streams.test.ts +++ b/tests/routes/streams.test.ts @@ -4,31 +4,41 @@ * The PostgreSQL repository is fully mocked so no real database is required. * Tests cover all routes, validation, idempotency, state-machine transitions, * and error envelopes. + * + * Idempotency coverage (§ "replay/collision tests"): + * - Missing Idempotency-Key → 400 + * - Malformed key (bad chars, too long) → 400 + * - First creation → 201, Idempotency-Replayed: false + * - Replay (same key + same body) → 201, Idempotency-Replayed: true, meta.idempotencyReplayed: true + * - Collision (same key + different body) → 409 CONFLICT, no key leaked in body + * - Independent keys → independent streams + * - Idempotency store unavailable → 503 + * - DB pool exhausted during upsert → 503 (key NOT stored) + * - Decimal-string precision preserved through replay */ import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import request from 'supertest'; // ── Mock the repository before importing the app ────────────────────────────── -const mockGetById = vi.fn(); -const mockUpsertStream = vi.fn(); -const mockUpdateStream = vi.fn(); -const mockFindWithCursor = vi.fn(); +const mockGetById = vi.fn(); +const mockUpsertStream = vi.fn(); +const mockUpdateStream = vi.fn(); +const mockFindWithCursor = vi.fn(); vi.mock('../../src/db/repositories/streamRepository.js', () => ({ streamRepository: { - getById: (...a: unknown[]) => mockGetById(...a), - upsertStream: (...a: unknown[]) => mockUpsertStream(...a), - updateStream: (...a: unknown[]) => mockUpdateStream(...a), - findWithCursor: (...a: unknown[]) => mockFindWithCursor(...a), - countByStatus: vi.fn().mockResolvedValue({ active: 0, paused: 0, completed: 0, cancelled: 0 }), + getById: (...a: unknown[]) => mockGetById(...a), + upsertStream: (...a: unknown[]) => mockUpsertStream(...a), + updateStream: (...a: unknown[]) => mockUpdateStream(...a), + findWithCursor: (...a: unknown[]) => mockFindWithCursor(...a), + countByStatus: vi.fn().mockResolvedValue({ active: 0, paused: 0, completed: 0, cancelled: 0 }), }, })); -// Mock pool so PoolExhaustedError is importable vi.mock('../../src/db/pool.js', () => ({ - getPool: vi.fn(() => ({})), - query: vi.fn(), - PoolExhaustedError: class PoolExhaustedError extends Error { + getPool: vi.fn(() => ({})), + query: vi.fn(), + PoolExhaustedError: class PoolExhaustedError extends Error { constructor() { super('pool exhausted'); this.name = 'PoolExhaustedError'; } }, DuplicateEntryError: class DuplicateEntryError extends Error { @@ -37,13 +47,23 @@ vi.mock('../../src/db/pool.js', () => ({ })); import { createApp } from '../../src/app.js'; -import { _resetStreams, setStreamListingDependencyState, setIdempotencyDependencyState } from '../../src/routes/streams.js'; +import { + _resetStreams, + setStreamListingDependencyState, + setIdempotencyDependencyState, +} from '../../src/routes/streams.js'; +import { initializeConfig } from '../../src/config/env.js'; +import { generateToken } from '../../src/lib/auth.js'; + +// Initialize config before importing anything that needs it +initializeConfig(); + +// ── Constants ───────────────────────────────────────────────────────────────── const VALID_SENDER = 'GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7'; const VALID_RECIPIENT = 'GBDEVU63Y6NTHJQQZIKVTC23NWLQVP3WJ2RI2OTSJTNYOIGICST6DUXR'; -const INVALID_KEY_SHORT = 'GABC123'; -const INVALID_KEY_WRONG_PREFIX = 'AAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7'; -const INVALID_KEY_INVALID_CHARS = 'G1111111111111111111111111111111111111111111111111111111'; + +const TEST_TOKEN = generateToken({ address: VALID_SENDER, role: 'operator' }); const app = createApp(); @@ -77,6 +97,22 @@ const validBody = { ratePerSecond: '10', }; +// ── Helpers ─────────────────────────────────────────────────────────────────── + +let _keyCounter = 0; +function uniqueKey(prefix = 'key'): string { + return `${prefix}-${++_keyCounter}`; +} + +function post(body: Record, key?: string) { + const req = request(app) + .post('/api/streams') + .set('Authorization', `Bearer ${TEST_TOKEN}`) + .send(body); + if (key !== undefined) req.set('Idempotency-Key', key); + return req; +} + // ── Setup ───────────────────────────────────────────────────────────────────── describe('streams routes', () => { @@ -86,7 +122,6 @@ describe('streams routes', () => { setStreamListingDependencyState('healthy'); setIdempotencyDependencyState('healthy'); - // Default happy-path mocks mockFindWithCursor.mockResolvedValue({ streams: [], hasMore: false }); mockGetById.mockResolvedValue(undefined); mockUpsertStream.mockResolvedValue({ created: true, stream: makeDbRecord() }); @@ -101,7 +136,7 @@ describe('streams routes', () => { vi.restoreAllMocks(); }); - // ── GET /api/streams ──────────────────────────────────────────────────────── + // ── GET /api/streams ────────────────────────────────────────────────────── describe('GET /api/streams', () => { it('returns an empty list when no streams exist', async () => { @@ -113,27 +148,18 @@ describe('streams routes', () => { }); it('returns mapped streams from the repository', async () => { - mockFindWithCursor.mockResolvedValue({ - streams: [makeDbRecord()], - hasMore: false, - }); - + mockFindWithCursor.mockResolvedValue({ streams: [makeDbRecord()], hasMore: false }); const res = await request(app).get('/api/streams'); expect(res.status).toBe(200); expect(res.body.data.streams).toHaveLength(1); const s = res.body.data.streams[0]; expect(s.sender).toBe(VALID_SENDER); - expect(s.recipient).toBe(VALID_RECIPIENT); expect(s.depositAmount).toBe('1000'); expect(s.ratePerSecond).toBe('10'); }); it('includes next_cursor when hasMore=true', async () => { - mockFindWithCursor.mockResolvedValue({ - streams: [makeDbRecord({ id: 'stream-abc-0' })], - hasMore: true, - }); - + mockFindWithCursor.mockResolvedValue({ streams: [makeDbRecord({ id: 'stream-abc-0' })], hasMore: true }); const res = await request(app).get('/api/streams?limit=1'); expect(res.status).toBe(200); expect(res.body.data.next_cursor).toBeDefined(); @@ -142,7 +168,6 @@ describe('streams routes', () => { it('includes total when include_total=true', async () => { mockFindWithCursor.mockResolvedValue({ streams: [], hasMore: false, total: 42 }); - const res = await request(app).get('/api/streams?include_total=true'); expect(res.status).toBe(200); expect(res.body.data.total).toBe(42); @@ -155,18 +180,15 @@ describe('streams routes', () => { }); it('rejects limit > 100', async () => { - const res = await request(app).get('/api/streams?limit=101'); - expect(res.status).toBe(400); + expect((await request(app).get('/api/streams?limit=101')).status).toBe(400); }); it('rejects invalid cursor', async () => { - const res = await request(app).get('/api/streams?cursor=!!!invalid!!!'); - expect(res.status).toBe(400); + expect((await request(app).get('/api/streams?cursor=!!!invalid!!!')).status).toBe(400); }); it('rejects invalid include_total value', async () => { - const res = await request(app).get('/api/streams?include_total=maybe'); - expect(res.status).toBe(400); + expect((await request(app).get('/api/streams?include_total=maybe')).status).toBe(400); }); it('returns 503 when listing dependency is unavailable', async () => { @@ -179,28 +201,20 @@ describe('streams routes', () => { it('returns 503 when pool is exhausted', async () => { const { PoolExhaustedError } = await import('../../src/db/pool.js'); mockFindWithCursor.mockRejectedValue(new PoolExhaustedError()); - - const res = await request(app).get('/api/streams'); - expect(res.status).toBe(503); + expect((await request(app).get('/api/streams')).status).toBe(503); }); - it('accepts a valid cursor and passes afterId to repository', async () => { + it('passes afterId to repository from a valid cursor', async () => { mockFindWithCursor.mockResolvedValue({ streams: [], hasMore: false }); - - // Build a valid cursor const cursor = Buffer.from(JSON.stringify({ v: 1, lastId: 'stream-abc-0' })).toString('base64url'); - const res = await request(app).get(`/api/streams?cursor=${cursor}`); - expect(res.status).toBe(200); + await request(app).get(`/api/streams?cursor=${cursor}`); expect(mockFindWithCursor).toHaveBeenCalledWith( - expect.anything(), - expect.any(Number), - 'stream-abc-0', - expect.any(Boolean), + expect.anything(), expect.any(Number), 'stream-abc-0', expect.any(Boolean), ); }); }); - // ── GET /api/streams/:id ──────────────────────────────────────────────────── + // ── GET /api/streams/:id ────────────────────────────────────────────────── describe('GET /api/streams/:id', () => { it('returns 404 for a non-existent stream', async () => { @@ -208,34 +222,21 @@ describe('streams routes', () => { const res = await request(app).get('/api/streams/stream-nonexistent'); expect(res.status).toBe(404); expect(res.body.success).toBe(false); - expect(res.body.error.message).toContain('Stream'); }); it('returns the stream when found', async () => { mockGetById.mockResolvedValue(makeDbRecord({ id: 'stream-abc-0' })); - const res = await request(app).get('/api/streams/stream-abc-0'); expect(res.status).toBe(200); - expect(res.body.success).toBe(true); expect(res.body.data.stream.id).toBe('stream-abc-0'); - expect(res.body.data.stream.sender).toBe(VALID_SENDER); expect(res.body.data.stream.depositAmount).toBe('1000'); }); it('maps DB snake_case to API camelCase', async () => { mockGetById.mockResolvedValue(makeDbRecord({ - sender_address: VALID_SENDER, - recipient_address: VALID_RECIPIENT, - amount: '500', - rate_per_second: '5', - start_time: 1700000000, - end_time: 1800000000, + amount: '500', rate_per_second: '5', start_time: 1700000000, end_time: 1800000000, })); - - const res = await request(app).get('/api/streams/stream-abc-0'); - const s = res.body.data.stream; - expect(s.sender).toBe(VALID_SENDER); - expect(s.recipient).toBe(VALID_RECIPIENT); + const s = (await request(app).get('/api/streams/stream-abc-0')).body.data.stream; expect(s.depositAmount).toBe('500'); expect(s.ratePerSecond).toBe('5'); expect(s.startTime).toBe(1700000000); @@ -245,276 +246,375 @@ describe('streams routes', () => { it('returns 503 when pool is exhausted', async () => { const { PoolExhaustedError } = await import('../../src/db/pool.js'); mockGetById.mockRejectedValue(new PoolExhaustedError()); - - const res = await request(app).get('/api/streams/stream-x'); - expect(res.status).toBe(503); + expect((await request(app).get('/api/streams/stream-x')).status).toBe(503); }); }); - // ── POST /api/streams ─────────────────────────────────────────────────────── - describe('POST /api/streams', () => { - it('creates a stream with valid input', async () => { - const res = await request(app) - .post('/api/streams') - .send(validBody); + // ── POST /api/streams — idempotency ────────────────────────────────────── + + describe('POST /api/streams — idempotency', () => { + + // ── Missing / malformed key ───────────────────────────────────────────── + + it('returns 400 when Idempotency-Key header is absent', async () => { + const res = await post(validBody); // no key + expect(res.status).toBe(400); + expect(res.body.success).toBe(false); + expect(res.body.error.code).toBe('VALIDATION_ERROR'); + expect(res.body.error.message).toMatch(/Idempotency-Key/); + }); + + it('returns 400 when Idempotency-Key is an empty string', async () => { + const res = await post(validBody, ''); + expect(res.status).toBe(400); + expect(res.body.error.code).toBe('VALIDATION_ERROR'); + }); + + it('returns 400 when Idempotency-Key exceeds 128 characters', async () => { + const res = await post(validBody, 'a'.repeat(129)); + expect(res.status).toBe(400); + expect(res.body.error.code).toBe('VALIDATION_ERROR'); + }); + it('accepts an Idempotency-Key of exactly 128 characters', async () => { + const res = await post(validBody, 'a'.repeat(128)); expect(res.status).toBe(201); - expect(res.body.success).toBe(true); - expect(res.body.data.sender).toBe(VALID_SENDER); - expect(res.body.data.recipient).toBe(VALID_RECIPIENT); - expect(res.body.data.depositAmount).toBe('1000'); - expect(res.body.data.ratePerSecond).toBe('10'); - expect(res.body.data.status).toBe('active'); - expect(res.body.data.id).toMatch(/^stream-/); }); - it('sets Idempotency-Replayed: false on first creation', async () => { - const res = await request(app) - .post('/api/streams') - .set('Idempotency-Key', 'key-001') - .send(validBody); + it('accepts an Idempotency-Key of exactly 1 character', async () => { + const res = await post(validBody, 'z'); + expect(res.status).toBe(201); + }); + + it('returns 400 when Idempotency-Key contains disallowed characters (space)', async () => { + const res = await post(validBody, 'key with spaces'); + expect(res.status).toBe(400); + expect(res.body.error.code).toBe('VALIDATION_ERROR'); + }); + + it('returns 400 when Idempotency-Key contains disallowed characters (slash)', async () => { + const res = await post(validBody, 'key/slash'); + expect(res.status).toBe(400); + expect(res.body.error.code).toBe('VALIDATION_ERROR'); + }); + it('accepts keys with allowed special characters (colon, underscore, hyphen)', async () => { + const res = await post(validBody, 'my-key_v1:2024'); + expect(res.status).toBe(201); + }); + + it('accepts a UUID-formatted key', async () => { + const res = await post(validBody, '550e8400-e29b-41d4-a716-446655440000'); expect(res.status).toBe(201); - expect(res.headers['idempotency-replayed']).toBe('false'); }); - it('replays idempotent request with same key and body', async () => { - await request(app) - .post('/api/streams') - .set('Idempotency-Key', 'key-replay') - .send(validBody); + // ── First creation ────────────────────────────────────────────────────── - const res2 = await request(app) - .post('/api/streams') - .set('Idempotency-Key', 'key-replay') - .send(validBody); + it('returns 201 on first creation with Idempotency-Replayed: false', async () => { + const res = await post(validBody, uniqueKey()); + expect(res.status).toBe(201); + expect(res.headers['idempotency-replayed']).toBe('false'); + expect(res.body.success).toBe(true); + expect(res.body.meta.idempotencyReplayed).toBeUndefined(); + }); + + it('echoes the Idempotency-Key header in the response', async () => { + const key = uniqueKey('echo'); + const res = await post(validBody, key); + expect(res.status).toBe(201); + expect(res.headers['idempotency-key']).toBe(key); + }); - expect(res2.status).toBe(201); - expect(res2.headers['idempotency-replayed']).toBe('true'); - // Repository should only be called once + it('calls upsertStream exactly once on first creation', async () => { + await post(validBody, uniqueKey()); expect(mockUpsertStream).toHaveBeenCalledTimes(1); }); - it('returns 409 when same key is used with different body', async () => { - await request(app) - .post('/api/streams') - .set('Idempotency-Key', 'key-conflict') - .send(validBody); + // ── Replay (same key + same body) ─────────────────────────────────────── + + it('returns 201 on replay with Idempotency-Replayed: true', async () => { + const key = uniqueKey('replay'); + await post(validBody, key).expect(201); + const res = await post(validBody, key); + expect(res.status).toBe(201); + expect(res.headers['idempotency-replayed']).toBe('true'); + }); + + it('sets meta.idempotencyReplayed=true on replay response', async () => { + const key = uniqueKey('meta-replay'); + await post(validBody, key).expect(201); + const res = await post(validBody, key).expect(201); + expect(res.body.meta.idempotencyReplayed).toBe(true); + }); - const res2 = await request(app) - .post('/api/streams') - .set('Idempotency-Key', 'key-conflict') - .send({ ...validBody, depositAmount: '9999' }); + it('returns identical data body on replay', async () => { + const key = uniqueKey('data-replay'); + const first = await post(validBody, key).expect(201); + const second = await post(validBody, key).expect(201); + expect(second.body.data).toEqual(first.body.data); + }); - expect(res2.status).toBe(409); - expect(res2.body.error.code).toBe('CONFLICT'); + it('does NOT call upsertStream on replay', async () => { + const key = uniqueKey('no-upsert'); + await post(validBody, key).expect(201); + vi.clearAllMocks(); + await post(validBody, key).expect(201); + expect(mockUpsertStream).not.toHaveBeenCalled(); }); - it('accepts an explicit startTime', async () => { + it('preserves decimal-string precision through replay', async () => { + const preciseBody = { ...validBody, depositAmount: '1000000.0000007', ratePerSecond: '0.0000116' }; mockUpsertStream.mockResolvedValue({ created: true, - stream: makeDbRecord({ start_time: 1700000000 }), + stream: makeDbRecord({ amount: '1000000.0000007', rate_per_second: '0.0000116' }), }); + const key = uniqueKey('decimal-replay'); + await post(preciseBody, key).expect(201); + const res = await post(preciseBody, key).expect(201); + expect(res.body.data.depositAmount).toBe('1000000.0000007'); + expect(res.body.data.ratePerSecond).toBe('0.0000116'); + }); - const res = await request(app) - .post('/api/streams') - .send({ ...validBody, startTime: 1700000000 }); + it('handles concurrent replays safely (sequential simulation)', async () => { + const key = uniqueKey('concurrent'); + await post(validBody, key).expect(201); + const [r1, r2, r3] = await Promise.all([ + post(validBody, key), + post(validBody, key), + post(validBody, key), + ]); + expect(r1.status).toBe(201); + expect(r2.status).toBe(201); + expect(r3.status).toBe(201); + // Only the original upsert call + expect(mockUpsertStream).toHaveBeenCalledTimes(1); + }); - expect(res.status).toBe(201); - expect(res.body.data.startTime).toBe(1700000000); + // ── Collision (same key + different body) ─────────────────────────────── + + it('returns 409 CONFLICT when same key is reused with a different body', async () => { + const key = uniqueKey('conflict'); + await post(validBody, key).expect(201); + const res = await post({ ...validBody, depositAmount: '9999' }, key); + expect(res.status).toBe(409); + expect(res.body.success).toBe(false); + expect(res.body.error.code).toBe('CONFLICT'); }); - it('rejects missing sender', async () => { - const { sender: _, ...body } = validBody; - const res = await request(app).post('/api/streams').send(body); - expect(res.status).toBe(400); - expect(res.body.error.details).toContain('sender is required'); + it('409 conflict body does NOT contain the raw Idempotency-Key value', async () => { + const key = uniqueKey('no-key-leak'); + await post(validBody, key).expect(201); + const res = await post({ ...validBody, depositAmount: '9999' }, key).expect(409); + // The key must not appear anywhere in the serialised response body + expect(JSON.stringify(res.body)).not.toContain(key); }); - it('rejects empty sender', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, sender: '' }); - expect(res.status).toBe(400); - expect(res.body.error.details).toContain('sender must be a valid Stellar public key (G...)'); + it('409 conflict body contains a hint for recovery', async () => { + const key = uniqueKey('hint'); + await post(validBody, key).expect(201); + const res = await post({ ...validBody, depositAmount: '1' }, key).expect(409); + expect(res.body.error.details).toBeDefined(); }); - it('rejects invalid sender - too short', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, sender: INVALID_KEY_SHORT }); - expect(res.status).toBe(400); - expect(res.body.error.details).toContain('sender must be a valid Stellar public key (G...)'); + it('different keys for different bodies create independent streams', async () => { + const key1 = uniqueKey('ind-a'); + const key2 = uniqueKey('ind-b'); + const r1 = await post(validBody, key1).expect(201); + const r2 = await post({ ...validBody, depositAmount: '2000' }, key2).expect(201); + expect(r1.body.data.id).toBeDefined(); + expect(r2.body.data.id).toBeDefined(); + expect(mockUpsertStream).toHaveBeenCalledTimes(2); }); - it('rejects invalid sender - wrong prefix', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, sender: INVALID_KEY_WRONG_PREFIX }); - expect(res.status).toBe(400); - expect(res.body.error.details).toContain('sender must be a valid Stellar public key (G...)'); + it('same body with different keys creates two separate upsert calls', async () => { + await post(validBody, uniqueKey('sep-a')).expect(201); + await post(validBody, uniqueKey('sep-b')).expect(201); + expect(mockUpsertStream).toHaveBeenCalledTimes(2); }); - it('rejects invalid sender - invalid characters', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, sender: INVALID_KEY_INVALID_CHARS }); - expect(res.status).toBe(400); - expect(res.body.error.details).toContain('sender must be a valid Stellar public key (G...)'); + // ── Dependency / infrastructure failures ──────────────────────────────── + + it('returns 503 when idempotency dependency is unavailable', async () => { + setIdempotencyDependencyState('unavailable'); + const res = await post(validBody, uniqueKey()); + expect(res.status).toBe(503); + expect(res.body.error.code).toBe('SERVICE_UNAVAILABLE'); }); - it('rejects invalid sender - generic string', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, sender: 'not-a-stellar-key' }); - expect(res.status).toBe(400); + it('returns 503 when pool is exhausted during upsert', async () => { + const { PoolExhaustedError } = await import('../../src/db/pool.js'); + mockUpsertStream.mockRejectedValue(new PoolExhaustedError()); + const res = await post(validBody, uniqueKey()); + expect(res.status).toBe(503); }); - it('rejects missing recipient', async () => { - const { recipient: _, ...body } = validBody; - const res = await request(app).post('/api/streams').send(body); - expect(res.status).toBe(400); - expect(res.body.error.details).toContain('recipient is required'); + it('does NOT cache the response when upsert fails (no phantom replay)', async () => { + const { PoolExhaustedError } = await import('../../src/db/pool.js'); + const key = uniqueKey('no-cache-on-fail'); + mockUpsertStream.mockRejectedValueOnce(new PoolExhaustedError()); + await post(validBody, key).expect(503); + + // Second attempt with DB healthy — should attempt upsert again, not replay + mockUpsertStream.mockResolvedValueOnce({ created: true, stream: makeDbRecord() }); + const res = await post(validBody, key).expect(201); + expect(res.headers['idempotency-replayed']).toBe('false'); + expect(mockUpsertStream).toHaveBeenCalledTimes(2); + }); + + // ── Security: no secret leakage in logs ───────────────────────────────── + + it('does not log raw Stellar addresses after creation', async () => { + const warnSpy = vi.spyOn(console, 'warn'); + const errorSpy = vi.spyOn(console, 'error'); + await post(validBody, uniqueKey('pii-check')).expect(201); + const allOutput = [ + ...warnSpy.mock.calls, + ...errorSpy.mock.calls, + ].map((c) => String(c[0])).join(' '); + expect(allOutput).not.toContain(VALID_SENDER); + expect(allOutput).not.toContain(VALID_RECIPIENT); + }); + + it('does not log the raw Idempotency-Key value on conflict', async () => { + const key = uniqueKey('key-not-logged'); + const warnSpy = vi.spyOn(console, 'warn'); + await post(validBody, key).expect(201); + await post({ ...validBody, depositAmount: '1' }, key).expect(409); + const allWarn = warnSpy.mock.calls.map((c) => String(c[0])).join(' '); + expect(allWarn).not.toContain(key); + }); + }); + + + // ── POST /api/streams — validation ─────────────────────────────────────── + + describe('POST /api/streams — validation', () => { + it('creates a stream with valid input', async () => { + const res = await post(validBody, uniqueKey()); + expect(res.status).toBe(201); + expect(res.body.data.sender).toBe(VALID_SENDER); + expect(res.body.data.depositAmount).toBe('1000'); + expect(res.body.data.status).toBe('active'); }); - it('rejects empty recipient', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, recipient: '' }); + it('rejects missing sender', async () => { + const { sender: _, ...body } = validBody; + const res = await post(body, uniqueKey()); expect(res.status).toBe(400); - expect(res.body.error.details).toContain('recipient must be a valid Stellar public key (G...)'); + expect(res.body.error.code).toBe('VALIDATION_ERROR'); }); - it('rejects invalid recipient - too short', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, recipient: INVALID_KEY_SHORT }); + it('rejects invalid sender (too short)', async () => { + const res = await post({ ...validBody, sender: 'GABC123' }, uniqueKey()); expect(res.status).toBe(400); - expect(res.body.error.details).toContain('recipient must be a valid Stellar public key (G...)'); }); - it('rejects invalid recipient - wrong prefix', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, recipient: INVALID_KEY_WRONG_PREFIX }); + it('rejects invalid sender (wrong prefix)', async () => { + const res = await post({ ...validBody, sender: 'AAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7' }, uniqueKey()); expect(res.status).toBe(400); - expect(res.body.error.details).toContain('recipient must be a valid Stellar public key (G...)'); }); - it('rejects non-positive depositAmount', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, depositAmount: '0' }); + it('rejects missing recipient', async () => { + const { recipient: _, ...body } = validBody; + const res = await post(body, uniqueKey()); expect(res.status).toBe(400); - expect(res.body.error.details).toContain('depositAmount must be a positive numeric string'); + expect(res.body.error.code).toBe('VALIDATION_ERROR'); }); - it('rejects non-numeric depositAmount', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, depositAmount: 'abc' }); + it('rejects non-positive depositAmount', async () => { + const res = await post({ ...validBody, depositAmount: '0' }, uniqueKey()); expect(res.status).toBe(400); }); it('rejects numeric depositAmount (must be string)', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, depositAmount: 1000 }); + const res = await post({ ...validBody, depositAmount: 1000 }, uniqueKey()); expect(res.status).toBe(400); }); it('rejects negative ratePerSecond', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, ratePerSecond: '-5' }); + const res = await post({ ...validBody, ratePerSecond: '-5' }, uniqueKey()); expect(res.status).toBe(400); - expect(res.body.error.details).toContain('ratePerSecond must be a positive numeric string'); }); it('rejects negative startTime', async () => { - const res = await request(app).post('/api/streams').send({ ...validBody, startTime: -1 }); + const res = await post({ ...validBody, startTime: -1 }, uniqueKey()); expect(res.status).toBe(400); - expect(res.body.error.details).toContain('startTime must be a non-negative number'); }); it('returns all validation errors at once', async () => { - const res = await request(app).post('/api/streams').send({}); + const res = await post({}, uniqueKey()); expect(res.status).toBe(400); - expect(res.body.success).toBe(false); - // At minimum sender and recipient errors expect(res.body.error.details.length).toBeGreaterThanOrEqual(2); }); - it('does not log raw Stellar keys after creation', async () => { - const logSpy = vi.spyOn(console, 'log'); - await request(app).post('/api/streams').send(validBody); - const allOutput = logSpy.mock.calls.map((c) => String(c[0])).join(' '); - expect(allOutput).not.toContain(VALID_SENDER); - expect(allOutput).not.toContain(VALID_RECIPIENT); - }); - - it('returns 503 when idempotency dependency is unavailable', async () => { - setIdempotencyDependencyState('unavailable'); - const res = await request(app).post('/api/streams').send(validBody); - expect(res.status).toBe(503); - }); - - it('returns 503 when pool is exhausted during upsert', async () => { - const { PoolExhaustedError } = await import('../../src/db/pool.js'); - mockUpsertStream.mockRejectedValue(new PoolExhaustedError()); - - const res = await request(app).post('/api/streams').send(validBody); - expect(res.status).toBe(503); - }); - it('preserves decimal-string precision for amounts', async () => { mockUpsertStream.mockResolvedValue({ created: true, stream: makeDbRecord({ amount: '0.0000001', rate_per_second: '0.0000116' }), }); - - const res = await request(app).post('/api/streams').send({ - ...validBody, - depositAmount: '0.0000001', - ratePerSecond: '0.0000116', - }); - + const res = await post({ ...validBody, depositAmount: '0.0000001', ratePerSecond: '0.0000116' }, uniqueKey()); expect(res.status).toBe(201); expect(res.body.data.depositAmount).toBe('0.0000001'); expect(res.body.data.ratePerSecond).toBe('0.0000116'); }); }); - // ── DELETE /api/streams/:id ───────────────────────────────────────────────── + // ── DELETE /api/streams/:id ─────────────────────────────────────────────── describe('DELETE /api/streams/:id', () => { it('cancels an active stream', async () => { mockGetById.mockResolvedValue(makeDbRecord({ status: 'active' })); mockUpdateStream.mockResolvedValue(makeDbRecord({ status: 'cancelled' })); - - const res = await request(app).delete('/api/streams/stream-abc-0'); + const res = await request(app) + .delete('/api/streams/stream-abc-0') + .set('Authorization', `Bearer ${TEST_TOKEN}`); expect(res.status).toBe(200); - expect(res.body.success).toBe(true); expect(res.body.data.message).toBe('Stream cancelled'); - expect(res.body.data.id).toBe('stream-abc-0'); }); it('returns 404 for a non-existent stream', async () => { mockGetById.mockResolvedValue(undefined); - const res = await request(app).delete('/api/streams/stream-nonexistent'); - expect(res.status).toBe(404); + expect((await request(app) + .delete('/api/streams/stream-nonexistent') + .set('Authorization', `Bearer ${TEST_TOKEN}`) + ).status).toBe(404); }); it('returns 409 when stream is already cancelled', async () => { mockGetById.mockResolvedValue(makeDbRecord({ status: 'cancelled' })); - const res = await request(app).delete('/api/streams/stream-abc-0'); + const res = await request(app) + .delete('/api/streams/stream-abc-0') + .set('Authorization', `Bearer ${TEST_TOKEN}`); expect(res.status).toBe(409); expect(res.body.error.code).toBe('CONFLICT'); }); it('returns 409 when stream is already completed', async () => { mockGetById.mockResolvedValue(makeDbRecord({ status: 'completed' })); - const res = await request(app).delete('/api/streams/stream-abc-0'); - expect(res.status).toBe(409); + expect((await request(app) + .delete('/api/streams/stream-abc-0') + .set('Authorization', `Bearer ${TEST_TOKEN}`) + ).status).toBe(409); }); it('returns 503 when pool is exhausted', async () => { const { PoolExhaustedError } = await import('../../src/db/pool.js'); mockGetById.mockRejectedValue(new PoolExhaustedError()); - - const res = await request(app).delete('/api/streams/stream-abc-0'); - expect(res.status).toBe(503); + expect((await request(app) + .delete('/api/streams/stream-abc-0') + .set('Authorization', `Bearer ${TEST_TOKEN}`) + ).status).toBe(503); }); }); - // ── PATCH /api/streams/:id/status ────────────────────────────────────────── + // ── PATCH /api/streams/:id/status ──────────────────────────────────────── describe('PATCH /api/streams/:id/status', () => { it('transitions active → paused', async () => { mockGetById.mockResolvedValue(makeDbRecord({ status: 'active' })); mockUpdateStream.mockResolvedValue(makeDbRecord({ status: 'paused' })); - - const res = await request(app) - .patch('/api/streams/stream-abc-0/status') - .send({ status: 'paused' }); - + const res = await request(app).patch('/api/streams/stream-abc-0/status').send({ status: 'paused' }); expect(res.status).toBe(200); expect(res.body.data.status).toBe('paused'); }); @@ -522,78 +622,39 @@ describe('streams routes', () => { it('transitions paused → active', async () => { mockGetById.mockResolvedValue(makeDbRecord({ status: 'paused' })); mockUpdateStream.mockResolvedValue(makeDbRecord({ status: 'active' })); - - const res = await request(app) - .patch('/api/streams/stream-abc-0/status') - .send({ status: 'active' }); - - expect(res.status).toBe(200); - expect(res.body.data.status).toBe('active'); - }); - - it('transitions active → completed', async () => { - mockGetById.mockResolvedValue(makeDbRecord({ status: 'active' })); - mockUpdateStream.mockResolvedValue(makeDbRecord({ status: 'completed' })); - - const res = await request(app) - .patch('/api/streams/stream-abc-0/status') - .send({ status: 'completed' }); - + const res = await request(app).patch('/api/streams/stream-abc-0/status').send({ status: 'active' }); expect(res.status).toBe(200); }); it('returns 409 for invalid transition: completed → active', async () => { mockGetById.mockResolvedValue(makeDbRecord({ status: 'completed' })); - - const res = await request(app) - .patch('/api/streams/stream-abc-0/status') - .send({ status: 'active' }); - + const res = await request(app).patch('/api/streams/stream-abc-0/status').send({ status: 'active' }); expect(res.status).toBe(409); expect(res.body.error.code).toBe('CONFLICT'); }); it('returns 409 for invalid transition: cancelled → paused', async () => { mockGetById.mockResolvedValue(makeDbRecord({ status: 'cancelled' })); - - const res = await request(app) - .patch('/api/streams/stream-abc-0/status') - .send({ status: 'paused' }); - - expect(res.status).toBe(409); + expect((await request(app).patch('/api/streams/stream-abc-0/status').send({ status: 'paused' })).status).toBe(409); }); it('returns 400 for unknown status value', async () => { - const res = await request(app) - .patch('/api/streams/stream-abc-0/status') - .send({ status: 'unknown-status' }); - - expect(res.status).toBe(400); + expect((await request(app).patch('/api/streams/stream-abc-0/status').send({ status: 'unknown-status' })).status).toBe(400); }); it('returns 404 when stream not found', async () => { mockGetById.mockResolvedValue(undefined); - - const res = await request(app) - .patch('/api/streams/stream-nonexistent/status') - .send({ status: 'paused' }); - - expect(res.status).toBe(404); + expect((await request(app).patch('/api/streams/stream-nonexistent/status').send({ status: 'paused' })).status).toBe(404); }); it('returns 503 when pool is exhausted', async () => { const { PoolExhaustedError } = await import('../../src/db/pool.js'); mockGetById.mockRejectedValue(new PoolExhaustedError()); - - const res = await request(app) - .patch('/api/streams/stream-abc-0/status') - .send({ status: 'paused' }); - - expect(res.status).toBe(503); + expect((await request(app).patch('/api/streams/stream-abc-0/status').send({ status: 'paused' })).status).toBe(503); }); }); - // ── Response envelope ─────────────────────────────────────────────────────── + // ── Response envelope ───────────────────────────────────────────────────── describe('response envelope', () => { it('success responses have { success: true, data, meta }', async () => { @@ -601,7 +662,6 @@ describe('streams routes', () => { const res = await request(app).get('/api/streams'); expect(res.body.success).toBe(true); expect(res.body.data).toBeDefined(); - expect(res.body.meta).toBeDefined(); expect(res.body.meta.timestamp).toBeDefined(); }); @@ -611,5 +671,17 @@ describe('streams routes', () => { expect(res.body.error.code).toBeDefined(); expect(res.body.error.message).toBeDefined(); }); + + it('replay responses have meta.idempotencyReplayed=true', async () => { + const key = uniqueKey('envelope-replay'); + await post(validBody, key).expect(201); + const res = await post(validBody, key).expect(201); + expect(res.body.meta.idempotencyReplayed).toBe(true); + }); + + it('fresh creation responses do NOT have meta.idempotencyReplayed', async () => { + const res = await post(validBody, uniqueKey('envelope-fresh')).expect(201); + expect(res.body.meta.idempotencyReplayed).toBeUndefined(); + }); }); });