chore: add websocket replay endpoint based on event store cursor#201
Merged
Jagadeeshftw merged 3 commits intoApr 24, 2026
Merged
Conversation
## Summary
Implements cursor-based event replay across the WebSocket and HTTP layers.
### src/db/types.ts
- Add `afterEventId` field to `StreamEventReplayFilter` — exclusive cursor for
forward-only replay in (ledger ASC, eventId ASC) order
- Add `nextCursor` field to `StreamEventReplayResult` — opaque token for the
next page; absent when no more events remain
### src/indexer/store.ts
- Fix `InMemoryContractEventStore.insertMany` — staged records were never
committed to `this.records` (pre-existing bug)
- Implement cursor logic in `InMemoryContractEventStore.getEvents`:
cursor is applied before other filters so position is stable across
combined filter+cursor queries; unknown cursor returns empty (past-end)
- Implement cursor logic in `PostgresContractEventStore.getEvents`:
translates `afterEventId` into a composite (ledger, event_id) boundary
using a single cursor-row lookup; unknown cursor returns empty
- Both stores emit `nextCursor` when a full page is returned and more
events exist
### src/ws/hub.ts
- Fix double-upgrade handler bug: when `wsAuthRequired=false` the WSS
handles upgrades automatically; manual handler only registered when
auth is required (fixes pre-existing JWT auth test failures)
- Add `eventStore` to `StreamHubOptions` and `StreamHub`
- Add `setEventStore(store)` for post-construction injection
- Add `replayFromCursor(ws, filter)` — pages through the event store and
sends `stream_replay` frames to the target connection; sends
`stream_replay_complete` when done; sends `REPLAY_UNAVAILABLE` error
when no store is configured
- Add `replay` message type to the client→server protocol: clients send
`{ type: "replay", afterEventId?, fromLedger?, toledger?, contractId?,
topic?, limit? }` to trigger server-side replay on their connection
### src/routes/indexer.ts
- Add `GET /internal/indexer/events/replay` — cursor-based HTTP endpoint;
accepts `afterEventId` query param as exclusive cursor; returns
`nextCursor` for multi-page traversal; auth-gated by indexer worker
token; amounts in payloads preserved as decimal strings
### openapi.yaml
- Document `GET /internal/indexer/events/replay` with full parameter,
response, and trust-boundary descriptions
- Add `EventReplayResponse` and `StreamEventRecord` component schemas
### tests/ws.test.ts
- Add `StreamHub.replayFromCursor` integration tests (8 cases):
full replay, cursor advance, end-of-store, empty store,
REPLAY_UNAVAILABLE, decimal-string preservation, metrics, setEventStore
### tests/indexer-replay.test.ts (new)
- Add HTTP endpoint tests for `GET /internal/indexer/events/replay` (12 cases):
empty store, ledger ordering, cursor advance, end-of-store, unknown cursor,
nextCursor pagination, full traversal, limit cap, auth rejection,
decimal-string preservation, fromLedger+cursor, topic+cursor
### package.json / pnpm-lock.yaml
- Add eslint + @typescript-eslint/parser/plugin + eslint-config-prettier
as devDependencies (were missing, causing `pnpm lint` to fail with
"eslint: not found" on the baseline)
- Add @paralleldrive/cuid2 runtime dependency (was missing from package.json)
## Security notes
- Replay endpoint is auth-gated by `x-indexer-worker-token` — same
boundary as the ingest endpoint; public clients cannot access it
- Cursor values are opaque eventIds; no SQL injection surface (parameterised
queries in Postgres store; in-memory store uses JS array operations)
- Decimal-string serialization is preserved end-to-end through replay;
amount fields are never coerced to numbers
|
@devJaja Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits. You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀 |
- Remove duplicate pnpm/action-setup step (v4 + v2 conflict caused pnpm v8 to win, making lockfile v9.0 unreadable with --frozen-lockfile) - Keep single pnpm/action-setup@v4 with version 9 - Add node cache: 'pnpm' for faster CI installs - Regenerate pnpm-lock.yaml with pnpm v9 so --frozen-lockfile passes
- Add packageManager field to package.json so pnpm enforces the version - Pin CI workflow to pnpm 9.15.9 (exact) instead of range '9' - Regenerate pnpm-lock.yaml (lockfileVersion 9.0) to match
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements cursor-based event replay across the WebSocket and HTTP layers.
src/db/types.ts
afterEventIdfield toStreamEventReplayFilter— exclusive cursor for forward-only replay in (ledger ASC, eventId ASC) ordernextCursorfield toStreamEventReplayResult— opaque token for the next page; absent when no more events remainsrc/indexer/store.ts
InMemoryContractEventStore.insertMany— staged records were never committed tothis.records(pre-existing bug)InMemoryContractEventStore.getEvents: cursor is applied before other filters so position is stable across combined filter+cursor queries; unknown cursor returns empty (past-end)PostgresContractEventStore.getEvents: translatesafterEventIdinto a composite (ledger, event_id) boundary using a single cursor-row lookup; unknown cursor returns emptynextCursorwhen a full page is returned and more events existsrc/ws/hub.ts
wsAuthRequired=falsethe WSS handles upgrades automatically; manual handler only registered when auth is required (fixes pre-existing JWT auth test failures)eventStoretoStreamHubOptionsandStreamHubsetEventStore(store)for post-construction injectionreplayFromCursor(ws, filter)— pages through the event store and sendsstream_replayframes to the target connection; sendsstream_replay_completewhen done; sendsREPLAY_UNAVAILABLEerror when no store is configuredreplaymessage type to the client→server protocol: clients send{ type: "replay", afterEventId?, fromLedger?, toledger?, contractId?, topic?, limit? }to trigger server-side replay on their connectionsrc/routes/indexer.ts
GET /internal/indexer/events/replay— cursor-based HTTP endpoint; acceptsafterEventIdquery param as exclusive cursor; returnsnextCursorfor multi-page traversal; auth-gated by indexer worker token; amounts in payloads preserved as decimal stringsopenapi.yaml
GET /internal/indexer/events/replaywith full parameter, response, and trust-boundary descriptionsEventReplayResponseandStreamEventRecordcomponent schemastests/ws.test.ts
StreamHub.replayFromCursorintegration tests (8 cases): full replay, cursor advance, end-of-store, empty store, REPLAY_UNAVAILABLE, decimal-string preservation, metrics, setEventStoretests/indexer-replay.test.ts (new)
GET /internal/indexer/events/replay(12 cases): empty store, ledger ordering, cursor advance, end-of-store, unknown cursor, nextCursor pagination, full traversal, limit cap, auth rejection, decimal-string preservation, fromLedger+cursor, topic+cursorpackage.json / pnpm-lock.yaml
pnpm lintto fail with "eslint: not found" on the baseline)Security notes
x-indexer-worker-token— same boundary as the ingest endpoint; public clients cannot access itCloses #162