Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/__tests__/browser-integration/web-v2/control-plane-v2.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,28 @@ const trpc = createTRPCProxyClient<AppRouter>({
});

test('loads the web v2 shell sections', async ({ page }) => {
const eventStreams = new Set<string>();
page.on('request', (request) => {
const url = new URL(request.url());
const eventPath = url.pathname.match(/\/trpc\/(controlPlane\.[^,?/]*Events)/)?.[1];
if (eventPath) {
eventStreams.add(eventPath);
}
});
const sessionEvents = page.waitForResponse((response) => (
response.url().includes('/trpc/controlPlane.sessionEvents') && response.status() === 200
));
await page.goto('/sessions');
await sessionEvents;
await page.waitForTimeout(250);

await expect(page.getByRole('complementary', { name: 'Primary navigation' })).toBeVisible();
await expect(page.getByRole('main')).toBeVisible();
await expect(page.getByRole('complementary', { name: 'Context inspector' })).toBeVisible();
await expect(page.getByTestId('web-v2-surface-sessions')).toBeVisible();
await expect(page.getByTestId('web-v2-workbench-title')).toBeVisible();
await expect(page.getByRole('link', { name: 'Sessions' })).toHaveAttribute('aria-current', 'page');
expect([...eventStreams].sort()).toEqual(['controlPlane.sessionEvents']);
});

test('collapses and expands the sidebar', async ({ page }) => {
Expand Down
43 changes: 43 additions & 0 deletions src/__tests__/unit/client-shared/api-links.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import { createControlPlaneRequestFetch } from '@/client-shared/api/links';

describe('control-plane API links', () => {
afterEach(() => {
vi.useRealTimers();
});

it('aborts request/response calls that exceed the configured timeout', async () => {
vi.useFakeTimers();
const fetchImpl = vi.fn((_input: RequestInfo | URL, init?: RequestInit) => new Promise<Response>((_resolve, reject) => {
init?.signal?.addEventListener('abort', () => reject(init.signal?.reason));
}));
const requestFetch = createControlPlaneRequestFetch({ fetchImpl, timeoutMs: 25 });

const request = requestFetch('http://127.0.0.1:8765/trpc/controlPlane.state');
const assertion = expect(request).rejects.toThrow('Control-plane request timed out after 25ms.');

await vi.advanceTimersByTimeAsync(25);
await assertion;
expect(fetchImpl).toHaveBeenCalledWith(
'http://127.0.0.1:8765/trpc/controlPlane.state',
expect.objectContaining({
signal: expect.any(AbortSignal),
}),
);
});

it('preserves caller aborts when tRPC cancels a request', async () => {
const fetchImpl = vi.fn((_input: RequestInfo | URL, init?: RequestInit) => new Promise<Response>((_resolve, reject) => {
init?.signal?.addEventListener('abort', () => reject(init.signal?.reason));
}));
const requestFetch = createControlPlaneRequestFetch({ fetchImpl, timeoutMs: 30_000 });
const controller = new AbortController();

const request = requestFetch('http://127.0.0.1:8765/trpc/controlPlane.state', {
signal: controller.signal,
});
controller.abort(new Error('Caller cancelled request.'));

await expect(request).rejects.toThrow('Caller cancelled request.');
});
});
46 changes: 45 additions & 1 deletion src/client-shared/api/links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ export type CreateControlPlaneTrpcLinksOptions = {
url: string;
batch?: boolean;
eventSource?: typeof EventSource;
requestTimeoutMs?: number;
};

const DEFAULT_CONTROL_PLANE_REQUEST_TIMEOUT_MS = 30_000;

/**
* Shared tRPC link service for frontend API consumers.
*
Expand All @@ -25,8 +28,10 @@ export class ClientSharedApiLinkService {
url,
batch = false,
eventSource,
requestTimeoutMs = DEFAULT_CONTROL_PLANE_REQUEST_TIMEOUT_MS,
}: CreateControlPlaneTrpcLinksOptions): TRPCLink<AppRouter>[] {
const requestLink = batch ? httpBatchLink({ url }) : httpLink({ url });
const fetch = createControlPlaneRequestFetch({ timeoutMs: requestTimeoutMs });
const requestLink = batch ? httpBatchLink({ url, fetch }) : httpLink({ url, fetch });

return [
splitLink({
Expand All @@ -37,3 +42,42 @@ export class ClientSharedApiLinkService {
];
}
}

export function createControlPlaneRequestFetch({
fetchImpl = globalThis.fetch,
timeoutMs = DEFAULT_CONTROL_PLANE_REQUEST_TIMEOUT_MS,
}: {
fetchImpl?: typeof fetch;
timeoutMs?: number;
} = {}): typeof fetch {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return fetchImpl;
}

return async (input, init) => {
const controller = new AbortController();
const upstreamSignal = init?.signal;
const timeoutId = setTimeout(() => {
controller.abort(new Error(`Control-plane request timed out after ${timeoutMs}ms.`));
}, timeoutMs);
const abortFromUpstream = () => {
controller.abort(upstreamSignal?.reason ?? new Error('Control-plane request aborted.'));
};

if (upstreamSignal?.aborted) {
abortFromUpstream();
} else {
upstreamSignal?.addEventListener('abort', abortFromUpstream, { once: true });
}

try {
return await fetchImpl(input, {
...init,
signal: controller.signal,
});
} finally {
clearTimeout(timeoutId);
upstreamSignal?.removeEventListener('abort', abortFromUpstream);
}
};
}
14 changes: 2 additions & 12 deletions src/web-v2/hooks/shell/useControlPlaneSidebarData.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { useEffect, useMemo, useState } from 'react';
import { skipToken } from '@tanstack/react-query';
import { trpcReact, type ControlPlaneSessionsEventEnvelope } from '@web/api/client';
import { trpcReact } from '@web/api/client';
import type { ControlPlaneState } from '@web/api/client';
import type { useWorkbenchNavigation } from '../useWorkbenchNavigation';
import { applyLiveTaskState } from '../tasks/useControlPlaneTaskLiveState';
Expand Down Expand Up @@ -30,6 +29,7 @@ export function useControlPlaneSidebarData({
workspaceKnown && workspaceId ? { workspaceId } : undefined,
{
enabled: workspaceKnown,
refetchInterval: navigation.activeSurfaceId === 'sessions' ? 10_000 : false,
},
);
const tasksQuery = trpcReact.controlPlane.heartbeatTasks.useQuery(
Expand Down Expand Up @@ -83,16 +83,6 @@ export function useControlPlaneSidebarData({
navigation.selectTask(tasks[0]!.taskId, { replace: true });
}, [navigation, tasks]);

trpcReact.controlPlane.sessionsEvents.useSubscription(workspaceId ? { workspaceId } : skipToken, {
onData: (event: ControlPlaneSessionsEventEnvelope) => {
if (event.type !== 'sessions.updated') {
return;
}

void sessionsQuery.refetch();
},
});

return {
stateQuery,
sessionsQuery,
Expand Down
11 changes: 9 additions & 2 deletions src/web-v2/hooks/tasks/useControlPlaneHeartbeatEvents.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { useCallback, useState } from 'react';
import { skipToken } from '@tanstack/react-query';
import dayjs from 'dayjs';
import { trpcReact, type ControlPlaneHeartbeatEventEnvelope, type ControlPlaneHeartbeatTaskView } from '@web/api/client';

Expand All @@ -15,7 +16,13 @@ export type ControlPlaneLiveTaskState = {
updatedAt: string;
};

export function useControlPlaneHeartbeatEvents(workspaceId?: string) {
export function useControlPlaneHeartbeatEvents({
enabled,
workspaceId,
}: {
enabled: boolean;
workspaceId?: string;
}) {
const utils = trpcReact.useUtils();
const [liveTasks, setLiveTasks] = useState<Record<string, ControlPlaneLiveTaskState>>({});

Expand Down Expand Up @@ -49,7 +56,7 @@ export function useControlPlaneHeartbeatEvents(workspaceId?: string) {
}
}, [utils, workspaceId]);

trpcReact.controlPlane.heartbeatEvents.useSubscription(workspaceId ? { workspaceId } : undefined, {
trpcReact.controlPlane.heartbeatEvents.useSubscription(enabled && workspaceId ? { workspaceId } : skipToken, {
onData: applyHeartbeatEnvelope,
});

Expand Down
5 changes: 4 additions & 1 deletion src/web-v2/hooks/useControlPlaneAppState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ export function useControlPlaneAppState() {
const workspaceCreateMutation = trpcReact.controlPlane.workspaceCreate.useMutation();
const workspaceRenameMutation = trpcReact.controlPlane.workspaceRename.useMutation();
const workspaceSetActiveMutation = trpcReact.controlPlane.workspaceSetActive.useMutation();
const taskEvents = useControlPlaneHeartbeatEvents(navigation.selectedWorkspaceId);
const taskEvents = useControlPlaneHeartbeatEvents({
enabled: navigation.activeSurfaceId === 'tasks',
workspaceId: navigation.selectedWorkspaceId,
});
const sidebar = useControlPlaneSidebarData({ navigation, taskEvents });
const memoryStatusQuery = trpcReact.controlPlane.memoryStatus.useQuery(sidebar.workspaceId ? { workspaceId: sidebar.workspaceId } : undefined, {
enabled: navigation.settingsOpen && navigation.activeSettingsSectionId === 'memory',
Expand Down
Loading