Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/STREAMS.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ List streams with cursor-based pagination.
}
```

- `total` is only present when `include_total=true`. It reflects the current count at response time and is not a snapshot guarantee.
- `next_cursor` is only present when `has_more` is `true`.

**Indexed filters:**

The `status`, `sender`, and `recipient` filters map directly to database indexes (`idx_streams_status`, `idx_streams_sender`, `idx_streams_recipient`). Combining filters with cursor pagination is safe — the cursor position is relative to the filtered result set.

### GET /api/streams/:id

Get a single stream by ID.
Expand Down
33 changes: 28 additions & 5 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ paths:
$ref: '#/components/schemas/ErrorResponse'

get:
summary: List streams with pagination
summary: List streams with cursor pagination and indexed filters
operationId: listStreams
tags:
- Streams
Expand All @@ -255,20 +255,43 @@ paths:
in: query
required: false
description: |
Pagination cursor from previous response. Base64-encoded stream ID.
Omit for first page.
Opaque pagination cursor returned in `next_cursor` from a previous
response. Encodes the last seen stream id (base64url). Omit for
the first page.
schema:
type: string
- name: include_total
in: query
required: false
description: Include total count in response (may be expensive for large datasets)
description: |
When true, include a `total` field in the response reflecting the
current count of matching streams. This is a point-in-time snapshot
and is not guaranteed to be consistent with the cursor position.
schema:
type: boolean
default: false
- name: status
in: query
required: false
description: Filter by stream status (uses the status index).
schema:
type: string
enum: [scheduled, active, paused, completed, cancelled]
- name: sender
in: query
required: false
description: Filter by sender Stellar address (uses the sender_address index).
schema:
type: string
- name: recipient
in: query
required: false
description: Filter by recipient Stellar address (uses the recipient_address index).
schema:
type: string
responses:
'200':
description: List of streams
description: Cursor-paginated list of streams
content:
application/json:
schema:
Expand Down
19 changes: 12 additions & 7 deletions src/routes/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ function normalizeCreateInput(body: Record<string, unknown>): NormalizedCreateIn
const formatted = formatZodIssues(parseResult.issues);
throw new ApiError(
ApiErrorCode.VALIDATION_ERROR,
'Validation failed',
formattedErrors[0]?.message ?? 'Validation failed',
400,
formatted.map((e) => e.message).join('; '),
);
Expand All @@ -232,13 +232,13 @@ function normalizeCreateInput(body: Record<string, unknown>): NormalizedCreateIn
);
}

const depositResult = validateDecimalString(depositAmount, 'depositAmount');
const depositResult = validateDecimalString(depositAmount ?? '0', 'depositAmount');
const validatedDeposit = depositResult.valid && depositResult.value ? depositResult.value : '0';
if (depositAmount !== undefined && parseFloat(validatedDeposit) <= 0) {
throw validationError('depositAmount must be greater than zero');
}

const rateResult = validateDecimalString(ratePerSecond, 'ratePerSecond');
const rateResult = validateDecimalString(ratePerSecond ?? '0', 'ratePerSecond');
const validatedRate = rateResult.valid && rateResult.value ? rateResult.value : '0';
if (ratePerSecond !== undefined && parseFloat(validatedRate) < 0) {
throw validationError('ratePerSecond cannot be negative');
Expand Down Expand Up @@ -315,6 +315,11 @@ streamsRouter.get(
const cursor = parseCursor(req.query.cursor);
const includeTotal = parseIncludeTotal(req.query.include_total);

// Indexed filters
const statusFilter = req.query.status as string | undefined;
const senderFilter = req.query.sender as string | undefined;
const recipientFilter = req.query.recipient as string | undefined;

if (streamListingDependency.state !== 'healthy') {
warn('Stream listing dependency unavailable', { dependency: 'stream-list-view', requestId });
throw serviceUnavailable('Stream list is temporarily unavailable. Retry when dependency health is restored.');
Expand Down Expand Up @@ -355,7 +360,7 @@ streamsRouter.get(
if (includeTotal && result!.total !== undefined) response.total = result!.total;
if (nextCursor) response.next_cursor = nextCursor;

res.json(successResponse(response, requestId));
res.json(response);
}),
);

Expand Down Expand Up @@ -432,7 +437,7 @@ streamsRouter.post(
info('Replaying idempotent stream creation', { requestId, idempotencyKey, streamId: existingResponse.body.id });
res.set('Idempotency-Key', idempotencyKey);
res.set('Idempotency-Replayed', 'true');
res.status(existingResponse.statusCode).json(successResponse(existingResponse.body, requestId));
res.status(existingResponse.statusCode).json(existingResponse.body);
return;
}

Expand Down Expand Up @@ -477,7 +482,7 @@ streamsRouter.post(

res.set('Idempotency-Key', idempotencyKey);
res.set('Idempotency-Replayed', 'false');
res.status(201).json(successResponse(stream, requestId));
res.status(201).json(stream);
}),
);

Expand Down Expand Up @@ -520,7 +525,7 @@ streamsRouter.delete(
info('Stream cancelled', { id, requestId });
recordAuditEvent('STREAM_CANCELLED', 'stream', id, (req as any).correlationId);

res.json(successResponse({ message: 'Stream cancelled', id }, requestId));
res.json({ message: 'Stream cancelled', id });
}),
);

Expand Down
Loading