Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 53 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -692,27 +692,36 @@ Note that you should not use a `Window` object itself as a port for RPC -- you s

### Custom transports

You can implement a custom RPC transport across any bidirectional stream. To do so, implement the interface `RpcTransport`, which is defined as follows:
You can implement a custom RPC transport across any bidirectional stream. For string-based transports, implement `RpcTransport`:

```ts
// Interface for an RPC transport, which is a simple bidirectional message stream.
export interface RpcTransport {
// Sends a message to the other end.
send(message: string): Promise<void>;
// Sends a JSON string message. Returns the byte size of the message.
// Transport errors should be propagated via receive() rejecting.
send(message: string): number;

// Receives a message sent by the other end.
//
// If and when the transport becomes disconnected, this will reject. The thrown error will be
// propagated to all outstanding calls and future calls on any stubs associated with the session.
// If there are no outstanding calls (and none are made in the future), then the error does not
// propagate anywhere -- this is considered a "clean" shutdown.
receive(): Promise<string>;

// Indicates that the RPC system has suffered an error that prevents the session from continuing.
// The transport should ideally try to send any queued messages if it can, and then close the
// connection. (It's not strictly necessary to deliver queued messages, but the last message sent
// before abort() is called is often an "abort" message, which communicates the error to the
// peer, so if that is dropped, the peer may have less information about what happened.)
// Called when the RPC system needs to abort the session.
abort?(reason: any): void;
}
```

For transports with custom encoding (CBOR, MessagePack, structured clone, etc.), implement `RpcTransportWithCustomEncoding`:

```ts
export interface RpcTransportWithCustomEncoding {
// Declares what encoding level this transport uses.
readonly encodingLevel: "json" | "jsonWithBytes" | "structuredClone";

// Encodes and sends a message. Returns the encoded byte size if known
// (for flow control), or void if unavailable (e.g. structured clone).
send(message: unknown): number | void;

// Receives and decodes a message.
receive(): Promise<unknown>;

abort?(reason: any): void;
}
```
Expand All @@ -735,4 +744,33 @@ let stub: RemoteMainInterface = session.getRemoteMain();
// Now we can call methods on the stub.
```

Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not.
Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not.ś
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be a stray ś added to this line.

But also this file has been rewritten in a different way than what I suggested before, could you please follow the suggestion?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey I put a comment above to clarify my intent here , nevertheless I ll clean this up as per your suggestion if thats what you want here.


#### Custom encoding levels

By default, `RpcTransport` sends and receives JSON strings. To use a binary format like CBOR or MessagePack, implement `RpcTransportWithCustomEncoding` instead, which declares an `encodingLevel` so the RPC system knows how much serialization to do before handing messages to the transport:

```ts
import { RpcTransportWithCustomEncoding, RpcSession } from "capnweb";
import * as cbor from "cbor-x";

class CborTransport implements RpcTransportWithCustomEncoding {
readonly encodingLevel = "jsonWithBytes"; // Uint8Array stays raw for CBOR

send(msg: unknown): number {
const encoded = cbor.encode(msg);
this.ws.send(encoded);
return encoded.byteLength;
}

async receive(): Promise<unknown> {
return cbor.decode(new Uint8Array(await this.nextMessage()));
}

abort(reason: any) { this.ws.close(3000, String(reason)); }
}

const session = new RpcSession<MyApi>(new CborTransport(ws));
```

The available encoding levels are `"json"` (JSON-compatible object tree), `"jsonWithBytes"` (same but `Uint8Array` stays raw), and `"structuredClone"` (native types like `Date`, `BigInt`, `Error` also pass through). The built-in `MessagePort` transport uses `"structuredClone"` automatically.
6 changes: 3 additions & 3 deletions __tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class TestTransport implements RpcTransport {
public log = false;
private fenced = false;

async send(message: string): Promise<void> {
send(message: string): void {
// HACK: If the string "$remove$" appears in the message, remove it. This is used in some
// tests to hack the RPC protocol.
message = message.replaceAll("$remove$", "");
Expand Down Expand Up @@ -1846,15 +1846,15 @@ describe("WritableStream over RPC", () => {
// Collect all messages sent by the server (which appear in the client's queue).
let serverMessages: any[] = [];
let origServerSend = harness.serverTransport.send;
harness.serverTransport.send = async function(message: string) {
harness.serverTransport.send = function(message: string) {
serverMessages.push(JSON.parse(message));
return origServerSend.call(this, message);
};

// Collect all messages sent by the client (which appear in the server's queue).
let clientMessages: any[] = [];
let origClientSend = harness.clientTransport.send;
harness.clientTransport.send = async function(message: string) {
harness.clientTransport.send = function(message: string) {
clientMessages.push(JSON.parse(message));
return origClientSend.call(this, message);
};
Expand Down
5 changes: 3 additions & 2 deletions src/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class BatchClientTransport implements RpcTransport {
#batchToSend: string[] | null = [];
#batchToReceive: string[] | null = null;

async send(message: string): Promise<void> {
send(message: string): void {
// If the batch was already sent, we just ignore the message, because throwing may cause the
// RPC system to abort prematurely. Once the last receive() is done then we'll throw an error
// that aborts the RPC system at the right time and will propagate to all other requests.
Expand Down Expand Up @@ -98,8 +98,9 @@ class BatchServerTransport implements RpcTransport {
#batchToReceive: string[];
#allReceived: PromiseWithResolvers<void> = Promise.withResolvers<void>();

async send(message: string): Promise<void> {
send(message: string): number {
this.#batchToSend.push(message);
return message.length;
}

async receive(): Promise<string> {
Expand Down
9 changes: 5 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// https://opensource.org/license/mit

import { RpcTarget as RpcTargetImpl, RpcStub as RpcStubImpl, RpcPromise as RpcPromiseImpl } from "./core.js";
import { serialize, deserialize } from "./serialize.js";
import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js";
import { serialize, deserialize, EncodingLevel } from "./serialize.js";
import { RpcTransport, RpcTransportWithCustomEncoding, AnyRpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js";
import { RpcTargetBranded, RpcCompatible, Stub, Stubify, __RPC_TARGET_BRAND } from "./types.js";
import { newWebSocketRpcSession as newWebSocketRpcSessionImpl,
newWorkersWebSocketRpcResponse } from "./websocket.js";
Expand All @@ -20,7 +20,8 @@ forceInitStreams();
// Re-export public API types.
export { serialize, deserialize, newWorkersWebSocketRpcResponse, newHttpBatchRpcResponse,
nodeHttpBatchRpcResponse };
export type { RpcTransport, RpcSessionOptions, RpcCompatible };
export type { RpcTransport, RpcTransportWithCustomEncoding, AnyRpcTransport,
RpcSessionOptions, RpcCompatible, EncodingLevel };

// Hack the type system to make RpcStub's types work nicely!
/**
Expand Down Expand Up @@ -76,7 +77,7 @@ export interface RpcSession<T extends RpcCompatible<T> = undefined> {
}
export const RpcSession: {
new <T extends RpcCompatible<T> = undefined>(
transport: RpcTransport, localMain?: any, options?: RpcSessionOptions): RpcSession<T>;
transport: AnyRpcTransport, localMain?: any, options?: RpcSessionOptions): RpcSession<T>;
} = <any>RpcSessionImpl;

// RpcTarget needs some hackage too to brand it properly and account for the implementation
Expand Down
25 changes: 13 additions & 12 deletions src/messageport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// https://opensource.org/license/mit

import { RpcStub } from "./core.js";
import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js";
import { RpcTransportWithCustomEncoding, RpcSession, RpcSessionOptions } from "./rpc.js";

// Start a MessagePort session given a MessagePort or a pair of MessagePorts.
//
Expand All @@ -16,7 +16,9 @@ export function newMessagePortRpcSession(
return rpc.getRemoteMain();
}

class MessagePortTransport implements RpcTransport {
class MessagePortTransport implements RpcTransportWithCustomEncoding {
readonly encodingLevel = "structuredClone" as const;

constructor (port: MessagePort) {
this.#port = port;

Expand All @@ -29,16 +31,15 @@ class MessagePortTransport implements RpcTransport {
} else if (event.data === null) {
// Peer is signaling that they're closing the connection
this.#receivedError(new Error("Peer closed MessagePort connection."));
} else if (typeof event.data === "string") {
} else {
// Accept any structured-clonable data
if (this.#receiveResolver) {
this.#receiveResolver(event.data);
this.#receiveResolver = undefined;
this.#receiveRejecter = undefined;
} else {
this.#receiveQueue.push(event.data);
}
} else {
this.#receivedError(new TypeError("Received non-string message from MessagePort."));
}
});

Expand All @@ -48,32 +49,32 @@ class MessagePortTransport implements RpcTransport {
}

#port: MessagePort;
#receiveResolver?: (message: string) => void;
#receiveResolver?: (message: unknown) => void;
#receiveRejecter?: (err: any) => void;
#receiveQueue: string[] = [];
#receiveQueue: unknown[] = [];
#error?: any;

async send(message: string): Promise<void> {
send(message: unknown): void {
if (this.#error) {
throw this.#error;
}
this.#port.postMessage(message);
}

async receive(): Promise<string> {
async receive(): Promise<unknown> {
if (this.#receiveQueue.length > 0) {
return this.#receiveQueue.shift()!;
} else if (this.#error) {
throw this.#error;
} else {
return new Promise<string>((resolve, reject) => {
return new Promise<unknown>((resolve, reject) => {
this.#receiveResolver = resolve;
this.#receiveRejecter = reject;
});
}
}

abort?(reason: any): void {
abort(reason: any): void {
// Send close signal to peer before closing
try {
this.#port.postMessage(null);
Expand All @@ -99,4 +100,4 @@ class MessagePortTransport implements RpcTransport {
}
}
}
}
}
Loading