diff --git a/__tests__/index.test.ts b/__tests__/index.test.ts index 3875857..b694021 100644 --- a/__tests__/index.test.ts +++ b/__tests__/index.test.ts @@ -1537,6 +1537,36 @@ describe("WebSockets", () => { expect(await cap.incrementCounter(counter, 9)).toBe(13); } }); + + it("can handle the same high-byte-volume workload in Node", async () => { + let entries = 6000; + let concurrency = 100; + let payload = "0".repeat(500_000); + let url = `ws://${inject("testServerHost")}`; + let cap = newWebSocketRpcSession(url); + let completed = 0; + let pending = new Set>(); + + try { + for (let i = 0; i < entries; i++) { + while (pending.size >= concurrency) { + await Promise.race(pending); + } + + let task = cap.store(`key-${i}`, payload) + .then(() => { completed += 1; }) + .finally(() => pending.delete(task)); + + pending.add(task); + } + + await Promise.all(pending); + } finally { + cap[Symbol.dispose](); + } + + expect(completed).toBe(entries); + }, 60_000); }); describe("MessagePorts", () => { diff --git a/__tests__/test-server-workerd.js b/__tests__/test-server-workerd.js index 45ea0a8..24b2437 100644 --- a/__tests__/test-server-workerd.js +++ b/__tests__/test-server-workerd.js @@ -52,6 +52,9 @@ export class TestTarget extends RpcTarget { this.env = env; } + // Accepts and discards a key/value pair. Used by the stress repro. + store(_key, _value) {} + square(i) { return i * i; } diff --git a/__tests__/test-util.ts b/__tests__/test-util.ts index 1559230..981dc51 100644 --- a/__tests__/test-util.ts +++ b/__tests__/test-util.ts @@ -27,6 +27,8 @@ function throwErrorImpl(): never { } export class TestTarget extends RpcTarget { + store(_key: string, _value: string) {} + square(i: number) { return i * i; } @@ -63,7 +65,13 @@ export class TestTarget extends RpcTarget { return result; } - returnNull() { return null; } - returnUndefined() { return undefined; } - returnNumber(i: number) { return i; } + returnNull() { + return null; + } + returnUndefined() { + return undefined; + } + returnNumber(i: number) { + return i; + } } diff --git a/__tests__/workerd.test.ts b/__tests__/workerd.test.ts index 79f63e8..cc3b083 100644 --- a/__tests__/workerd.test.ts +++ b/__tests__/workerd.test.ts @@ -298,4 +298,71 @@ describe("workerd RPC server", () => { expect(await Promise.all([promise1, promise2, promise3])) .toStrictEqual([36, 5, 9]); }) + + it("passes with a fresh websocket session per batch under the same load", async () => { + let entries = 6000; + let concurrency = 100; + let payload = "0".repeat(500_000); + let completed = 0; + + while (completed < entries) { + let resp = await (env).testServer.fetch("http://foo", {headers: {Upgrade: "websocket"}}); + let ws = resp.webSocket; + expect(ws).toBeTruthy(); + + ws!.accept(); + let cap = newWebSocketRpcSession(ws!); + + try { + let batchSize = Math.min(concurrency, entries - completed); + + await Promise.all( + Array.from({length: batchSize}, (_, index) => + cap.store(`key-${completed + index}`, payload)) + ); + + completed += batchSize; + } finally { + cap[Symbol.dispose](); + } + } + + expect(completed).toBe(entries); + }, 60_000) + + it("fails with a shared websocket session under high byte volume", async () => { + let entries = 3000; + let concurrency = 100; + let payload = "0".repeat(500_000); + let completed = 0; + + let resp = await (env).testServer.fetch("http://foo", {headers: {Upgrade: "websocket"}}); + let ws = resp.webSocket; + expect(ws).toBeTruthy(); + + ws!.accept(); + let cap = newWebSocketRpcSession(ws!); + + let pending = new Set>(); + + try { + for (let i = 0; i < entries; i++) { + while (pending.size >= concurrency) { + await Promise.race(pending); + } + + let task = cap.store(`key-${i}`, payload) + .then(() => { completed += 1; }) + .finally(() => pending.delete(task)); + + pending.add(task); + } + + await Promise.all(pending); + } finally { + cap[Symbol.dispose](); + } + + expect(completed).toBe(entries); + }, 240_000) });