From adeb2ec97785f345b2d77b9d6c87df5987d97fa4 Mon Sep 17 00:00:00 2001 From: Jay/Fienna Liang Date: Tue, 2 Jun 2026 19:26:17 +0800 Subject: [PATCH] Fix web control-plane connection starvation --- .../web-v2/control-plane-v2.spec.ts | 10 ++++ .../unit/client-shared/api-links.test.ts | 43 +++++++++++++++++ src/client-shared/api/links.ts | 46 ++++++++++++++++++- .../hooks/shell/useControlPlaneSidebarData.ts | 14 +----- .../tasks/useControlPlaneHeartbeatEvents.ts | 11 ++++- src/web-v2/hooks/useControlPlaneAppState.ts | 5 +- 6 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 src/__tests__/unit/client-shared/api-links.test.ts diff --git a/src/__tests__/browser-integration/web-v2/control-plane-v2.spec.ts b/src/__tests__/browser-integration/web-v2/control-plane-v2.spec.ts index 6c981801..9bd39b01 100644 --- a/src/__tests__/browser-integration/web-v2/control-plane-v2.spec.ts +++ b/src/__tests__/browser-integration/web-v2/control-plane-v2.spec.ts @@ -12,11 +12,20 @@ const trpc = createTRPCProxyClient({ }); test('loads the web v2 shell sections', async ({ page }) => { + const eventStreams = new Set(); + page.on('request', (request) => { + const url = new URL(request.url()); + const eventPath = url.pathname.match(/\/trpc\/(controlPlane\.[^,?/]*Events)/)?.[1]; + if (eventPath) { + eventStreams.add(eventPath); + } + }); const sessionEvents = page.waitForResponse((response) => ( response.url().includes('/trpc/controlPlane.sessionEvents') && response.status() === 200 )); await page.goto('/sessions'); await sessionEvents; + await page.waitForTimeout(250); await expect(page.getByRole('complementary', { name: 'Primary navigation' })).toBeVisible(); await expect(page.getByRole('main')).toBeVisible(); @@ -24,6 +33,7 @@ test('loads the web v2 shell sections', async ({ page }) => { await expect(page.getByTestId('web-v2-surface-sessions')).toBeVisible(); await expect(page.getByTestId('web-v2-workbench-title')).toBeVisible(); await expect(page.getByRole('link', { name: 'Sessions' })).toHaveAttribute('aria-current', 'page'); + expect([...eventStreams].sort()).toEqual(['controlPlane.sessionEvents']); }); test('collapses and expands the sidebar', async ({ page }) => { diff --git a/src/__tests__/unit/client-shared/api-links.test.ts b/src/__tests__/unit/client-shared/api-links.test.ts new file mode 100644 index 00000000..aa727529 --- /dev/null +++ b/src/__tests__/unit/client-shared/api-links.test.ts @@ -0,0 +1,43 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { createControlPlaneRequestFetch } from '@/client-shared/api/links'; + +describe('control-plane API links', () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it('aborts request/response calls that exceed the configured timeout', async () => { + vi.useFakeTimers(); + const fetchImpl = vi.fn((_input: RequestInfo | URL, init?: RequestInit) => new Promise((_resolve, reject) => { + init?.signal?.addEventListener('abort', () => reject(init.signal?.reason)); + })); + const requestFetch = createControlPlaneRequestFetch({ fetchImpl, timeoutMs: 25 }); + + const request = requestFetch('http://127.0.0.1:8765/trpc/controlPlane.state'); + const assertion = expect(request).rejects.toThrow('Control-plane request timed out after 25ms.'); + + await vi.advanceTimersByTimeAsync(25); + await assertion; + expect(fetchImpl).toHaveBeenCalledWith( + 'http://127.0.0.1:8765/trpc/controlPlane.state', + expect.objectContaining({ + signal: expect.any(AbortSignal), + }), + ); + }); + + it('preserves caller aborts when tRPC cancels a request', async () => { + const fetchImpl = vi.fn((_input: RequestInfo | URL, init?: RequestInit) => new Promise((_resolve, reject) => { + init?.signal?.addEventListener('abort', () => reject(init.signal?.reason)); + })); + const requestFetch = createControlPlaneRequestFetch({ fetchImpl, timeoutMs: 30_000 }); + const controller = new AbortController(); + + const request = requestFetch('http://127.0.0.1:8765/trpc/controlPlane.state', { + signal: controller.signal, + }); + controller.abort(new Error('Caller cancelled request.')); + + await expect(request).rejects.toThrow('Caller cancelled request.'); + }); +}); diff --git a/src/client-shared/api/links.ts b/src/client-shared/api/links.ts index 45a111e3..cb7430a0 100644 --- a/src/client-shared/api/links.ts +++ b/src/client-shared/api/links.ts @@ -12,8 +12,11 @@ export type CreateControlPlaneTrpcLinksOptions = { url: string; batch?: boolean; eventSource?: typeof EventSource; + requestTimeoutMs?: number; }; +const DEFAULT_CONTROL_PLANE_REQUEST_TIMEOUT_MS = 30_000; + /** * Shared tRPC link service for frontend API consumers. * @@ -25,8 +28,10 @@ export class ClientSharedApiLinkService { url, batch = false, eventSource, + requestTimeoutMs = DEFAULT_CONTROL_PLANE_REQUEST_TIMEOUT_MS, }: CreateControlPlaneTrpcLinksOptions): TRPCLink[] { - const requestLink = batch ? httpBatchLink({ url }) : httpLink({ url }); + const fetch = createControlPlaneRequestFetch({ timeoutMs: requestTimeoutMs }); + const requestLink = batch ? httpBatchLink({ url, fetch }) : httpLink({ url, fetch }); return [ splitLink({ @@ -37,3 +42,42 @@ export class ClientSharedApiLinkService { ]; } } + +export function createControlPlaneRequestFetch({ + fetchImpl = globalThis.fetch, + timeoutMs = DEFAULT_CONTROL_PLANE_REQUEST_TIMEOUT_MS, +}: { + fetchImpl?: typeof fetch; + timeoutMs?: number; +} = {}): typeof fetch { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return fetchImpl; + } + + return async (input, init) => { + const controller = new AbortController(); + const upstreamSignal = init?.signal; + const timeoutId = setTimeout(() => { + controller.abort(new Error(`Control-plane request timed out after ${timeoutMs}ms.`)); + }, timeoutMs); + const abortFromUpstream = () => { + controller.abort(upstreamSignal?.reason ?? new Error('Control-plane request aborted.')); + }; + + if (upstreamSignal?.aborted) { + abortFromUpstream(); + } else { + upstreamSignal?.addEventListener('abort', abortFromUpstream, { once: true }); + } + + try { + return await fetchImpl(input, { + ...init, + signal: controller.signal, + }); + } finally { + clearTimeout(timeoutId); + upstreamSignal?.removeEventListener('abort', abortFromUpstream); + } + }; +} diff --git a/src/web-v2/hooks/shell/useControlPlaneSidebarData.ts b/src/web-v2/hooks/shell/useControlPlaneSidebarData.ts index ccf2e794..650b82c0 100644 --- a/src/web-v2/hooks/shell/useControlPlaneSidebarData.ts +++ b/src/web-v2/hooks/shell/useControlPlaneSidebarData.ts @@ -1,6 +1,5 @@ import { useEffect, useMemo, useState } from 'react'; -import { skipToken } from '@tanstack/react-query'; -import { trpcReact, type ControlPlaneSessionsEventEnvelope } from '@web/api/client'; +import { trpcReact } from '@web/api/client'; import type { ControlPlaneState } from '@web/api/client'; import type { useWorkbenchNavigation } from '../useWorkbenchNavigation'; import { applyLiveTaskState } from '../tasks/useControlPlaneTaskLiveState'; @@ -30,6 +29,7 @@ export function useControlPlaneSidebarData({ workspaceKnown && workspaceId ? { workspaceId } : undefined, { enabled: workspaceKnown, + refetchInterval: navigation.activeSurfaceId === 'sessions' ? 10_000 : false, }, ); const tasksQuery = trpcReact.controlPlane.heartbeatTasks.useQuery( @@ -83,16 +83,6 @@ export function useControlPlaneSidebarData({ navigation.selectTask(tasks[0]!.taskId, { replace: true }); }, [navigation, tasks]); - trpcReact.controlPlane.sessionsEvents.useSubscription(workspaceId ? { workspaceId } : skipToken, { - onData: (event: ControlPlaneSessionsEventEnvelope) => { - if (event.type !== 'sessions.updated') { - return; - } - - void sessionsQuery.refetch(); - }, - }); - return { stateQuery, sessionsQuery, diff --git a/src/web-v2/hooks/tasks/useControlPlaneHeartbeatEvents.ts b/src/web-v2/hooks/tasks/useControlPlaneHeartbeatEvents.ts index c1e6449f..a965a9a0 100644 --- a/src/web-v2/hooks/tasks/useControlPlaneHeartbeatEvents.ts +++ b/src/web-v2/hooks/tasks/useControlPlaneHeartbeatEvents.ts @@ -1,4 +1,5 @@ import { useCallback, useState } from 'react'; +import { skipToken } from '@tanstack/react-query'; import dayjs from 'dayjs'; import { trpcReact, type ControlPlaneHeartbeatEventEnvelope, type ControlPlaneHeartbeatTaskView } from '@web/api/client'; @@ -15,7 +16,13 @@ export type ControlPlaneLiveTaskState = { updatedAt: string; }; -export function useControlPlaneHeartbeatEvents(workspaceId?: string) { +export function useControlPlaneHeartbeatEvents({ + enabled, + workspaceId, +}: { + enabled: boolean; + workspaceId?: string; +}) { const utils = trpcReact.useUtils(); const [liveTasks, setLiveTasks] = useState>({}); @@ -49,7 +56,7 @@ export function useControlPlaneHeartbeatEvents(workspaceId?: string) { } }, [utils, workspaceId]); - trpcReact.controlPlane.heartbeatEvents.useSubscription(workspaceId ? { workspaceId } : undefined, { + trpcReact.controlPlane.heartbeatEvents.useSubscription(enabled && workspaceId ? { workspaceId } : skipToken, { onData: applyHeartbeatEnvelope, }); diff --git a/src/web-v2/hooks/useControlPlaneAppState.ts b/src/web-v2/hooks/useControlPlaneAppState.ts index e63a7d76..8a857809 100644 --- a/src/web-v2/hooks/useControlPlaneAppState.ts +++ b/src/web-v2/hooks/useControlPlaneAppState.ts @@ -28,7 +28,10 @@ export function useControlPlaneAppState() { const workspaceCreateMutation = trpcReact.controlPlane.workspaceCreate.useMutation(); const workspaceRenameMutation = trpcReact.controlPlane.workspaceRename.useMutation(); const workspaceSetActiveMutation = trpcReact.controlPlane.workspaceSetActive.useMutation(); - const taskEvents = useControlPlaneHeartbeatEvents(navigation.selectedWorkspaceId); + const taskEvents = useControlPlaneHeartbeatEvents({ + enabled: navigation.activeSurfaceId === 'tasks', + workspaceId: navigation.selectedWorkspaceId, + }); const sidebar = useControlPlaneSidebarData({ navigation, taskEvents }); const memoryStatusQuery = trpcReact.controlPlane.memoryStatus.useQuery(sidebar.workspaceId ? { workspaceId: sidebar.workspaceId } : undefined, { enabled: navigation.settingsOpen && navigation.activeSettingsSectionId === 'memory',