From c053b0606dfee8cc853c76c362e32b2eb12754ce Mon Sep 17 00:00:00 2001 From: VastBlast <48421698+VastBlast@users.noreply.github.com> Date: Thu, 19 Mar 2026 03:35:44 -0400 Subject: [PATCH 1/3] fix: websocket session promise retention leak --- src/rpc.ts | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/src/rpc.ts b/src/rpc.ts index 4801a27..0850a70 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -323,7 +323,7 @@ class RpcSessionImpl implements Importer, Exporter { private reverseExports: Map = new Map(); private imports: Array = []; private abortReason?: any; - private cancelReadLoop: (error: any) => void; + private cancelReadLoop?: (error: any) => void; // We assign positive numbers to imports we initiate, and negative numbers to exports we // initiate. So the next import ID is just `imports.length`, but the next export ID needs @@ -348,11 +348,7 @@ class RpcSessionImpl implements Importer, Exporter { // Import zero is the other side's bootstrap object. this.imports.push(new ImportTableEntry(this, 0, false)); - let rejectFunc: (error: any) => void;; - let abortPromise = new Promise((resolve, reject) => { rejectFunc = reject; }); - this.cancelReadLoop = rejectFunc!; - - this.readLoop(abortPromise).catch(err => this.abort(err)); + this.readLoop().catch(err => this.abort(err)); } // Should only be called once immediately after construction. @@ -684,7 +680,8 @@ class RpcSessionImpl implements Importer, Exporter { // Don't double-abort. if (this.abortReason !== undefined) return; - this.cancelReadLoop(error); + this.cancelReadLoop?.(error); + this.cancelReadLoop = undefined; if (trySendAbortMessage) { try { @@ -734,9 +731,9 @@ class RpcSessionImpl implements Importer, Exporter { } } - private async readLoop(abortPromise: Promise) { + private async readLoop() { while (!this.abortReason) { - let msg = JSON.parse(await Promise.race([this.transport.receive(), abortPromise])); + let msg = JSON.parse(await this.receiveOrAbort()); if (this.abortReason) break; // check again before processing if (msg instanceof Array) { @@ -848,6 +845,27 @@ class RpcSessionImpl implements Importer, Exporter { } } + // Use a fresh cancellation promise for each read. Reusing one session-long promise here causes + // Promise.race() to accumulate reactions until the session is shut down. + private receiveOrAbort(): Promise { + let readCanceled = Promise.withResolvers(); + this.cancelReadLoop = readCanceled.reject; + + let receivePromise: Promise; + try { + receivePromise = this.transport.receive(); + } catch (err) { + this.cancelReadLoop = undefined; + return Promise.reject(err); + } + + return Promise.race([receivePromise, readCanceled.promise]).finally(() => { + if (this.cancelReadLoop === readCanceled.reject) { + this.cancelReadLoop = undefined; + } + }); + } + async drain(): Promise { if (this.abortReason) { throw this.abortReason; From 0f6c59d0fa2b8d09f39e302a4fb1d23a8d177849 Mon Sep 17 00:00:00 2001 From: VastBlast <48421698+VastBlast@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:50:13 -0400 Subject: [PATCH 2/3] fix: eagerly resolve callback call results to avoid memory leak --- __tests__/index.test.ts | 30 ++++++++++++++++++++++++++++++ src/rpc.ts | 17 +++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/__tests__/index.test.ts b/__tests__/index.test.ts index 3875857..969705a 100644 --- a/__tests__/index.test.ts +++ b/__tests__/index.test.ts @@ -1308,6 +1308,36 @@ describe("stub disposal over RPC", () => { }); }); +describe("callback calls", () => { + class CallbackTarget extends RpcTarget { + async callMany(callback: RpcStub<(text: string) => unknown>, count: number) { + let cb = callback.dup(); + try { + for (let i = 0; i < count; ++i) { + cb(`value-${i}`); + await pumpMicrotasks(); + } + } finally { + cb[Symbol.dispose](); + } + } + } + + it("does not retain ignored synchronous callback results", async () => { + await using harness = new TestHarness(new CallbackTarget()); + await harness.stub.callMany((_text: string) => {}, 5); + await pumpMicrotasks(); + harness.checkAllDisposed(); + }); + + it("does not retain ignored synchronous object callback results", async () => { + await using harness = new TestHarness(new CallbackTarget()); + await harness.stub.callMany((text: string) => ({text}), 5); + await pumpMicrotasks(); + harness.checkAllDisposed(); + }); +}); + describe("e-order", () => { it("maintains e-order for concurrent calls on single stub", async () => { let callOrder: number[] = []; diff --git a/src/rpc.ts b/src/rpc.ts index 0850a70..51ec94c 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -581,6 +581,17 @@ class RpcSessionImpl implements Importer, Exporter { return msgText.length; } + private isDirectFunctionCall(expression: unknown): boolean { + if (!(expression instanceof Array)) return false; + + let [kind, _id, path, args] = expression; + return kind === "pipeline" && + expression.length === 4 && + path instanceof Array && + path.length === 0 && + args instanceof Array; + } + sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook { if (this.abortReason) throw this.abortReason; @@ -748,7 +759,13 @@ class RpcSessionImpl implements Importer, Exporter { // treated as an unhandled rejection on our end. hook.ignoreUnhandledRejections(); + let exportId = this.exports.length; this.exports.push({ hook, refcount: 1 }); + if (this.isDirectFunctionCall(msg[1])) { + // Direct function calls can be resolved immediately. This lets unused results + // be released right away instead of staying alive for the rest of the session. + this.ensureResolvingExport(exportId); + } continue; } break; From 4fc13b0eeca6056de500720a5d7b37674817eb26 Mon Sep 17 00:00:00 2001 From: VastBlast <48421698+VastBlast@users.noreply.github.com> Date: Sun, 22 Mar 2026 18:05:05 -0400 Subject: [PATCH 3/3] rewrite logic to fix changes causing test failures, inline rpc read cancellation without delaying cleanup --- src/rpc.ts | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/src/rpc.ts b/src/rpc.ts index 51ec94c..8c2b577 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -744,7 +744,20 @@ class RpcSessionImpl implements Importer, Exporter { private async readLoop() { while (!this.abortReason) { - let msg = JSON.parse(await this.receiveOrAbort()); + // Each receive needs its own abort promise so Promise.race() doesn't keep old reads. + let readCanceled = Promise.withResolvers(); + this.cancelReadLoop = readCanceled.reject; + + let msgText: string; + try { + msgText = await Promise.race([this.transport.receive(), readCanceled.promise]); + } finally { + if (this.cancelReadLoop === readCanceled.reject) { + this.cancelReadLoop = undefined; + } + } + + let msg = JSON.parse(msgText); if (this.abortReason) break; // check again before processing if (msg instanceof Array) { @@ -862,27 +875,6 @@ class RpcSessionImpl implements Importer, Exporter { } } - // Use a fresh cancellation promise for each read. Reusing one session-long promise here causes - // Promise.race() to accumulate reactions until the session is shut down. - private receiveOrAbort(): Promise { - let readCanceled = Promise.withResolvers(); - this.cancelReadLoop = readCanceled.reject; - - let receivePromise: Promise; - try { - receivePromise = this.transport.receive(); - } catch (err) { - this.cancelReadLoop = undefined; - return Promise.reject(err); - } - - return Promise.race([receivePromise, readCanceled.promise]).finally(() => { - if (this.cancelReadLoop === readCanceled.reject) { - this.cancelReadLoop = undefined; - } - }); - } - async drain(): Promise { if (this.abortReason) { throw this.abortReason;