From a709af0b3c975ff4da508e8c028a1566d386f6c4 Mon Sep 17 00:00:00 2001 From: ashKalor <74497194+ashkalor@users.noreply.github.com> Date: Wed, 4 Mar 2026 14:24:53 +0530 Subject: [PATCH 1/4] feat: add encoding levels (stringify/devalue/partial/passthrough) to RpcTransport --- src/batch.ts | 17 ++++--- src/index.ts | 8 ++-- src/messageport.ts | 18 ++++---- src/rpc.ts | 113 +++++++++++++++++++++++++++++++++++++-------- src/serialize.ts | 76 ++++++++++++++++++++++++++++-- src/websocket.ts | 11 +++-- 6 files changed, 196 insertions(+), 47 deletions(-) diff --git a/src/batch.ts b/src/batch.ts index 4f423c7..1fd4ace 100644 --- a/src/batch.ts +++ b/src/batch.ts @@ -4,11 +4,14 @@ import { RpcStub } from "./core.js"; import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; +import type { EncodingLevel } from "./serialize.js"; import type { IncomingMessage, ServerResponse, OutgoingHttpHeader, OutgoingHttpHeaders } from "node:http"; type SendBatchFunc = (batch: string[]) => Promise; class BatchClientTransport implements RpcTransport { + readonly encodingLevel: EncodingLevel = "stringify"; + constructor(sendBatch: SendBatchFunc) { this.#promise = this.#scheduleBatch(sendBatch); } @@ -19,16 +22,16 @@ class BatchClientTransport implements RpcTransport { #batchToSend: string[] | null = []; #batchToReceive: string[] | null = null; - async send(message: string): Promise { + async send(message: string | object): Promise { // If the batch was already sent, we just ignore the message, because throwing may cause the // RPC system to abort prematurely. Once the last receive() is done then we'll throw an error // that aborts the RPC system at the right time and will propagate to all other requests. if (this.#batchToSend !== null) { - this.#batchToSend.push(message); + this.#batchToSend.push(message as string); } } - async receive(): Promise { + async receive(): Promise { if (!this.#batchToReceive) { await this.#promise; } @@ -90,6 +93,8 @@ export function newHttpBatchRpcSession( } class BatchServerTransport implements RpcTransport { + readonly encodingLevel: EncodingLevel = "stringify"; + constructor(batch: string[]) { this.#batchToReceive = batch; } @@ -98,11 +103,11 @@ class BatchServerTransport implements RpcTransport { #batchToReceive: string[]; #allReceived: PromiseWithResolvers = Promise.withResolvers(); - async send(message: string): Promise { - this.#batchToSend.push(message); + async send(message: string | object): Promise { + this.#batchToSend.push(message as string); } - async receive(): Promise { + async receive(): Promise { let msg = this.#batchToReceive!.shift(); if (msg !== undefined) { return msg; diff --git a/src/index.ts b/src/index.ts index 10e4b7a..dc75882 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,8 +3,8 @@ // https://opensource.org/license/mit import { RpcTarget as RpcTargetImpl, RpcStub as RpcStubImpl, RpcPromise as RpcPromiseImpl } from "./core.js"; -import { serialize, deserialize } from "./serialize.js"; -import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js"; +import { serialize, deserialize, EncodingLevel } from "./serialize.js"; +import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions, wrapTransport } from "./rpc.js"; import { RpcTargetBranded, RpcCompatible, Stub, Stubify, __RPC_TARGET_BRAND } from "./types.js"; import { newWebSocketRpcSession as newWebSocketRpcSessionImpl, newWorkersWebSocketRpcResponse } from "./websocket.js"; @@ -19,8 +19,8 @@ forceInitStreams(); // Re-export public API types. export { serialize, deserialize, newWorkersWebSocketRpcResponse, newHttpBatchRpcResponse, - nodeHttpBatchRpcResponse }; -export type { RpcTransport, RpcSessionOptions, RpcCompatible }; + nodeHttpBatchRpcResponse, wrapTransport }; +export type { RpcTransport, RpcSessionOptions, RpcCompatible, EncodingLevel }; // Hack the type system to make RpcStub's types work nicely! /** diff --git a/src/messageport.ts b/src/messageport.ts index f9b5a3c..79e684f 100644 --- a/src/messageport.ts +++ b/src/messageport.ts @@ -4,6 +4,7 @@ import { RpcStub } from "./core.js"; import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; +import type { EncodingLevel } from "./serialize.js"; // Start a MessagePort session given a MessagePort or a pair of MessagePorts. // @@ -17,6 +18,8 @@ export function newMessagePortRpcSession( } class MessagePortTransport implements RpcTransport { + readonly encodingLevel: EncodingLevel = "passthrough"; + constructor (port: MessagePort) { this.#port = port; @@ -29,7 +32,8 @@ class MessagePortTransport implements RpcTransport { } else if (event.data === null) { // Peer is signaling that they're closing the connection this.#receivedError(new Error("Peer closed MessagePort connection.")); - } else if (typeof event.data === "string") { + } else { + // Accept any structured-clonable data if (this.#receiveResolver) { this.#receiveResolver(event.data); this.#receiveResolver = undefined; @@ -37,8 +41,6 @@ class MessagePortTransport implements RpcTransport { } else { this.#receiveQueue.push(event.data); } - } else { - this.#receivedError(new TypeError("Received non-string message from MessagePort.")); } }); @@ -48,25 +50,25 @@ class MessagePortTransport implements RpcTransport { } #port: MessagePort; - #receiveResolver?: (message: string) => void; + #receiveResolver?: (message: string | object) => void; #receiveRejecter?: (err: any) => void; - #receiveQueue: string[] = []; + #receiveQueue: (string | object)[] = []; #error?: any; - async send(message: string): Promise { + async send(message: string | object): Promise { if (this.#error) { throw this.#error; } this.#port.postMessage(message); } - async receive(): Promise { + async receive(): Promise { if (this.#receiveQueue.length > 0) { return this.#receiveQueue.shift()!; } else if (this.#error) { throw this.#error; } else { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { this.#receiveResolver = resolve; this.#receiveRejecter = reject; }); diff --git a/src/rpc.ts b/src/rpc.ts index 4801a27..f542f4d 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -3,17 +3,27 @@ // https://opensource.org/license/mit import { StubHook, RpcPayload, RpcStub, PropertyPath, PayloadStubHook, ErrorStubHook, RpcTarget, unwrapStubAndPath, streamImpl } from "./core.js"; -import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize } from "./serialize.js"; +import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize, EncodingLevel } from "./serialize.js"; /** * Interface for an RPC transport, which is a simple bidirectional message stream. Implement this * interface if the built-in transports (e.g. for HTTP batch and WebSocket) don't meet your needs. */ export interface RpcTransport { + /** + * The encoding level this transport works with. Defaults to "stringify" if not specified. + * + * - "stringify": Transport receives/sends JSON strings. + * - "devalue": Transport receives/sends JS objects (JSON-compatible). + * - "partial": Transport receives/sends JS objects with raw Uint8Array. + * - "passthrough": Transport receives/sends structured-clonable objects. + */ + readonly encodingLevel?: EncodingLevel; + /** * Sends a message to the other end. */ - send(message: string): Promise; + send(message: string | object): Promise; /** * Receives a message sent by the other end. @@ -23,7 +33,7 @@ export interface RpcTransport { * If there are no outstanding calls (and none are made in the future), then the error does not * propagate anywhere -- this is considered a "clean" shutdown. */ - receive(): Promise; + receive(): Promise; /** * Indicates that the RPC system has suffered an error that prevents the session from continuing. @@ -35,6 +45,44 @@ export interface RpcTransport { abort?(reason: any): void; } +/** + * Wraps a transport with custom encode/decode functions, allowing you to use binary formats + * like CBOR or MessagePack. + * + * @param inner The underlying transport to wrap. + * @param encode Function to encode outgoing messages. + * @param decode Function to decode incoming messages. + * @param level The encoding level this wrapped transport operates at. + * + * @example + * ```ts + * import * as cbor from "cbor-x"; + * + * // Wrap a WebSocket transport with CBOR encoding + * const cborTransport = wrapTransport( + * rawWebSocketTransport, + * (msg) => cbor.encode(msg), + * (data) => cbor.decode(data as Uint8Array), + * "partial" // Uint8Array stays raw for CBOR + * ); + * + * const session = new RpcSession(cborTransport, myApi); + * ``` + */ +export function wrapTransport( + inner: RpcTransport, + encode: (value: string | object) => string | object, + decode: (data: string | object) => string | object, + level: EncodingLevel +): RpcTransport { + return { + encodingLevel: level, + async send(message: string | object) { await inner.send(encode(message)); }, + async receive() { return decode(await inner.receive()); }, + abort: inner.abort?.bind(inner), + }; +} + // Entry on the exports table. type ExportTableEntry = { hook: StubHook, @@ -340,8 +388,12 @@ class RpcSessionImpl implements Importer, Exporter { // may be deleted from the middle (hence leaving the array sparse). onBrokenCallbacks: ((error: any) => void)[] = []; + // Encoding level from the transport (defaults to "stringify") + private encodingLevel: EncodingLevel; + constructor(private transport: RpcTransport, mainHook: StubHook, private options: RpcSessionOptions) { + this.encodingLevel = transport.encodingLevel ?? "stringify"; // Export zero is automatically the bootstrap object. this.exports.push({hook: mainHook, refcount: 1}); @@ -460,12 +512,12 @@ class RpcSessionImpl implements Importer, Exporter { payload => { // We don't transfer ownership of stubs in the payload since the payload // belongs to the hook which sticks around to handle pipelined requests. - let value = Devaluator.devaluate(payload.value, undefined, this, payload); + let value = Devaluator.devaluate(payload.value, undefined, this, payload, this.encodingLevel); this.send(["resolve", exportId, value]); if (autoRelease) this.releaseExport(exportId, 1); }, error => { - this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]); + this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this, undefined, this.encodingLevel)]); if (autoRelease) this.releaseExport(exportId, 1); } ).catch( @@ -473,7 +525,7 @@ class RpcSessionImpl implements Importer, Exporter { // If serialization failed, report the serialization error, which should // itself always be serializable. try { - this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]); + this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this, undefined, this.encodingLevel)]); if (autoRelease) this.releaseExport(exportId, 1); } catch (error2) { // TODO: Shouldn't happen, now what? @@ -567,9 +619,26 @@ class RpcSessionImpl implements Importer, Exporter { return 0; } - let msgText: string; + let encoded: string | object; + let msgLength: number; try { - msgText = JSON.stringify(msg); + // Only stringify at "stringify" level; otherwise pass object directly + if (this.encodingLevel === "stringify") { + let msgText = JSON.stringify(msg); + encoded = msgText; + msgLength = msgText.length; + } else { + encoded = msg; + // For non-stringify levels, estimate size for flow control. + // This may fail if msg contains non-JSON types (Uint8Array, BigInt, etc.) + // In that case, use a rough estimate. + try { + msgLength = JSON.stringify(msg).length; + } catch { + // Rough estimate: 100 bytes per top-level message element + msgLength = Array.isArray(msg) ? msg.length * 100 : 100; + } + } } catch (err) { // If JSON stringification failed, there's something wrong with the devaluator, as it should // not allow non-JSONable values to be injected in the first place. @@ -577,12 +646,12 @@ class RpcSessionImpl implements Importer, Exporter { throw err; } - this.transport.send(msgText) + this.transport.send(encoded) // If send fails, abort the connection, but don't try to send an abort message since // that'll probably also fail. .catch(err => this.abort(err, false)); - return msgText.length; + return msgLength; } sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook { @@ -590,7 +659,7 @@ class RpcSessionImpl implements Importer, Exporter { let value: Array = ["pipeline", id, path]; if (args) { - let devalue = Devaluator.devaluate(args.value, undefined, this, args); + let devalue = Devaluator.devaluate(args.value, undefined, this, args, this.encodingLevel); // HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap. // TODO: Clean this up somehow. @@ -611,7 +680,7 @@ class RpcSessionImpl implements Importer, Exporter { if (this.abortReason) throw this.abortReason; let value: Array = ["pipeline", id, path]; - let devalue = Devaluator.devaluate(args.value, undefined, this, args); + let devalue = Devaluator.devaluate(args.value, undefined, this, args, this.encodingLevel); // HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap. // TODO: Clean this up somehow. @@ -688,8 +757,9 @@ class RpcSessionImpl implements Importer, Exporter { if (trySendAbortMessage) { try { - this.transport.send(JSON.stringify(["abort", Devaluator - .devaluate(error, undefined, this)])) + let abortMsg = ["abort", Devaluator.devaluate(error, undefined, this, undefined, this.encodingLevel)]; + let encoded: string | object = this.encodingLevel === "stringify" ? JSON.stringify(abortMsg) : abortMsg; + this.transport.send(encoded) .catch(err => {}); } catch (err) { // ignore, probably the whole reason we're aborting is because the transport is broken @@ -736,14 +806,17 @@ class RpcSessionImpl implements Importer, Exporter { private async readLoop(abortPromise: Promise) { while (!this.abortReason) { - let msg = JSON.parse(await Promise.race([this.transport.receive(), abortPromise])); + let raw = await Promise.race([this.transport.receive(), abortPromise]); if (this.abortReason) break; // check again before processing + // Only parse JSON at "stringify" level; otherwise message is already an object + let msg = this.encodingLevel === "stringify" ? JSON.parse(raw as string) : raw; + if (msg instanceof Array) { switch (msg[0]) { case "push": // ["push", Expression] if (msg.length > 1) { - let payload = new Evaluator(this).evaluate(msg[1]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); let hook = new PayloadStubHook(payload); // It's possible for a rejection to occur before the client gets a chance to send @@ -802,11 +875,11 @@ class RpcSessionImpl implements Importer, Exporter { let imp = this.imports[importId]; if (imp) { if (msg[0] == "resolve") { - imp.resolve(new PayloadStubHook(new Evaluator(this).evaluate(msg[2]))); + imp.resolve(new PayloadStubHook(new Evaluator(this, this.encodingLevel).evaluate(msg[2]))); } else { // HACK: We expect errors are always simple values (no stubs) so we can just // pull the value out of the payload. - let payload = new Evaluator(this).evaluate(msg[2]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[2]); payload.dispose(); // just in case -- should be no-op imp.resolve(new ErrorStubHook(payload.value)); } @@ -817,7 +890,7 @@ class RpcSessionImpl implements Importer, Exporter { if (msg[0] == "resolve") { // We need to evaluate the resolution and immediately dispose it so that we // release any stubs it contains. - new Evaluator(this).evaluate(msg[2]).dispose(); + new Evaluator(this, this.encodingLevel).evaluate(msg[2]).dispose(); } } continue; @@ -836,7 +909,7 @@ class RpcSessionImpl implements Importer, Exporter { } case "abort": { - let payload = new Evaluator(this).evaluate(msg[1]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); payload.dispose(); // just in case -- should be no-op this.abort(payload, false); break; diff --git a/src/serialize.ts b/src/serialize.ts index bea1282..37b9c4c 100644 --- a/src/serialize.ts +++ b/src/serialize.ts @@ -7,6 +7,26 @@ import { StubHook, RpcPayload, typeForRpc, RpcStub, RpcPromise, LocatedPromise, export type ImportId = number; export type ExportId = number; +/** + * Encoding levels determine how much pre-processing the RPC system does before handing + * messages to the transport. + * + * - `"stringify"`: Full JSON encoding (string output). Default, used by HTTP batch. + * - `"devalue"`: JS object tree with all types encoded (JSON-compatible). For custom encoders. + * - `"partial"`: Like devalue but Uint8Array stays raw. For CBOR/MessagePack. + * - `"passthrough"`: Only encode stubs/functions, pass native types through. For MessagePort. + * + * @example + * ```ts + * // What happens to Uint8Array([1, 2, 3]) at each level: + * "stringify" → '["bytes","AQID"]' // JSON string with base64 + * "devalue" → ["bytes", "AQID"] // JS array with base64 + * "partial" → ["bytes", Uint8Array] // JS array with raw bytes + * "passthrough" → ["bytes", Uint8Array] // + Date, BigInt, Error stay native + * ``` + */ +export type EncodingLevel = "stringify" | "devalue" | "partial" | "passthrough"; + // ======================================================================================= export interface Exporter { @@ -73,7 +93,11 @@ interface FromBase64 { // actually converting to a string. (The name is meant to be the opposite of "Evaluator", which // implements the opposite direction.) export class Devaluator { - private constructor(private exporter: Exporter, private source: RpcPayload | undefined) {} + private constructor( + private exporter: Exporter, + private source: RpcPayload | undefined, + private encodingLevel: EncodingLevel + ) {} // Devaluate the given value. // * value: The value to devaluate. @@ -81,12 +105,15 @@ export class Devaluator { // as a function. // * exporter: Callbacks to the RPC session for exporting capabilities found in this message. // * source: The RpcPayload which contains the value, and therefore owns stubs within. + // * encodingLevel: How much encoding to apply (default "stringify"). // - // Returns: The devaluated value, ready to be JSON-serialized. + // Returns: The devaluated value, ready to be JSON-serialized (or passed to transport directly + // for non-stringify levels). public static devaluate( - value: unknown, parent?: object, exporter: Exporter = NULL_EXPORTER, source?: RpcPayload) + value: unknown, parent?: object, exporter: Exporter = NULL_EXPORTER, source?: RpcPayload, + encodingLevel: EncodingLevel = "stringify") : unknown { - let devaluator = new Devaluator(exporter, source); + let devaluator = new Devaluator(exporter, source, encodingLevel); try { return devaluator.devaluateImpl(value, parent, 0); } catch (err) { @@ -123,6 +150,10 @@ export class Devaluator { case "primitive": if (typeof value === "number" && !isFinite(value)) { + // At passthrough level, keep Infinity/NaN as native values + if (this.encodingLevel === "passthrough") { + return value; + } if (value === Infinity) { return ["inf"]; } else if (value === -Infinity) { @@ -156,13 +187,26 @@ export class Devaluator { } case "bigint": + // At passthrough level, keep BigInt as native value + if (this.encodingLevel === "passthrough") { + return value; + } return ["bigint", (value).toString()]; case "date": + // At passthrough level, keep Date as native value + if (this.encodingLevel === "passthrough") { + return value; + } return ["date", (value).getTime()]; case "bytes": { let bytes = value as Uint8Array; + // At passthrough or partial level, keep Uint8Array raw + if (this.encodingLevel === "passthrough" || this.encodingLevel === "partial") { + return ["bytes", bytes]; + } + // Otherwise encode as base64 if (bytes.toBase64) { return ["bytes", bytes.toBase64({omitPadding: true})]; } else { @@ -311,6 +355,11 @@ export class Devaluator { e = rewritten; } + // At passthrough level, keep Error as native value (still call onSendError above) + if (this.encodingLevel === "passthrough") { + return rewritten || value; + } + let result = ["error", e.name, e.message]; if (rewritten && rewritten.stack) { result.push(rewritten.stack); @@ -319,6 +368,10 @@ export class Devaluator { } case "undefined": + // At passthrough level, keep undefined as native value + if (this.encodingLevel === "passthrough") { + return undefined; + } return ["undefined"]; case "stub": @@ -465,7 +518,7 @@ function fixBrokenRequestBody(request: Request, body: ReadableStream): RpcPromis // delivery to the app. This is used to implement deserialization, except that it doesn't actually // start from a raw string. export class Evaluator { - constructor(private importer: Importer) {} + constructor(private importer: Importer, private encodingLevel: EncodingLevel = "stringify") {} private hooks: StubHook[] = []; private promises: LocatedPromise[] = []; @@ -487,6 +540,14 @@ export class Evaluator { } private evaluateImpl(value: unknown, parent: object, property: string | number): unknown { + // At passthrough level, native types come through directly + if (this.encodingLevel === "passthrough" || this.encodingLevel === "partial") { + if (value instanceof Date || value instanceof Uint8Array || + value instanceof Error || typeof value === "bigint") { + return value; + } + } + if (value instanceof Array) { if (value.length == 1 && value[0] instanceof Array) { // Escaped array. Evaluate the contents. @@ -507,6 +568,11 @@ export class Evaluator { } break; case "bytes": { + // At partial/passthrough level, bytes may already be a Uint8Array + if (value[1] instanceof Uint8Array) { + return value[1]; + } + // Otherwise decode from base64 let b64 = Uint8Array as FromBase64; if (typeof value[1] == "string") { if (b64.fromBase64) { diff --git a/src/websocket.ts b/src/websocket.ts index 32dbefa..37e4666 100644 --- a/src/websocket.ts +++ b/src/websocket.ts @@ -6,6 +6,7 @@ import { RpcStub } from "./core.js"; import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; +import type { EncodingLevel } from "./serialize.js"; export function newWebSocketRpcSession( webSocket: WebSocket | string, localMain?: any, options?: RpcSessionOptions): RpcStub { @@ -39,6 +40,8 @@ export function newWorkersWebSocketRpcResponse( } class WebSocketTransport implements RpcTransport { + readonly encodingLevel: EncodingLevel = "stringify"; + constructor (webSocket: WebSocket) { this.#webSocket = webSocket; @@ -88,16 +91,16 @@ class WebSocketTransport implements RpcTransport { #receiveQueue: string[] = []; #error?: any; - async send(message: string): Promise { + async send(message: string | object): Promise { if (this.#sendQueue === undefined) { - this.#webSocket.send(message); + this.#webSocket.send(message as string); } else { // Not open yet, queue for later. - this.#sendQueue.push(message); + this.#sendQueue.push(message as string); } } - async receive(): Promise { + async receive(): Promise { if (this.#receiveQueue.length > 0) { return this.#receiveQueue.shift()!; } else if (this.#error) { From c8e4bdbf8535da41bc94bf2d88076a9bdce60569 Mon Sep 17 00:00:00 2001 From: ashKalor <74497194+ashkalor@users.noreply.github.com> Date: Wed, 4 Mar 2026 14:37:08 +0530 Subject: [PATCH 2/4] fix: Add missing encoding level to stream message handling, Remove redundant JSON.stringify --- src/rpc.ts | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/rpc.ts b/src/rpc.ts index f542f4d..082c741 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -629,15 +629,10 @@ class RpcSessionImpl implements Importer, Exporter { msgLength = msgText.length; } else { encoded = msg; - // For non-stringify levels, estimate size for flow control. - // This may fail if msg contains non-JSON types (Uint8Array, BigInt, etc.) - // In that case, use a rough estimate. - try { - msgLength = JSON.stringify(msg).length; - } catch { - // Rough estimate: 100 bytes per top-level message element - msgLength = Array.isArray(msg) ? msg.length * 100 : 100; - } + // For non-stringify levels, use a rough estimate for flow control. + // Avoid JSON.stringify since it would fail on non-JSON types (Uint8Array, BigInt, etc.) + // and defeats the purpose of not stringifying. + msgLength = Array.isArray(msg) ? msg.length * 100 : 100; } } catch (err) { // If JSON stringification failed, there's something wrong with the devaluator, as it should @@ -835,7 +830,7 @@ class RpcSessionImpl implements Importer, Exporter { // - The export is automatically considered "pulled". // - Once the "resolve" is sent, the export is implicitly released. if (msg.length > 1) { - let payload = new Evaluator(this).evaluate(msg[1]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); let hook = new PayloadStubHook(payload); hook.ignoreUnhandledRejections(); From eddff0467074e93d34c35b72bbe43f025d3a320e Mon Sep 17 00:00:00 2001 From: ashKalor <74497194+ashkalor@users.noreply.github.com> Date: Sat, 7 Mar 2026 21:48:32 +0530 Subject: [PATCH 3/4] chore: Update readme to include a section for encoding levels --- README.md | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/README.md b/README.md index 3f436fc..2e033f4 100644 --- a/README.md +++ b/README.md @@ -736,3 +736,64 @@ let stub: RemoteMainInterface = session.getRemoteMain(); ``` Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not. + +### Encoding Levels + +Transports can operate at different encoding levels, controlling how messages are serialized: + +| Level | Message Format | Use Case | +| --------------- | ------------------------------- | ------------------------------- | +| `"stringify"` | JSON string | HTTP batch, WebSocket (default) | +| `"devalue"` | JS object (JSON-compatible) | Custom JSON-like encoders | +| `"partial"` | JS object with raw `Uint8Array` | CBOR, MessagePack | +| `"passthrough"` | Structured-clonable object | MessagePort, `postMessage()` | + +**Default behavior:** Existing code works unchanged. WebSocket and HTTP batch use `"stringify"`. MessagePort automatically uses `"passthrough"` for efficient structured cloning. + +```ts +// MessagePort: Uint8Array passed directly via structured clone, no base64 overhead +const channel = new MessageChannel(); +newMessagePortRpcSession(channel.port1, new FileService()); +const stub = newMessagePortRpcSession(channel.port2); +const contents = await stub.getFileContents("/path"); // Uint8Array transferred efficiently +``` + +**Binary encoding (CBOR/MessagePack):** Use `wrapTransport()` to add encoding at the `"partial"` level: + +```ts +import { wrapTransport, RpcSession } from "capnweb"; +import * as cbor from "cbor-x"; + +const rawTransport = createWebSocketTransport(url); +const cborTransport = wrapTransport( + rawTransport, + (msg) => cbor.encode(msg), + (data) => cbor.decode(data), + "partial" // Keeps Uint8Array raw for CBOR +); + +const session = new RpcSession(cborTransport); +``` + +**Custom transports:** Declare `encodingLevel` to tell the RPC system what format you expect: + +```ts +class MyBinaryTransport implements RpcTransport { + readonly encodingLevel: EncodingLevel = "partial"; + + async send(message: object): Promise { + // message is JS object; Uint8Array values are raw, not base64 + await this.connection.write(myEncoder.encode(message)); + } + + async receive(): Promise { + return myDecoder.decode(await this.connection.read()); + } +} +``` + +What happens to `Uint8Array([1, 2, 3])` at each level: +- `"stringify"` → `'["bytes","AQID"]'` (JSON string) +- `"devalue"` → `["bytes", "AQID"]` (JS object) +- `"partial"` → `["bytes", Uint8Array([1,2,3])]` (raw binary) +- `"passthrough"` → `["bytes", Uint8Array([1,2,3])]` (also preserves Date, BigInt, Error) From 519c2aea3173bba793964bfdeab23eddbcc9e03b Mon Sep 17 00:00:00 2001 From: ashKalor <74497194+ashkalor@users.noreply.github.com> Date: Sun, 8 Mar 2026 17:57:05 +0530 Subject: [PATCH 4/4] fix: Update encodinglevels + retain type safety --- README.md | 103 +++++++++++---------------- __tests__/index.test.ts | 6 +- src/batch.ts | 18 ++--- src/index.ts | 9 +-- src/messageport.ts | 21 +++--- src/rpc.ts | 152 +++++++++++++++++++--------------------- src/serialize.ts | 54 +++++++------- src/websocket.ts | 45 +++++++----- 8 files changed, 190 insertions(+), 218 deletions(-) diff --git a/README.md b/README.md index 2e033f4..fc9958a 100644 --- a/README.md +++ b/README.md @@ -692,27 +692,36 @@ Note that you should not use a `Window` object itself as a port for RPC -- you s ### Custom transports -You can implement a custom RPC transport across any bidirectional stream. To do so, implement the interface `RpcTransport`, which is defined as follows: +You can implement a custom RPC transport across any bidirectional stream. For string-based transports, implement `RpcTransport`: ```ts -// Interface for an RPC transport, which is a simple bidirectional message stream. export interface RpcTransport { - // Sends a message to the other end. - send(message: string): Promise; + // Sends a JSON string message. Returns the byte size of the message. + // Transport errors should be propagated via receive() rejecting. + send(message: string): number; // Receives a message sent by the other end. - // - // If and when the transport becomes disconnected, this will reject. The thrown error will be - // propagated to all outstanding calls and future calls on any stubs associated with the session. - // If there are no outstanding calls (and none are made in the future), then the error does not - // propagate anywhere -- this is considered a "clean" shutdown. receive(): Promise; - // Indicates that the RPC system has suffered an error that prevents the session from continuing. - // The transport should ideally try to send any queued messages if it can, and then close the - // connection. (It's not strictly necessary to deliver queued messages, but the last message sent - // before abort() is called is often an "abort" message, which communicates the error to the - // peer, so if that is dropped, the peer may have less information about what happened.) + // Called when the RPC system needs to abort the session. + abort?(reason: any): void; +} +``` + +For transports with custom encoding (CBOR, MessagePack, structured clone, etc.), implement `RpcTransportWithCustomEncoding`: + +```ts +export interface RpcTransportWithCustomEncoding { + // Declares what encoding level this transport uses. + readonly encodingLevel: "json" | "jsonWithBytes" | "structuredClone"; + + // Encodes and sends a message. Returns the encoded byte size if known + // (for flow control), or void if unavailable (e.g. structured clone). + send(message: unknown): number | void; + + // Receives and decodes a message. + receive(): Promise; + abort?(reason: any): void; } ``` @@ -735,65 +744,33 @@ let stub: RemoteMainInterface = session.getRemoteMain(); // Now we can call methods on the stub. ``` -Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not. - -### Encoding Levels - -Transports can operate at different encoding levels, controlling how messages are serialized: +Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not.ś -| Level | Message Format | Use Case | -| --------------- | ------------------------------- | ------------------------------- | -| `"stringify"` | JSON string | HTTP batch, WebSocket (default) | -| `"devalue"` | JS object (JSON-compatible) | Custom JSON-like encoders | -| `"partial"` | JS object with raw `Uint8Array` | CBOR, MessagePack | -| `"passthrough"` | Structured-clonable object | MessagePort, `postMessage()` | +#### Custom encoding levels -**Default behavior:** Existing code works unchanged. WebSocket and HTTP batch use `"stringify"`. MessagePort automatically uses `"passthrough"` for efficient structured cloning. +By default, `RpcTransport` sends and receives JSON strings. To use a binary format like CBOR or MessagePack, implement `RpcTransportWithCustomEncoding` instead, which declares an `encodingLevel` so the RPC system knows how much serialization to do before handing messages to the transport: ```ts -// MessagePort: Uint8Array passed directly via structured clone, no base64 overhead -const channel = new MessageChannel(); -newMessagePortRpcSession(channel.port1, new FileService()); -const stub = newMessagePortRpcSession(channel.port2); -const contents = await stub.getFileContents("/path"); // Uint8Array transferred efficiently -``` - -**Binary encoding (CBOR/MessagePack):** Use `wrapTransport()` to add encoding at the `"partial"` level: - -```ts -import { wrapTransport, RpcSession } from "capnweb"; +import { RpcTransportWithCustomEncoding, RpcSession } from "capnweb"; import * as cbor from "cbor-x"; -const rawTransport = createWebSocketTransport(url); -const cborTransport = wrapTransport( - rawTransport, - (msg) => cbor.encode(msg), - (data) => cbor.decode(data), - "partial" // Keeps Uint8Array raw for CBOR -); - -const session = new RpcSession(cborTransport); -``` +class CborTransport implements RpcTransportWithCustomEncoding { + readonly encodingLevel = "jsonWithBytes"; // Uint8Array stays raw for CBOR -**Custom transports:** Declare `encodingLevel` to tell the RPC system what format you expect: - -```ts -class MyBinaryTransport implements RpcTransport { - readonly encodingLevel: EncodingLevel = "partial"; - - async send(message: object): Promise { - // message is JS object; Uint8Array values are raw, not base64 - await this.connection.write(myEncoder.encode(message)); + send(msg: unknown): number { + const encoded = cbor.encode(msg); + this.ws.send(encoded); + return encoded.byteLength; } - async receive(): Promise { - return myDecoder.decode(await this.connection.read()); + async receive(): Promise { + return cbor.decode(new Uint8Array(await this.nextMessage())); } + + abort(reason: any) { this.ws.close(3000, String(reason)); } } + +const session = new RpcSession(new CborTransport(ws)); ``` -What happens to `Uint8Array([1, 2, 3])` at each level: -- `"stringify"` → `'["bytes","AQID"]'` (JSON string) -- `"devalue"` → `["bytes", "AQID"]` (JS object) -- `"partial"` → `["bytes", Uint8Array([1,2,3])]` (raw binary) -- `"passthrough"` → `["bytes", Uint8Array([1,2,3])]` (also preserves Date, BigInt, Error) +The available encoding levels are `"json"` (JSON-compatible object tree), `"jsonWithBytes"` (same but `Uint8Array` stays raw), and `"structuredClone"` (native types like `Date`, `BigInt`, `Error` also pass through). The built-in `MessagePort` transport uses `"structuredClone"` automatically. diff --git a/__tests__/index.test.ts b/__tests__/index.test.ts index 35c68c2..f82767b 100644 --- a/__tests__/index.test.ts +++ b/__tests__/index.test.ts @@ -144,7 +144,7 @@ class TestTransport implements RpcTransport { public log = false; private fenced = false; - async send(message: string): Promise { + send(message: string): void { // HACK: If the string "$remove$" appears in the message, remove it. This is used in some // tests to hack the RPC protocol. message = message.replaceAll("$remove$", ""); @@ -1846,7 +1846,7 @@ describe("WritableStream over RPC", () => { // Collect all messages sent by the server (which appear in the client's queue). let serverMessages: any[] = []; let origServerSend = harness.serverTransport.send; - harness.serverTransport.send = async function(message: string) { + harness.serverTransport.send = function(message: string) { serverMessages.push(JSON.parse(message)); return origServerSend.call(this, message); }; @@ -1854,7 +1854,7 @@ describe("WritableStream over RPC", () => { // Collect all messages sent by the client (which appear in the server's queue). let clientMessages: any[] = []; let origClientSend = harness.clientTransport.send; - harness.clientTransport.send = async function(message: string) { + harness.clientTransport.send = function(message: string) { clientMessages.push(JSON.parse(message)); return origClientSend.call(this, message); }; diff --git a/src/batch.ts b/src/batch.ts index 1fd4ace..81927cf 100644 --- a/src/batch.ts +++ b/src/batch.ts @@ -4,14 +4,11 @@ import { RpcStub } from "./core.js"; import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; -import type { EncodingLevel } from "./serialize.js"; import type { IncomingMessage, ServerResponse, OutgoingHttpHeader, OutgoingHttpHeaders } from "node:http"; type SendBatchFunc = (batch: string[]) => Promise; class BatchClientTransport implements RpcTransport { - readonly encodingLevel: EncodingLevel = "stringify"; - constructor(sendBatch: SendBatchFunc) { this.#promise = this.#scheduleBatch(sendBatch); } @@ -22,16 +19,16 @@ class BatchClientTransport implements RpcTransport { #batchToSend: string[] | null = []; #batchToReceive: string[] | null = null; - async send(message: string | object): Promise { + send(message: string): void { // If the batch was already sent, we just ignore the message, because throwing may cause the // RPC system to abort prematurely. Once the last receive() is done then we'll throw an error // that aborts the RPC system at the right time and will propagate to all other requests. if (this.#batchToSend !== null) { - this.#batchToSend.push(message as string); + this.#batchToSend.push(message); } } - async receive(): Promise { + async receive(): Promise { if (!this.#batchToReceive) { await this.#promise; } @@ -93,8 +90,6 @@ export function newHttpBatchRpcSession( } class BatchServerTransport implements RpcTransport { - readonly encodingLevel: EncodingLevel = "stringify"; - constructor(batch: string[]) { this.#batchToReceive = batch; } @@ -103,11 +98,12 @@ class BatchServerTransport implements RpcTransport { #batchToReceive: string[]; #allReceived: PromiseWithResolvers = Promise.withResolvers(); - async send(message: string | object): Promise { - this.#batchToSend.push(message as string); + send(message: string): number { + this.#batchToSend.push(message); + return message.length; } - async receive(): Promise { + async receive(): Promise { let msg = this.#batchToReceive!.shift(); if (msg !== undefined) { return msg; diff --git a/src/index.ts b/src/index.ts index dc75882..5dfc35a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,7 +4,7 @@ import { RpcTarget as RpcTargetImpl, RpcStub as RpcStubImpl, RpcPromise as RpcPromiseImpl } from "./core.js"; import { serialize, deserialize, EncodingLevel } from "./serialize.js"; -import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions, wrapTransport } from "./rpc.js"; +import { RpcTransport, RpcTransportWithCustomEncoding, AnyRpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js"; import { RpcTargetBranded, RpcCompatible, Stub, Stubify, __RPC_TARGET_BRAND } from "./types.js"; import { newWebSocketRpcSession as newWebSocketRpcSessionImpl, newWorkersWebSocketRpcResponse } from "./websocket.js"; @@ -19,8 +19,9 @@ forceInitStreams(); // Re-export public API types. export { serialize, deserialize, newWorkersWebSocketRpcResponse, newHttpBatchRpcResponse, - nodeHttpBatchRpcResponse, wrapTransport }; -export type { RpcTransport, RpcSessionOptions, RpcCompatible, EncodingLevel }; + nodeHttpBatchRpcResponse }; +export type { RpcTransport, RpcTransportWithCustomEncoding, AnyRpcTransport, + RpcSessionOptions, RpcCompatible, EncodingLevel }; // Hack the type system to make RpcStub's types work nicely! /** @@ -76,7 +77,7 @@ export interface RpcSession = undefined> { } export const RpcSession: { new = undefined>( - transport: RpcTransport, localMain?: any, options?: RpcSessionOptions): RpcSession; + transport: AnyRpcTransport, localMain?: any, options?: RpcSessionOptions): RpcSession; } = RpcSessionImpl; // RpcTarget needs some hackage too to brand it properly and account for the implementation diff --git a/src/messageport.ts b/src/messageport.ts index 79e684f..2086e40 100644 --- a/src/messageport.ts +++ b/src/messageport.ts @@ -3,8 +3,7 @@ // https://opensource.org/license/mit import { RpcStub } from "./core.js"; -import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; -import type { EncodingLevel } from "./serialize.js"; +import { RpcTransportWithCustomEncoding, RpcSession, RpcSessionOptions } from "./rpc.js"; // Start a MessagePort session given a MessagePort or a pair of MessagePorts. // @@ -17,8 +16,8 @@ export function newMessagePortRpcSession( return rpc.getRemoteMain(); } -class MessagePortTransport implements RpcTransport { - readonly encodingLevel: EncodingLevel = "passthrough"; +class MessagePortTransport implements RpcTransportWithCustomEncoding { + readonly encodingLevel = "structuredClone" as const; constructor (port: MessagePort) { this.#port = port; @@ -50,32 +49,32 @@ class MessagePortTransport implements RpcTransport { } #port: MessagePort; - #receiveResolver?: (message: string | object) => void; + #receiveResolver?: (message: unknown) => void; #receiveRejecter?: (err: any) => void; - #receiveQueue: (string | object)[] = []; + #receiveQueue: unknown[] = []; #error?: any; - async send(message: string | object): Promise { + send(message: unknown): void { if (this.#error) { throw this.#error; } this.#port.postMessage(message); } - async receive(): Promise { + async receive(): Promise { if (this.#receiveQueue.length > 0) { return this.#receiveQueue.shift()!; } else if (this.#error) { throw this.#error; } else { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { this.#receiveResolver = resolve; this.#receiveRejecter = reject; }); } } - abort?(reason: any): void { + abort(reason: any): void { // Send close signal to peer before closing try { this.#port.postMessage(null); @@ -101,4 +100,4 @@ class MessagePortTransport implements RpcTransport { } } } -} \ No newline at end of file +} diff --git a/src/rpc.ts b/src/rpc.ts index 082c741..dcc3a31 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -6,24 +6,15 @@ import { StubHook, RpcPayload, RpcStub, PropertyPath, PayloadStubHook, ErrorStub import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize, EncodingLevel } from "./serialize.js"; /** - * Interface for an RPC transport, which is a simple bidirectional message stream. Implement this - * interface if the built-in transports (e.g. for HTTP batch and WebSocket) don't meet your needs. + * Interface for a string-based RPC transport. This is the default transport type — no + * `encodingLevel` field is needed. Messages are JSON strings. Implement this interface if the + * built-in transports (e.g. for HTTP batch and WebSocket) don't meet your needs. */ export interface RpcTransport { - /** - * The encoding level this transport works with. Defaults to "stringify" if not specified. - * - * - "stringify": Transport receives/sends JSON strings. - * - "devalue": Transport receives/sends JS objects (JSON-compatible). - * - "partial": Transport receives/sends JS objects with raw Uint8Array. - * - "passthrough": Transport receives/sends structured-clonable objects. - */ - readonly encodingLevel?: EncodingLevel; - /** * Sends a message to the other end. */ - send(message: string | object): Promise; + send(message: string): void; /** * Receives a message sent by the other end. @@ -33,7 +24,7 @@ export interface RpcTransport { * If there are no outstanding calls (and none are made in the future), then the error does not * propagate anywhere -- this is considered a "clean" shutdown. */ - receive(): Promise; + receive(): Promise; /** * Indicates that the RPC system has suffered an error that prevents the session from continuing. @@ -46,43 +37,51 @@ export interface RpcTransport { } /** - * Wraps a transport with custom encode/decode functions, allowing you to use binary formats - * like CBOR or MessagePack. - * - * @param inner The underlying transport to wrap. - * @param encode Function to encode outgoing messages. - * @param decode Function to decode incoming messages. - * @param level The encoding level this wrapped transport operates at. - * - * @example - * ```ts - * import * as cbor from "cbor-x"; - * - * // Wrap a WebSocket transport with CBOR encoding - * const cborTransport = wrapTransport( - * rawWebSocketTransport, - * (msg) => cbor.encode(msg), - * (data) => cbor.decode(data as Uint8Array), - * "partial" // Uint8Array stays raw for CBOR - * ); - * - * const session = new RpcSession(cborTransport, myApi); - * ``` + * Interface for a transport with custom binary encoding (e.g. CBOR, MessagePack). The transport + * is responsible for encoding/decoding messages and reporting the encoded byte size for flow + * control. */ -export function wrapTransport( - inner: RpcTransport, - encode: (value: string | object) => string | object, - decode: (data: string | object) => string | object, - level: EncodingLevel -): RpcTransport { - return { - encodingLevel: level, - async send(message: string | object) { await inner.send(encode(message)); }, - async receive() { return decode(await inner.receive()); }, - abort: inner.abort?.bind(inner), - }; +export interface RpcTransportWithCustomEncoding { + /** + * The encoding level this transport works with. + * + * - "json": Transport encodes/decodes JS objects (JSON-compatible). + * - "jsonWithBytes": Like "json" but Uint8Array values are left raw (not base64-encoded). + * - "structuredClone": Native types like Date, BigInt, Error pass through (e.g. MessagePort). + */ + readonly encodingLevel: "json" | "jsonWithBytes" | "structuredClone"; + + /** + * Encodes and sends a message to the other end. Returns the encoded byte size if known + * (for flow control), or void if the size is unavailable (e.g. structured clone transports). + * When void is returned, stream writes are serialized (no overlapping) instead of using + * window-based flow control. Send errors should be propagated via `receive()` rejecting. + */ + send(message: unknown): number | void; + + /** + * Receives and decodes a message sent by the other end. + * + * If and when the transport becomes disconnected, this will reject. The thrown error will be + * propagated to all outstanding calls and future calls on any stubs associated with the session. + * If there are no outstanding calls (and none are made in the future), then the error does not + * propagate anywhere -- this is considered a "clean" shutdown. + */ + receive(): Promise; + + /** + * Indicates that the RPC system has suffered an error that prevents the session from continuing. + * The transport should ideally try to send any queued messages if it can, and then close the + * connection. (It's not strictly necessary to deliver queued messages, but the last message sent + * before abort() is called is often an "abort" message, which communicates the error to the + * peer, so if that is dropped, the peer may have less information about what happened.) + */ + abort?(reason: any): void; } +/** Any supported transport type. */ +export type AnyRpcTransport = RpcTransport | RpcTransportWithCustomEncoding; + // Entry on the exports table. type ExportTableEntry = { hook: StubHook, @@ -388,12 +387,12 @@ class RpcSessionImpl implements Importer, Exporter { // may be deleted from the middle (hence leaving the array sparse). onBrokenCallbacks: ((error: any) => void)[] = []; - // Encoding level from the transport (defaults to "stringify") + // Encoding level from the transport (defaults to "string") private encodingLevel: EncodingLevel; - constructor(private transport: RpcTransport, mainHook: StubHook, + constructor(private transport: AnyRpcTransport, mainHook: StubHook, private options: RpcSessionOptions) { - this.encodingLevel = transport.encodingLevel ?? "stringify"; + this.encodingLevel = 'encodingLevel' in transport ? transport.encodingLevel : "string"; // Export zero is automatically the bootstrap object. this.exports.push({hook: mainHook, refcount: 1}); @@ -612,41 +611,32 @@ class RpcSessionImpl implements Importer, Exporter { return importId; } - // Serializes and sends a message. Returns the byte length of the serialized message. - private send(msg: any): number { + // Serializes and sends a message. Returns the byte length of the serialized message, + // or undefined if the transport doesn't report size (e.g. structured clone). + private send(msg: any): number | void { if (this.abortReason !== undefined) { // Ignore sends after we've aborted. return 0; } - let encoded: string | object; - let msgLength: number; try { - // Only stringify at "stringify" level; otherwise pass object directly - if (this.encodingLevel === "stringify") { + if (this.encodingLevel === "string") { + // Stringify and send via string transport. We know the size from the string length. let msgText = JSON.stringify(msg); - encoded = msgText; - msgLength = msgText.length; + (this.transport as RpcTransport).send(msgText); + return msgText.length; } else { - encoded = msg; - // For non-stringify levels, use a rough estimate for flow control. - // Avoid JSON.stringify since it would fail on non-JSON types (Uint8Array, BigInt, etc.) - // and defeats the purpose of not stringifying. - msgLength = Array.isArray(msg) ? msg.length * 100 : 100; + // Custom encoding transport encodes and returns the actual encoded size, + // or void if size is unavailable (e.g. structured clone). + return (this.transport as RpcTransportWithCustomEncoding).send(msg); } } catch (err) { // If JSON stringification failed, there's something wrong with the devaluator, as it should - // not allow non-JSONable values to be injected in the first place. + // not allow non-JSONable values to be injected in the first place. If send() threw, the + // transport is broken. Either way, abort the session. try { this.abort(err); } catch (err2) {} throw err; } - - this.transport.send(encoded) - // If send fails, abort the connection, but don't try to send an abort message since - // that'll probably also fail. - .catch(err => this.abort(err, false)); - - return msgLength; } sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook { @@ -671,7 +661,7 @@ class RpcSessionImpl implements Importer, Exporter { } sendStream(id: ImportId, path: PropertyPath, args: RpcPayload) - : {promise: Promise, size: number} { + : {promise: Promise, size?: number} { if (this.abortReason) throw this.abortReason; let value: Array = ["pipeline", id, path]; @@ -681,7 +671,7 @@ class RpcSessionImpl implements Importer, Exporter { // TODO: Clean this up somehow. value.push((>devalue)[0]); - let size = this.send(["stream", value]); + let size = this.send(["stream", value]) ?? undefined; // Create the import entry in "already pulling" state (pulling=true), since stream messages // are automatically pulled. Set remoteRefcount to 0 so that resolve() won't send a release @@ -753,9 +743,11 @@ class RpcSessionImpl implements Importer, Exporter { if (trySendAbortMessage) { try { let abortMsg = ["abort", Devaluator.devaluate(error, undefined, this, undefined, this.encodingLevel)]; - let encoded: string | object = this.encodingLevel === "stringify" ? JSON.stringify(abortMsg) : abortMsg; - this.transport.send(encoded) - .catch(err => {}); + if (this.encodingLevel === "string") { + (this.transport as RpcTransport).send(JSON.stringify(abortMsg)); + } else { + (this.transport as RpcTransportWithCustomEncoding).send(abortMsg); + } } catch (err) { // ignore, probably the whole reason we're aborting is because the transport is broken } @@ -804,8 +796,8 @@ class RpcSessionImpl implements Importer, Exporter { let raw = await Promise.race([this.transport.receive(), abortPromise]); if (this.abortReason) break; // check again before processing - // Only parse JSON at "stringify" level; otherwise message is already an object - let msg = this.encodingLevel === "stringify" ? JSON.parse(raw as string) : raw; + // Only parse JSON at "string" level; otherwise message is already an object + let msg = this.encodingLevel === "string" ? JSON.parse(raw as string) : raw; if (msg instanceof Array) { switch (msg[0]) { @@ -947,7 +939,7 @@ export class RpcSession { #session: RpcSessionImpl; #mainStub: RpcStub; - constructor(transport: RpcTransport, localMain?: any, options: RpcSessionOptions = {}) { + constructor(transport: AnyRpcTransport, localMain?: any, options: RpcSessionOptions = {}) { let mainHook: StubHook; if (localMain) { mainHook = new PayloadStubHook(RpcPayload.fromAppReturn(localMain)); diff --git a/src/serialize.ts b/src/serialize.ts index 37b9c4c..c9608ef 100644 --- a/src/serialize.ts +++ b/src/serialize.ts @@ -11,21 +11,21 @@ export type ExportId = number; * Encoding levels determine how much pre-processing the RPC system does before handing * messages to the transport. * - * - `"stringify"`: Full JSON encoding (string output). Default, used by HTTP batch. - * - `"devalue"`: JS object tree with all types encoded (JSON-compatible). For custom encoders. - * - `"partial"`: Like devalue but Uint8Array stays raw. For CBOR/MessagePack. - * - `"passthrough"`: Only encode stubs/functions, pass native types through. For MessagePort. + * - `"string"`: Full JSON encoding (string output). Default, used by HTTP batch. + * - `"json"`: JS object tree with all types encoded (JSON-compatible). For custom encoders. + * - `"jsonWithBytes"`: Like json but Uint8Array stays raw. For CBOR/MessagePack. + * - `"structuredClone"`: Only encode stubs/functions, pass native types through. For MessagePort. * * @example * ```ts * // What happens to Uint8Array([1, 2, 3]) at each level: - * "stringify" → '["bytes","AQID"]' // JSON string with base64 - * "devalue" → ["bytes", "AQID"] // JS array with base64 - * "partial" → ["bytes", Uint8Array] // JS array with raw bytes - * "passthrough" → ["bytes", Uint8Array] // + Date, BigInt, Error stay native + * "string" → '["bytes","AQID"]' // JSON string with base64 + * "json" → ["bytes", "AQID"] // JS array with base64 + * "jsonWithBytes" → ["bytes", Uint8Array] // JS array with raw bytes + * "structuredClone" → ["bytes", Uint8Array] // + Date, BigInt, Error stay native * ``` */ -export type EncodingLevel = "stringify" | "devalue" | "partial" | "passthrough"; +export type EncodingLevel = "string" | "json" | "jsonWithBytes" | "structuredClone"; // ======================================================================================= @@ -105,13 +105,13 @@ export class Devaluator { // as a function. // * exporter: Callbacks to the RPC session for exporting capabilities found in this message. // * source: The RpcPayload which contains the value, and therefore owns stubs within. - // * encodingLevel: How much encoding to apply (default "stringify"). + // * encodingLevel: How much encoding to apply (default "string"). // // Returns: The devaluated value, ready to be JSON-serialized (or passed to transport directly - // for non-stringify levels). + // for non-string levels). public static devaluate( value: unknown, parent?: object, exporter: Exporter = NULL_EXPORTER, source?: RpcPayload, - encodingLevel: EncodingLevel = "stringify") + encodingLevel: EncodingLevel = "string") : unknown { let devaluator = new Devaluator(exporter, source, encodingLevel); try { @@ -151,7 +151,7 @@ export class Devaluator { case "primitive": if (typeof value === "number" && !isFinite(value)) { // At passthrough level, keep Infinity/NaN as native values - if (this.encodingLevel === "passthrough") { + if (this.encodingLevel === "structuredClone") { return value; } if (value === Infinity) { @@ -187,23 +187,23 @@ export class Devaluator { } case "bigint": - // At passthrough level, keep BigInt as native value - if (this.encodingLevel === "passthrough") { + // At structuredClone level, keep BigInt as native value + if (this.encodingLevel === "structuredClone") { return value; } return ["bigint", (value).toString()]; case "date": - // At passthrough level, keep Date as native value - if (this.encodingLevel === "passthrough") { + // At structuredClone level, keep Date as native value + if (this.encodingLevel === "structuredClone") { return value; } return ["date", (value).getTime()]; case "bytes": { let bytes = value as Uint8Array; - // At passthrough or partial level, keep Uint8Array raw - if (this.encodingLevel === "passthrough" || this.encodingLevel === "partial") { + // At structuredClone or jsonWithBytes level, keep Uint8Array raw + if (this.encodingLevel === "structuredClone" || this.encodingLevel === "jsonWithBytes") { return ["bytes", bytes]; } // Otherwise encode as base64 @@ -355,8 +355,8 @@ export class Devaluator { e = rewritten; } - // At passthrough level, keep Error as native value (still call onSendError above) - if (this.encodingLevel === "passthrough") { + // At structuredClone level, keep Error as native value (still call onSendError above) + if (this.encodingLevel === "structuredClone") { return rewritten || value; } @@ -368,8 +368,8 @@ export class Devaluator { } case "undefined": - // At passthrough level, keep undefined as native value - if (this.encodingLevel === "passthrough") { + // At structuredClone level, keep undefined as native value + if (this.encodingLevel === "structuredClone") { return undefined; } return ["undefined"]; @@ -518,7 +518,7 @@ function fixBrokenRequestBody(request: Request, body: ReadableStream): RpcPromis // delivery to the app. This is used to implement deserialization, except that it doesn't actually // start from a raw string. export class Evaluator { - constructor(private importer: Importer, private encodingLevel: EncodingLevel = "stringify") {} + constructor(private importer: Importer, private encodingLevel: EncodingLevel = "string") {} private hooks: StubHook[] = []; private promises: LocatedPromise[] = []; @@ -540,8 +540,8 @@ export class Evaluator { } private evaluateImpl(value: unknown, parent: object, property: string | number): unknown { - // At passthrough level, native types come through directly - if (this.encodingLevel === "passthrough" || this.encodingLevel === "partial") { + // At structuredClone level, native types come through directly + if (this.encodingLevel === "structuredClone" || this.encodingLevel === "jsonWithBytes") { if (value instanceof Date || value instanceof Uint8Array || value instanceof Error || typeof value === "bigint") { return value; @@ -568,7 +568,7 @@ export class Evaluator { } break; case "bytes": { - // At partial/passthrough level, bytes may already be a Uint8Array + // At jsonWithBytes/structuredClone level, bytes may already be a Uint8Array if (value[1] instanceof Uint8Array) { return value[1]; } diff --git a/src/websocket.ts b/src/websocket.ts index 37e4666..b081766 100644 --- a/src/websocket.ts +++ b/src/websocket.ts @@ -6,7 +6,6 @@ import { RpcStub } from "./core.js"; import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; -import type { EncodingLevel } from "./serialize.js"; export function newWebSocketRpcSession( webSocket: WebSocket | string, localMain?: any, options?: RpcSessionOptions): RpcStub { @@ -39,12 +38,17 @@ export function newWorkersWebSocketRpcResponse( }); } -class WebSocketTransport implements RpcTransport { - readonly encodingLevel: EncodingLevel = "stringify"; - +/** + * Generic WebSocket transport. Default `T = string` is backward-compatible and satisfies + * `RpcTransport`. Use `T = ArrayBuffer` as a building block for binary transports. + */ +export class WebSocketTransport { constructor (webSocket: WebSocket) { this.#webSocket = webSocket; + // Always set binaryType — harmless for string mode, required for ArrayBuffer mode. + webSocket.binaryType = "arraybuffer"; + if (webSocket.readyState === WebSocket.CONNECTING) { this.#sendQueue = []; webSocket.addEventListener("open", event => { @@ -62,16 +66,16 @@ class WebSocketTransport implements RpcTransport { webSocket.addEventListener("message", (event: MessageEvent) => { if (this.#error) { // Ignore further messages. - } else if (typeof event.data === "string") { + } else if (typeof event.data === "string" || event.data instanceof ArrayBuffer) { if (this.#receiveResolver) { - this.#receiveResolver(event.data); + this.#receiveResolver(event.data as T); this.#receiveResolver = undefined; this.#receiveRejecter = undefined; } else { - this.#receiveQueue.push(event.data); + this.#receiveQueue.push(event.data as T); } } else { - this.#receivedError(new TypeError("Received non-string message from WebSocket.")); + this.#receivedError(new TypeError("Received unexpected message type from WebSocket.")); } }); @@ -85,35 +89,35 @@ class WebSocketTransport implements RpcTransport { } #webSocket: WebSocket; - #sendQueue?: string[]; // only if not opened yet - #receiveResolver?: (message: string) => void; + #sendQueue?: T[]; // only if not opened yet + #receiveResolver?: (message: T) => void; #receiveRejecter?: (err: any) => void; - #receiveQueue: string[] = []; + #receiveQueue: T[] = []; #error?: any; - async send(message: string | object): Promise { + send(message: T): void { if (this.#sendQueue === undefined) { - this.#webSocket.send(message as string); + this.#webSocket.send(message); } else { // Not open yet, queue for later. - this.#sendQueue.push(message as string); + this.#sendQueue.push(message); } } - async receive(): Promise { + receive(): Promise { if (this.#receiveQueue.length > 0) { - return this.#receiveQueue.shift()!; + return Promise.resolve(this.#receiveQueue.shift()!); } else if (this.#error) { - throw this.#error; + return Promise.reject(this.#error); } else { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { this.#receiveResolver = resolve; this.#receiveRejecter = reject; }); } } - abort?(reason: any): void { + abort(reason: any): void { let message: string; if (reason instanceof Error) { message = reason.message; @@ -139,3 +143,6 @@ class WebSocketTransport implements RpcTransport { } } } + +// WebSocketTransport satisfies RpcTransport (can't use `implements` on generic class). +const _typeCheck: RpcTransport = null! as WebSocketTransport;