diff --git a/.changeset/soft-donuts-wave.md b/.changeset/soft-donuts-wave.md new file mode 100644 index 0000000..fcd1c7c --- /dev/null +++ b/.changeset/soft-donuts-wave.md @@ -0,0 +1,14 @@ +--- +'@contextvm/sdk': minor +--- + +Add CEP-41 open-ended stream transfer support over ContextVM transport. + +This introduces open-stream framing over MCP [`notifications/progress`](docs/cep-41.md:10) using the request `progressToken` as the stream identifier, with support for `start`, `accept`, `chunk`, `ping`, `pong`, `close`, and `abort` frames. + +It also adds SDK support for: + +- client and server open-stream transport handling +- stream session lifecycle management, buffering, and keepalive timeouts +- ergonomic tool streaming via [`callToolStream()`](src/transport/call-tool-stream.ts:28) +- CEP-41 coverage across unit and end-to-end transport tests diff --git a/docs/cep-41-design-plan.md b/docs/cep-41-design-plan.md new file mode 100644 index 0000000..1283a12 --- /dev/null +++ b/docs/cep-41-design-plan.md @@ -0,0 +1,376 @@ +# CEP-41 SDK Design and Implementation Plan + +## Goals + +- Add CEP-41 open-ended streams as a first-class transport feature. +- Keep the public API simple, explicit, and composable. +- Preserve low overhead for users who do not need streaming. +- Support concurrent long-lived streams safely on both client and server. + +## Design Summary + +### Activation model + +CEP-41 support is opt-in at two levels: + +- **Transport feature enablement** on both [`NostrClientTransport`](../src/transport/nostr-client-transport.ts) and [`NostrServerTransport`](../src/transport/nostr-server-transport.ts). +- **Per-request activation** via MCP `progressToken`, as required by [`cep-41.md`](./cep-41.md:64). + +Recommended config shape: + +```ts +openStream?: { + enabled?: boolean; + policy?: { + maxConcurrentStreams?: number; + maxBufferedChunksPerStream?: number; + maxBufferedBytesPerStream?: number; + idleTimeoutMs?: number; + probeTimeoutMs?: number; + closeGracePeriodMs?: number; + }; +}; +``` + +Initial default: + +- `enabled: false` + +When disabled: + +- do not advertise `support_open_stream` +- do not create stream sessions +- do not expose high-level stream helpers as usable + +When enabled but unused: + +- no per-stream state is allocated +- only a lightweight registry/manager exists + +### Public API + +#### Consumer API + +Primary API is a free helper function, not a wrapper client class. + +```ts +const call = await callToolStream({ + client, + transport, + name: 'subscribeToEvents', + arguments: { topic: 'orders' }, +}); + +for await (const chunk of call.stream) { + console.log(chunk); +} + +const result = await call.result; +``` + +Recommended return shape: + +```ts +interface ToolStreamCall { + readonly progressToken: string; + readonly stream: AsyncIterable; + readonly result: Promise; + abort(reason?: string): Promise; +} +``` + +Advanced API remains on [`NostrClientTransport`](../src/transport/nostr-client-transport.ts): + +- low-level stream registry +- waiting for streams by `progressToken` +- observing active streams for diagnostics/tests + +#### Producer API + +Expose a single long-lived stream session in handler context: + +```ts +interface ToolStreamSession { + readonly progressToken: string; + readonly isActive: boolean; + readonly closed: Promise; + write(chunk: string): Promise; + close(): Promise; + abort(reason?: string): Promise; + onClose(handler: () => void | Promise): void; +} + +interface ToolHandlerContext { + stream?: ToolStreamSession; +} +``` + +This same object supports both: + +- inline progressive generation +- detached/live streaming from external async sources + +### Key API principles + +- Async iterators are the primary read abstraction. +- Writers/sessions are the primary write abstraction. +- The stream lifecycle is distinct from the final JSON-RPC response lifecycle. +- Ping/pong keepalive is internal runtime behavior, not user API. +- CEP-41 is implemented as a sibling subsystem to CEP-22, not a variant of it. + +## Runtime Model + +### Internal modules + +Add a new transport subsystem: + +```text +src/transport/open-stream/ +├── constants.ts +├── errors.ts +├── frames.ts +├── index.ts +├── receiver.ts +├── registry.ts +├── session.ts +├── types.ts +└── writer.ts +``` + +### Core responsibilities + +The open-stream subsystem owns: + +- frame validation +- ordered lifecycle handling +- per-stream session state +- idle timeout and ping/pong probing +- local buffering/resource limits +- cleanup on close/abort/disconnect/probe failure +- coordination between stream termination and final JSON-RPC completion + +### Session state + +Each active session is keyed by `progressToken` and tracks: + +- lifecycle state +- last observed `progress` +- next expected `chunkIndex` +- missing/out-of-order chunks within bounded policy +- buffered consumer chunks +- idle/probe timers +- cleanup callbacks +- final response completion state + +## Protocol Mapping + +### Capability advertisement + +When enabled, advertise `support_open_stream` following [`cep-41.md`](./cep-41.md:40). + +Add a new tag constant near [`SUPPORT_OVERSIZED_TRANSFER`](../src/core/constants.ts:118). + +### Request activation + +Open streaming only applies when the initiating request includes `progressToken`, per [`cep-41.md`](./cep-41.md:64). + +### Transport interception points + +Mirror the CEP-22 interception pattern at: + +- [`NostrClientTransport.handleNotification()`](../src/transport/nostr-client-transport.ts:1012) +- [`NostrServerTransport.authorizeAndProcessEvent()`](../src/transport/nostr-server-transport.ts:1190) + +### Correlation + +Reuse existing progress-token routing via [`CorrelationStore.getEventIdByProgressToken()`](../src/transport/nostr-server/correlation-store.ts:194). + +The final JSON-RPC response must only be sent after the stream reaches `close` or `abort`, consistent with [`cep-41.md`](./cep-41.md:334). + +## Usage Patterns + +### 1. Inline progressive generation + +```ts +server.registerTool('generateText', async (args, ctx) => { + await ctx.stream?.write('Hello'); + await ctx.stream?.write(' world'); + await ctx.stream?.close(); + + return { + content: [{ type: 'text', text: 'Done' }], + isError: false, + }; +}); +``` + +### 2. Live subscription backed by websocket/events + +```ts +server.registerTool('subscribeToEvents', async (args, ctx) => { + const stream = ctx.stream; + if (!stream) { + return { + content: [{ type: 'text', text: 'Streaming unavailable' }], + isError: true, + }; + } + + const ws = new WebSocket(args.url); + + ws.onmessage = async (event) => { + if (!stream.isActive) return; + await stream.write(event.data.toString()); + }; + + ws.onerror = async () => { + if (!stream.isActive) return; + await stream.abort('Upstream websocket error'); + }; + + ws.onclose = async () => { + if (!stream.isActive) return; + await stream.close(); + }; + + stream.onClose(() => { + try { + ws.close(); + } catch { + // best effort + } + }); + + return waitForSubscriptionResult(); +}); +``` + +This demonstrates that one `ctx.stream` abstraction supports detached, concurrent, long-lived production. + +## Keepalive Semantics + +Implement keepalive strictly inside the stream session manager per [`cep-41.md`](./cep-41.md:340): + +- any valid open-stream frame resets idle timeout +- on idle timeout, send `ping` +- require matching `pong` before probe timeout +- on probe failure, fail the stream and clean up local resources + +Application code should not manually manage `ping` or `pong`. + +## Performance and Safety + +### Non-streaming users + +When feature is disabled or unused: + +- negligible overhead +- no active session allocations +- no stream timers +- no chunk buffers + +### Streaming users + +Enforce local limits: + +- max concurrent streams +- max buffered chunks/bytes per stream +- bounded out-of-order buffering +- hard idle/probe timeouts +- close-grace timeout for unresolved gaps + +One stream must not block unrelated streams. + +## Implementation Plan + +### Phase 1: types, constants, capability plumbing + +1. Add `support_open_stream` constant in [`constants.ts`](../src/core/constants.ts). +2. Extend discovery parsing in [`discovery-tags.ts`](../src/transport/discovery-tags.ts). +3. Add `openStream` options to: + - [`NostrClientTransport`](../src/transport/nostr-client-transport.ts) + - [`NostrServerTransport`](../src/transport/nostr-server-transport.ts) + +### Phase 2: internal open-stream subsystem + +1. Add frame types and errors. +2. Implement frame builders for `start`, `accept`, `chunk`, `ping`, `pong`, `close`, `abort`. +3. Implement session manager/registry. +4. Implement client-side readable stream session. +5. Implement server-side writable stream session. + +Representative type sketch: + +```ts +export type OpenStreamFrameType = + | 'start' + | 'accept' + | 'chunk' + | 'ping' + | 'pong' + | 'close' + | 'abort'; +``` + +### Phase 3: transport integration + +1. Intercept inbound CEP-41 progress notifications at the same branch points used by CEP-22. +2. Route frames into the open-stream manager. +3. Create server-side stream sessions bound to request `progressToken`. +4. Delay final JSON-RPC completion until stream termination. + +Representative interception sketch: + +```ts +if ( + isJSONRPCNotification(message) && + message.method === 'notifications/progress' && + OpenStreamReceiver.isOpenStreamFrame(message) +) { + await this.openStreamRegistry.processFrame(message); + return; +} +``` + +### Phase 4: public APIs + +1. Add consumer helper, e.g. [`call-tool-stream.ts`](../src/transport/call-tool-stream.ts). +2. Export helper from [`transport/index.ts`](../src/transport/index.ts) and optionally [`index.ts`](../src/index.ts). +3. Expose advanced client registry access on [`NostrClientTransport`](../src/transport/nostr-client-transport.ts). +4. Inject `ctx.stream` into server/tool execution path. + +Representative helper sketch: + +```ts +const call = await callToolStream({ + client, + transport, + name: 'subscribeToEvents', + arguments: { topic: 'orders' }, +}); +``` + +### Phase 5: tests + +Add focused unit and e2e coverage for: + +- capability advertisement and negotiation +- accept-gated bootstrap +- zero-chunk streams +- multiple concurrent streams +- live detached production +- ordered and out-of-order chunks +- `close` with missing chunks +- remote and local aborts +- keepalive ping/pong timeout +- cleanup on disconnect/close +- final response strictly after stream termination + +## Final Decisions + +- CEP-41 is a separate subsystem from CEP-22. +- Consumer primary API is a free helper function. +- Advanced consumer API is a transport-level registry. +- Producer primary API is one long-lived [`ctx.stream`](README.md:64)-style session handle. +- Feature is transport-level opt-in and request-level opt-in. +- Keepalive is internal runtime behavior. diff --git a/docs/cep-41.md b/docs/cep-41.md new file mode 100644 index 0000000..dd4080b --- /dev/null +++ b/docs/cep-41.md @@ -0,0 +1,528 @@ +--- +title: CEP-XX Open-Ended Stream Transfer +description: Open-ended stream transfer for ContextVM using progress-notification framing +--- + +# Open-Ended Stream Transfer + +## Abstract + +This CEP defines an additive transport profile for open-ended streaming over ContextVM. It reuses MCP `notifications/progress` as the transfer envelope and uses the request `progressToken` as the stream identifier. + +Unlike bounded oversized-payload transfer in [`CEP-22`](/src/content/docs/spec/ceps/cep-22.md), this CEP defines a long-lived stream model where ordered fragments may continue until the sender explicitly closes or aborts the stream. The stream payload itself is the primary incremental output, but it does not replace the final JSON-RPC response for the originating request. + +This CEP is intended for cases where data is naturally incremental, long-lived, or unbounded, and where representing the result as one reassembled MCP request or response would be artificial or inefficient. + +## Specification + +### Overview + +ContextVM currently transports MCP JSON-RPC messages through Nostr events. That model fits ordinary request and response exchange well, and [`CEP-22`](/src/content/docs/spec/ceps/cep-22.md) extends it for bounded reassembly of oversized logical messages. + +Some use cases are different in nature: + +- long-running generation that emits useful partial output over time +- event feeds or incremental result sets +- progressive delivery where partial consumption is desirable +- cases where no single final rendered payload is the right abstraction + +This CEP defines an open-ended stream profile that: + +- reuses the existing single-kind ContextVM transport model +- reuses MCP `notifications/progress` as the stream envelope +- uses the request `progressToken` as the stream identifier +- supports ordered `start`, `accept`, `chunk`, `ping`, `pong`, `close`, and `abort` frames +- treats the stream itself as the payload rather than a bounded reassembly artifact +- allows receivers to process fragments incrementally as they arrive + +This CEP is intentionally distinct from the bounded reassembly mechanism in [`CEP-22`](/src/content/docs/spec/ceps/cep-22.md). Implementations MUST NOT treat these two profiles as interchangeable. + +### Capability Advertisement and Negotiation + +Support for open-ended stream transfer MAY be advertised through the same additive discovery surfaces already used by ContextVM capabilities and transport features, following the patterns in [`CEP-35`](/src/content/docs/spec/ceps/informational/cep-35.md). + +Peers MAY advertise support using one or more `support_open_stream` tags. + +Example tags only: + +```json +[["support_open_stream"]] +``` + +Advertisement surfaces: + +- **Public announcements:** Servers MAY advertise support in public server announcements. +- **Initialization:** Clients and servers SHOULD advertise support during MCP initialization when initialization is available. +- **Stateless operation:** Clients and servers MAY advertise support in tags on the first exchanged request or response when no prior initialization occurred. + +Support semantics: + +- `support_open_stream` indicates support for the open-ended stream profile defined by this CEP. + +### Request-Level Activation + +Open-ended stream transfer for a given logical exchange is available only when the initiating request includes a valid MCP `progressToken`. + +Activation rules: + +- Clients that want to permit open-ended streaming for a request MUST include a `progressToken`. +- Servers MUST NOT start an open-ended stream for a request that did not include a `progressToken`. +- When no `progressToken` is present, peers MUST use ordinary non-streaming behavior or fail cleanly. + +The `progressToken` is the stream identifier for the open-ended stream session. + +### Sender Behavior + +When open-ended stream transfer is used, the sender MUST emit an ordered sequence of MCP `notifications/progress` messages containing ContextVM stream frames. + +If the sender already knows the receiver supports this CEP for the exchange, it MAY proceed directly from `start` to `chunk`. Otherwise it MUST wait for `accept` before sending `chunk` frames. + +The sender: + +- MAY emit any number of `chunk` frames after stream startup +- MAY keep the stream open while useful incremental output continues +- MUST terminate the stream with either `close` or `abort` +- MUST NOT silently stop transmission without a terminal frame unless transport failure prevents completion + +Multiple streams MAY exist concurrently between the same peers, but each active stream MUST use a distinct `progressToken`. A sender MUST NOT send a second `start` for a stream that is already active under the same `progressToken`. + +### Progress Notification Framing + +Open-ended stream frames are carried inside MCP `notifications/progress` params. The MCP envelope remains valid and additive; ContextVM defines additional frame semantics inside the params object. + +Example conceptual envelope: + +```json +{ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": { + "progressToken": "req-123", + "progress": 1, + "message": "(Optional) starting open stream", + "cvm": { + "type": "open-stream", + "frameType": "start" + } + } +} +``` + +The sender MUST use `progress` values that increase monotonically across the stream, consistent with MCP progress rules. `progress` orders all stream frames, including control frames, and MUST NOT be interpreted as a chunk counter. + +### Frame Types + +This CEP defines seven frame types: + +- `start` +- `accept` +- `chunk` +- `ping` +- `pong` +- `close` +- `abort` + +#### Common Fields + +All open-stream frames MUST include a ContextVM-specific transport object with: + +- `type`: MUST be `open-stream` +- `frameType`: one of `start`, `accept`, `chunk`, `ping`, `pong`, `close`, `abort` + +The outer MCP progress params MUST include: + +- `progressToken` +- `progress` + +The outer MCP `total` and `message` fields MAY be used for UX hints or progress reporting, but they do not define stream correctness. + +#### `start` Frame + +The `start` frame begins the stream. + +Optional fields: + +- application-defined advisory payload metadata + +Rules: + +- `start` establishes intent to begin an open-ended stream under the given `progressToken`. +- Applications MAY include additional advisory metadata in `cvm` on `start` when both peers understand it. +- Receivers MUST NOT depend on advisory `start` metadata for stream correctness. + +#### `accept` Frame + +The `accept` frame confirms that the receiver accepts the stream and that the sender may begin transmitting `chunk` frames. + +This frame is primarily intended for bootstrap in stateless sender-to-receiver flows where support is not yet known. + +Rules: + +- A receiver MAY send `accept` after `start`. +- A sender that is required to wait for confirmation MUST NOT send `chunk` frames before receiving `accept`. +- `accept` SHOULD remain minimal and does not negotiate additional stream parameters in v1. + +##### When `accept` Is Required + +`accept` is conditional bootstrap confirmation, not a universal requirement. + +This mirrors the `accept` semantics defined in [`CEP-22`](/src/content/docs/spec/ceps/cep-22.md), so implementations can reuse the same conceptual model for conditional bootstrap confirmation and avoid semantic drift between the two transfer profiles. + +- If the sender already knows that the receiver supports this CEP for the exchange through prior negotiation, explicit capability advertisement, or other valid context for the exchange, it MAY send `chunk` frames immediately after `start`. +- If support is not yet known for the exchange, the sender MUST wait for `accept` before sending the first `chunk` frame. +- In stateless bootstrap flows where no prior support knowledge exists, `accept` is required before the first `chunk`. + +#### `chunk` Frame + +The `chunk` frame carries one ordered fragment of stream payload. + +Required fields: + +- `data`: chunk payload fragment +- `chunkIndex`: contiguous chunk index + +Rules: + +- For open-stream frames, MCP `progress` is the normative stream-ordering field for all frames. +- Each `chunk` frame MUST use a `progress` value greater than the preceding stream frame's `progress` value. +- `chunkIndex` MUST start at `0` for the first `chunk` frame in the stream and increase contiguously by `1` for each subsequent `chunk` frame. +- `data` carries one ordered fragment of the stream payload, following the same chunk-payload semantics as [`CEP-22`](/src/content/docs/spec/ceps/cep-22.md). +- Receivers MUST use `chunkIndex`, not `progress`, to validate chunk contiguity and payload completeness. +- Receivers MAY buffer valid out-of-order `chunk` frames within bounded local limits and process them once the contiguous `chunkIndex` sequence resumes. +- Receivers MAY track missing `chunkIndex` values as provisional gaps while the stream remains active. +- Receivers SHOULD enforce bounded buffering or equivalent local resource policy for unresolved chunk gaps. + +#### `ping` Frame + +The `ping` frame probes whether the peer remains responsive for the active stream. + +Required fields: + +- `nonce` + +Rules: + +- Either peer MAY send `ping` on an active stream. +- `nonce` MUST identify the probe uniquely within the stream. +- Receivers SHOULD enforce a local maximum nonce size of `64 bytes` and MAY reject, ignore, or abort on oversized nonces. +- `ping` carries no stream payload. + +#### `pong` Frame + +The `pong` frame acknowledges a received `ping` for the active stream. + +Required fields: + +- `nonce` + +Rules: + +- A receiver of `ping` MUST respond with `pong` for the same stream unless the stream has already terminated. +- `pong.nonce` MUST match the triggering `ping.nonce`. +- `pong` acknowledges peer responsiveness only and does not acknowledge delivery or processing of stream payload. +- A `pong` with an unknown, duplicate, expired, or already-satisfied `nonce` is invalid for keepalive matching and MUST NOT be treated as evidence of stream liveness. +- Receivers MAY ignore invalid `pong` frames and MAY apply local logging or anti-abuse policy to them. +- Implementations MAY apply local anti-abuse policy to `ping` handling, including ignoring, coalescing, rate-limiting, or aborting on excessive keepalive traffic. + +#### `close` Frame + +The `close` frame signals successful sender-side closure of the stream. + +Optional fields: + +- `lastChunkIndex` + +Rules: + +- `close` is required for successful stream completion. +- `close` indicates that no further `chunk` frames will be sent for the stream. +- When present, `close.lastChunkIndex` MUST equal the greatest `chunkIndex` sent for the stream. +- Senders SHOULD include `close.lastChunkIndex` when they intend `close` to declare a finite chunk-completeness bound for the delivered payload. +- Senders MAY omit `close.lastChunkIndex` for live, event-like, or otherwise open-ended streams where no chunk-completeness bound is being declared. +- If the stream included no `chunk` frames, `close.lastChunkIndex` MUST be omitted. + +#### `abort` Frame + +The `abort` frame signals that the stream did not complete successfully. + +Optional fields: + +- `reason` + +Rules: + +- Either peer MAY send `abort`. +- Receivers MUST treat `abort` as terminal for the stream. +- `reason` is advisory only. +- A peer MAY send `abort` when local policy determines that successful continuation is no longer acceptable or no longer plausible, including resource exhaustion, excessive unresolved gaps, timeout, or anti-abuse conditions. + +### Validation Rules + +#### Ordering and Lifecycle + +Receivers MUST validate stream ordering using MCP `progress`. + +To fail a stream means to treat it as unsuccessfully terminated, release local state for it, and NOT treat it as successfully completed. A peer that fails a stream SHOULD send `abort` with an advisory `reason` when it is still able to transmit. + +Rules: + +- a stream MUST begin with `start` +- if confirmation is required for the stream, `accept` MUST be received before the first `chunk` +- `progress` values for open-stream frames MUST increase monotonically across the stream +- receivers MUST treat `progress` as the canonical frame-ordering field, not as a chunk count +- `chunk` frames MUST include contiguous `chunkIndex` values beginning at `0` +- receivers MAY buffer valid out-of-order `chunk` frames within bounded local limits while awaiting missing earlier `chunkIndex` values +- receivers MAY treat missing `chunkIndex` positions as provisional gaps while the stream remains active +- receivers MUST NOT treat a gap alone as terminal failure while the stream remains active, except under local timeout or resource policy +- `pong` MUST correspond to an earlier `ping` on the same stream +- a second `start` received for an already active `progressToken` MUST cause the stream to fail +- successful completion requires `close` +- if `close.lastChunkIndex` is present, receivers MUST treat it as the completeness bound for the stream payload +- when `close.lastChunkIndex` is present, successful completion requires receipt of every `chunkIndex` from `0` through `lastChunkIndex` +- if gaps remain when `close` is received, receivers MAY wait a bounded local grace period for delayed chunks or MAY fail immediately under local policy +- if `close` arrives after malformed or non-monotonic ordering, the stream MUST fail + +This CEP does not define replay, selective retransmission, or repair. + +#### Post-Close Behavior + +After `close` or `abort`: + +- the stream is terminal +- receivers MUST ignore or reject later frames for the same terminated stream +- senders MUST NOT resume the same stream identifier + +### Receiver Behavior + +Receivers that support this CEP: + +- MUST track stream state by `progressToken` +- MUST process frames in stream order +- MUST reject or fail malformed frame sequences +- MUST treat `abort` as terminal +- MUST allow a valid zero-chunk stream in which `close` follows `start` without any `chunk` frames +- MUST fail a stream if `close` is received before `start` or after malformed ordering +- MAY terminate a stream with `abort` when local timeout, buffering, relay-safety, or anti-abuse policy makes continued processing unacceptable + +Receivers MAY expose stream fragments to applications incrementally as they arrive. + +### Stateless Operation + +This CEP is compatible with stateless ContextVM operation. + +In stateless operation: + +- peers MAY advertise support in tags on the first exchanged request or response +- stream state is correlated by `progressToken` +- receivers MUST NOT rely on a persistent connection-local session beyond temporary stream state + +For stateless client-to-server streaming where the client has not previously learned server support, the client MUST send `start` first and wait for `accept` before sending `chunk` frames. + +### Request Completion Semantics + +Open-ended streaming supplements the lifecycle of the originating JSON-RPC request; it does not replace it. + +Rules: + +- A stream associated with a request MUST still conclude with exactly one final JSON-RPC response for that request. +- `close` indicates that no more stream frames will be sent, but it does not itself satisfy the JSON-RPC request/response lifecycle. +- After sending `close`, the sender MUST send the final JSON-RPC success response for the originating request. +- If a stream associated with a request is terminated with `abort`, the sender SHOULD send a final JSON-RPC error response when it is still able to do so. +- Implementations MUST NOT synthesize successful final JSON-RPC responses locally solely from receipt of `close`. + +### Timeout and Keepalive Semantics + +Receipt of any valid open-stream frame counts as stream activity. + +Implementations MUST maintain an idle timeout for each active stream. + +Rules: + +- receipt of `start`, `accept`, `chunk`, `ping`, `pong`, `close`, or `abort` MUST reset the idle timeout +- if no valid frame is received before the idle timeout expires, the peer MUST send `ping` +- the receiver of `ping` MUST respond with `pong` carrying the same `nonce` +- implementations MAY apply local anti-abuse policy to keepalive traffic, including rate-limiting, coalescing, ignoring, or rejecting excessive `ping` traffic and rejecting oversized `nonce` values +- if the probing peer does not receive a matching `pong` before its probe timeout expires, it MUST treat the stream as failed +- a peer that fails the stream due to probe timeout SHOULD send `abort` if it is still able to transmit +- implementations SHOULD enforce a hard maximum timeout or other resource policy for long-lived streams + +### Relay Rate and Flow-Control Guidance + +Nostr relays may impose different event-rate, buffering, or publication policies. + +Implementations: + +- MUST NOT assume that all relays accept the same sustained event rate +- SHOULD throttle frame emission conservatively enough to respect expected relay policies +- MAY apply local policy to abort, defer, or deprioritize streams that exceed relay-safety limits +- MUST NOT assume that this CEP provides transport-level backpressure signaling in v1 + +### Example: Server-to-Client Open Stream + +Client sends a request with a `progressToken`: + +```json +{ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "streaming_tool", + "arguments": {}, + "_meta": { + "progressToken": "req-123" + } + } +} +``` + +Server starts the stream: + +```json +{ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": { + "progressToken": "req-123", + "progress": 1, + "message": "starting stream", + "cvm": { + "type": "open-stream", + "frameType": "start" + } + } +} +``` + +Server sends stream fragments: + +```json +{ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": { + "progressToken": "req-123", + "progress": 2, + "cvm": { + "type": "open-stream", + "frameType": "chunk", + "chunkIndex": 0, + "data": "Hello" + } + } +} +``` + +```json +{ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": { + "progressToken": "req-123", + "progress": 3, + "cvm": { + "type": "open-stream", + "frameType": "chunk", + "chunkIndex": 1, + "data": " world" + } + } +} +``` + +Server closes the stream: + +```json +{ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": { + "progressToken": "req-123", + "progress": 4, + "message": "stream complete", + "cvm": { + "type": "open-stream", + "frameType": "close" + } + } +} +``` + +Server returns the final JSON-RPC response for the originating request: + +```json +{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "content": [ + { + "type": "text", + "text": "Stream completed successfully" + } + ], + "isError": false + } +} +``` + +### Example: Stateless Client-to-Server Stream Bootstrap + +The following example shows the stream bootstrap phase only. As in [`Request-Level Activation`](#request-level-activation), the initiating JSON-RPC request for this exchange has already been sent and already supplied the `progressToken`. + +Client announces intent to begin a stream: + +```json +{ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": { + "progressToken": "req-789", + "progress": 1, + "message": "starting client stream", + "cvm": { + "type": "open-stream", + "frameType": "start" + } + } +} +``` + +Server confirms support: + +```json +{ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": { + "progressToken": "req-789", + "progress": 2, + "message": "client stream accepted", + "cvm": { + "type": "open-stream", + "frameType": "accept" + } + } +} +``` + +After `accept`, the client sends `chunk` frames and eventually terminates the stream with `close` or `abort`. If the stream is associated with a JSON-RPC request, the exchange still concludes with the final JSON-RPC response for that request. + +## Backward Compatibility + +This CEP introduces no breaking changes: + +- peers that do not advertise support continue using ordinary ContextVM request and response transport +- peers that do not include a `progressToken` on a request do not enable open-ended stream transfer for that exchange +- peers that do not understand the ContextVM-specific open-stream framing continue to interoperate for ordinary non-streaming messages + +## Dependencies + +- [CEP-6: Public Server Announcements](/spec/ceps/cep-6) +- [CEP-19: Ephemeral Gift Wraps](/spec/ceps/cep-19) +- [CEP-22: Oversized Payload Transfer](/spec/ceps/cep-22) +- [CEP-35: Discoverability Patterns for ContextVM Capabilities](/spec/ceps/informational/cep-35) + +## Reference Implementation + +A reference implementation is intended for the ContextVM SDK transport layer. diff --git a/src/core/constants.ts b/src/core/constants.ts index f954dc7..4a4ea17 100644 --- a/src/core/constants.ts +++ b/src/core/constants.ts @@ -116,6 +116,11 @@ export const NOSTR_TAGS = { * Support CEP-22 oversized payload transfer via notifications/progress framing. */ SUPPORT_OVERSIZED_TRANSFER: 'support_oversized_transfer', + + /** + * Support CEP-41 open-ended stream transfer via notifications/progress framing. + */ + SUPPORT_OPEN_STREAM: 'support_open_stream', } as const; export const DEFAULT_LRU_SIZE = 5000; @@ -139,4 +144,3 @@ export const NOTIFICATIONS_INITIALIZED_METHOD = 'notifications/initialized'; * Namespace for CEP-15 common schema metadata in tool definitions. */ export const COMMON_SCHEMA_META_NAMESPACE = 'io.contextvm/common-schema'; - diff --git a/src/core/utils/common-schema.test.ts b/src/core/utils/common-schema.test.ts index f8b14b7..955944c 100644 --- a/src/core/utils/common-schema.test.ts +++ b/src/core/utils/common-schema.test.ts @@ -1,8 +1,5 @@ import { describe, expect, test } from 'bun:test'; -import { - computeCommonSchemaHash, - normalizeSchema, -} from './common-schema.js'; +import { computeCommonSchemaHash, normalizeSchema } from './common-schema.js'; describe('normalizeSchema', () => { test('recursively removes title and description fields', () => { @@ -111,7 +108,7 @@ describe('normalizeSchema', () => { }; expect(() => normalizeSchema(schema)).toThrow( - 'External $ref pointers must be resolved before computing common schema hash' + 'External $ref pointers must be resolved before computing common schema hash', ); }); diff --git a/src/transport/call-tool-stream.e2e.test.ts b/src/transport/call-tool-stream.e2e.test.ts new file mode 100644 index 0000000..1c5347f --- /dev/null +++ b/src/transport/call-tool-stream.e2e.test.ts @@ -0,0 +1,1117 @@ +import { describe, expect, test } from 'bun:test'; +import { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { generateSecretKey, getPublicKey } from 'nostr-tools/pure'; +import { bytesToHex, hexToBytes } from 'nostr-tools/utils'; +import { z } from 'zod'; +import { waitFor } from '../core/utils/test.utils.js'; +import { EncryptionMode } from '../core/interfaces.js'; +import { MockRelayHub } from '../__mocks__/mock-relay-handler.js'; +import { PrivateKeySigner } from '../signer/private-key-signer.js'; +import { + buildOpenStreamStartFrame, + type OpenStreamWriter, +} from './open-stream/index.js'; +import { callToolStream } from './call-tool-stream.js'; +import { NostrClientTransport } from './nostr-client-transport.js'; +import { NostrServerTransport } from './nostr-server-transport.js'; + +function getOpenStreamWriter(extra: { + _meta?: Record; +}): OpenStreamWriter { + const stream = (extra._meta as { stream?: OpenStreamWriter } | undefined) + ?.stream; + + expect(stream).toBeDefined(); + + return stream as OpenStreamWriter; +} + +function createOpenStreamFixture(options?: { + idleTimeoutMs?: number; + probeTimeoutMs?: number; + closeGracePeriodMs?: number; +}): { + relayHub: MockRelayHub; + server: McpServer; + client: Client; + serverTransport: NostrServerTransport; + clientTransport: NostrClientTransport; +} { + const relayHub = new MockRelayHub(); + const serverPrivateKey = bytesToHex(generateSecretKey()); + const serverPublicKey = getPublicKey(hexToBytes(serverPrivateKey)); + const clientPrivateKey = bytesToHex(generateSecretKey()); + + const server = new McpServer({ + name: 'stream-server', + version: '1.0.0', + }); + + const serverTransport = new NostrServerTransport({ + signer: new PrivateKeySigner(serverPrivateKey), + relayHandler: relayHub.createRelayHandler(), + encryptionMode: EncryptionMode.DISABLED, + openStream: { + enabled: true, + policy: { + idleTimeoutMs: options?.idleTimeoutMs, + probeTimeoutMs: options?.probeTimeoutMs, + closeGracePeriodMs: options?.closeGracePeriodMs, + }, + }, + }); + + const clientTransport = new NostrClientTransport({ + signer: new PrivateKeySigner(clientPrivateKey), + relayHandler: relayHub.createRelayHandler(), + serverPubkey: serverPublicKey, + encryptionMode: EncryptionMode.DISABLED, + openStream: { + enabled: true, + policy: { + idleTimeoutMs: options?.idleTimeoutMs, + probeTimeoutMs: options?.probeTimeoutMs, + closeGracePeriodMs: options?.closeGracePeriodMs, + }, + }, + }); + + const client = new Client({ + name: 'stream-client', + version: '1.0.0', + }); + + return { + relayHub, + server, + client, + serverTransport, + clientTransport, + }; +} + +function getFrameType(event: { content: string }): string | undefined { + try { + const message = JSON.parse(event.content) as { + params?: { + cvm?: { + frameType?: string; + }; + }; + }; + + return message.params?.cvm?.frameType; + } catch { + return undefined; + } +} + +function parseRelayMessage(event: { content: string }): + | { + method?: string; + params?: { + progressToken?: string; + progress?: number; + cvm?: { + frameType?: string; + nonce?: string; + chunkIndex?: number; + data?: string; + lastChunkIndex?: number; + reason?: string; + }; + }; + } + | undefined { + try { + return JSON.parse(event.content) as { + method?: string; + params?: { + progressToken?: string; + progress?: number; + cvm?: { + frameType?: string; + nonce?: string; + chunkIndex?: number; + data?: string; + lastChunkIndex?: number; + reason?: string; + }; + }; + }; + } catch { + return undefined; + } +} + +async function cleanupOpenStreamFixture(params: { + client: Client; + server: McpServer; + relayHub: MockRelayHub; +}): Promise { + await params.client.close(); + await params.server.close(); + params.relayHub.clear(); +} + +describe('callToolStream end-to-end', () => { + test('streams tool output over CEP-41 with an ergonomic client API', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + + server.registerTool( + 'subscribeToEvents', + { + title: 'Subscribe To Events', + description: 'Streams mock event notifications to the caller.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`event:1:${topic}`); + await stream.write(`event:2:${topic}`); + await stream.close(); + + return { + content: [{ type: 'text', text: `completed:${topic}` }], + structuredContent: { + topic, + streamed: true, + }, + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'subscribeToEvents', + arguments: { + topic: 'orders', + }, + }); + + const chunksPromise = (async (): Promise => { + const chunks: string[] = []; + + for await (const chunk of call.stream) { + chunks.push(chunk.value); + } + + return chunks; + })(); + + const [chunks, result] = await Promise.all([chunksPromise, call.result]); + + expect(chunks).toEqual(['event:1:orders', 'event:2:orders']); + expect(result).toMatchObject({ + content: [{ type: 'text', text: 'completed:orders' }], + structuredContent: { + topic: 'orders', + streamed: true, + }, + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('delays the final tool result until the stream closes', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + let releaseClose: (() => void) | undefined; + + server.registerTool( + 'delayedClose', + { + title: 'Delayed Close', + description: 'Waits before closing the stream.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`stream:${topic}:1`); + await new Promise((resolve) => { + releaseClose = resolve; + }); + await stream.close(); + + return { + content: [{ type: 'text', text: `done:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'delayedClose', + arguments: { + topic: 'orders', + }, + }); + + const firstChunk = await call.stream[Symbol.asyncIterator]().next(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value?.value).toBe('stream:orders:1'); + + let resultSettled = false; + void call.result.then(() => { + resultSettled = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(resultSettled).toBe(false); + + releaseClose?.(); + + await expect(call.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'done:orders' }], + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('rejects the stream iterator when the server aborts mid-stream', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + + server.registerTool( + 'abortingStream', + { + title: 'Aborting Stream', + description: 'Aborts after emitting one chunk.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`stream:${topic}:1`); + await stream.abort('server aborted stream'); + + return { + content: [{ type: 'text', text: `aborted:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'abortingStream', + arguments: { + topic: 'orders', + }, + }); + const closedResult = call.stream.closed.catch((error: unknown) => error); + + const iterator = call.stream[Symbol.asyncIterator](); + const firstChunk = await iterator.next(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value?.value).toBe('stream:orders:1'); + await expect(iterator.next()).rejects.toThrow('server aborted stream'); + expect(await closedResult).toBeInstanceOf(Error); + + await expect(call.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'aborted:orders' }], + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('keeps concurrent streams isolated by progress token', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + + server.registerTool( + 'subscribeToEvents', + { + title: 'Subscribe To Events', + description: 'Streams topic-specific events to the caller.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`${topic}:1`); + await stream.write(`${topic}:2`); + await stream.close(); + + return { + content: [{ type: 'text', text: `completed:${topic}` }], + structuredContent: { topic }, + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const [ordersCall, invoicesCall] = await Promise.all([ + callToolStream({ + client, + transport: clientTransport, + name: 'subscribeToEvents', + arguments: { topic: 'orders' }, + }), + callToolStream({ + client, + transport: clientTransport, + name: 'subscribeToEvents', + arguments: { topic: 'invoices' }, + }), + ]); + + const [orderChunks, invoiceChunks, orderResult, invoiceResult] = + await Promise.all([ + (async (): Promise => { + const chunks: string[] = []; + for await (const chunk of ordersCall.stream) { + chunks.push(chunk.value); + } + return chunks; + })(), + (async (): Promise => { + const chunks: string[] = []; + for await (const chunk of invoicesCall.stream) { + chunks.push(chunk.value); + } + return chunks; + })(), + ordersCall.result, + invoicesCall.result, + ]); + + expect(orderChunks).toEqual(['orders:1', 'orders:2']); + expect(invoiceChunks).toEqual(['invoices:1', 'invoices:2']); + expect(orderResult).toMatchObject({ + content: [{ type: 'text', text: 'completed:orders' }], + structuredContent: { topic: 'orders' }, + }); + expect(invoiceResult).toMatchObject({ + content: [{ type: 'text', text: 'completed:invoices' }], + structuredContent: { topic: 'invoices' }, + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('releases server-side pending response state after stream termination', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + + server.registerTool( + 'subscribeToEvents', + { + title: 'Subscribe To Events', + description: 'Streams mock event notifications to the caller.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`event:${topic}`); + await stream.close(); + + return { + content: [{ type: 'text', text: `completed:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'subscribeToEvents', + arguments: { + topic: 'orders', + }, + }); + + await Promise.all([ + (async (): Promise => { + for await (const _chunk of call.stream) { + // Drain the stream to completion. + } + })(), + call.result, + ]); + + await waitFor({ + produce: () => { + const state = serverTransport.getInternalStateForTesting(); + return state.correlationStore.eventRouteCount === 0 && + state.openStreamReceiver.size === 0 + ? true + : undefined; + }, + timeoutMs: 5_000, + }); + + expect( + serverTransport.getInternalStateForTesting().correlationStore + .eventRouteCount, + ).toBe(0); + expect( + clientTransport.getOpenStreamSession(call.progressToken), + ).toBeUndefined(); + await expect(call.stream.closed).resolves.toBeUndefined(); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('keeps the stream alive across idle timeout ping/pong and continues delivering chunks', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture({ + idleTimeoutMs: 40, + probeTimeoutMs: 200, + closeGracePeriodMs: 200, + }); + let releaseSecondChunk: (() => void) | undefined; + let observedPongNonce: string | undefined; + + server.registerTool( + 'keepaliveStream', + { + title: 'Keepalive Stream', + description: 'Waits long enough to require keepalive before resuming.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + const originalPong = stream.pong.bind(stream); + + stream.pong = async (nonce: string): Promise => { + observedPongNonce = nonce; + await originalPong(nonce); + }; + + await stream.start(); + await stream.write(`stream:${topic}:1`); + await new Promise((resolve) => { + releaseSecondChunk = resolve; + }); + await stream.write(`stream:${topic}:2`); + await stream.close(); + + return { + content: [{ type: 'text', text: `done:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'keepaliveStream', + arguments: { + topic: 'orders', + }, + }); + + const iterator = call.stream[Symbol.asyncIterator](); + const firstChunk = await iterator.next(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value?.value).toBe('stream:orders:1'); + + await waitFor({ + produce: () => { + return observedPongNonce; + }, + timeoutMs: 5_000, + }); + + const controlFrames = relayHub + .getEvents() + .map((event) => getFrameType(event)) + .filter( + (frameType): frameType is string => + frameType === 'ping' || frameType === 'pong', + ); + + expect(controlFrames).toContain('ping'); + expect(controlFrames).toContain('pong'); + + releaseSecondChunk?.(); + + const secondChunk = await iterator.next(); + expect(secondChunk.done).toBe(false); + expect(secondChunk.value?.value).toBe('stream:orders:2'); + await expect(iterator.next()).resolves.toEqual({ + done: true, + value: undefined, + }); + + await expect(call.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'done:orders' }], + }); + await expect(call.stream.closed).resolves.toBeUndefined(); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('aborts the server-side stream when the keepalive probe is not acknowledged', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture({ + idleTimeoutMs: 40, + probeTimeoutMs: 60, + closeGracePeriodMs: 200, + }); + let abortReason: string | undefined; + let producerReleased = false; + + server.registerTool( + 'probeTimeoutStream', + { + title: 'Probe Timeout Stream', + description: + 'Stays open until the receiver aborts after probe timeout.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + const originalPong = stream.pong.bind(stream); + const originalAbort = stream.abort.bind(stream); + + stream.pong = async (_nonce: string): Promise => { + // Suppress pong so the receiver-side keepalive probe times out. + }; + stream.abort = async (reason?: string): Promise => { + abortReason = reason; + await originalAbort(reason); + }; + + await stream.start(); + await stream.write(`stream:${topic}:1`); + + await new Promise((resolve) => { + const poll = (): void => { + if (!stream.isActive) { + producerReleased = true; + resolve(); + return; + } + + setTimeout(poll, 10); + }; + + poll(); + }); + + stream.pong = originalPong; + + return { + content: [{ type: 'text', text: `probe-timeout:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'probeTimeoutStream', + arguments: { + topic: 'orders', + }, + }); + + const iterator = call.stream[Symbol.asyncIterator](); + const firstChunk = await iterator.next(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value?.value).toBe('stream:orders:1'); + const closedResult = call.stream.closed.catch((error: unknown) => error); + + await expect(iterator.next()).rejects.toThrow('Probe timeout'); + expect(await closedResult).toBeInstanceOf(Error); + + await waitFor({ + produce: () => { + if (producerReleased && abortReason === 'Probe timeout') { + return true; + } + + return undefined; + }, + timeoutMs: 5_000, + }); + + await waitFor({ + produce: () => { + const state = serverTransport.getInternalStateForTesting(); + return state.correlationStore.eventRouteCount === 0 && + state.openStreamReceiver.size === 0 + ? true + : undefined; + }, + timeoutMs: 5_000, + }); + + expect(abortReason).toBe('Probe timeout'); + expect(producerReleased).toBe(true); + await expect(call.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'probe-timeout:orders' }], + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('keeps streaming after an interleaved client ping and server pong', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + let releaseSecondChunk: (() => void) | undefined; + + server.registerTool( + 'interleavedControlStream', + { + title: 'Interleaved Control Stream', + description: + 'Continues streaming after client-originated keepalive control frames.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`stream:${topic}:1`); + await new Promise((resolve) => { + releaseSecondChunk = resolve; + }); + await stream.write(`stream:${topic}:2`); + await stream.close(); + + return { + content: [{ type: 'text', text: `interleaved:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'interleavedControlStream', + arguments: { + topic: 'orders', + }, + }); + + const iterator = call.stream[Symbol.asyncIterator](); + const firstChunk = await iterator.next(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value?.value).toBe('stream:orders:1'); + + await clientTransport.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: { + progressToken: call.progressToken, + progress: 1, + cvm: { + type: 'open-stream', + frameType: 'ping', + nonce: 'manual-client-ping', + }, + }, + }); + + await waitFor({ + produce: () => + relayHub.getEvents().find((event) => { + const message = parseRelayMessage(event); + return ( + message?.params?.progressToken === call.progressToken && + message.params?.cvm?.frameType === 'pong' && + message.params?.cvm?.nonce === 'manual-client-ping' + ); + }), + timeoutMs: 5_000, + }); + + releaseSecondChunk?.(); + + const secondChunk = await iterator.next(); + expect(secondChunk.done).toBe(false); + expect(secondChunk.value?.value).toBe('stream:orders:2'); + await expect(iterator.next()).resolves.toEqual({ + done: true, + value: undefined, + }); + await expect(call.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'interleaved:orders' }], + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('aborts the server-side stream when the client aborts the tool stream call', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + let abortReason: string | undefined; + let producerReleased = false; + + server.registerTool( + 'clientAbortableStream', + { + title: 'Client Abortable Stream', + description: 'Keeps the stream open until the client aborts it.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + const streamClosed = stream.abort.bind(stream); + + stream.abort = async (reason?: string): Promise => { + abortReason = reason; + await streamClosed(reason); + }; + + await stream.start(); + await stream.write(`stream:${topic}:1`); + + await new Promise((resolve) => { + const poll = (): void => { + if (!stream.isActive) { + producerReleased = true; + resolve(); + return; + } + + setTimeout(poll, 10); + }; + + poll(); + }); + + return { + content: [{ type: 'text', text: `client-aborted:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'clientAbortableStream', + arguments: { + topic: 'orders', + }, + }); + + const iterator = call.stream[Symbol.asyncIterator](); + const firstChunk = await iterator.next(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value?.value).toBe('stream:orders:1'); + + const closedResult = call.stream.closed.catch((error: unknown) => error); + await call.abort('client cancelled stream'); + + await expect(iterator.next()).rejects.toThrow('client cancelled stream'); + expect(await closedResult).toBeInstanceOf(Error); + + await expect(call.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'client-aborted:orders' }], + }); + + await waitFor({ + produce: () => { + if (producerReleased && abortReason === 'client cancelled stream') { + return true; + } + + return undefined; + }, + timeoutMs: 5_000, + }); + + await waitFor({ + produce: () => { + const state = serverTransport.getInternalStateForTesting(); + return state.correlationStore.eventRouteCount === 0 && + state.openStreamReceiver.size === 0 + ? true + : undefined; + }, + timeoutMs: 5_000, + }); + + expect(abortReason).toBe('client cancelled stream'); + expect(producerReleased).toBe(true); + expect( + serverTransport.getInternalStateForTesting().correlationStore + .eventRouteCount, + ).toBe(0); + expect( + clientTransport.getOpenStreamSession(call.progressToken), + ).toBeUndefined(); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('keeps concurrent streams isolated when one aborts and the other closes', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + + server.registerTool( + 'mixedTerminalStreams', + { + title: 'Mixed Terminal Streams', + description: 'Aborts one stream and closes the other.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`${topic}:1`); + + if (topic === 'orders') { + await stream.abort('orders aborted'); + + return { + content: [{ type: 'text', text: `aborted:${topic}` }], + }; + } + + await stream.write(`${topic}:2`); + await stream.close(); + + return { + content: [{ type: 'text', text: `completed:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const [ordersCall, invoicesCall] = await Promise.all([ + callToolStream({ + client, + transport: clientTransport, + name: 'mixedTerminalStreams', + arguments: { topic: 'orders' }, + }), + callToolStream({ + client, + transport: clientTransport, + name: 'mixedTerminalStreams', + arguments: { topic: 'invoices' }, + }), + ]); + + const ordersClosed = ordersCall.stream.closed.catch( + (error: unknown) => error, + ); + const ordersIterator = ordersCall.stream[Symbol.asyncIterator](); + const orderFirstChunk = await ordersIterator.next(); + expect(orderFirstChunk.done).toBe(false); + expect(orderFirstChunk.value?.value).toBe('orders:1'); + await expect(ordersIterator.next()).rejects.toThrow('orders aborted'); + expect(await ordersClosed).toBeInstanceOf(Error); + + const invoiceChunks: string[] = []; + for await (const chunk of invoicesCall.stream) { + invoiceChunks.push(chunk.value); + } + + expect(invoiceChunks).toEqual(['invoices:1', 'invoices:2']); + await expect(invoicesCall.stream.closed).resolves.toBeUndefined(); + await expect(ordersCall.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'aborted:orders' }], + }); + await expect(invoicesCall.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'completed:invoices' }], + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('emits an accept frame for client-to-server CEP-41 bootstrap', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + + server.registerTool( + 'bootstrapOnly', + { + title: 'Bootstrap Only', + description: 'Stays pending while bootstrap frames are negotiated.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + void getOpenStreamWriter(extra); + + await new Promise(() => undefined); + + return { + content: [{ type: 'text', text: `unused:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const progressToken = 'client-origin-stream'; + void client.callTool({ + name: 'bootstrapOnly', + arguments: { + topic: 'orders', + _meta: { + progressToken, + }, + }, + }); + + await clientTransport.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamStartFrame({ + progressToken, + progress: 1, + }), + }); + + const acceptEvent = await waitFor({ + produce: () => + relayHub.getEvents().find((event) => getFrameType(event) === 'accept'), + timeoutMs: 5_000, + }); + + expect(acceptEvent).toBeDefined(); + expect(JSON.parse(acceptEvent.content)).toMatchObject({ + method: 'notifications/progress', + params: { + progressToken, + progress: 2, + cvm: { + type: 'open-stream', + frameType: 'accept', + }, + }, + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); + + test('defers the final JSON-RPC response until the stream closes', async () => { + const { relayHub, server, client, serverTransport, clientTransport } = + createOpenStreamFixture(); + let releaseClose: (() => void) | undefined; + + server.registerTool( + 'deferredResponseStream', + { + title: 'Deferred Response Stream', + description: 'Keeps the stream open until explicitly released.', + inputSchema: { + topic: z.string(), + }, + }, + async ({ topic }, extra) => { + const stream = getOpenStreamWriter(extra); + + await stream.start(); + await stream.write(`stream:${topic}:1`); + + await new Promise((resolve) => { + releaseClose = resolve; + }); + + await stream.close(); + + return { + content: [{ type: 'text', text: `deferred:${topic}` }], + }; + }, + ); + + await server.connect(serverTransport); + await client.connect(clientTransport); + + const call = await callToolStream({ + client, + transport: clientTransport, + name: 'deferredResponseStream', + arguments: { + topic: 'orders', + }, + }); + + const iterator = call.stream[Symbol.asyncIterator](); + const firstChunk = await iterator.next(); + expect(firstChunk.done).toBe(false); + expect(firstChunk.value?.value).toBe('stream:orders:1'); + + let resultSettled = false; + void call.result.finally(() => { + resultSettled = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + expect(resultSettled).toBe(false); + + releaseClose?.(); + + await expect(iterator.next()).resolves.toEqual({ + done: true, + value: undefined, + }); + await expect(call.result).resolves.toMatchObject({ + content: [{ type: 'text', text: 'deferred:orders' }], + }); + + await cleanupOpenStreamFixture({ client, server, relayHub }); + }, 15_000); +}); diff --git a/src/transport/call-tool-stream.test.ts b/src/transport/call-tool-stream.test.ts new file mode 100644 index 0000000..056e838 --- /dev/null +++ b/src/transport/call-tool-stream.test.ts @@ -0,0 +1,54 @@ +import { describe, expect, test } from 'bun:test'; +import { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import type { CallToolRequest } from '@modelcontextprotocol/sdk/types.js'; +import { callToolStream } from './call-tool-stream.js'; +import { OpenStreamSession } from './open-stream/index.js'; + +describe('callToolStream', () => { + test('creates stream session and forwards progress token into tool call', async () => { + const stream = new OpenStreamSession({ + progressToken: 'token-fixed', + maxBufferedChunks: 4, + maxBufferedBytes: 1024, + }); + + let capturedParams: CallToolRequest['params'] | undefined; + const client = new Client({ + name: 'call-tool-stream-test-client', + version: '1.0.0', + }); + + client.callTool = async (params) => { + capturedParams = params; + return { + content: [], + toolResult: { ok: true }, + }; + }; + + const call = await callToolStream({ + client, + transport: { + createOutboundOpenStreamSession: (progressToken: string) => { + expect(progressToken).toBe('token-fixed'); + return stream; + }, + } as unknown as never, + name: 'subscribeToEvents', + arguments: { topic: 'orders' }, + progressToken: 'token-fixed', + }); + + expect(call.progressToken).toBe('token-fixed'); + expect(call.stream).toBe(stream); + await expect(call.result).resolves.toEqual({ + content: [], + toolResult: { ok: true }, + }); + expect(capturedParams).toEqual({ + name: 'subscribeToEvents', + arguments: { topic: 'orders' }, + _meta: { progressToken: 'token-fixed' }, + }); + }); +}); diff --git a/src/transport/call-tool-stream.ts b/src/transport/call-tool-stream.ts new file mode 100644 index 0000000..0b2a57e --- /dev/null +++ b/src/transport/call-tool-stream.ts @@ -0,0 +1,49 @@ +import type { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import type { CallToolRequest } from '@modelcontextprotocol/sdk/types.js'; +import type { OpenStreamSession } from './open-stream/index.js'; +import type { NostrClientTransport } from './nostr-client-transport.js'; + +export interface CallToolStreamParams { + client: Client; + transport: NostrClientTransport; + name: CallToolRequest['params']['name']; + arguments?: CallToolRequest['params']['arguments']; + progressToken?: string; +} + +export interface ToolStreamCall { + readonly progressToken: string; + readonly stream: OpenStreamSession; + readonly result: Promise; + abort(reason?: string): Promise; +} + +function createProgressToken(): string { + return `open-stream-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`; +} + +/** + * Calls an MCP tool with a CEP-41 progress token and returns the paired stream handle. + */ +export async function callToolStream( + params: CallToolStreamParams, +): Promise> { + const { client, transport, name, arguments: toolArguments } = params; + const progressToken = params.progressToken ?? createProgressToken(); + const stream = transport.createOutboundOpenStreamSession(progressToken); + + const result = client.callTool({ + name, + arguments: toolArguments, + _meta: { + progressToken, + }, + } satisfies CallToolRequest['params']) as Promise; + + return { + progressToken, + stream, + result, + abort: (reason?: string): Promise => stream.abort(reason), + }; +} diff --git a/src/transport/discovery-tags.ts b/src/transport/discovery-tags.ts index dab1b98..cbae081 100644 --- a/src/transport/discovery-tags.ts +++ b/src/transport/discovery-tags.ts @@ -11,6 +11,7 @@ export interface DiscoveredPeerCapabilities { supportsEncryption: boolean; supportsEphemeralEncryption: boolean; supportsOversizedTransfer: boolean; + supportsOpenStream: boolean; } /** @@ -20,6 +21,7 @@ export interface PeerCapabilities { supportsEncryption: boolean; supportsEphemeralEncryption: boolean; supportsOversizedTransfer: boolean; + supportsOpenStream: boolean; } function cloneTag(tag: readonly string[]): string[] { @@ -91,5 +93,6 @@ export function learnPeerCapabilities( eventTags, NOSTR_TAGS.SUPPORT_OVERSIZED_TRANSFER, ), + supportsOpenStream: hasSingleTag(eventTags, NOSTR_TAGS.SUPPORT_OPEN_STREAM), }; } diff --git a/src/transport/index.ts b/src/transport/index.ts index ae872b8..1666b43 100644 --- a/src/transport/index.ts +++ b/src/transport/index.ts @@ -3,3 +3,6 @@ export * from './nostr-server-transport.js'; export * from './nostr-server/announcement-manager.js'; export * from './base-nostr-transport.js'; export * from './server-transport-common-schemas.js'; +export * from './open-stream/index.js'; +export * from './open-stream-policy.js'; +export * from './call-tool-stream.js'; diff --git a/src/transport/nostr-client-transport.ts b/src/transport/nostr-client-transport.ts index 3a44d98..975a413 100644 --- a/src/transport/nostr-client-transport.ts +++ b/src/transport/nostr-client-transport.ts @@ -48,12 +48,27 @@ import { OversizedTransferReceiver, type TransferPolicy, } from './oversized-transfer/index.js'; +import { + OpenStreamReceiver, + OpenStreamSession, + buildOpenStreamAbortFrame, + buildOpenStreamPingFrame, + buildOpenStreamPongFrame, +} from './open-stream/index.js'; +import { + DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS, + DEFAULT_OPEN_STREAM_IDLE_TIMEOUT_MS, + DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS, + DEFAULT_MAX_BUFFERED_BYTES_PER_STREAM, + DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM, +} from './open-stream/constants.js'; import { parseDiscoveredPeerCapabilities } from './discovery-tags.js'; import { DEFAULT_CHUNK_SIZE, DEFAULT_OVERSIZED_THRESHOLD, } from './oversized-transfer/constants.js'; import { sendOversizedClientRequest } from './nostr-client/oversized-client-sender.js'; +import type { OpenStreamTransportPolicy } from './open-stream-policy.js'; /** * Options for configuring the NostrClientTransport. @@ -99,6 +114,13 @@ export interface NostrTransportOptions extends Omit< /** Receiver-side admission policy. */ policy?: TransferPolicy; }; + /** Options controlling CEP-41 open-ended stream transfer. */ + openStream?: { + /** Whether open stream transfer is enabled. @default false */ + enabled?: boolean; + /** Receiver/session policy reserved for CEP-41 stream lifecycle limits. */ + policy?: OpenStreamTransportPolicy; + }; } /** @@ -155,6 +177,9 @@ export class NostrClientTransport /** Whether the server has advertised CEP-22 oversized transfer support. */ private serverSupportsOversizedTransfer: boolean = false; + /** Whether the server has advertised CEP-41 open stream support. */ + private serverSupportsOpenStream: boolean = false; + /** Whether this client has already sent its discovery tags to the server. */ private hasSentDiscoveryTags: boolean = false; @@ -163,10 +188,15 @@ export class NostrClientTransport private readonly oversizedThreshold: number; private readonly oversizedChunkSize: number; private readonly oversizedAcceptTimeoutMs: number; + private readonly openStreamEnabled: boolean; + private readonly openStreamPolicy: OpenStreamTransportPolicy | undefined; /** Receives inbound oversized-transfer frames from the server (server→client responses). */ private readonly oversizedReceiver: OversizedTransferReceiver; + /** Receives inbound open-stream frames from the server (server→client notifications). */ + private readonly openStreamReceiver: OpenStreamReceiver; + /** * Deduplicate inbound events to avoid redundant work. * @@ -211,6 +241,61 @@ export class NostrClientTransport ot?.policy ?? {}, this.logger, ); + this.openStreamEnabled = options.openStream?.enabled ?? false; + this.openStreamPolicy = options.openStream?.policy; + this.openStreamReceiver = new OpenStreamReceiver({ + maxConcurrentStreams: options.openStream?.policy?.maxConcurrentStreams, + maxBufferedChunksPerStream: + options.openStream?.policy?.maxBufferedChunksPerStream, + maxBufferedBytesPerStream: + options.openStream?.policy?.maxBufferedBytesPerStream, + idleTimeoutMs: options.openStream?.policy?.idleTimeoutMs, + probeTimeoutMs: options.openStream?.policy?.probeTimeoutMs, + closeGracePeriodMs: options.openStream?.policy?.closeGracePeriodMs, + getSessionOptions: (progressToken) => { + let progress = 0; + + return { + sendPing: async (nonce: string): Promise => { + progress += 1; + await this.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamPingFrame({ + progressToken, + progress, + nonce, + }), + }); + }, + sendPong: async (nonce: string): Promise => { + progress += 1; + await this.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamPongFrame({ + progressToken, + progress, + nonce, + }), + }); + }, + sendAbort: async (reason?: string): Promise => { + progress += 1; + await this.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamAbortFrame({ + progressToken, + progress, + reason, + }), + }); + }, + }; + }, + logger: this.logger, + }); } /** @@ -240,6 +325,10 @@ export class NostrClientTransport tags.push([NOSTR_TAGS.SUPPORT_OVERSIZED_TRANSFER]); } + if (this.openStreamEnabled) { + tags.push([NOSTR_TAGS.SUPPORT_OPEN_STREAM]); + } + return tags; } @@ -324,6 +413,7 @@ export class NostrClientTransport this.correlationStore.clear(); this.seenEventIds.clear(); this.oversizedReceiver.clear(); + this.openStreamReceiver.clear(); this.onclose?.(); } catch (error) { this.onerror?.(error instanceof Error ? error : new Error(String(error))); @@ -544,6 +634,93 @@ export class NostrClientTransport return this.correlationStore.getPendingRequest(eventId); } + /** + * Returns the CEP-41 stream session for a progress token, creating it lazily if needed. + */ + public getOrCreateOpenStreamSession( + progressToken: string, + ): OpenStreamSession { + return this.openStreamReceiver.getOrCreateSession(progressToken); + } + + /** + * Returns an outbound CEP-41 session whose local abort publishes an abort + * notification to the server. + */ + public createOutboundOpenStreamSession( + progressToken: string, + ): OpenStreamSession { + const existing = this.openStreamReceiver.getSession(progressToken); + if (existing) { + return existing; + } + + let progress = 0; + return this.openStreamReceiver.createSession({ + progressToken, + maxBufferedChunks: + this.openStreamPolicy?.maxBufferedChunksPerStream ?? + DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM, + maxBufferedBytes: + this.openStreamPolicy?.maxBufferedBytesPerStream ?? + DEFAULT_MAX_BUFFERED_BYTES_PER_STREAM, + idleTimeoutMs: + this.openStreamPolicy?.idleTimeoutMs ?? + DEFAULT_OPEN_STREAM_IDLE_TIMEOUT_MS, + probeTimeoutMs: + this.openStreamPolicy?.probeTimeoutMs ?? + DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS, + closeGracePeriodMs: + this.openStreamPolicy?.closeGracePeriodMs ?? + DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS, + sendPing: async (nonce: string): Promise => { + progress += 1; + await this.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamPingFrame({ + progressToken, + progress, + nonce, + }), + }); + }, + sendPong: async (nonce: string): Promise => { + progress += 1; + await this.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamPongFrame({ + progressToken, + progress, + nonce, + }), + }); + }, + sendAbort: async (reason?: string): Promise => { + progress += 1; + await this.send({ + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamAbortFrame({ + progressToken, + progress, + reason, + }), + }); + }, + }); + } + + /** + * Returns the CEP-41 stream session for a progress token when it already exists. + */ + public getOpenStreamSession( + progressToken: string, + ): OpenStreamSession | undefined { + return this.openStreamReceiver.getSession(progressToken); + } + /** * Emulates the server's initialize response for stateless clients. * @param requestId - The ID of the original initialize request @@ -893,6 +1070,7 @@ export class NostrClientTransport discovered.supportsEphemeralEncryption; this.serverSupportsOversizedTransfer ||= discovered.supportsOversizedTransfer; + this.serverSupportsOpenStream ||= discovered.supportsOpenStream; if (!this.serverInitializeEvent) { this.serverInitializeEvent = event; @@ -1010,6 +1188,22 @@ export class NostrClientTransport } // CEP-22: intercept oversized-transfer frames and do NOT forward raw frames. + if ( + isJSONRPCNotification(mcpMessage) && + mcpMessage.method === 'notifications/progress' && + OpenStreamReceiver.isOpenStreamFrame(mcpMessage) + ) { + this.openStreamReceiver + .processFrame(mcpMessage) + .catch((err: unknown) => { + this.logger.error('Open stream error (client)', { + error: err instanceof Error ? err.message : String(err), + }); + this.onerror?.(err instanceof Error ? err : new Error(String(err))); + }); + return; + } + if ( isJSONRPCNotification(mcpMessage) && mcpMessage.method === 'notifications/progress' && @@ -1083,6 +1277,8 @@ export class NostrClientTransport serverResourcesListEvent: this.serverResourcesListEvent, serverResourceTemplatesListEvent: this.serverResourceTemplatesListEvent, serverPromptsListEvent: this.serverPromptsListEvent, + oversizedReceiver: this.oversizedReceiver, + openStreamReceiver: this.openStreamReceiver, }; } } diff --git a/src/transport/nostr-server-transport.dedup-response.test.ts b/src/transport/nostr-server-transport.dedup-response.test.ts index 3c96bc1..146a1d5 100644 --- a/src/transport/nostr-server-transport.dedup-response.test.ts +++ b/src/transport/nostr-server-transport.dedup-response.test.ts @@ -1,7 +1,11 @@ import { describe, it, expect, mock } from 'bun:test'; import type { RelayHandler } from '../core/interfaces.js'; import type { NostrEvent } from 'nostr-tools'; -import { finalizeEvent, generateSecretKey, getPublicKey } from 'nostr-tools/pure'; +import { + finalizeEvent, + generateSecretKey, + getPublicKey, +} from 'nostr-tools/pure'; import type { JSONRPCResponse } from '@modelcontextprotocol/sdk/types.js'; import { NostrServerTransport } from './nostr-server-transport.js'; import { PrivateKeySigner } from '../signer/private-key-signer.js'; diff --git a/src/transport/nostr-server-transport.inner-event-verification.test.ts b/src/transport/nostr-server-transport.inner-event-verification.test.ts index 6681634..3adbe8a 100644 --- a/src/transport/nostr-server-transport.inner-event-verification.test.ts +++ b/src/transport/nostr-server-transport.inner-event-verification.test.ts @@ -1,7 +1,11 @@ import { describe, it, expect, mock } from 'bun:test'; import type { RelayHandler } from '../core/interfaces.js'; import type { NostrEvent } from 'nostr-tools'; -import { finalizeEvent, generateSecretKey, getPublicKey } from 'nostr-tools/pure'; +import { + finalizeEvent, + generateSecretKey, + getPublicKey, +} from 'nostr-tools/pure'; import { NostrServerTransport } from './nostr-server-transport.js'; import { PrivateKeySigner } from '../signer/private-key-signer.js'; import { EncryptionMode } from '../core/interfaces.js'; diff --git a/src/transport/nostr-server-transport.test.ts b/src/transport/nostr-server-transport.test.ts index 8e951b4..b8be45f 100644 --- a/src/transport/nostr-server-transport.test.ts +++ b/src/transport/nostr-server-transport.test.ts @@ -1605,7 +1605,7 @@ describe.serial('NostrServerTransport', () => { const serverPublicKey = getPublicKey(hexToBytes(serverPrivateKey)); const clientPrivateKey = bytesToHex(generateSecretKey()); const uniqueSuffix = Math.random().toString(36).substring(2, 8); - const commonToolName = `translate_text_${uniqueSuffix}`; + const commonToolName = `translate_text_${uniqueSuffix}`; const bespokeToolName = `bespoke_tool_${uniqueSuffix}`; const server = new McpServer({ @@ -1720,9 +1720,9 @@ describe.serial('NostrServerTransport', () => { }, }); expect( - (announcedBespokeTool?.['_meta'] as Record | undefined)?.[ - COMMON_SCHEMA_META_NAMESPACE - ], + ( + announcedBespokeTool?.['_meta'] as Record | undefined + )?.[COMMON_SCHEMA_META_NAMESPACE], ).toBeUndefined(); expect(iTags).toEqual( diff --git a/src/transport/nostr-server-transport.ts b/src/transport/nostr-server-transport.ts index 4531cf8..208b452 100644 --- a/src/transport/nostr-server-transport.ts +++ b/src/transport/nostr-server-transport.ts @@ -56,6 +56,14 @@ import { OversizedTransferReceiver, type TransferPolicy, } from './oversized-transfer/index.js'; +import { + OpenStreamReceiver, + OpenStreamWriter, + buildOpenStreamAcceptFrame, + buildOpenStreamAbortFrame, + buildOpenStreamPingFrame, + buildOpenStreamPongFrame, +} from './open-stream/index.js'; import { DEFAULT_CHUNK_SIZE, DEFAULT_OVERSIZED_THRESHOLD, @@ -65,6 +73,7 @@ import { sendAcceptFrame, sendOversizedServerResponse, } from './nostr-server/oversized-server-handler.js'; +import type { OpenStreamTransportPolicy } from './open-stream-policy.js'; /** * Options for configuring the NostrServerTransport. @@ -148,6 +157,13 @@ export interface NostrServerTransportOptions extends BaseNostrTransportOptions { /** Receiver-side admission policy. */ policy?: TransferPolicy; }; + /** Options controlling CEP-41 open-ended stream transfer. */ + openStream?: { + /** Whether open stream transfer is enabled. @default false */ + enabled?: boolean; + /** Receiver/session policy reserved for CEP-41 stream lifecycle limits. */ + policy?: OpenStreamTransportPolicy; + }; } export type ListToolsResultTransformer = ( @@ -195,8 +211,10 @@ export class NostrServerTransport }) => void | Promise) | undefined; private readonly inboundMiddlewares: InboundMiddlewareFn[] = []; - private readonly listToolsResultTransformers: ListToolsResultTransformer[] = []; - private readonly listToolsAnnouncementTagsProducers: ListToolsAnnouncementTagsProducer[] = []; + private readonly listToolsResultTransformers: ListToolsResultTransformer[] = + []; + private readonly listToolsAnnouncementTagsProducers: ListToolsAnnouncementTagsProducer[] = + []; /** * Deduplicate inbound events to avoid redundant work. @@ -208,10 +226,23 @@ export class NostrServerTransport /** Receives inbound oversized-transfer frames from clients (client→server requests). */ private readonly oversizedReceiver: OversizedTransferReceiver; + /** Receives inbound open-stream frames from clients (client→server notifications). */ + private readonly openStreamReceiver: OpenStreamReceiver; + + /** Pending final responses held until their CEP-41 stream terminates. */ + private readonly pendingOpenStreamResponses = new Map< + string, + JSONRPCResponse + >(); + + /** Active server-side CEP-41 writers keyed by inbound request event id. */ + private readonly openStreamWriters = new Map(); + // Oversized-transfer sender settings (for server→client responses) private readonly oversizedEnabled: boolean; private readonly oversizedThreshold: number; private readonly oversizedChunkSize: number; + private readonly openStreamEnabled: boolean; constructor(options: NostrServerTransportOptions) { super('nostr-server-transport', options); @@ -316,13 +347,73 @@ export class NostrServerTransport ot?.policy ?? {}, this.logger, ); + this.openStreamEnabled = options.openStream?.enabled ?? false; + this.openStreamReceiver = new OpenStreamReceiver({ + maxConcurrentStreams: options.openStream?.policy?.maxConcurrentStreams, + maxBufferedChunksPerStream: + options.openStream?.policy?.maxBufferedChunksPerStream, + maxBufferedBytesPerStream: + options.openStream?.policy?.maxBufferedBytesPerStream, + idleTimeoutMs: options.openStream?.policy?.idleTimeoutMs, + probeTimeoutMs: options.openStream?.policy?.probeTimeoutMs, + closeGracePeriodMs: options.openStream?.policy?.closeGracePeriodMs, + getSessionOptions: (progressToken) => { + let progress = 0; + + return { + sendPing: async (nonce: string): Promise => { + progress += 1; + await this.sendNotification(progressToken, { + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamPingFrame({ + progressToken, + progress, + nonce, + }), + }); + }, + sendPong: async (nonce: string): Promise => { + progress += 1; + await this.sendNotification(progressToken, { + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamPongFrame({ + progressToken, + progress, + nonce, + }), + }); + }, + sendAbort: async (reason?: string): Promise => { + progress += 1; + await this.sendNotification(progressToken, { + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamAbortFrame({ + progressToken, + progress, + reason, + }), + }); + }, + }; + }, + logger: this.logger, + }); // Advertise CEP-22 support so clients can skip the accept handshake. + const internalCommonTags: string[][] = []; + if (this.oversizedEnabled) { - this.announcementManager.setInternalCommonTags([ - [NOSTR_TAGS.SUPPORT_OVERSIZED_TRANSFER], - ]); + internalCommonTags.push([NOSTR_TAGS.SUPPORT_OVERSIZED_TRANSFER]); + } + + if (this.openStreamEnabled) { + internalCommonTags.push([NOSTR_TAGS.SUPPORT_OPEN_STREAM]); } + + this.announcementManager.setInternalCommonTags(internalCommonTags); } /** @@ -430,6 +521,9 @@ export class NostrServerTransport this.correlationStore.clear(); this.seenEventIds.clear(); this.oversizedReceiver.clear(); + this.openStreamReceiver.clear(); + this.pendingOpenStreamResponses.clear(); + this.openStreamWriters.clear(); this.onclose?.(); } catch (error) { this.onerror?.(error instanceof Error ? error : new Error(String(error))); @@ -619,6 +713,40 @@ export class NostrServerTransport wrapKind, this.shouldInjectRequestEventId ? event : undefined, ); + + if (this.openStreamEnabled && progressToken) { + const writer = new OpenStreamWriter({ + progressToken: String(progressToken), + publishFrame: async (frame) => { + await this.sendNotification(clientPubkey, { + jsonrpc: '2.0', + method: 'notifications/progress', + params: frame, + }); + return undefined; + }, + onClose: async (): Promise => { + await this.flushPendingOpenStreamResponse(eventId); + }, + onAbort: async (): Promise => { + await this.flushPendingOpenStreamResponse(eventId); + }, + }); + + this.openStreamWriters.set(eventId, writer); + } + } + + private async flushPendingOpenStreamResponse(eventId: string): Promise { + const pendingResponse = this.pendingOpenStreamResponses.get(eventId); + this.pendingOpenStreamResponses.delete(eventId); + this.openStreamWriters.delete(eventId); + + if (!pendingResponse) { + return; + } + + await this.handleResponse(pendingResponse); } /** @@ -662,9 +790,7 @@ export class NostrServerTransport ); } - private buildListToolsAnnouncementTags( - result: ListToolsResult, - ): string[][] { + private buildListToolsAnnouncementTags(result: ListToolsResult): string[][] { return this.listToolsAnnouncementTagsProducers.flatMap((producer) => producer(result), ); @@ -687,6 +813,12 @@ export class NostrServerTransport // Find the event route using O(1) lookup const nostrEventId = response.id as string; + const existingOpenStreamWriter = this.openStreamWriters.get(nostrEventId); + if (existingOpenStreamWriter && existingOpenStreamWriter.isActive) { + this.pendingOpenStreamResponses.set(nostrEventId, response); + return; + } + const route = this.correlationStore.popEventRoute(nostrEventId); if (!route) { @@ -711,7 +843,9 @@ export class NostrServerTransport const responseToSend = parsedListToolsResult?.success ? { ...response, - result: this.applyListToolsResultTransformers(parsedListToolsResult.data), + result: this.applyListToolsResultTransformers( + parsedListToolsResult.data, + ), } : response; @@ -1070,6 +1204,8 @@ export class NostrServerTransport return; } + const inboundMessage: JSONRPCMessage = mcpMessage; + // Check authorization using the authorization policy const authDecision = await this.authorizationPolicy.authorize( event.pubkey, @@ -1133,6 +1269,8 @@ export class NostrServerTransport session.supportsOversizedTransfer ||= this.oversizedEnabled && discoveredCapabilities.supportsOversizedTransfer; + session.supportsOpenStream ||= + this.openStreamEnabled && discoveredCapabilities.supportsOpenStream; const shouldSendAccept = !hadLearnedOversizedSupport; @@ -1168,43 +1306,152 @@ export class NostrServerTransport return forwarded; }; - if (isJSONRPCRequest(mcpMessage)) { + if (isJSONRPCRequest(inboundMessage)) { this.handleIncomingRequest( event, event.id, - mcpMessage, + inboundMessage, event.pubkey, wrapKind, ); if (this.shouldInjectRequestEventId) { - injectRequestEventId(mcpMessage, event.id); + injectRequestEventId(inboundMessage, event.id); } if (this.injectClientPubkey) { - injectClientPubkey(mcpMessage, event.pubkey); + injectClientPubkey(inboundMessage, event.pubkey); + } + + const openStreamWriter = this.openStreamWriters.get(event.id); + if (openStreamWriter) { + const params = inboundMessage.params ?? {}; + inboundMessage.params = params; + const meta = params._meta ?? {}; + params._meta = meta; + (meta as { stream?: OpenStreamWriter }).stream = openStreamWriter; + } + } else if (isJSONRPCNotification(inboundMessage)) { + this.handleIncomingNotification(event.pubkey, inboundMessage); + + if ( + inboundMessage.method === 'notifications/progress' && + OpenStreamReceiver.isOpenStreamFrame(inboundMessage) + ) { + const frame = inboundMessage.params?.cvm as + | { frameType?: string; reason?: string } + | undefined; + + if (frame?.frameType === 'abort') { + const progressToken = String( + inboundMessage.params?.progressToken ?? '', + ); + const eventId = + this.correlationStore.getEventIdByProgressToken(progressToken); + const writer = eventId + ? this.openStreamWriters.get(eventId) + : undefined; + + if (writer) { + void writer.abort(frame.reason).catch((err: unknown) => { + this.logger.error( + 'Open stream abort propagation failed (server)', + { + error: err instanceof Error ? err.message : String(err), + pubkey: event.pubkey, + progressToken, + }, + ); + this.onerror?.( + err instanceof Error ? err : new Error(String(err)), + ); + }); + } + + return; + } + + if (frame?.frameType === 'ping') { + const progressToken = String( + inboundMessage.params?.progressToken ?? '', + ); + const nonce = + 'nonce' in frame && typeof frame.nonce === 'string' + ? frame.nonce + : ''; + const eventId = + this.correlationStore.getEventIdByProgressToken(progressToken); + const writer = eventId + ? this.openStreamWriters.get(eventId) + : undefined; + + if (writer) { + void writer.pong(nonce).catch((err: unknown) => { + this.logger.error('Open stream ping handling failed (server)', { + error: err instanceof Error ? err.message : String(err), + pubkey: event.pubkey, + progressToken, + }); + this.onerror?.( + err instanceof Error ? err : new Error(String(err)), + ); + }); + + return; + } + } + + this.openStreamReceiver + .processFrame(inboundMessage) + .then(async () => { + const frameType = frame?.frameType; + + if (frameType === 'start' && session.supportsOpenStream) { + await this.sendNotification(event.pubkey, { + jsonrpc: '2.0', + method: 'notifications/progress', + params: buildOpenStreamAcceptFrame({ + progressToken: String( + inboundMessage.params?.progressToken ?? '', + ), + progress: Number(inboundMessage.params?.progress ?? 0) + 1, + }), + }); + } + }) + .catch((err: unknown) => { + this.logger.error('Open stream error (server)', { + error: err instanceof Error ? err.message : String(err), + pubkey: event.pubkey, + }); + this.onerror?.( + err instanceof Error ? err : new Error(String(err)), + ); + }); + return; } - } else if (isJSONRPCNotification(mcpMessage)) { - this.handleIncomingNotification(event.pubkey, mcpMessage); if ( - mcpMessage.method === 'notifications/progress' && - OversizedTransferReceiver.isOversizedFrame(mcpMessage) + inboundMessage.method === 'notifications/progress' && + OversizedTransferReceiver.isOversizedFrame(inboundMessage) ) { this.oversizedReceiver - .processFrame(mcpMessage) + .processFrame(inboundMessage) .then(async (synthetic) => { if (synthetic === null) { if ( - (mcpMessage.params?.cvm as { frameType?: string } | undefined) - ?.frameType === 'start' && + ( + inboundMessage.params?.cvm as + | { frameType?: string } + | undefined + )?.frameType === 'start' && shouldSendAccept ) { await sendAcceptFrame( { clientPubkey: event.pubkey, progressToken: String( - mcpMessage.params?.progressToken ?? '', + inboundMessage.params?.progressToken ?? '', ), }, { @@ -1272,10 +1519,10 @@ export class NostrServerTransport } } - void dispatch(0, mcpMessage) + void dispatch(0, inboundMessage) .then((forwarded) => { if (!forwarded) { - this.cleanupDroppedRequest(mcpMessage); + this.cleanupDroppedRequest(inboundMessage); } }) .catch((err: unknown) => { @@ -1307,6 +1554,8 @@ export class NostrServerTransport return { sessionStore: this.sessionStore, correlationStore: this.correlationStore, + oversizedReceiver: this.oversizedReceiver, + openStreamReceiver: this.openStreamReceiver, }; } } diff --git a/src/transport/nostr-server/announcement-manager.ts b/src/transport/nostr-server/announcement-manager.ts index 4156a05..11fd1a1 100644 --- a/src/transport/nostr-server/announcement-manager.ts +++ b/src/transport/nostr-server/announcement-manager.ts @@ -592,7 +592,8 @@ export class AnnouncementManager { : undefined; const announcementResult = announcementListToolsResult ?? result; const listToolsAnnouncementTags = announcementListToolsResult - ? this.getListToolsAnnouncementTags?.(announcementListToolsResult) ?? [] + ? (this.getListToolsAnnouncementTags?.(announcementListToolsResult) ?? + []) : []; for (const mapping of announcementMapping) { diff --git a/src/transport/nostr-server/session-store.ts b/src/transport/nostr-server/session-store.ts index 59ce12e..e911c1b 100644 --- a/src/transport/nostr-server/session-store.ts +++ b/src/transport/nostr-server/session-store.ts @@ -24,6 +24,8 @@ export interface ClientSession { supportsEphemeralEncryption: boolean; /** Whether the client has advertised CEP-22 oversized transfer support. */ supportsOversizedTransfer: boolean; + /** Whether the client has advertised CEP-41 open stream support. */ + supportsOpenStream: boolean; } /** @@ -84,6 +86,7 @@ export class SessionStore { supportsEncryption: false, supportsEphemeralEncryption: false, supportsOversizedTransfer: false, + supportsOpenStream: false, }; this.sessions.set(clientPubkey, newSession); diff --git a/src/transport/nostr-transport-deduplication.test.ts b/src/transport/nostr-transport-deduplication.test.ts index e8ad80d..4b35924 100644 --- a/src/transport/nostr-transport-deduplication.test.ts +++ b/src/transport/nostr-transport-deduplication.test.ts @@ -1,7 +1,11 @@ import { describe, test, expect } from 'bun:test'; import { EncryptionMode, type RelayHandler } from '../core/interfaces.js'; import type { NostrEvent } from 'nostr-tools'; -import { finalizeEvent, generateSecretKey, getPublicKey } from 'nostr-tools/pure'; +import { + finalizeEvent, + generateSecretKey, + getPublicKey, +} from 'nostr-tools/pure'; import { NostrClientTransport } from './nostr-client-transport.js'; import { PrivateKeySigner } from '../signer/private-key-signer.js'; import { EPHEMERAL_GIFT_WRAP_KIND, GIFT_WRAP_KIND } from '../core/constants.js'; diff --git a/src/transport/open-stream-policy.ts b/src/transport/open-stream-policy.ts new file mode 100644 index 0000000..574a603 --- /dev/null +++ b/src/transport/open-stream-policy.ts @@ -0,0 +1,11 @@ +/** + * Shared policy options for CEP-41 open-stream lifecycle and buffering. + */ +export interface OpenStreamTransportPolicy { + maxConcurrentStreams?: number; + maxBufferedChunksPerStream?: number; + maxBufferedBytesPerStream?: number; + idleTimeoutMs?: number; + probeTimeoutMs?: number; + closeGracePeriodMs?: number; +} diff --git a/src/transport/open-stream/constants.ts b/src/transport/open-stream/constants.ts new file mode 100644 index 0000000..05ed157 --- /dev/null +++ b/src/transport/open-stream/constants.ts @@ -0,0 +1,8 @@ +export const OPEN_STREAM_TYPE = 'open-stream'; + +export const DEFAULT_MAX_CONCURRENT_OPEN_STREAMS = 64; +export const DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM = 64; +export const DEFAULT_MAX_BUFFERED_BYTES_PER_STREAM = 512 * 1024; +export const DEFAULT_OPEN_STREAM_IDLE_TIMEOUT_MS = 30_000; +export const DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS = 10_000; +export const DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS = 5_000; diff --git a/src/transport/open-stream/errors.ts b/src/transport/open-stream/errors.ts new file mode 100644 index 0000000..5f9b2bf --- /dev/null +++ b/src/transport/open-stream/errors.ts @@ -0,0 +1,36 @@ +/** Base class for all CEP-41 open-stream errors. */ +export class OpenStreamError extends Error { + constructor(message: string) { + super(message); + this.name = 'OpenStreamError'; + } +} + +/** Thrown when a stream is aborted locally or by the remote peer. */ +export class OpenStreamAbortError extends OpenStreamError { + public readonly progressToken: string; + public readonly reason: string | undefined; + + constructor(progressToken: string, reason?: string) { + super(`Open stream aborted${reason ? `: ${reason}` : ''}`); + this.name = 'OpenStreamAbortError'; + this.progressToken = progressToken; + this.reason = reason; + } +} + +/** Thrown when a stream violates local admission or buffering policy. */ +export class OpenStreamPolicyError extends OpenStreamError { + constructor(message: string) { + super(message); + this.name = 'OpenStreamPolicyError'; + } +} + +/** Thrown when CEP-41 lifecycle or ordering rules are violated. */ +export class OpenStreamSequenceError extends OpenStreamError { + constructor(message: string) { + super(message); + this.name = 'OpenStreamSequenceError'; + } +} diff --git a/src/transport/open-stream/frames.ts b/src/transport/open-stream/frames.ts new file mode 100644 index 0000000..ae1ad99 --- /dev/null +++ b/src/transport/open-stream/frames.ts @@ -0,0 +1,145 @@ +import type { + OpenStreamAbortFrame, + OpenStreamAcceptFrame, + OpenStreamChunkFrame, + OpenStreamCloseFrame, + OpenStreamPingFrame, + OpenStreamPongFrame, + OpenStreamProgress, + OpenStreamStartFrame, +} from './types.js'; + +export function buildOpenStreamStartFrame(params: { + progressToken: string; + progress: number; + contentType?: string; +}): OpenStreamProgress { + const cvm: OpenStreamStartFrame = { + type: 'open-stream', + frameType: 'start', + }; + + if (params.contentType !== undefined) { + cvm.contentType = params.contentType; + } + + return { + progressToken: params.progressToken, + progress: params.progress, + cvm, + }; +} + +export function buildOpenStreamAcceptFrame(params: { + progressToken: string; + progress: number; +}): OpenStreamProgress { + const cvm: OpenStreamAcceptFrame = { + type: 'open-stream', + frameType: 'accept', + }; + + return { + progressToken: params.progressToken, + progress: params.progress, + cvm, + }; +} + +export function buildOpenStreamChunkFrame(params: { + progressToken: string; + progress: number; + chunkIndex: number; + data: string; +}): OpenStreamProgress { + const cvm: OpenStreamChunkFrame = { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: params.chunkIndex, + data: params.data, + }; + + return { + progressToken: params.progressToken, + progress: params.progress, + cvm, + }; +} + +export function buildOpenStreamPingFrame(params: { + progressToken: string; + progress: number; + nonce: string; +}): OpenStreamProgress { + const cvm: OpenStreamPingFrame = { + type: 'open-stream', + frameType: 'ping', + nonce: params.nonce, + }; + + return { + progressToken: params.progressToken, + progress: params.progress, + cvm, + }; +} + +export function buildOpenStreamPongFrame(params: { + progressToken: string; + progress: number; + nonce: string; +}): OpenStreamProgress { + const cvm: OpenStreamPongFrame = { + type: 'open-stream', + frameType: 'pong', + nonce: params.nonce, + }; + + return { + progressToken: params.progressToken, + progress: params.progress, + cvm, + }; +} + +export function buildOpenStreamCloseFrame(params: { + progressToken: string; + progress: number; + lastChunkIndex?: number; +}): OpenStreamProgress { + const cvm: OpenStreamCloseFrame = { + type: 'open-stream', + frameType: 'close', + }; + + if (params.lastChunkIndex !== undefined) { + cvm.lastChunkIndex = params.lastChunkIndex; + } + + return { + progressToken: params.progressToken, + progress: params.progress, + cvm, + }; +} + +export function buildOpenStreamAbortFrame(params: { + progressToken: string; + progress: number; + reason?: string; +}): OpenStreamProgress { + const cvm: OpenStreamAbortFrame = { + type: 'open-stream', + frameType: 'abort', + }; + + if (params.reason !== undefined) { + cvm.reason = params.reason; + } + + return { + progressToken: params.progressToken, + progress: params.progress, + cvm, + }; +} diff --git a/src/transport/open-stream/index.ts b/src/transport/open-stream/index.ts new file mode 100644 index 0000000..02ddc2b --- /dev/null +++ b/src/transport/open-stream/index.ts @@ -0,0 +1,8 @@ +export * from './constants.js'; +export * from './errors.js'; +export * from './frames.js'; +export * from './types.js'; +export { OpenStreamSession } from './session.js'; +export { OpenStreamRegistry } from './registry.js'; +export { OpenStreamReceiver } from './receiver.js'; +export { OpenStreamWriter } from './writer.js'; diff --git a/src/transport/open-stream/receiver.ts b/src/transport/open-stream/receiver.ts new file mode 100644 index 0000000..f694adf --- /dev/null +++ b/src/transport/open-stream/receiver.ts @@ -0,0 +1,50 @@ +import type { JSONRPCNotification } from '@modelcontextprotocol/sdk/types.js'; +import { + OpenStreamRegistry, + type OpenStreamRegistryOptions, +} from './registry.js'; +import type { OpenStreamSession, OpenStreamSessionOptions } from './session.js'; +import type { OpenStreamProgress } from './types.js'; + +/** + * Stateful receiver for inbound CEP-41 `notifications/progress` frames. + */ +export class OpenStreamReceiver { + private readonly registry: OpenStreamRegistry; + + constructor(options: OpenStreamRegistryOptions) { + this.registry = new OpenStreamRegistry(options); + } + + public static isOpenStreamFrame(notification: JSONRPCNotification): boolean { + return OpenStreamRegistry.isOpenStreamProgress(notification.params); + } + + public async processFrame( + notification: JSONRPCNotification, + ): Promise { + return this.registry.processFrame( + notification.params as OpenStreamProgress, + ); + } + + public getSession(progressToken: string): OpenStreamSession | undefined { + return this.registry.getSession(progressToken); + } + + public getOrCreateSession(progressToken: string): OpenStreamSession { + return this.registry.getOrCreateSession(progressToken); + } + + public createSession(options: OpenStreamSessionOptions): OpenStreamSession { + return this.registry.createSession(options); + } + + public clear(): void { + this.registry.clear(); + } + + public get size(): number { + return this.registry.size; + } +} diff --git a/src/transport/open-stream/registry.test.ts b/src/transport/open-stream/registry.test.ts new file mode 100644 index 0000000..e66df58 --- /dev/null +++ b/src/transport/open-stream/registry.test.ts @@ -0,0 +1,272 @@ +import { describe, expect, test } from 'bun:test'; +import { createLogger } from '../../core/utils/logger.js'; +import { + DEFAULT_MAX_BUFFERED_BYTES_PER_STREAM, + DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM, +} from './constants.js'; +import { OpenStreamPolicyError, OpenStreamSequenceError } from './errors.js'; +import { OpenStreamRegistry } from './registry.js'; +import type { OpenStreamProgress } from './types.js'; + +describe('OpenStreamRegistry', () => { + test('enforces the max concurrent stream policy and reuses slots after close', async () => { + const registry = new OpenStreamRegistry({ + maxConcurrentStreams: 1, + maxBufferedChunksPerStream: 4, + maxBufferedBytesPerStream: 128, + logger: createLogger('test', { level: 'silent' }), + }); + + const first = registry.createSession('token-1'); + + expect(() => registry.createSession('token-2')).toThrow( + OpenStreamPolicyError, + ); + + await first.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await first.processFrame(2, { + type: 'open-stream', + frameType: 'close', + }); + await first.closed; + + const second = registry.createSession('token-2'); + + expect(second.progressToken).toBe('token-2'); + expect(registry.size).toBe(1); + }); + + test('reuses the same session instance for repeated getOrCreate calls', () => { + const registry = new OpenStreamRegistry({ + maxConcurrentStreams: 2, + maxBufferedChunksPerStream: 4, + maxBufferedBytesPerStream: 128, + logger: createLogger('test', { level: 'silent' }), + }); + + const first = registry.getOrCreateSession('token-shared'); + const second = registry.getOrCreateSession('token-shared'); + + expect(second).toBe(first); + expect(registry.size).toBe(1); + }); + + test('rejects non-start frames for unknown progress tokens', async () => { + const registry = new OpenStreamRegistry({ + maxConcurrentStreams: 2, + maxBufferedChunksPerStream: 4, + maxBufferedBytesPerStream: 128, + logger: createLogger('test', { level: 'silent' }), + }); + + await expect( + registry.processFrame({ + progressToken: 'token-missing-start', + progress: 1, + cvm: { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'orphan', + }, + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + + expect(registry.getSession('token-missing-start')).toBeUndefined(); + expect(registry.size).toBe(0); + }); + + test('applies default buffering limits when a session is created without overrides', async () => { + const registry = new OpenStreamRegistry({ + maxConcurrentStreams: 2, + logger: createLogger('test', { level: 'silent' }), + }); + + const session = registry.createSession('token-default-bounds'); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + for (let i = 0; i < DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM; i += 1) { + await session.processFrame(i + 2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: i + 1, + data: 'x', + }); + } + + await expect( + session.processFrame(DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM + 2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM + 1, + data: 'x', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + + const byteLimited = registry.createSession('token-default-bytes'); + await byteLimited.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + await expect( + byteLimited.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 1, + data: 'x'.repeat(DEFAULT_MAX_BUFFERED_BYTES_PER_STREAM + 1), + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('applies default timer limits when a session is created without overrides', async () => { + const pings: string[] = []; + const pongs: string[] = []; + const aborts: Array = []; + const registry = new OpenStreamRegistry({ + logger: createLogger('test', { level: 'silent' }), + getSessionOptions: () => ({ + sendPing: async (nonce: string): Promise => { + pings.push(nonce); + }, + sendPong: async (nonce: string): Promise => { + pongs.push(nonce); + }, + sendAbort: async (reason?: string): Promise => { + aborts.push(reason); + }, + }), + }); + + const session = registry.createSession('token-default-timers'); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'ping', + nonce: 'peer-nonce', + }); + + expect(pings).toEqual([]); + expect(pongs).toEqual(['peer-nonce']); + expect(aborts).toEqual([]); + }); + + test('clearing the registry disposes active sessions and cancels pending timers', async () => { + const pings: string[] = []; + const aborts: Array = []; + const registry = new OpenStreamRegistry({ + maxConcurrentStreams: 2, + maxBufferedChunksPerStream: 4, + maxBufferedBytesPerStream: 128, + idleTimeoutMs: 10, + probeTimeoutMs: 10, + logger: createLogger('test', { level: 'silent' }), + getSessionOptions: () => ({ + sendPing: async (nonce: string): Promise => { + pings.push(nonce); + }, + sendAbort: async (reason?: string): Promise => { + aborts.push(reason); + }, + }), + }); + + const session = registry.createSession('token-clear-disposes'); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + registry.clear(); + + await expect(session.closed).resolves.toBeUndefined(); + + await new Promise((resolve) => setTimeout(resolve, 35)); + + expect(registry.size).toBe(0); + expect(pings).toEqual([]); + expect(aborts).toEqual([]); + }); + + test('accepts a start frame with advisory metadata omitted', async () => { + const registry = new OpenStreamRegistry({ + maxConcurrentStreams: 2, + maxBufferedChunksPerStream: 4, + maxBufferedBytesPerStream: 128, + logger: createLogger('test', { level: 'silent' }), + }); + + const session = await registry.processFrame({ + progressToken: 'token-advisory-start', + progress: 1, + cvm: { + type: 'open-stream', + frameType: 'start', + }, + }); + + expect(session.progressToken).toBe('token-advisory-start'); + expect(registry.getSession('token-advisory-start')).toBe(session); + + registry.clear(); + await expect(session.closed).resolves.toBeUndefined(); + }); + + test('rejects malformed progress payloads that are not CEP-41 frames', () => { + const malformedPayloads: unknown[] = [ + null, + {}, + { progressToken: 'missing-cvm', progress: 1 }, + { + progressToken: 'wrong-type', + progress: 1, + cvm: { type: 'other', frameType: 'start' }, + }, + { + progressToken: 'missing-frame-type', + progress: 1, + cvm: { type: 'open-stream' }, + }, + ]; + + expect( + malformedPayloads.every( + (payload) => !OpenStreamRegistry.isOpenStreamProgress(payload), + ), + ).toBe(true); + }); + + test('rejects accept as the first frame for an unknown token', async () => { + const registry = new OpenStreamRegistry({ + maxConcurrentStreams: 2, + maxBufferedChunksPerStream: 4, + maxBufferedBytesPerStream: 128, + logger: createLogger('test', { level: 'silent' }), + }); + const acceptFrame: OpenStreamProgress = { + progressToken: 'token-orphan-accept', + progress: 1, + cvm: { + type: 'open-stream', + frameType: 'accept', + }, + }; + + await expect(registry.processFrame(acceptFrame)).rejects.toBeInstanceOf( + OpenStreamSequenceError, + ); + expect(registry.getSession('token-orphan-accept')).toBeUndefined(); + }); +}); diff --git a/src/transport/open-stream/registry.ts b/src/transport/open-stream/registry.ts new file mode 100644 index 0000000..f33191c --- /dev/null +++ b/src/transport/open-stream/registry.ts @@ -0,0 +1,191 @@ +import type { Logger } from '../../core/utils/logger.js'; +import { + DEFAULT_MAX_BUFFERED_BYTES_PER_STREAM, + DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM, + DEFAULT_MAX_CONCURRENT_OPEN_STREAMS, + DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS, + DEFAULT_OPEN_STREAM_IDLE_TIMEOUT_MS, + DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS, +} from './constants.js'; +import { OpenStreamPolicyError, OpenStreamSequenceError } from './errors.js'; +import { OpenStreamSession, type OpenStreamSessionOptions } from './session.js'; +import type { OpenStreamFrame, OpenStreamProgress } from './types.js'; + +export interface OpenStreamRegistryOptions { + maxConcurrentStreams?: number; + maxBufferedChunksPerStream?: number; + maxBufferedBytesPerStream?: number; + idleTimeoutMs?: number; + probeTimeoutMs?: number; + closeGracePeriodMs?: number; + getSessionOptions?: ( + progressToken: string, + ) => Partial>; + logger: Logger; +} + +function isOpenStreamFrame(value: unknown): value is OpenStreamFrame { + return ( + typeof value === 'object' && + value !== null && + (value as OpenStreamFrame).type === 'open-stream' && + typeof (value as OpenStreamFrame).frameType === 'string' + ); +} + +/** + * Registry of active CEP-41 sessions keyed by progress token. + */ +export class OpenStreamRegistry { + private readonly logger: Logger; + private readonly maxConcurrentStreams: number; + private readonly maxBufferedChunksPerStream: number; + private readonly maxBufferedBytesPerStream: number; + private readonly idleTimeoutMs: number; + private readonly probeTimeoutMs: number; + private readonly closeGracePeriodMs: number; + private readonly getSessionOptions: + | (( + progressToken: string, + ) => Partial>) + | undefined; + private readonly sessions = new Map(); + + constructor(options: OpenStreamRegistryOptions) { + this.logger = options.logger; + this.maxConcurrentStreams = + options.maxConcurrentStreams ?? DEFAULT_MAX_CONCURRENT_OPEN_STREAMS; + this.maxBufferedChunksPerStream = + options.maxBufferedChunksPerStream ?? + DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM; + this.maxBufferedBytesPerStream = + options.maxBufferedBytesPerStream ?? + DEFAULT_MAX_BUFFERED_BYTES_PER_STREAM; + this.idleTimeoutMs = + options.idleTimeoutMs ?? DEFAULT_OPEN_STREAM_IDLE_TIMEOUT_MS; + this.probeTimeoutMs = + options.probeTimeoutMs ?? DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS; + this.closeGracePeriodMs = + options.closeGracePeriodMs ?? DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS; + this.getSessionOptions = options.getSessionOptions; + } + + public static isOpenStreamProgress( + value: unknown, + ): value is OpenStreamProgress { + return ( + typeof value === 'object' && + value !== null && + isOpenStreamFrame((value as OpenStreamProgress).cvm) + ); + } + + public getSession(progressToken: string): OpenStreamSession | undefined { + return this.sessions.get(progressToken); + } + + public createSession( + options: + | string + | (Pick & + Partial>), + ): OpenStreamSession { + const sessionOptions = + typeof options === 'string' ? { progressToken: options } : options; + const { progressToken } = sessionOptions; + const derivedSessionOptions = this.getSessionOptions?.(progressToken) ?? {}; + + if (this.sessions.has(progressToken)) { + throw new OpenStreamSequenceError( + `Stream session already exists for ${progressToken}`, + ); + } + + if (this.sessions.size >= this.maxConcurrentStreams) { + throw new OpenStreamPolicyError( + 'Maximum concurrent open streams exceeded', + ); + } + + const session = new OpenStreamSession({ + progressToken, + maxBufferedChunks: + sessionOptions.maxBufferedChunks ?? + derivedSessionOptions.maxBufferedChunks ?? + this.maxBufferedChunksPerStream, + maxBufferedBytes: + sessionOptions.maxBufferedBytes ?? + derivedSessionOptions.maxBufferedBytes ?? + this.maxBufferedBytesPerStream, + idleTimeoutMs: + sessionOptions.idleTimeoutMs ?? + derivedSessionOptions.idleTimeoutMs ?? + this.idleTimeoutMs, + probeTimeoutMs: + sessionOptions.probeTimeoutMs ?? + derivedSessionOptions.probeTimeoutMs ?? + this.probeTimeoutMs, + closeGracePeriodMs: + sessionOptions.closeGracePeriodMs ?? + derivedSessionOptions.closeGracePeriodMs ?? + this.closeGracePeriodMs, + sendPing: sessionOptions.sendPing ?? derivedSessionOptions.sendPing, + sendPong: sessionOptions.sendPong ?? derivedSessionOptions.sendPong, + sendAbort: sessionOptions.sendAbort ?? derivedSessionOptions.sendAbort, + onClose: async () => { + await sessionOptions.onClose?.(); + this.sessions.delete(progressToken); + }, + onAbort: async (reason?: string) => { + await sessionOptions.onAbort?.(reason); + this.sessions.delete(progressToken); + }, + }); + + this.sessions.set(progressToken, session); + return session; + } + + public getOrCreateSession(progressToken: string): OpenStreamSession { + return this.getSession(progressToken) ?? this.createSession(progressToken); + } + + public async processFrame( + frame: OpenStreamProgress, + ): Promise { + const progressToken = String(frame.progressToken); + const existingSession = this.getSession(progressToken); + + if (!existingSession) { + if (frame.cvm.frameType !== 'start') { + throw new OpenStreamSequenceError( + `Received ${frame.cvm.frameType} frame before start for ${progressToken}`, + ); + } + } + + const session = existingSession ?? this.createSession(progressToken); + await session.processFrame(frame.progress, frame.cvm); + return session; + } + + public deleteSession(progressToken: string): boolean { + return this.sessions.delete(progressToken); + } + + public clear(): void { + this.logger.debug('Clearing open stream registry', { + count: this.sessions.size, + }); + + for (const session of this.sessions.values()) { + session.dispose(); + } + + this.sessions.clear(); + } + + public get size(): number { + return this.sessions.size; + } +} diff --git a/src/transport/open-stream/session.test.ts b/src/transport/open-stream/session.test.ts new file mode 100644 index 0000000..2a6a426 --- /dev/null +++ b/src/transport/open-stream/session.test.ts @@ -0,0 +1,529 @@ +import { describe, expect, test } from 'bun:test'; +import { OpenStreamAbortError, OpenStreamSequenceError } from './errors.js'; +import { OpenStreamSession } from './session.js'; + +describe('OpenStreamSession', () => { + test('yields ordered chunks and finishes after close', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-1', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'hello', + }); + await session.processFrame(3, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 1, + data: ' world', + }); + await session.processFrame(4, { + type: 'open-stream', + frameType: 'close', + }); + + const chunks: string[] = []; + for await (const chunk of session) { + chunks.push(chunk.value); + } + + expect(chunks).toEqual(['hello', ' world']); + await expect(session.closed).resolves.toBeUndefined(); + }); + + test('buffers out-of-order chunks until contiguous', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-2', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 1, + data: 'world', + }); + await session.processFrame(3, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'hello ', + }); + await session.processFrame(4, { + type: 'open-stream', + frameType: 'close', + }); + + const received: string[] = []; + for await (const chunk of session) { + received.push(chunk.value); + } + + expect(received).toEqual(['hello ', 'world']); + }); + + test('fails when progress does not increase', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-3', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + await expect( + session.processFrame(1, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'repeat', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('fails waiting readers when stream aborts', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-4', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + const iterator = session[Symbol.asyncIterator](); + const nextChunk = iterator.next().catch((error: unknown) => error); + const closed = session.closed.catch((error: unknown) => error); + + await session.processFrame(2, { + type: 'open-stream', + frameType: 'abort', + reason: 'boom', + }); + + expect(await nextChunk).toBeInstanceOf(OpenStreamAbortError); + expect(await closed).toBeInstanceOf(OpenStreamAbortError); + }); + + test('fails when buffered chunk count exceeds the configured limit', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-buffer-count', + maxBufferedChunks: 1, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 1, + data: 'late', + }); + + await expect( + session.processFrame(3, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 2, + data: 'later', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('fails when buffered byte count exceeds the configured limit', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-buffer-bytes', + maxBufferedChunks: 4, + maxBufferedBytes: 4, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + await expect( + session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 1, + data: 'hello', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('rejects frames after close', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-post-close', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'close', + }); + await expect(session.closed).resolves.toBeUndefined(); + + await expect( + session.processFrame(3, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'late', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('rejects frames after abort', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-post-abort', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'abort', + reason: 'boom', + }); + await expect(session.closed).rejects.toBeInstanceOf(OpenStreamAbortError); + + await expect( + session.processFrame(3, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'late', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + + await expect( + session.processFrame(4, { + type: 'open-stream', + frameType: 'close', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('calls only onAbort when the stream aborts', async () => { + const calls: string[] = []; + const session = new OpenStreamSession({ + progressToken: 'token-abort-callbacks', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + onClose: async (): Promise => { + calls.push('close'); + }, + onAbort: async (): Promise => { + calls.push('abort'); + }, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + const closed = session.closed.catch((error: unknown) => error); + + await session.processFrame(2, { + type: 'open-stream', + frameType: 'abort', + reason: 'boom', + }); + + expect(calls).toEqual(['abort']); + expect(await closed).toBeInstanceOf(OpenStreamAbortError); + }); + + test('rejects stale chunk indexes that were already flushed', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-stale-chunk', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'hello', + }); + + await expect( + session.processFrame(3, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'late-duplicate', + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('requires all chunks through close.lastChunkIndex before finishing', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-last-chunk-index', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'hello', + }); + + await expect( + session.processFrame(3, { + type: 'open-stream', + frameType: 'close', + lastChunkIndex: 1, + }), + ).rejects.toBeInstanceOf(OpenStreamSequenceError); + }); + + test('allows graceful close when close.lastChunkIndex matches received chunks', async () => { + const session = new OpenStreamSession({ + progressToken: 'token-last-chunk-complete', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + idleTimeoutMs: 1000, + probeTimeoutMs: 1000, + closeGracePeriodMs: 1000, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 0, + data: 'hello', + }); + await session.processFrame(3, { + type: 'open-stream', + frameType: 'close', + lastChunkIndex: 0, + }); + + await expect(session.closed).resolves.toBeUndefined(); + }); + + test('responds to ping frames with a matching pong', async () => { + const pongs: string[] = []; + const session = new OpenStreamSession({ + progressToken: 'token-ping-pong', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + idleTimeoutMs: 100, + probeTimeoutMs: 100, + closeGracePeriodMs: 100, + sendPong: async (nonce: string): Promise => { + pongs.push(nonce); + }, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'ping', + nonce: 'nonce-1', + }); + + expect(pongs).toEqual(['nonce-1']); + + session.dispose(); + await expect(session.closed).resolves.toBeUndefined(); + }); + + test('sends ping after idle timeout and aborts after probe timeout', async () => { + const pings: string[] = []; + const aborts: Array = []; + const session = new OpenStreamSession({ + progressToken: 'token-probe-timeout', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + idleTimeoutMs: 10, + probeTimeoutMs: 10, + closeGracePeriodMs: 100, + sendPing: async (nonce: string): Promise => { + pings.push(nonce); + }, + sendAbort: async (reason?: string): Promise => { + aborts.push(reason); + }, + }); + const closed = session.closed.catch((error: unknown) => error); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + await new Promise((resolve) => setTimeout(resolve, 35)); + + expect(pings).toHaveLength(1); + expect(aborts).toEqual(['Probe timeout']); + expect(await closed).toBeInstanceOf(OpenStreamAbortError); + }); + + test('clears the probe timeout when a matching pong arrives', async () => { + const pings: string[] = []; + const aborts: Array = []; + const session = new OpenStreamSession({ + progressToken: 'token-probe-success', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + idleTimeoutMs: 10, + probeTimeoutMs: 20, + closeGracePeriodMs: 100, + sendPing: async (nonce: string): Promise => { + pings.push(nonce); + }, + sendAbort: async (reason?: string): Promise => { + aborts.push(reason); + }, + }); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + await new Promise((resolve) => setTimeout(resolve, 15)); + expect(pings).toHaveLength(1); + + await session.processFrame(2, { + type: 'open-stream', + frameType: 'pong', + nonce: pings[0]!, + }); + + await new Promise((resolve) => setTimeout(resolve, 15)); + expect(aborts).toEqual([]); + expect(session.isActive).toBe(true); + + await session.abort('test cleanup'); + await session.closed.catch(() => undefined); + }); + + test('ignores unexpected pong frames for liveness tracking', async () => { + const pings: string[] = []; + const aborts: Array = []; + const session = new OpenStreamSession({ + progressToken: 'token-invalid-pong', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + idleTimeoutMs: 10, + probeTimeoutMs: 10, + closeGracePeriodMs: 100, + sendPing: async (nonce: string): Promise => { + pings.push(nonce); + }, + sendAbort: async (reason?: string): Promise => { + aborts.push(reason); + }, + }); + const closed = session.closed.catch((error: unknown) => error); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + + await new Promise((resolve) => setTimeout(resolve, 15)); + expect(pings).toHaveLength(1); + + await session.processFrame(2, { + type: 'open-stream', + frameType: 'pong', + nonce: 'unexpected-pong', + }); + + await new Promise((resolve) => setTimeout(resolve, 15)); + + expect(aborts).toEqual(['Probe timeout']); + expect(await closed).toBeInstanceOf(OpenStreamAbortError); + }); + + test('aborts when close grace period expires with missing chunks', async () => { + const aborts: Array = []; + const session = new OpenStreamSession({ + progressToken: 'token-close-grace-timeout', + maxBufferedChunks: 8, + maxBufferedBytes: 1024, + idleTimeoutMs: 100, + probeTimeoutMs: 100, + closeGracePeriodMs: 10, + sendAbort: async (reason?: string): Promise => { + aborts.push(reason); + }, + }); + const closed = session.closed.catch((error: unknown) => error); + + await session.processFrame(1, { + type: 'open-stream', + frameType: 'start', + }); + await session.processFrame(2, { + type: 'open-stream', + frameType: 'chunk', + chunkIndex: 1, + data: 'late', + }); + await session.processFrame(3, { + type: 'open-stream', + frameType: 'close', + }); + + await new Promise((resolve) => setTimeout(resolve, 25)); + + expect(aborts).toEqual(['Close grace period expired']); + expect(await closed).toBeInstanceOf(OpenStreamAbortError); + }); +}); diff --git a/src/transport/open-stream/session.ts b/src/transport/open-stream/session.ts new file mode 100644 index 0000000..f37f113 --- /dev/null +++ b/src/transport/open-stream/session.ts @@ -0,0 +1,501 @@ +import { + DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS, + DEFAULT_OPEN_STREAM_IDLE_TIMEOUT_MS, + DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS, +} from './constants.js'; +import { OpenStreamAbortError, OpenStreamSequenceError } from './errors.js'; +import type { + OpenStreamChunkFrame, + OpenStreamFrame, + OpenStreamPingFrame, + OpenStreamReadResult, + OpenStreamSessionLike, +} from './types.js'; + +type Deferred = { + resolve: (value: T | PromiseLike) => void; + reject: (reason?: unknown) => void; + promise: Promise; +}; + +function createDeferred(): Deferred { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + return { resolve, reject, promise }; +} + +type PendingChunk = OpenStreamReadResult; + +export interface OpenStreamSessionOptions { + progressToken: string; + maxBufferedChunks: number; + maxBufferedBytes: number; + idleTimeoutMs?: number; + probeTimeoutMs?: number; + closeGracePeriodMs?: number; + sendPing?: (nonce: string) => Promise; + sendPong?: (nonce: string) => Promise; + sendAbort?: (reason?: string) => Promise; + onAbort?: (reason?: string) => Promise; + onClose?: () => Promise; +} + +type CloseState = { + expectedLastChunkIndex?: number; +}; + +/** + * Readable client-side/session-side view of a CEP-41 stream. + */ +export class OpenStreamSession implements OpenStreamSessionLike { + public readonly progressToken: string; + public readonly closed: Promise; + + private readonly onAbort?: (reason?: string) => Promise; + private readonly onClose?: () => Promise; + private readonly closeDeferred = createDeferred(); + private readonly waiters: Array>> = []; + private readonly queue: PendingChunk[] = []; + private readonly bufferedChunks = new Map(); + private readonly maxBufferedChunks: number; + private readonly maxBufferedBytes: number; + private readonly idleTimeoutMs: number; + private readonly probeTimeoutMs: number; + private readonly closeGracePeriodMs: number; + private readonly sendPing?: (nonce: string) => Promise; + private readonly sendPong?: (nonce: string) => Promise; + private readonly sendAbort?: (reason?: string) => Promise; + private bufferedBytes = 0; + private active = true; + private started = false; + private closedRemotely = false; + private closeState: CloseState | undefined; + private nextExpectedChunkIndex = 0; + private lastProgress = -1; + private terminalError: Error | undefined; + private controlNonce = 0; + private pendingProbeNonce: string | undefined; + private idleTimer: ReturnType | undefined; + private probeTimer: ReturnType | undefined; + private closeGraceTimer: ReturnType | undefined; + + constructor(options: OpenStreamSessionOptions) { + this.progressToken = options.progressToken; + this.maxBufferedChunks = options.maxBufferedChunks; + this.maxBufferedBytes = options.maxBufferedBytes; + this.idleTimeoutMs = + options.idleTimeoutMs ?? DEFAULT_OPEN_STREAM_IDLE_TIMEOUT_MS; + this.probeTimeoutMs = + options.probeTimeoutMs ?? DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS; + this.closeGracePeriodMs = + options.closeGracePeriodMs ?? DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS; + this.sendPing = options.sendPing; + this.sendPong = options.sendPong; + this.sendAbort = options.sendAbort; + this.onAbort = options.onAbort; + this.onClose = options.onClose; + this.closed = this.closeDeferred.promise; + } + + public get isActive(): boolean { + return this.active; + } + + public async abort(reason?: string): Promise { + if (!this.active) { + return; + } + + const error = new OpenStreamAbortError(this.progressToken, reason); + await this.finishAborted(error, reason, true); + } + + public dispose(): void { + this.finalize(); + } + + public async processFrame( + progress: number, + frame: OpenStreamFrame, + ): Promise { + this.assertActive(); + this.assertProgress(progress); + + switch (frame.frameType) { + case 'start': + if (this.started) { + throw new OpenStreamSequenceError( + `Duplicate start frame for stream ${this.progressToken}`, + ); + } + this.started = true; + this.refreshIdleTimer(); + return; + case 'accept': + this.refreshIdleTimer(); + return; + case 'ping': + this.assertStarted(); + this.refreshIdleTimer(); + await this.handlePing(frame); + return; + case 'pong': + this.assertStarted(); + this.refreshIdleTimer(); + this.handlePong(frame.nonce); + return; + case 'chunk': + this.assertStarted(); + this.bufferChunk(frame); + this.flushContiguousChunks(); + this.refreshIdleTimer(); + return; + case 'close': + this.assertStarted(); + this.closedRemotely = true; + this.closeState = { + expectedLastChunkIndex: frame.lastChunkIndex, + }; + this.flushContiguousChunks(); + this.refreshIdleTimer(); + this.maybeFinishGracefully(); + this.armCloseGraceTimer(); + return; + case 'abort': + this.refreshIdleTimer(); + await this.finishAborted( + new OpenStreamAbortError(this.progressToken, frame.reason), + frame.reason, + false, + ); + return; + default: + return; + } + } + + public [Symbol.asyncIterator](): AsyncIterator { + return { + next: async (): Promise> => { + if (this.queue.length > 0) { + const value = this.queue.shift(); + if (!value) { + return { done: true, value: undefined }; + } + + return { done: false, value }; + } + + if (!this.active) { + if (this.terminalError) { + throw this.terminalError; + } + + return { done: true, value: undefined }; + } + + const waiter = createDeferred>(); + this.waiters.push(waiter); + return waiter.promise; + }, + }; + } + + private assertActive(): void { + if (!this.active) { + throw new OpenStreamSequenceError( + `Received frame for inactive stream ${this.progressToken}`, + ); + } + } + + private assertStarted(): void { + if (!this.started) { + throw new OpenStreamSequenceError( + `Received non-start frame before start for ${this.progressToken}`, + ); + } + } + + private assertProgress(progress: number): void { + if (!Number.isFinite(progress) || progress <= this.lastProgress) { + throw new OpenStreamSequenceError( + `Non-increasing progress for stream ${this.progressToken}`, + ); + } + + this.lastProgress = progress; + } + + private bufferChunk(frame: OpenStreamChunkFrame): void { + if (!Number.isInteger(frame.chunkIndex) || frame.chunkIndex < 0) { + throw new OpenStreamSequenceError( + `Invalid chunkIndex for stream ${this.progressToken}`, + ); + } + + if (frame.chunkIndex < this.nextExpectedChunkIndex) { + throw new OpenStreamSequenceError( + `Stale chunkIndex ${frame.chunkIndex} for ${this.progressToken}`, + ); + } + + if (this.bufferedChunks.has(frame.chunkIndex)) { + throw new OpenStreamSequenceError( + `Duplicate chunkIndex ${frame.chunkIndex} for ${this.progressToken}`, + ); + } + + const chunkBytes = Buffer.byteLength(frame.data, 'utf8'); + if ( + this.bufferedChunks.size + this.queue.length >= + this.maxBufferedChunks + ) { + throw new OpenStreamSequenceError( + `Buffered chunk limit exceeded for stream ${this.progressToken}`, + ); + } + + if (this.bufferedBytes + chunkBytes > this.maxBufferedBytes) { + throw new OpenStreamSequenceError( + `Buffered byte limit exceeded for stream ${this.progressToken}`, + ); + } + + this.bufferedChunks.set(frame.chunkIndex, frame.data); + this.bufferedBytes += chunkBytes; + } + + private flushContiguousChunks(): void { + while (this.bufferedChunks.has(this.nextExpectedChunkIndex)) { + const data = this.bufferedChunks.get(this.nextExpectedChunkIndex); + if (typeof data !== 'string') { + break; + } + + this.bufferedChunks.delete(this.nextExpectedChunkIndex); + this.bufferedBytes -= Buffer.byteLength(data, 'utf8'); + this.emit({ value: data, chunkIndex: this.nextExpectedChunkIndex }); + this.nextExpectedChunkIndex += 1; + } + + if (this.closedRemotely) { + this.maybeFinishGracefully(); + } + } + + private emit(value: PendingChunk): void { + const waiter = this.waiters.shift(); + if (waiter) { + waiter.resolve({ done: false, value }); + return; + } + + this.queue.push(value); + } + + private maybeFinishGracefully(): void { + if (!this.closedRemotely || this.bufferedChunks.size > 0) { + return; + } + + const expectedLastChunkIndex = this.closeState?.expectedLastChunkIndex; + if (expectedLastChunkIndex !== undefined) { + if ( + !Number.isInteger(expectedLastChunkIndex) || + expectedLastChunkIndex < 0 + ) { + throw new OpenStreamSequenceError( + `Invalid lastChunkIndex for stream ${this.progressToken}`, + ); + } + + if (this.nextExpectedChunkIndex !== expectedLastChunkIndex + 1) { + throw new OpenStreamSequenceError( + `Incomplete stream for ${this.progressToken}: expected chunks through ${expectedLastChunkIndex}`, + ); + } + } + + void this.finishClosed(); + } + + private async handlePing(frame: OpenStreamPingFrame): Promise { + await this.sendPong?.(frame.nonce); + } + + private handlePong(nonce: string): void { + if (this.pendingProbeNonce !== nonce) { + return; + } + + this.pendingProbeNonce = undefined; + this.clearProbeTimer(); + } + + private refreshIdleTimer(): void { + if (!this.active || this.closedRemotely) { + return; + } + + this.clearIdleTimer(); + this.idleTimer = setTimeout(() => { + this.handleIdleTimeout().catch(() => undefined); + }, this.idleTimeoutMs); + } + + private async handleIdleTimeout(): Promise { + if (!this.active || this.closedRemotely || this.pendingProbeNonce) { + return; + } + + const nonce = this.nextControlNonce(); + this.pendingProbeNonce = nonce; + + try { + await this.sendPing?.(nonce); + } catch (error) { + await this.finishAborted( + error instanceof Error ? error : new Error(String(error)), + 'Failed to send keepalive ping', + false, + ); + return; + } + + this.clearProbeTimer(); + this.probeTimer = setTimeout(() => { + this.handleProbeTimeout(nonce).catch(() => undefined); + }, this.probeTimeoutMs); + } + + private async handleProbeTimeout(nonce: string): Promise { + if (!this.active || this.pendingProbeNonce !== nonce) { + return; + } + + await this.finishAborted( + new OpenStreamAbortError(this.progressToken, 'Probe timeout'), + 'Probe timeout', + true, + ); + } + + private armCloseGraceTimer(): void { + if ( + !this.active || + !this.closedRemotely || + this.bufferedChunks.size === 0 + ) { + return; + } + + this.clearCloseGraceTimer(); + this.closeGraceTimer = setTimeout(() => { + this.handleCloseGraceTimeout().catch(() => undefined); + }, this.closeGracePeriodMs); + } + + private async handleCloseGraceTimeout(): Promise { + if ( + !this.active || + !this.closedRemotely || + this.bufferedChunks.size === 0 + ) { + return; + } + + await this.finishAborted( + new OpenStreamAbortError( + this.progressToken, + 'Close grace period expired', + ), + 'Close grace period expired', + true, + ); + } + + private nextControlNonce(): string { + this.controlNonce += 1; + return `${this.progressToken}:${this.controlNonce}`; + } + + private clearIdleTimer(): void { + if (this.idleTimer) { + clearTimeout(this.idleTimer); + this.idleTimer = undefined; + } + } + + private clearProbeTimer(): void { + if (this.probeTimer) { + clearTimeout(this.probeTimer); + this.probeTimer = undefined; + } + } + + private clearCloseGraceTimer(): void { + if (this.closeGraceTimer) { + clearTimeout(this.closeGraceTimer); + this.closeGraceTimer = undefined; + } + } + + private clearTimers(): void { + this.clearIdleTimer(); + this.clearProbeTimer(); + this.clearCloseGraceTimer(); + this.pendingProbeNonce = undefined; + } + + private finalize(error?: Error): void { + if (!this.active) { + return; + } + + this.clearTimers(); + this.active = false; + this.terminalError = error; + + while (this.waiters.length > 0) { + const waiter = this.waiters.shift(); + if (!waiter) { + continue; + } + + if (error) { + waiter.reject(error); + } else { + waiter.resolve({ done: true, value: undefined }); + } + } + + if (error) { + this.closeDeferred.reject(error); + } else { + this.closeDeferred.resolve(undefined); + } + } + + private async finishClosed(): Promise { + this.finalize(); + await this.onClose?.(); + } + + private async finishAborted( + error: Error, + reason?: string, + publishAbort: boolean = false, + ): Promise { + this.finalize(error); + if (publishAbort) { + await this.sendAbort?.(reason); + } + await this.onAbort?.(reason); + } +} diff --git a/src/transport/open-stream/types.ts b/src/transport/open-stream/types.ts new file mode 100644 index 0000000..3fd6e2b --- /dev/null +++ b/src/transport/open-stream/types.ts @@ -0,0 +1,79 @@ +export type OpenStreamFrameType = + | 'start' + | 'accept' + | 'chunk' + | 'ping' + | 'pong' + | 'close' + | 'abort'; + +export type OpenStreamCommon = { + type: 'open-stream'; + frameType: OpenStreamFrameType; +}; + +export type OpenStreamStartFrame = OpenStreamCommon & { + frameType: 'start'; + contentType?: string; +}; + +export type OpenStreamAcceptFrame = OpenStreamCommon & { + frameType: 'accept'; +}; + +export type OpenStreamChunkFrame = OpenStreamCommon & { + frameType: 'chunk'; + chunkIndex: number; + data: string; +}; + +export type OpenStreamPingFrame = OpenStreamCommon & { + frameType: 'ping'; + nonce: string; +}; + +export type OpenStreamPongFrame = OpenStreamCommon & { + frameType: 'pong'; + nonce: string; +}; + +export type OpenStreamCloseFrame = OpenStreamCommon & { + frameType: 'close'; + lastChunkIndex?: number; +}; + +export type OpenStreamAbortFrame = OpenStreamCommon & { + frameType: 'abort'; + reason?: string; +}; + +export type OpenStreamFrame = + | OpenStreamStartFrame + | OpenStreamAcceptFrame + | OpenStreamChunkFrame + | OpenStreamPingFrame + | OpenStreamPongFrame + | OpenStreamCloseFrame + | OpenStreamAbortFrame; + +export type OpenStreamProgress = { + progressToken: string | number; + progress: number; + message?: string; + total?: number; + cvm: OpenStreamFrame; +}; + +export interface OpenStreamReadResult { + readonly value: TChunk; + readonly chunkIndex: number; +} + +export interface OpenStreamSessionLike extends AsyncIterable< + OpenStreamReadResult +> { + readonly progressToken: string; + readonly isActive: boolean; + readonly closed: Promise; + abort(reason?: string): Promise; +} diff --git a/src/transport/open-stream/writer.test.ts b/src/transport/open-stream/writer.test.ts new file mode 100644 index 0000000..7221214 --- /dev/null +++ b/src/transport/open-stream/writer.test.ts @@ -0,0 +1,140 @@ +import { describe, expect, test } from 'bun:test'; +import type { OpenStreamProgress } from './types.js'; +import { OpenStreamWriter } from './writer.js'; + +describe('OpenStreamWriter', () => { + test('emits ping and pong frames with matching nonce values', async () => { + const frames: OpenStreamProgress[] = []; + const writer = new OpenStreamWriter({ + progressToken: 'token-keepalive', + publishFrame: async (frame): Promise => { + frames.push(frame); + return undefined; + }, + }); + + await writer.start(); + await writer.ping(); + await writer.pong('keepalive-nonce'); + + expect(frames).toHaveLength(3); + expect(frames[1]).toMatchObject({ + progressToken: 'token-keepalive', + progress: 2, + cvm: { + type: 'open-stream', + frameType: 'ping', + nonce: 'token-keepalive:1', + }, + }); + expect(frames[2]).toMatchObject({ + progressToken: 'token-keepalive', + progress: 3, + cvm: { + type: 'open-stream', + frameType: 'pong', + nonce: 'keepalive-nonce', + }, + }); + }); + + test('omits lastChunkIndex on close when no chunks were written', async () => { + const frames: OpenStreamProgress[] = []; + const writer = new OpenStreamWriter({ + progressToken: 'token-empty-close', + publishFrame: async (frame): Promise => { + frames.push(frame); + return undefined; + }, + }); + + await writer.close(); + + expect(frames).toHaveLength(2); + expect(frames[1]).toMatchObject({ + cvm: { + type: 'open-stream', + frameType: 'close', + }, + }); + if (frames[1]?.cvm.frameType !== 'close') { + throw new Error('Expected close frame'); + } + expect('lastChunkIndex' in frames[1].cvm).toBe(false); + expect(frames[1].cvm.lastChunkIndex).toBeUndefined(); + }); + + test('includes lastChunkIndex on close after chunk writes', async () => { + const frames: OpenStreamProgress[] = []; + const writer = new OpenStreamWriter({ + progressToken: 'token-chunk-close', + publishFrame: async (frame): Promise => { + frames.push(frame); + return undefined; + }, + }); + + await writer.write('hello'); + await writer.write('world'); + await writer.close(); + + expect(frames).toHaveLength(4); + expect(frames[3]).toMatchObject({ + cvm: { + type: 'open-stream', + frameType: 'close', + lastChunkIndex: 1, + }, + }); + }); + + test('runs lifecycle hooks after terminal frames are published', async () => { + const lifecycle: string[] = []; + const frames: OpenStreamProgress[] = []; + const writer = new OpenStreamWriter({ + progressToken: 'token-hooks', + publishFrame: async (frame): Promise => { + frames.push(frame); + return undefined; + }, + onClose: async (): Promise => { + lifecycle.push('close'); + }, + onAbort: async (reason?: string): Promise => { + lifecycle.push(`abort:${reason ?? ''}`); + }, + }); + + await writer.close(); + + expect(frames[frames.length - 1]).toMatchObject({ + cvm: { + type: 'open-stream', + frameType: 'close', + }, + }); + expect(lifecycle).toEqual(['close']); + + const abortWriter = new OpenStreamWriter({ + progressToken: 'token-hooks-abort', + publishFrame: async (frame): Promise => { + frames.push(frame); + return undefined; + }, + onAbort: async (reason?: string): Promise => { + lifecycle.push(`abort:${reason ?? ''}`); + }, + }); + + await abortWriter.abort('done'); + + expect(frames[frames.length - 1]).toMatchObject({ + cvm: { + type: 'open-stream', + frameType: 'abort', + reason: 'done', + }, + }); + expect(lifecycle).toEqual(['close', 'abort:done']); + }); +}); diff --git a/src/transport/open-stream/writer.ts b/src/transport/open-stream/writer.ts new file mode 100644 index 0000000..cd1aee6 --- /dev/null +++ b/src/transport/open-stream/writer.ts @@ -0,0 +1,154 @@ +import type { OpenStreamProgress } from './types.js'; +import { + buildOpenStreamAbortFrame, + buildOpenStreamChunkFrame, + buildOpenStreamCloseFrame, + buildOpenStreamPingFrame, + buildOpenStreamPongFrame, + buildOpenStreamStartFrame, +} from './frames.js'; + +export type OpenStreamFramePublisher = ( + frame: OpenStreamProgress, +) => Promise; + +export interface OpenStreamWriterOptions { + progressToken: string; + publishFrame: OpenStreamFramePublisher; + contentType?: string; + onClose?: () => Promise; + onAbort?: (reason?: string) => Promise; +} + +/** + * Minimal CEP-41 writer/session for server-side production. + */ +export class OpenStreamWriter { + public readonly progressToken: string; + + private readonly publishFrame: OpenStreamFramePublisher; + private readonly contentType: string | undefined; + private readonly onClose?: () => Promise; + private readonly onAbort?: (reason?: string) => Promise; + private progress = 0; + private chunkIndex = 0; + private controlNonce = 0; + private started = false; + private active = true; + + constructor(options: OpenStreamWriterOptions) { + this.progressToken = options.progressToken; + this.publishFrame = options.publishFrame; + this.contentType = options.contentType; + this.onClose = options.onClose; + this.onAbort = options.onAbort; + } + + public get isActive(): boolean { + return this.active; + } + + public async start(): Promise { + if (this.started || !this.active) { + return; + } + + this.started = true; + await this.publishFrame( + buildOpenStreamStartFrame({ + progressToken: this.progressToken, + progress: this.nextProgress(), + contentType: this.contentType, + }), + ); + } + + public async write(data: string): Promise { + await this.start(); + if (!this.active) { + return; + } + + await this.publishFrame( + buildOpenStreamChunkFrame({ + progressToken: this.progressToken, + progress: this.nextProgress(), + chunkIndex: this.chunkIndex, + data, + }), + ); + this.chunkIndex += 1; + } + + public async ping(): Promise { + if (!this.active) { + return; + } + + const progress = this.nextProgress(); + await this.publishFrame( + buildOpenStreamPingFrame({ + progressToken: this.progressToken, + progress, + nonce: this.nextControlNonce(), + }), + ); + } + + public async pong(nonce: string): Promise { + if (!this.active) { + return; + } + + await this.publishFrame( + buildOpenStreamPongFrame({ + progressToken: this.progressToken, + progress: this.nextProgress(), + nonce, + }), + ); + } + + public async close(): Promise { + if (!this.active) { + return; + } + + await this.start(); + this.active = false; + await this.publishFrame( + buildOpenStreamCloseFrame({ + progressToken: this.progressToken, + progress: this.nextProgress(), + lastChunkIndex: this.chunkIndex > 0 ? this.chunkIndex - 1 : undefined, + }), + ); + await this.onClose?.(); + } + + public async abort(reason?: string): Promise { + if (!this.active) { + return; + } + + this.active = false; + await this.publishFrame( + buildOpenStreamAbortFrame({ + progressToken: this.progressToken, + progress: this.nextProgress(), + reason, + }), + ); + await this.onAbort?.(reason); + } + + private nextProgress(): number { + this.progress += 1; + return this.progress; + } + + private nextControlNonce(): string { + this.controlNonce += 1; + return `${this.progressToken}:${this.controlNonce}`; + } +} diff --git a/src/transport/server-transport-common-schemas.test.ts b/src/transport/server-transport-common-schemas.test.ts index 943bada..a1e4ab0 100644 --- a/src/transport/server-transport-common-schemas.test.ts +++ b/src/transport/server-transport-common-schemas.test.ts @@ -70,8 +70,6 @@ describe('createCommonSchemaToolsResultTransformer', () => { expect(bespokeTool?._meta?.[COMMON_SCHEMA_META_NAMESPACE]).toBeUndefined(); }); - - test('returns the original result when opted-in tools already carry the matching schema hash', () => { const schemaHash = computeCommonSchemaHash({ name: 'translate_text', @@ -185,8 +183,6 @@ describe('createCommonSchemaAnnouncementTagsProducer', () => { ]); }); - - test('reuses existing schemaHash metadata when producing announcement tags', () => { const result: ListToolsResult = { tools: [