diff --git a/__tests__/index.test.ts b/__tests__/index.test.ts index 3875857..969705a 100644 --- a/__tests__/index.test.ts +++ b/__tests__/index.test.ts @@ -1308,6 +1308,36 @@ describe("stub disposal over RPC", () => { }); }); +describe("callback calls", () => { + class CallbackTarget extends RpcTarget { + async callMany(callback: RpcStub<(text: string) => unknown>, count: number) { + let cb = callback.dup(); + try { + for (let i = 0; i < count; ++i) { + cb(`value-${i}`); + await pumpMicrotasks(); + } + } finally { + cb[Symbol.dispose](); + } + } + } + + it("does not retain ignored synchronous callback results", async () => { + await using harness = new TestHarness(new CallbackTarget()); + await harness.stub.callMany((_text: string) => {}, 5); + await pumpMicrotasks(); + harness.checkAllDisposed(); + }); + + it("does not retain ignored synchronous object callback results", async () => { + await using harness = new TestHarness(new CallbackTarget()); + await harness.stub.callMany((text: string) => ({text}), 5); + await pumpMicrotasks(); + harness.checkAllDisposed(); + }); +}); + describe("e-order", () => { it("maintains e-order for concurrent calls on single stub", async () => { let callOrder: number[] = []; diff --git a/src/rpc.ts b/src/rpc.ts index 4801a27..8c2b577 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -323,7 +323,7 @@ class RpcSessionImpl implements Importer, Exporter { private reverseExports: Map = new Map(); private imports: Array = []; private abortReason?: any; - private cancelReadLoop: (error: any) => void; + private cancelReadLoop?: (error: any) => void; // We assign positive numbers to imports we initiate, and negative numbers to exports we // initiate. So the next import ID is just `imports.length`, but the next export ID needs @@ -348,11 +348,7 @@ class RpcSessionImpl implements Importer, Exporter { // Import zero is the other side's bootstrap object. this.imports.push(new ImportTableEntry(this, 0, false)); - let rejectFunc: (error: any) => void;; - let abortPromise = new Promise((resolve, reject) => { rejectFunc = reject; }); - this.cancelReadLoop = rejectFunc!; - - this.readLoop(abortPromise).catch(err => this.abort(err)); + this.readLoop().catch(err => this.abort(err)); } // Should only be called once immediately after construction. @@ -585,6 +581,17 @@ class RpcSessionImpl implements Importer, Exporter { return msgText.length; } + private isDirectFunctionCall(expression: unknown): boolean { + if (!(expression instanceof Array)) return false; + + let [kind, _id, path, args] = expression; + return kind === "pipeline" && + expression.length === 4 && + path instanceof Array && + path.length === 0 && + args instanceof Array; + } + sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook { if (this.abortReason) throw this.abortReason; @@ -684,7 +691,8 @@ class RpcSessionImpl implements Importer, Exporter { // Don't double-abort. if (this.abortReason !== undefined) return; - this.cancelReadLoop(error); + this.cancelReadLoop?.(error); + this.cancelReadLoop = undefined; if (trySendAbortMessage) { try { @@ -734,9 +742,22 @@ class RpcSessionImpl implements Importer, Exporter { } } - private async readLoop(abortPromise: Promise) { + private async readLoop() { while (!this.abortReason) { - let msg = JSON.parse(await Promise.race([this.transport.receive(), abortPromise])); + // Each receive needs its own abort promise so Promise.race() doesn't keep old reads. + let readCanceled = Promise.withResolvers(); + this.cancelReadLoop = readCanceled.reject; + + let msgText: string; + try { + msgText = await Promise.race([this.transport.receive(), readCanceled.promise]); + } finally { + if (this.cancelReadLoop === readCanceled.reject) { + this.cancelReadLoop = undefined; + } + } + + let msg = JSON.parse(msgText); if (this.abortReason) break; // check again before processing if (msg instanceof Array) { @@ -751,7 +772,13 @@ class RpcSessionImpl implements Importer, Exporter { // treated as an unhandled rejection on our end. hook.ignoreUnhandledRejections(); + let exportId = this.exports.length; this.exports.push({ hook, refcount: 1 }); + if (this.isDirectFunctionCall(msg[1])) { + // Direct function calls can be resolved immediately. This lets unused results + // be released right away instead of staying alive for the rest of the session. + this.ensureResolvingExport(exportId); + } continue; } break;