Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/blob-rpc-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"capnweb": minor
---

Add `Blob` as a serializable type over RPC. `Blob` objects can now be passed as call arguments and return values. The MIME type (`blob.type`) is preserved across the wire. `File` is not yet supported.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ The following types can be passed over RPC (in arguments or return values), and
* `Date`
* `Uint8Array`
* `Error` and its well-known subclasses
* `Blob`
* `ReadableStream` and `WritableStream`, with automatic flow control.
* `Headers`, `Request`, and `Response` from the Fetch API.

Expand Down
389 changes: 389 additions & 0 deletions __tests__/index.test.ts

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions __tests__/test-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ export class TestTarget extends RpcTarget {
returnNull() { return null; }
returnUndefined() { return undefined; }
returnNumber(i: number) { return i; }

async echoBlob(blob: Blob): Promise<Blob> {
let bytes = await blob.arrayBuffer();
return new Blob([bytes], {type: blob.type});
}
}
38 changes: 34 additions & 4 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ export let RpcTarget = workersModule ? workersModule.RpcTarget : class {};
export type PropertyPath = (string | number)[];

type TypeForRpc = "unsupported" | "primitive" | "object" | "function" | "array" | "date" |
"bigint" | "bytes" | "stub" | "rpc-promise" | "rpc-target" | "rpc-thenable" | "error" |
"undefined" | "writable" | "readable" | "headers" | "request" | "response";
"bigint" | "bytes" | "blob" | "stub" | "rpc-promise" | "rpc-target" | "rpc-thenable" |
"error" | "undefined" | "writable" | "readable" | "headers" | "request" | "response";

const AsyncFunction = (async function () {}).constructor;

Expand All @@ -48,6 +48,11 @@ const AsyncFunction = (async function () {}).constructor;
let BUFFER_PROTOTYPE: object | undefined =
typeof Buffer !== "undefined" ? Buffer.prototype : undefined;

// Blob.prototype, or undefined in environments that don't have Blob. Same lazy-init pattern as
// BUFFER_PROTOTYPE above.
let BLOB_PROTOTYPE: object | undefined =
typeof Blob !== "undefined" ? Blob.prototype : undefined;

export function typeForRpc(value: unknown): TypeForRpc {
switch (typeof value) {
case "boolean":
Expand Down Expand Up @@ -112,6 +117,9 @@ export function typeForRpc(value: unknown): TypeForRpc {
case Response.prototype:
return "response";

case BLOB_PROTOTYPE:
return "blob";

// TODO: All other structured clone types.

case RpcStub.prototype:
Expand Down Expand Up @@ -630,6 +638,7 @@ async function pullPromise(promise: RpcPromise): Promise<unknown> {
// RpcPayload

export type LocatedPromise = {parent: object, property: string | number, promise: RpcPromise};
export type LocatedBlobPromise = {parent: object, property: string | number, promise: Promise<Blob>};

// Represents the params to an RPC call, or the resolution of an RPC promise, as it passes
// through the system.
Expand Down Expand Up @@ -753,8 +762,11 @@ export class RpcPayload {
// When done, the payload takes ownership of the final value and all the stubs within. It may
// modify the value in preparation for delivery, and may deliver the value directly to the app
// without copying.
public static forEvaluate(hooks: StubHook[], promises: LocatedPromise[]) {
return new RpcPayload(null, "owned", hooks, promises);
public static forEvaluate(
hooks: StubHook[], promises: LocatedPromise[], blobPromises?: LocatedBlobPromise[]) {
let p = new RpcPayload(null, "owned", hooks, promises);
p.blobPromises = blobPromises;
return p;
}

// Deep-copy the given value, including dup()ing all stubs.
Expand Down Expand Up @@ -797,6 +809,11 @@ export class RpcPayload {
private promises?: LocatedPromise[]
) {}

// Pending Blob assembly promises: each entry holds the location of a placeholder Blob and a
// promise that resolves to the real Blob once bytes have been collected from its pipe stream.
// Only populated in "owned" payloads created by Evaluator (deserialized from wire).
private blobPromises?: LocatedBlobPromise[];

// For `source === "return"` payloads only, this tracks any StubHooks created around RpcTargets
// or WritableStreams found in the payload at the time that it is serialized (or deep-copied) for
// return, so that we can make sure they are not disposed before the pipeline ends.
Expand Down Expand Up @@ -947,6 +964,7 @@ export class RpcPayload {
case "bigint":
case "date":
case "bytes":
case "blob":
case "error":
case "undefined":
// immutable, no need to copy
Expand Down Expand Up @@ -1131,6 +1149,14 @@ export class RpcPayload {
// the promise's eventual payload.
RpcPayload.deliverRpcPromiseTo(record.promise, record.parent, record.property, promises);
}

// Await any pending Blob assemblies (bytes being collected from pipe streams) and substitute
// the real Blob for the placeholder before the payload is delivered to user code.
for (let record of this.blobPromises ?? []) {
promises.push(record.promise.then(blob => {
(<any>record.parent)[record.property] = blob;
}));
}
}
}

Expand Down Expand Up @@ -1261,6 +1287,7 @@ export class RpcPayload {
this.source = "owned";
this.hooks = [];
this.promises = [];
this.blobPromises = [];
}

// Recursive dispose, called only when `source` is "return".
Expand All @@ -1271,6 +1298,7 @@ export class RpcPayload {
case "primitive":
case "bigint":
case "bytes":
case "blob":
case "date":
case "error":
case "undefined":
Expand Down Expand Up @@ -1409,6 +1437,7 @@ export class RpcPayload {
case "primitive":
case "bigint":
case "bytes":
case "blob":
case "date":
case "error":
case "undefined":
Expand Down Expand Up @@ -1566,6 +1595,7 @@ function followPath(value: unknown, parent: object | undefined,
case "primitive":
case "bigint":
case "bytes":
case "blob":
case "date":
case "error":
case "headers":
Expand Down
38 changes: 30 additions & 8 deletions src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Licensed under the MIT license found in the LICENSE.txt file or at:
// https://opensource.org/license/mit

import { StubHook, RpcPayload, RpcStub, PropertyPath, PayloadStubHook, ErrorStubHook, RpcTarget, unwrapStubAndPath, streamImpl } from "./core.js";
import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize } from "./serialize.js";
import { StubHook, RpcPayload, RpcStub, PropertyPath, PayloadStubHook, ErrorStubHook, PromiseStubHook, RpcTarget, unwrapStubAndPath, streamImpl } from "./core.js";
import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize, BLOB_INLINE_THRESHOLD } from "./serialize.js";

/**
* Interface for an RPC transport, which is a simple bidirectional message stream. Implement this
Expand Down Expand Up @@ -200,9 +200,27 @@ class RpcImportHook extends StubHook {
let entry = this.getEntry();
if (entry.resolution) {
return entry.resolution.call(path, args);
} else {
return entry.session.sendCall(entry.importId, path, args);
}

// If args contain small blobs, pre-read bytes before sendCall() so they can be encoded inline
// as ["bytes", base64]. We must NOT pre-create the import entry: the push message and import
// entry creation must be atomic (protocol constraint — IDs are positional). Returning a
// PromiseStubHook defers both until bytes are ready, matching the existing behavior of calls
// pipelined on unresolved promises. Send-side e-order vs subsequent calls is not preserved.
//
// Skip this path if args also contain streams: ensureDeepCopied() cannot deep-copy
// ReadableStream/WritableStream. In that case, the small blob falls through to the pipe
// approach which is synchronous and doesn't require deep-copying.
if (args && Devaluator.hasSmallBlob(args.value, BLOB_INLINE_THRESHOLD)
&& !Devaluator.hasStream(args.value)) {
args.ensureDeepCopied(); // must deep-copy before yielding to event loop
return new PromiseStubHook(
Devaluator.preReadBlobs(args.value, BLOB_INLINE_THRESHOLD)
.then(blobBytes => entry.session.sendCall(entry.importId, path, args, blobBytes))
);
}

return entry.session.sendCall(entry.importId, path, args);
}

stream(path: PropertyPath, args: RpcPayload): {promise: Promise<void>, size?: number} {
Expand Down Expand Up @@ -457,10 +475,13 @@ class RpcSessionImpl implements Importer, Exporter {

++this.pullCount;
exp.pull = resolve().then(
payload => {
async payload => {
// We don't transfer ownership of stubs in the payload since the payload
// belongs to the hook which sticks around to handle pipelined requests.
let value = Devaluator.devaluate(payload.value, undefined, this, payload);
// Pre-read small blobs so they are encoded inline as ["bytes", base64].
let blobBytes = await Devaluator.preReadBlobs(
payload.value, BLOB_INLINE_THRESHOLD);
let value = Devaluator.devaluate(payload.value, undefined, this, payload, blobBytes);
this.send(["resolve", exportId, value]);
if (autoRelease) this.releaseExport(exportId, 1);
},
Expand Down Expand Up @@ -585,12 +606,13 @@ class RpcSessionImpl implements Importer, Exporter {
return msgText.length;
}

sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook {
sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload,
blobBytes?: ReadonlyMap<Blob, Uint8Array>): RpcImportHook {
if (this.abortReason) throw this.abortReason;

let value: Array<any> = ["pipeline", id, path];
if (args) {
let devalue = Devaluator.devaluate(args.value, undefined, this, args);
let devalue = Devaluator.devaluate(args.value, undefined, this, args, blobBytes);

// HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap.
// TODO: Clean this up somehow.
Expand Down
Loading
Loading