diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8f83c5e..10b8226 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,6 +24,9 @@ jobs: node-version: "22" cache: "npm" + - name: Setup Bun + run: npm install -g bun + - name: Install dependencies run: npm ci @@ -31,7 +34,7 @@ jobs: run: npm run build - name: Run tests - run: npm test + run: npm run test:ci - name: Run type tests run: npm run test:types diff --git a/README.md b/README.md index 3f436fc..c7f0126 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Cap'n Web is a spiritual sibling to [Cap'n Proto](https://capnproto.org) (and is * That said, it integrates nicely with TypeScript. * Also unlike Cap'n Proto, Cap'n Web's underlying serialization is human-readable. In fact, it's just JSON, with a little pre-/post-processing. * It works over HTTP, WebSocket, and postMessage() out-of-the-box, with the ability to extend it to other transports easily. -* It works in all major browsers, Cloudflare Workers, Node.js, and other modern JavaScript runtimes. +* It works in all major browsers, Cloudflare Workers, Node.js, Bun, Deno, and other modern JavaScript runtimes. The whole thing compresses (minify+gzip) to under 10kB with no dependencies. Cap'n Web is more expressive than almost every other RPC system, because it implements an object-capability RPC model. That means it: @@ -630,6 +630,73 @@ Deno.serve(async (req) => { }); ``` +### HTTP server on Bun + +Bun's server-side WebSocket API uses [callback-based handlers](https://bun.sh/docs/runtime/http/websockets) instead of the standard `addEventListener` interface. Cap'n Web provides `newBunWebSocketRpcHandler()` which returns a handler object you can pass directly to `Bun.serve()`. + +> **Note:** The Bun-specific helpers (`newBunWebSocketRpcHandler`, `newBunWebSocketRpcSession`, and `BunWebSocketTransport`) are shipped in a separate build entry point that Bun resolves automatically via the `"bun"` [conditional export](https://nodejs.org/api/packages.html#conditional-exports) in `package.json`. Other runtimes never load this code, keeping bundle size minimal. + +```ts +import { RpcTarget, newBunWebSocketRpcHandler, newHttpBatchRpcResponse } from "capnweb"; + +class MyApiImpl extends RpcTarget implements MyApi { + // ... define API, same as above ... +} + +// Create a WebSocket handler that manages RPC sessions automatically. +// The callback is invoked once per connection to create a fresh API instance. +let rpcHandler = newBunWebSocketRpcHandler(() => new MyApiImpl()); + +Bun.serve({ + async fetch(req, server) { + let url = new URL(req.url); + if (url.pathname === "/api") { + // Upgrade WebSocket requests. + if (req.headers.get("upgrade")?.toLowerCase() === "websocket") { + if (server.upgrade(req)) return; + return new Response("WebSocket upgrade failed", { status: 500 }); + } + + // Handle HTTP batch requests. + let response = await newHttpBatchRpcResponse(req, new MyApiImpl()); + response.headers.set("Access-Control-Allow-Origin", "*"); + return response; + } + + return new Response("Not Found", { status: 404 }); + }, + + // Pass the handler directly — no manual wiring needed. + websocket: rpcHandler, +}); +``` + +If you need to attach custom data to connections or add your own logic to the WebSocket handlers, use `newBunWebSocketRpcSession()` instead, which gives you direct access to the transport: + +```ts +import { newBunWebSocketRpcSession, newHttpBatchRpcResponse, RpcTarget } from "capnweb"; + +class MyApiImpl extends RpcTarget implements MyApi { + // ... define API, same as above ... +} + +Bun.serve({ + fetch(req, server) { + let userId = authenticate(req); + server.upgrade(req, { data: { userId } }); + }, + websocket: { + open(ws) { + let { stub, transport } = newBunWebSocketRpcSession(ws, new MyApiImpl()); + ws.data.transport = transport; + }, + message(ws, msg) { ws.data.transport.dispatchMessage(msg); }, + close(ws, code, reason) { ws.data.transport.dispatchClose(code, reason); }, + error(ws, err) { ws.data.transport.dispatchError(err); }, + }, +}); +``` + ### HTTP server on other runtimes Every runtime does HTTP handling and WebSockets a little differently, although most modern runtimes use the standard `Request` and `Response` types from the Fetch API, as well as the standard `WebSocket` API. You should be able to use these two functions (exported by `capnweb`) to implement both HTTP batch and WebSocket handling on all platforms: @@ -654,6 +721,8 @@ function newWebSocketRpcSession( : Disposable; ``` +Note: Bun's `ServerWebSocket` does not implement the standard `WebSocket` `addEventListener` interface. If you are using Bun, use the dedicated `newBunWebSocketRpcHandler()` or `newBunWebSocketRpcSession()` described above. + ### HTTP server using Hono If your app is built on [Hono](https://hono.dev/) (on any runtime it supports), check out [`@hono/capnweb`](https://github.com/honojs/middleware/tree/main/packages/capnweb). @@ -735,4 +804,25 @@ let stub: RemoteMainInterface = session.getRemoteMain(); // Now we can call methods on the stub. ``` +Cap'n Web's built-in transports (WebSocket, MessagePort, HTTP batch, and Bun ServerWebSocket) are all implemented on top of this interface. + Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not. + +## Development + +```bash +npm install +npm run build +``` + +`npm test` runs the vitest suite, which covers Node, browsers, and Cloudflare Workers. The Bun transport tests require the Bun runtime and are run separately: + +```bash +npm run test:bun +``` + +To run both suites together (as CI does): + +```bash +npm run test:ci +``` diff --git a/__tests__/bun.test.ts b/__tests__/bun.test.ts new file mode 100644 index 0000000..36fd047 --- /dev/null +++ b/__tests__/bun.test.ts @@ -0,0 +1,300 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the MIT license found in the LICENSE.txt file or at: +// https://opensource.org/license/mit + +import { expect, it, describe, afterEach } from "bun:test" +import { RpcSession, RpcTarget, RpcStub } from "../src/index.js" +import { BunWebSocketTransport, newBunWebSocketRpcSession, newBunWebSocketRpcHandler, + newWebSocketRpcSession } from "../src/index-bun.js" +import { Counter, TestTarget } from "./test-util.js"; +import type { Server } from "bun"; + +let servers: Server[] = []; + +afterEach(() => { + for (let server of servers) { + server.stop(true); + } + servers = []; +}); + +// Start a Bun server with the given WebSocket handler and return the URL. +function startServer(handler: ReturnType): string { + let server = Bun.serve({ + port: 0, + fetch(req, server) { + if (server.upgrade(req)) return undefined as any; + return new Response("Not Found", { status: 404 }); + }, + websocket: handler, + }); + servers.push(server); + return `ws://${server.hostname}:${server.port}`; +} + +// Open a WebSocket client and wait for the connection to be established. +function connect(url: string): Promise { + return new Promise((resolve, reject) => { + let ws = new WebSocket(url); + ws.addEventListener("open", () => resolve(ws)); + ws.addEventListener("error", reject); + }); +} + +// Start a server with the handler helper and return a connected RPC stub. +async function startRpcSession(createMain: () => any) { + let handler = newBunWebSocketRpcHandler(createMain); + let url = startServer(handler); + let stub = newWebSocketRpcSession(url); + // Wait for the connection to be ready by making a throwaway property access. + // newWebSocketRpcSession returns immediately; the connection opens in the + // background. Awaiting any call ensures the handshake has completed. + return stub as any; +} + +describe("BunWebSocketTransport", () => { + it("sends and receives messages through a real Bun ServerWebSocket", async () => { + // Stand up a simple echo server that bounces messages back through the + // transport's dispatch path, exercising real send/receive. + let serverTransport: BunWebSocketTransport | undefined; + let handler = { + open(ws: any) { + serverTransport = new BunWebSocketTransport(ws); + }, + message(ws: any, msg: string | Buffer) { + serverTransport!.dispatchMessage(msg); + }, + close(ws: any, code: number, reason: string) { + serverTransport!.dispatchClose(code, reason); + }, + error(ws: any, err: Error) { + serverTransport!.dispatchError(err); + }, + }; + + let url = startServer(handler as any); + let clientWs = await connect(url); + + // Send a message from the client; the server handler dispatches it into the + // transport. Then read it back from the server transport to verify the real + // socket delivered it. + clientWs.send("hello from client"); + let received = await serverTransport!.receive(); + expect(received).toBe("hello from client"); + + // Send a second message to confirm queuing works. + clientWs.send("second"); + expect(await serverTransport!.receive()).toBe("second"); + + clientWs.close(); + }); + + it("receive() rejects when the client closes the connection", async () => { + let serverTransport: BunWebSocketTransport | undefined; + let handler = { + open(ws: any) { serverTransport = new BunWebSocketTransport(ws); }, + message(ws: any, msg: string | Buffer) { serverTransport!.dispatchMessage(msg); }, + close(ws: any, code: number, reason: string) { serverTransport!.dispatchClose(code, reason); }, + error(ws: any, err: Error) { serverTransport!.dispatchError(err); }, + }; + + let url = startServer(handler as any); + let clientWs = await connect(url); + + let receivePromise = serverTransport!.receive(); + clientWs.close(1000, "bye"); + await expect(receivePromise).rejects.toThrow("Peer closed WebSocket"); + }); + + it("abort() closes the real socket with code 3000", async () => { + let serverTransport: BunWebSocketTransport | undefined; + let closed = new Promise<{ code: number; reason: string }>((resolve) => { + let handler = { + open(ws: any) { serverTransport = new BunWebSocketTransport(ws); }, + message() {}, + close(ws: any, code: number, reason: string) { resolve({ code, reason }); }, + error() {}, + }; + startServer(handler as any); + }); + + // The server was started inside the promise — grab the URL from the + // last registered server. + let url = `ws://${servers[servers.length - 1].hostname}:${servers[servers.length - 1].port}`; + await connect(url); + + serverTransport!.abort!(new Error("test abort")); + let result = await closed; + expect(result.code).toBe(3000); + expect(result.reason).toBe("test abort"); + }); +}); + +describe("newBunWebSocketRpcSession", () => { + it("returns { stub, transport } over a real connection", async () => { + let serverResult: { stub: any; transport: BunWebSocketTransport } | undefined; + let handler = { + open(ws: any) { + serverResult = newBunWebSocketRpcSession(ws, new TestTarget()); + }, + message(ws: any, msg: string | Buffer) { serverResult!.transport.dispatchMessage(msg); }, + close(ws: any, code: number, reason: string) { serverResult!.transport.dispatchClose(code, reason); }, + error(ws: any, err: Error) { serverResult!.transport.dispatchError(err); }, + }; + + let url = startServer(handler as any); + let clientWs = await connect(url); + + expect(serverResult).toBeDefined(); + expect(serverResult!.stub).toBeDefined(); + expect(serverResult!.transport).toBeInstanceOf(BunWebSocketTransport); + + clientWs.close(); + }); + + it("full RPC round-trip with manual wiring", async () => { + let serverTransport: BunWebSocketTransport | undefined; + let handler = { + open(ws: any) { + let result = newBunWebSocketRpcSession(ws, new TestTarget()); + serverTransport = result.transport; + }, + message(ws: any, msg: string | Buffer) { serverTransport!.dispatchMessage(msg); }, + close(ws: any, code: number, reason: string) { serverTransport!.dispatchClose(code, reason); }, + error(ws: any, err: Error) { serverTransport!.dispatchError(err); }, + }; + + let url = startServer(handler as any); + let stub: any = newWebSocketRpcSession(url); + + expect(await stub.square(5)).toBe(25); + expect(await stub.square(3)).toBe(9); + + stub[Symbol.dispose](); + }); + + it("error propagation over a real connection", async () => { + let serverTransport: BunWebSocketTransport | undefined; + let handler = { + open(ws: any) { + let result = newBunWebSocketRpcSession(ws, new TestTarget()); + serverTransport = result.transport; + }, + message(ws: any, msg: string | Buffer) { serverTransport!.dispatchMessage(msg); }, + close(ws: any, code: number, reason: string) { serverTransport!.dispatchClose(code, reason); }, + error(ws: any, err: Error) { serverTransport!.dispatchError(err); }, + }; + + let url = startServer(handler as any); + let stub: any = newWebSocketRpcSession(url); + + await expect(Promise.resolve(stub.throwError())).rejects.toThrow("test error"); + + stub[Symbol.dispose](); + }); + + it("bi-directional RPC (server calls client callback)", async () => { + let serverTransport: BunWebSocketTransport | undefined; + let handler = { + open(ws: any) { + let result = newBunWebSocketRpcSession(ws, new TestTarget()); + serverTransport = result.transport; + }, + message(ws: any, msg: string | Buffer) { serverTransport!.dispatchMessage(msg); }, + close(ws: any, code: number, reason: string) { serverTransport!.dispatchClose(code, reason); }, + error(ws: any, err: Error) { serverTransport!.dispatchError(err); }, + }; + + let url = startServer(handler as any); + let stub: any = newWebSocketRpcSession(url); + + let double = new RpcStub((x: number) => x * 2); + let result = await stub.callFunction(double, 7); + expect(await result.result).toBe(14); + + stub[Symbol.dispose](); + }); +}); + +describe("newBunWebSocketRpcHandler", () => { + it("returns object with open/message/close/error", () => { + let handler = newBunWebSocketRpcHandler(() => new TestTarget()); + expect(typeof handler.open).toBe("function"); + expect(typeof handler.message).toBe("function"); + expect(typeof handler.close).toBe("function"); + expect(typeof handler.error).toBe("function"); + }); + + it("serves RPC over a real Bun server", async () => { + let stub: any = await startRpcSession(() => new TestTarget()); + + expect(await stub.square(7)).toBe(49); + expect(await stub.square(3)).toBe(9); + + stub[Symbol.dispose](); + }); + + it("creates a fresh API instance per connection", async () => { + let connectionCount = 0; + let handler = newBunWebSocketRpcHandler(() => { + connectionCount++; + return new TestTarget(); + }); + let url = startServer(handler); + + let stub1: any = newWebSocketRpcSession(url); + await stub1.square(1); + + let stub2: any = newWebSocketRpcSession(url); + await stub2.square(2); + + expect(connectionCount).toBe(2); + + stub1[Symbol.dispose](); + stub2[Symbol.dispose](); + }); +}); + +describe("Bun transport: full RPC integration", () => { + it("square(i)", async () => { + let stub: any = await startRpcSession(() => new TestTarget()); + expect(await stub.square(4)).toBe(16); + stub[Symbol.dispose](); + }); + + it("makeCounter + incrementCounter", async () => { + let stub: any = await startRpcSession(() => new TestTarget()); + let counter = stub.makeCounter(10); + expect(await stub.incrementCounter(counter, 5)).toBe(15); + expect(await stub.incrementCounter(counter)).toBe(16); + stub[Symbol.dispose](); + }); + + it("throwError", async () => { + let stub: any = await startRpcSession(() => new TestTarget()); + await expect(Promise.resolve(stub.throwError())).rejects.toThrow("test error"); + stub[Symbol.dispose](); + }); + + it("callSquare(self, i)", async () => { + let stub: any = await startRpcSession(() => new TestTarget()); + let result = await stub.callSquare(stub, 6); + expect(await result.result).toBe(36); + stub[Symbol.dispose](); + }); + + it("generateFibonacci(10)", async () => { + let stub: any = await startRpcSession(() => new TestTarget()); + let fib = await stub.generateFibonacci(10); + expect(fib).toEqual([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); + stub[Symbol.dispose](); + }); + + it("returnNull / returnUndefined / returnNumber", async () => { + let stub: any = await startRpcSession(() => new TestTarget()); + expect(await stub.returnNull()).toBe(null); + expect(await stub.returnUndefined()).toBe(undefined); + expect(await stub.returnNumber(42)).toBe(42); + stub[Symbol.dispose](); + }); +}); diff --git a/package.json b/package.json index 6309385..f6e6a2d 100644 --- a/package.json +++ b/package.json @@ -14,10 +14,12 @@ "types": "./dist/index.d.ts", "import": { "workerd": "./dist/index-workers.js", + "bun": "./dist/index-bun.js", "default": "./dist/index.js" }, "require": { "workerd": "./dist/index-workers.cjs", + "bun": "./dist/index-bun.cjs", "default": "./dist/index.cjs" } } @@ -30,6 +32,8 @@ "build": "tsup", "build:watch": "tsup --watch", "test": "vitest run", + "test:bun": "bun test __tests__/bun.test.ts", + "test:ci": "vitest run && bun test __tests__/bun.test.ts", "test:watch": "vitest", "test:types": "tsc -p __type-tests__/tsconfig.json --noEmit", "prepublishOnly": "npm run build" diff --git a/src/bun.ts b/src/bun.ts new file mode 100644 index 0000000..124ea33 --- /dev/null +++ b/src/bun.ts @@ -0,0 +1,140 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the MIT license found in the LICENSE.txt file or at: +// https://opensource.org/license/mit + +import { RpcStub } from "./core.js"; +import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; +import { RpcTargetBranded } from "./types.js"; + +// Minimal interface matching Bun's ServerWebSocket. Avoids a hard dependency on @types/bun. +export interface BunServerWebSocket { + send(data: string | ArrayBuffer | Uint8Array): number; + close(code?: number, reason?: string): void; + readonly readyState: number; + data: any; +} + +/** + * Start an RPC session over a Bun ServerWebSocket. + * + * Returns both the stub and the transport. The transport must be wired to Bun's + * `WebSocketHandler` callbacks (`message`, `close`, `error`) by calling its + * `dispatchMessage`, `dispatchClose`, and `dispatchError` methods. + * + * For a zero-wiring alternative, see `newBunWebSocketRpcHandler`. + */ +export function newBunWebSocketRpcSession( + ws: BunServerWebSocket, localMain?: any, + options?: RpcSessionOptions): { stub: RpcStub, transport: BunWebSocketTransport } { + let transport = new BunWebSocketTransport(ws); + let rpc = new RpcSession(transport, localMain, options); + return { stub: rpc.getRemoteMain(), transport }; +} + +/** + * Create a Bun `WebSocketHandler` object that manages RPC sessions automatically. + * + * The returned object can be passed directly as the `websocket` option to `Bun.serve()`. + * A fresh `localMain` is created for each connection via the `createMain` callback. + * The transport is stored on `ws.data.__capnwebTransport`. + * + * @param createMain Called once per connection to create the main RPC interface for that client. + * @param options Optional RPC session options applied to every connection. + */ +export function newBunWebSocketRpcHandler(createMain: () => RpcTargetBranded, options?: RpcSessionOptions) { + return { + open(ws: BunServerWebSocket) { + let transport = new BunWebSocketTransport(ws); + let rpc = new RpcSession(transport, createMain(), options); + ws.data = { __capnwebTransport: transport, __capnwebStub: rpc.getRemoteMain() }; + }, + message(ws: BunServerWebSocket, message: string | Buffer) { + ws.data.__capnwebTransport.dispatchMessage(message); + }, + close(ws: BunServerWebSocket, code: number, reason: string) { + ws.data.__capnwebTransport.dispatchClose(code, reason); + }, + error(ws: BunServerWebSocket, error: Error) { + ws.data.__capnwebTransport.dispatchError(error); + }, + }; +} + +export class BunWebSocketTransport implements RpcTransport { + constructor (ws: BunServerWebSocket) { + this.#ws = ws; + } + + #ws: BunServerWebSocket; + #receiveResolver?: (message: string) => void; + #receiveRejecter?: (err: any) => void; + #receiveQueue: string[] = []; + #error?: any; + + async send(message: string): Promise { + this.#ws.send(message); + } + + async receive(): Promise { + if (this.#receiveQueue.length > 0) { + return this.#receiveQueue.shift()!; + } else if (this.#error) { + throw this.#error; + } else { + return new Promise((resolve, reject) => { + this.#receiveResolver = resolve; + this.#receiveRejecter = reject; + }); + } + } + + abort?(reason: any): void { + let message: string; + if (reason instanceof Error) { + message = reason.message; + } else { + message = `${reason}`; + } + this.#ws.close(3000, message); + + if (!this.#error) { + this.#error = reason; + // No need to call receiveRejecter(); RPC implementation will stop listening anyway. + } + } + + dispatchMessage(data: string | Buffer): void { + if (this.#error) { + return; + } + + let strData = typeof data === "string" ? data : data.toString("utf-8"); + + if (this.#receiveResolver) { + this.#receiveResolver(strData); + this.#receiveResolver = undefined; + this.#receiveRejecter = undefined; + } else { + this.#receiveQueue.push(strData); + } + } + + dispatchClose(code: number, reason: string): void { + this.#receivedError(new Error(`Peer closed WebSocket: ${code} ${reason}`)); + } + + dispatchError(error: Error): void { + this.#receivedError(new Error(`WebSocket connection failed.`)); + } + + #receivedError(reason: any) { + if (!this.#error) { + this.#error = reason; + if (this.#receiveRejecter) { + this.#receiveRejecter(reason); + this.#receiveResolver = undefined; + this.#receiveRejecter = undefined; + } + } + } +} diff --git a/src/index-bun.ts b/src/index-bun.ts new file mode 100644 index 0000000..2314946 --- /dev/null +++ b/src/index-bun.ts @@ -0,0 +1,34 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the MIT license found in the LICENSE.txt file or at: +// https://opensource.org/license/mit + +export * from "./index.js"; + +import { RpcTransport, RpcSessionOptions } from "./rpc.js"; +import { RpcCompatible } from "./types.js"; +import { newBunWebSocketRpcSession as newBunWebSocketRpcSessionImpl, + newBunWebSocketRpcHandler, + BunWebSocketTransport } from "./bun.js"; + +export { newBunWebSocketRpcHandler, BunWebSocketTransport }; +export type { BunServerWebSocket } from "./bun.js"; + +// Re-declare with the proper type so callers get full typing. +import type { RpcStub } from "./index.js"; + +interface Empty {} + +/** + * Start an RPC session over a Bun ServerWebSocket. + * + * Returns both the RPC stub and the transport. The transport exposes `dispatchMessage`, + * `dispatchClose`, and `dispatchError` methods that must be wired to Bun's `WebSocketHandler` + * callbacks. For a zero-wiring alternative, use `newBunWebSocketRpcHandler` instead. + * + * @param ws The Bun ServerWebSocket from the `open` callback. + * @param localMain The main RPC interface to expose to the peer. + */ +export let newBunWebSocketRpcSession: = Empty> + (ws: import("./bun.js").BunServerWebSocket, localMain?: any, + options?: RpcSessionOptions) => { stub: RpcStub, transport: BunWebSocketTransport } = + newBunWebSocketRpcSessionImpl; diff --git a/tsup.config.ts b/tsup.config.ts index a09c96d..240f467 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -5,7 +5,7 @@ import { defineConfig } from 'tsup' export default defineConfig({ - entry: ['src/index.ts', 'src/index-workers.ts'], + entry: ['src/index.ts', 'src/index-workers.ts', 'src/index-bun.ts'], format: ['esm', 'cjs'], external: ['cloudflare:workers'], dts: true,