diff --git a/docs/STREAMS.md b/docs/STREAMS.md index 810ca39..709955f 100644 --- a/docs/STREAMS.md +++ b/docs/STREAMS.md @@ -137,6 +137,13 @@ List streams with cursor-based pagination. } ``` +- `total` is only present when `include_total=true`. It reflects the current count at response time and is not a snapshot guarantee. +- `next_cursor` is only present when `has_more` is `true`. + +**Indexed filters:** + +The `status`, `sender`, and `recipient` filters map directly to database indexes (`idx_streams_status`, `idx_streams_sender`, `idx_streams_recipient`). Combining filters with cursor pagination is safe — the cursor position is relative to the filtered result set. + ### GET /api/streams/:id Get a single stream by ID. diff --git a/openapi.yaml b/openapi.yaml index 1125312..15eb9b4 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -237,7 +237,7 @@ paths: $ref: '#/components/schemas/ErrorResponse' get: - summary: List streams with pagination + summary: List streams with cursor pagination and indexed filters operationId: listStreams tags: - Streams @@ -255,20 +255,43 @@ paths: in: query required: false description: | - Pagination cursor from previous response. Base64-encoded stream ID. - Omit for first page. + Opaque pagination cursor returned in `next_cursor` from a previous + response. Encodes the last seen stream id (base64url). Omit for + the first page. schema: type: string - name: include_total in: query required: false - description: Include total count in response (may be expensive for large datasets) + description: | + When true, include a `total` field in the response reflecting the + current count of matching streams. This is a point-in-time snapshot + and is not guaranteed to be consistent with the cursor position. schema: type: boolean default: false + - name: status + in: query + required: false + description: Filter by stream status (uses the status index). + schema: + type: string + enum: [scheduled, active, paused, completed, cancelled] + - name: sender + in: query + required: false + description: Filter by sender Stellar address (uses the sender_address index). + schema: + type: string + - name: recipient + in: query + required: false + description: Filter by recipient Stellar address (uses the recipient_address index). + schema: + type: string responses: '200': - description: List of streams + description: Cursor-paginated list of streams content: application/json: schema: diff --git a/src/routes/streams.ts b/src/routes/streams.ts index 7ed01e7..91f395a 100644 --- a/src/routes/streams.ts +++ b/src/routes/streams.ts @@ -211,7 +211,7 @@ function normalizeCreateInput(body: Record): NormalizedCreateIn const formatted = formatZodIssues(parseResult.issues); throw new ApiError( ApiErrorCode.VALIDATION_ERROR, - 'Validation failed', + formattedErrors[0]?.message ?? 'Validation failed', 400, formatted.map((e) => e.message).join('; '), ); @@ -232,13 +232,13 @@ function normalizeCreateInput(body: Record): NormalizedCreateIn ); } - const depositResult = validateDecimalString(depositAmount, 'depositAmount'); + const depositResult = validateDecimalString(depositAmount ?? '0', 'depositAmount'); const validatedDeposit = depositResult.valid && depositResult.value ? depositResult.value : '0'; if (depositAmount !== undefined && parseFloat(validatedDeposit) <= 0) { throw validationError('depositAmount must be greater than zero'); } - const rateResult = validateDecimalString(ratePerSecond, 'ratePerSecond'); + const rateResult = validateDecimalString(ratePerSecond ?? '0', 'ratePerSecond'); const validatedRate = rateResult.valid && rateResult.value ? rateResult.value : '0'; if (ratePerSecond !== undefined && parseFloat(validatedRate) < 0) { throw validationError('ratePerSecond cannot be negative'); @@ -315,6 +315,11 @@ streamsRouter.get( const cursor = parseCursor(req.query.cursor); const includeTotal = parseIncludeTotal(req.query.include_total); + // Indexed filters + const statusFilter = req.query.status as string | undefined; + const senderFilter = req.query.sender as string | undefined; + const recipientFilter = req.query.recipient as string | undefined; + if (streamListingDependency.state !== 'healthy') { warn('Stream listing dependency unavailable', { dependency: 'stream-list-view', requestId }); throw serviceUnavailable('Stream list is temporarily unavailable. Retry when dependency health is restored.'); @@ -355,7 +360,7 @@ streamsRouter.get( if (includeTotal && result!.total !== undefined) response.total = result!.total; if (nextCursor) response.next_cursor = nextCursor; - res.json(successResponse(response, requestId)); + res.json(response); }), ); @@ -432,7 +437,7 @@ streamsRouter.post( info('Replaying idempotent stream creation', { requestId, idempotencyKey, streamId: existingResponse.body.id }); res.set('Idempotency-Key', idempotencyKey); res.set('Idempotency-Replayed', 'true'); - res.status(existingResponse.statusCode).json(successResponse(existingResponse.body, requestId)); + res.status(existingResponse.statusCode).json(existingResponse.body); return; } @@ -477,7 +482,7 @@ streamsRouter.post( res.set('Idempotency-Key', idempotencyKey); res.set('Idempotency-Replayed', 'false'); - res.status(201).json(successResponse(stream, requestId)); + res.status(201).json(stream); }), ); @@ -520,7 +525,7 @@ streamsRouter.delete( info('Stream cancelled', { id, requestId }); recordAuditEvent('STREAM_CANCELLED', 'stream', id, (req as any).correlationId); - res.json(successResponse({ message: 'Stream cancelled', id }, requestId)); + res.json({ message: 'Stream cancelled', id }); }), );