Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions __tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,36 @@ describe("stub disposal over RPC", () => {
});
});

describe("callback calls", () => {
class CallbackTarget extends RpcTarget {
async callMany(callback: RpcStub<(text: string) => unknown>, count: number) {
let cb = callback.dup();
try {
for (let i = 0; i < count; ++i) {
cb(`value-${i}`);
await pumpMicrotasks();
}
} finally {
cb[Symbol.dispose]();
}
}
}

it("does not retain ignored synchronous callback results", async () => {
await using harness = new TestHarness(new CallbackTarget());
await harness.stub.callMany((_text: string) => {}, 5);
await pumpMicrotasks();
harness.checkAllDisposed();
});

it("does not retain ignored synchronous object callback results", async () => {
await using harness = new TestHarness(new CallbackTarget());
await harness.stub.callMany((text: string) => ({text}), 5);
await pumpMicrotasks();
harness.checkAllDisposed();
});
});

describe("e-order", () => {
it("maintains e-order for concurrent calls on single stub", async () => {
let callOrder: number[] = [];
Expand Down
45 changes: 36 additions & 9 deletions src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class RpcSessionImpl implements Importer, Exporter {
private reverseExports: Map<StubHook, ExportId> = new Map();
private imports: Array<ImportTableEntry> = [];
private abortReason?: any;
private cancelReadLoop: (error: any) => void;
private cancelReadLoop?: (error: any) => void;

// We assign positive numbers to imports we initiate, and negative numbers to exports we
// initiate. So the next import ID is just `imports.length`, but the next export ID needs
Expand All @@ -348,11 +348,7 @@ class RpcSessionImpl implements Importer, Exporter {
// Import zero is the other side's bootstrap object.
this.imports.push(new ImportTableEntry(this, 0, false));

let rejectFunc: (error: any) => void;;
let abortPromise = new Promise<never>((resolve, reject) => { rejectFunc = reject; });
this.cancelReadLoop = rejectFunc!;

this.readLoop(abortPromise).catch(err => this.abort(err));
this.readLoop().catch(err => this.abort(err));
}

// Should only be called once immediately after construction.
Expand Down Expand Up @@ -585,6 +581,17 @@ class RpcSessionImpl implements Importer, Exporter {
return msgText.length;
}

private isDirectFunctionCall(expression: unknown): boolean {
if (!(expression instanceof Array)) return false;

let [kind, _id, path, args] = expression;
return kind === "pipeline" &&
expression.length === 4 &&
path instanceof Array &&
path.length === 0 &&
args instanceof Array;
}

sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook {
if (this.abortReason) throw this.abortReason;

Expand Down Expand Up @@ -684,7 +691,8 @@ class RpcSessionImpl implements Importer, Exporter {
// Don't double-abort.
if (this.abortReason !== undefined) return;

this.cancelReadLoop(error);
this.cancelReadLoop?.(error);
this.cancelReadLoop = undefined;

if (trySendAbortMessage) {
try {
Expand Down Expand Up @@ -734,9 +742,22 @@ class RpcSessionImpl implements Importer, Exporter {
}
}

private async readLoop(abortPromise: Promise<never>) {
private async readLoop() {
while (!this.abortReason) {
let msg = JSON.parse(await Promise.race([this.transport.receive(), abortPromise]));
// Each receive needs its own abort promise so Promise.race() doesn't keep old reads.
let readCanceled = Promise.withResolvers<never>();
this.cancelReadLoop = readCanceled.reject;

let msgText: string;
try {
msgText = await Promise.race([this.transport.receive(), readCanceled.promise]);
} finally {
if (this.cancelReadLoop === readCanceled.reject) {
this.cancelReadLoop = undefined;
}
}

let msg = JSON.parse(msgText);
if (this.abortReason) break; // check again before processing

if (msg instanceof Array) {
Expand All @@ -751,7 +772,13 @@ class RpcSessionImpl implements Importer, Exporter {
// treated as an unhandled rejection on our end.
hook.ignoreUnhandledRejections();

let exportId = this.exports.length;
this.exports.push({ hook, refcount: 1 });
if (this.isDirectFunctionCall(msg[1])) {
// Direct function calls can be resolved immediately. This lets unused results
// be released right away instead of staying alive for the rest of the session.
this.ensureResolvingExport(exportId);
}
continue;
}
break;
Expand Down
Loading