feat: add encoding levels to RPC transport#144
feat: add encoding levels to RPC transport#144ashkalor wants to merge 4 commits intocloudflare:mainfrom
Conversation
|
…dundant JSON.stringify
7680e00 to
c8e4bdb
Compare
|
Having some examples in the README.md would be appreciated 🙏. |
Hey just added them, let me know if it looks good. Didn't initially add them because I was expecting some kind of feedback. |
kentonv
left a comment
There was a problem hiding this comment.
Really like where this is going! Getting the types right seems a bit tricky.
README.md
Outdated
|
|
||
| ### Encoding Levels | ||
|
|
||
| Transports can operate at different encoding levels, controlling how messages are serialized: | ||
|
|
||
| | Level | Message Format | Use Case | | ||
| | --------------- | ------------------------------- | ------------------------------- | | ||
| | `"stringify"` | JSON string | HTTP batch, WebSocket (default) | | ||
| | `"devalue"` | JS object (JSON-compatible) | Custom JSON-like encoders | | ||
| | `"partial"` | JS object with raw `Uint8Array` | CBOR, MessagePack | | ||
| | `"passthrough"` | Structured-clonable object | MessagePort, `postMessage()` | | ||
|
|
||
| **Default behavior:** Existing code works unchanged. WebSocket and HTTP batch use `"stringify"`. MessagePort automatically uses `"passthrough"` for efficient structured cloning. | ||
|
|
||
| ```ts | ||
| // MessagePort: Uint8Array passed directly via structured clone, no base64 overhead | ||
| const channel = new MessageChannel(); | ||
| newMessagePortRpcSession(channel.port1, new FileService()); | ||
| const stub = newMessagePortRpcSession<FileService>(channel.port2); | ||
| const contents = await stub.getFileContents("/path"); // Uint8Array transferred efficiently | ||
| ``` | ||
|
|
||
| **Binary encoding (CBOR/MessagePack):** Use `wrapTransport()` to add encoding at the `"partial"` level: | ||
|
|
||
| ```ts | ||
| import { wrapTransport, RpcSession } from "capnweb"; | ||
| import * as cbor from "cbor-x"; | ||
|
|
||
| const rawTransport = createWebSocketTransport(url); | ||
| const cborTransport = wrapTransport( | ||
| rawTransport, | ||
| (msg) => cbor.encode(msg), | ||
| (data) => cbor.decode(data), | ||
| "partial" // Keeps Uint8Array raw for CBOR | ||
| ); | ||
|
|
||
| const session = new RpcSession<MyApi>(cborTransport); | ||
| ``` | ||
|
|
||
| **Custom transports:** Declare `encodingLevel` to tell the RPC system what format you expect: | ||
|
|
||
| ```ts | ||
| class MyBinaryTransport implements RpcTransport { | ||
| readonly encodingLevel: EncodingLevel = "partial"; | ||
|
|
||
| async send(message: object): Promise<void> { | ||
| // message is JS object; Uint8Array values are raw, not base64 | ||
| await this.connection.write(myEncoder.encode(message)); | ||
| } | ||
|
|
||
| async receive(): Promise<object> { | ||
| return myDecoder.decode(await this.connection.read()); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| What happens to `Uint8Array([1, 2, 3])` at each level: | ||
| - `"stringify"` → `'["bytes","AQID"]'` (JSON string) | ||
| - `"devalue"` → `["bytes", "AQID"]` (JS object) | ||
| - `"partial"` → `["bytes", Uint8Array([1,2,3])]` (raw binary) | ||
| - `"passthrough"` → `["bytes", Uint8Array([1,2,3])]` (also preserves Date, BigInt, Error) |
There was a problem hiding this comment.
This section is written more like a change description than documentation. Phrases like "existing code works unchanged" don't really make sense in a README. I also think there's too much information here -- this is an obscure feature most readers don't need to know so much about.
Let's just add this to the end of the previous section ("Custom Transports"):
| ### Encoding Levels | |
| Transports can operate at different encoding levels, controlling how messages are serialized: | |
| | Level | Message Format | Use Case | | |
| | --------------- | ------------------------------- | ------------------------------- | | |
| | `"stringify"` | JSON string | HTTP batch, WebSocket (default) | | |
| | `"devalue"` | JS object (JSON-compatible) | Custom JSON-like encoders | | |
| | `"partial"` | JS object with raw `Uint8Array` | CBOR, MessagePack | | |
| | `"passthrough"` | Structured-clonable object | MessagePort, `postMessage()` | | |
| **Default behavior:** Existing code works unchanged. WebSocket and HTTP batch use `"stringify"`. MessagePort automatically uses `"passthrough"` for efficient structured cloning. | |
| ```ts | |
| // MessagePort: Uint8Array passed directly via structured clone, no base64 overhead | |
| const channel = new MessageChannel(); | |
| newMessagePortRpcSession(channel.port1, new FileService()); | |
| const stub = newMessagePortRpcSession<FileService>(channel.port2); | |
| const contents = await stub.getFileContents("/path"); // Uint8Array transferred efficiently | |
| ``` | |
| **Binary encoding (CBOR/MessagePack):** Use `wrapTransport()` to add encoding at the `"partial"` level: | |
| ```ts | |
| import { wrapTransport, RpcSession } from "capnweb"; | |
| import * as cbor from "cbor-x"; | |
| const rawTransport = createWebSocketTransport(url); | |
| const cborTransport = wrapTransport( | |
| rawTransport, | |
| (msg) => cbor.encode(msg), | |
| (data) => cbor.decode(data), | |
| "partial" // Keeps Uint8Array raw for CBOR | |
| ); | |
| const session = new RpcSession<MyApi>(cborTransport); | |
| ``` | |
| **Custom transports:** Declare `encodingLevel` to tell the RPC system what format you expect: | |
| ```ts | |
| class MyBinaryTransport implements RpcTransport { | |
| readonly encodingLevel: EncodingLevel = "partial"; | |
| async send(message: object): Promise<void> { | |
| // message is JS object; Uint8Array values are raw, not base64 | |
| await this.connection.write(myEncoder.encode(message)); | |
| } | |
| async receive(): Promise<object> { | |
| return myDecoder.decode(await this.connection.read()); | |
| } | |
| } | |
| ``` | |
| What happens to `Uint8Array([1, 2, 3])` at each level: | |
| - `"stringify"` → `'["bytes","AQID"]'` (JSON string) | |
| - `"devalue"` → `["bytes", "AQID"]` (JS object) | |
| - `"partial"` → `["bytes", Uint8Array([1,2,3])]` (raw binary) | |
| - `"passthrough"` → `["bytes", Uint8Array([1,2,3])]` (also preserves Date, BigInt, Error) | |
| By default, `send()` accepts a string, and `receive()` returns a string, with Cap'n Web handling the encoding all the way to and from strings. However, transports that want more control over the serialization can declare the property `encodingLevel` to control just how much encoding Cap'n Web does before passing off the message: | |
| * `"string"`: The default. Messages are strings. | |
| * `"json"`: Messages are JSON-compatible objects. The transport is responsible for serializing/deserializing. | |
| * `"jsonWithBytes"`: Like "json" except that byte arrays are left as `Uint8Array` instead of base64-encoded. Handy for use with serializations like CBOR or MessagePack that support this efficiently. | |
| * `"structuredClone"`: Messages are structured-clonable objects. Cap'n Web will only implement special handling of RPC stubs. This is useful when the transport is a `MessagePort` or similar. |
Also let's change the encodingLevel names to these, I think they are more understandable.
There was a problem hiding this comment.
I fixed this, I still think having atleast one example for custom encoding levels is needed. Otherwise most people might not be able to figure this out at a glance?
src/rpc.ts
Outdated
| * const session = new RpcSession(cborTransport, myApi); | ||
| * ``` | ||
| */ | ||
| export function wrapTransport( |
There was a problem hiding this comment.
I don't think this wrapping is sound. For use with CBOR, you have to assume that the underlying transport supports Uint8Array and will pass it through, which is true of the WebSocket transport essentially by coincidence, but wouldn't be expected to be supported by an arbitrary transport.
IMO we shouldn't offer this function. It's not really adding much anyway -- it's not that hard to just write out the wrapper manually.
src/rpc.ts
Outdated
| * Sends a message to the other end. | ||
| */ | ||
| send(message: string): Promise<void>; | ||
| send(message: string | object): Promise<void>; |
There was a problem hiding this comment.
Hmm, it's disappointing that we lose type safety here. It'd be nice to fix that.
One way would be like:
export type RpcTransport = {
encodingLevel?: "string";
send(message: string): Promise<void>;
receive(): Promise<string>;
abort?(reason: any): void;
} | {
encodingLevel: "json" | "jsonWithBytes" | "structuredClone";
send(message: unknown): Promise<void>;
receive(): Promise<unknown>;
abort?(reason: any): void;
}
But that breaks anyone who uses implements RpcTransport today, because RpcTransport is no longer an interface.
Another approach might be:
export interface RpcTransport<Level extends EncodingLevel = "string"> {
readonly encodingLevel: Level;
send(message: Level extends "string" ? string : unknown): Promise<void>;
receive(): Promise<Level extends "string" ? string : unknown>;
abort?(reason: any): void;
}
But this forces the level to be written twice, and there's no apparent way to make evcodingLevel optional in the "string" case, so this still breaks existing users (forcing them to specify encodingLevel).
Ugh.
Maybe the best thing is just to declare two different types:
export interface RpcTransport {
readonly encodingLevel?: "string";
send(message: string): Promise<void>;
receive(): Promise<string>;
abort?(reason: any): void;
}
export interface RpcTransportWithCustomEncoding {
readonly encodingLevel: "json" | "jsonWithBytes" | "structuredClone";
send(message: unknown): Promise<void>;
receive(): Promise<unknown>;
abort?(reason: any): void;
}
And then we just accept RpcTransport | RpcTransportWithCustomEncoding in RpcSession's constructor. Ugly but no breakage.
There was a problem hiding this comment.
Updated to use two interfaces now as mentioned.
src/rpc.ts
Outdated
| // For non-stringify levels, use a rough estimate for flow control. | ||
| // Avoid JSON.stringify since it would fail on non-JSON types (Uint8Array, BigInt, etc.) | ||
| // and defeats the purpose of not stringifying. | ||
| msgLength = Array.isArray(msg) ? msg.length * 100 : 100; |
There was a problem hiding this comment.
This estimate isn't going to work.
I think we'll need the underlying transport to return the actual encoded size from send(). Maybe this is another argument for having a separate interface. It'll need to return a pair like {size, promise}. Or maybe we can just say that it only returns a size, and it's the responsibility of the transport itself to propagate errors to receive() if any are encountered.
There was a problem hiding this comment.
Yea updated this, for structured clone via message port since it isnt super straightforward to estimate the size I am just returning void so that we can skip flow control altogether.
src/websocket.ts
Outdated
| async send(message: string | object): Promise<void> { | ||
| if (this.#sendQueue === undefined) { | ||
| this.#webSocket.send(message); | ||
| this.#webSocket.send(message as string); |
There was a problem hiding this comment.
Your example using wrapTransport() to wrap the WebSocketTransport and feed it CBOR violates this type assertion, but happens to work in practice because it just so happens WebSocket send() accepts byte arrays, and if given one, will deliver a byte array to the other end.
It does seem like many transports will want to build on top of WebSocketTransport and expect it to support bytes.
Maybe we need to declare WebSocketTransport<T extends string | Uint8Array>. It can't be declared implements RpcTransport, but we can separately assert that WebSocketTransport<string> does in fact implement it. But then other transports can use it directly?
There was a problem hiding this comment.
I made this as WebSocketTransport<T extends string | ArrayBuffer = string> since it closely aligns with what the browser websocket is typed as. Uint8Array is any ways a view over this buffer so I think this could be more correct here.
|
|
||
| sendStream(id: ImportId, path: PropertyPath, args: RpcPayload) | ||
| : {promise: Promise<void>, size: number} { | ||
| : {promise: Promise<void>, size?: number} { |
There was a problem hiding this comment.
I see you changed size to optional here, which passes type checks because it ends up being returned by StubHook.stream(), where it just so happens that size is also declared as optional.
However, StubHook.stream()'s contract is that if no size is returned, then the call is local, and promise does not resolve until said local call actually completes. That is not the contract you are providing here. So this breaks streaming in the case that size isn't available -- the caller will queue up the entire stream in a tight loop, creating excessive buffer bloat for large streams.
I think what we will need to do here is, if the underlying transport doesn't provide a size, then we need to compute an estimate of the size by walking the message. We only need to do this in sendStream, and only when the transport doesn't provide a size.
There was a problem hiding this comment.
Hey I put a comment above talking about this. The main reason I made this optional was because it needed to sit nice with structuredClone type of encoding. Its not straightforward to estimate the size there so not sure how to go by for it. If we are planning to compute the size by running JSON.stringify or something similar wouldnt that be additional overhead that we were aiming to reduce with this implementation? Just putting out my thoughts here loudly, happy to fix this in whatever way you see fit. The other transports currently do return size maybe we can somehow mandate that while keeping size here optional?
| ``` | ||
|
|
||
| Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not. | ||
| Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not.ś |
There was a problem hiding this comment.
Seems to be a stray ś added to this line.
But also this file has been rewritten in a different way than what I suggested before, could you please follow the suggestion?
There was a problem hiding this comment.
Hey I put a comment above to clarify my intent here , nevertheless I ll clean this up as per your suggestion if thats what you want here.
|
I'm happy to take over the PR at this point and finish it up if you prefer -- can be a lot faster than back-and-forth code review. |
Hey I'd really like to contribute here, its just that I only get free over the weekend because of work. I can work quickly with you for the next two days if you want to see this through to completion. This would also be my first official open source contribution. |
Implements the encoding level architecture suggested by @kentonv in #133, handling serialization at the transport layer rather than through a global WireFormat hook.
Summary
EncodingLeveltype with four levels:stringify,devalue,partial,passthroughencodingLevelproperty toRpcTransportinterfacewrapTransport()helper for custom binary formats (CBOR, MessagePack, etc.)stringifyis the default)Encoding Levels
stringifydevaluepartialpassthroughMotivation
Binary formats like CBOR can significantly outperform JSON for large payloads by avoiding base64 encoding cycles. This architecture lets transports declare their capabilities and have serialization handled appropriately.