From 2a48ad69c3afd7466f882cd1b8072107be1ee887 Mon Sep 17 00:00:00 2001 From: Jay/Fienna Liang Date: Wed, 3 Jun 2026 17:57:39 +0800 Subject: [PATCH 1/2] Refactor heartbeat commands onto control plane --- .../control-plane-heartbeat-mutations.test.ts | 43 ++++ .../unit/cli-v2/chat-v2-runtime.test.ts | 12 +- src/__tests__/unit/tui/heartbeat-cli.test.ts | 48 ++++- src/cli-v2/commands/README.md | 8 +- src/cli-v2/commands/chat-v2-command.ts | 124 +---------- .../commands/control-plane-command-runtime.ts | 127 +++++++++++ src/cli-v2/commands/heartbeat-command.ts | 69 ++++++ .../commands}/heartbeat/args.ts | 0 src/cli-v2/commands/heartbeat/duration.ts | 50 +++++ src/cli-v2/commands/heartbeat/output.ts | 22 ++ src/cli-v2/commands/heartbeat/run-commands.ts | 141 ++++++++++++ .../commands}/heartbeat/summary.ts | 2 +- .../commands/heartbeat/task-commands.ts | 204 ++++++++++++++++++ src/cli-v2/commands/heartbeat/types.ts | 21 ++ src/cli-v2/commands/heartbeat/worker.ts | 107 +++++++++ src/cli/heartbeat.ts | 52 +---- src/cli/heartbeat/duration.ts | 36 ---- src/cli/heartbeat/output.ts | 94 -------- src/cli/heartbeat/run-commands.ts | 75 ------- src/cli/heartbeat/store.ts | 8 - src/cli/heartbeat/task-commands.ts | 167 -------------- src/cli/heartbeat/types.ts | 12 -- src/cli/heartbeat/worker.ts | 114 ---------- src/cli/main.ts | 23 +- src/core/heartbeat/README.md | 8 + .../trpc/control-plane/heartbeat.ts | 24 +++ src/server/routes/trpc/control-plane.ts | 19 ++ src/server/routes/trpc/schema.ts | 10 + 28 files changed, 917 insertions(+), 703 deletions(-) create mode 100644 src/cli-v2/commands/control-plane-command-runtime.ts create mode 100644 src/cli-v2/commands/heartbeat-command.ts rename src/{cli => cli-v2/commands}/heartbeat/args.ts (100%) create mode 100644 src/cli-v2/commands/heartbeat/duration.ts create mode 100644 src/cli-v2/commands/heartbeat/output.ts create mode 100644 src/cli-v2/commands/heartbeat/run-commands.ts rename src/{cli => cli-v2/commands}/heartbeat/summary.ts (87%) create mode 100644 src/cli-v2/commands/heartbeat/task-commands.ts create mode 100644 src/cli-v2/commands/heartbeat/types.ts create mode 100644 src/cli-v2/commands/heartbeat/worker.ts delete mode 100644 src/cli/heartbeat/duration.ts delete mode 100644 src/cli/heartbeat/output.ts delete mode 100644 src/cli/heartbeat/run-commands.ts delete mode 100644 src/cli/heartbeat/store.ts delete mode 100644 src/cli/heartbeat/task-commands.ts delete mode 100644 src/cli/heartbeat/types.ts delete mode 100644 src/cli/heartbeat/worker.ts diff --git a/src/__tests__/integration/control-plane/control-plane-heartbeat-mutations.test.ts b/src/__tests__/integration/control-plane/control-plane-heartbeat-mutations.test.ts index 65ad8370..d29d89ef 100644 --- a/src/__tests__/integration/control-plane/control-plane-heartbeat-mutations.test.ts +++ b/src/__tests__/integration/control-plane/control-plane-heartbeat-mutations.test.ts @@ -327,6 +327,49 @@ describe('control-plane heartbeat mutations', () => { const runDetail = await caller.heartbeatRun({ taskId: 'repo-check', runId: 'run_heartbeat_1' }); expect(runDetail.run).toMatchObject({ taskId: 'repo-check', runId: 'run_heartbeat_1', loadedCheckpoint: true }); }); + + it('exposes due-task execution through the control-plane router', async () => { + const workspaceRoot = mkdtempSync(join(tmpdir(), 'heddle-cp-heartbeat-due-workspace-')); + const stateRoot = mkdtempSync(join(tmpdir(), 'heddle-cp-heartbeat-due-router-')); + const store = new FileHeartbeatTaskService({ dir: join(stateRoot, 'heartbeat') }); + await store.saveTask(createTask({ + enabled: false, + schedule: { + intervalMs: 60_000, + nextRunAt: undefined, + }, + state: { + status: 'idle', + }, + })); + const catalog = RuntimeWorkspaceService.ensureCatalog({ workspaceRoot, stateRoot }); + const activeWorkspace = catalog.workspaces[0]; + if (!activeWorkspace) { + throw new Error('expected default workspace'); + } + + const caller = controlPlaneRouter.createCaller({ + workspaceRoot, + stateRoot, + activeWorkspaceId: activeWorkspace.id, + activeWorkspace, + workspaces: catalog.workspaces, + runtimeHost: null, + logger: pino({ level: 'silent' }), + }); + + const result = await caller.heartbeatRunDueTasks({ + workspaceId: activeWorkspace.id, + model: 'gpt-5.4', + }); + + expect(result).toMatchObject({ + checked: 0, + ran: 0, + failed: 0, + records: [], + }); + }); }); function createHeartbeatResult( diff --git a/src/__tests__/unit/cli-v2/chat-v2-runtime.test.ts b/src/__tests__/unit/cli-v2/chat-v2-runtime.test.ts index e8655f99..e8f82333 100644 --- a/src/__tests__/unit/cli-v2/chat-v2-runtime.test.ts +++ b/src/__tests__/unit/cli-v2/chat-v2-runtime.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, vi } from 'vitest'; -import { formatChatV2RuntimeNotice, resolveChatV2Runtime } from '@/cli-v2/commands/chat-v2-command.js'; +import { ControlPlaneCommandRuntimeService } from '@/cli-v2/commands/control-plane-command-runtime.js'; import type { ResolvedRuntimeHost } from '@/core/runtime/daemon/index.js'; import type { HeddleControlPlaneServerHandle } from '@/server/index.js'; @@ -12,7 +12,7 @@ describe('chat-v2 runtime bootstrap', () => { it('attaches to a fresh live control-plane server', async () => { const startServer = vi.fn(); - const runtime = await resolveChatV2Runtime({ + const runtime = await ControlPlaneCommandRuntimeService.resolve({ workspaceRoot: '/repo', stateDir: '.heddle', preferApiKey: false, @@ -26,7 +26,7 @@ describe('chat-v2 runtime bootstrap', () => { trpcUrl: 'http://127.0.0.1:8765/trpc', serverId: 'server-1', }); - expect(formatChatV2RuntimeNotice(runtime)).toContain('attaching chat-v2'); + expect(ControlPlaneCommandRuntimeService.formatNotice(runtime, 'chat-v2')).toContain('attaching chat-v2'); }); it('starts an embedded control-plane server when no live server exists', async () => { @@ -38,7 +38,7 @@ describe('chat-v2 runtime bootstrap', () => { close, })); - const runtime = await resolveChatV2Runtime({ + const runtime = await ControlPlaneCommandRuntimeService.resolve({ workspaceRoot: '/repo', stateDir: '.heddle-test', preferApiKey: true, @@ -65,7 +65,7 @@ describe('chat-v2 runtime bootstrap', () => { }); await runtime.close(); expect(close).toHaveBeenCalledTimes(1); - expect(formatChatV2RuntimeNotice(runtime)).toContain('browser=http://127.0.0.1:8123'); + expect(ControlPlaneCommandRuntimeService.formatNotice(runtime, 'chat-v2')).toContain('browser=http://127.0.0.1:8123'); }); it('starts embedded when force-owner-conflict bypasses a live server', async () => { @@ -75,7 +75,7 @@ describe('chat-v2 runtime bootstrap', () => { port: 8765, })); - const runtime = await resolveChatV2Runtime({ + const runtime = await ControlPlaneCommandRuntimeService.resolve({ workspaceRoot: '/repo', stateDir: '.heddle', preferApiKey: false, diff --git a/src/__tests__/unit/tui/heartbeat-cli.test.ts b/src/__tests__/unit/tui/heartbeat-cli.test.ts index 34df1aff..1a5887c5 100644 --- a/src/__tests__/unit/tui/heartbeat-cli.test.ts +++ b/src/__tests__/unit/tui/heartbeat-cli.test.ts @@ -1,4 +1,7 @@ -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; +import { ClientSharedProxyApiService } from '@/client-shared/api/proxy.js'; +import { ControlPlaneCommandRuntimeService } from '@/cli-v2/commands/control-plane-command-runtime.js'; +import { runHeartbeatCli } from '@/cli-v2/commands/heartbeat-command.js'; import { formatDurationMs, parseDurationMs, parseHeartbeatArgs } from '../../../cli/heartbeat.js'; describe('heartbeat CLI helpers', () => { @@ -80,4 +83,47 @@ describe('heartbeat CLI helpers', () => { expect(() => parseDurationMs('soon')).toThrow('Invalid duration'); expect(() => parseDurationMs('0s')).toThrow('Invalid duration'); }); + + it('routes heartbeat task listing through the control-plane API', async () => { + const query = vi.fn(async () => ({ workspaceId: 'workspace-1', tasks: [] })); + const runtime = { + kind: 'attached' as const, + trpcUrl: 'http://127.0.0.1:8765/trpc', + endpoint: { + host: '127.0.0.1', + port: 8765, + }, + serverId: 'server-1', + close: vi.fn(async () => undefined), + }; + const resolve = vi.spyOn(ControlPlaneCommandRuntimeService, 'resolve').mockResolvedValue(runtime); + const createClient = vi.spyOn(ClientSharedProxyApiService, 'createClient').mockReturnValue({ + controlPlane: { + heartbeatTasks: { + query, + }, + }, + } as never); + const stdout = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + + try { + await runHeartbeatCli(['task', 'list'], { + workspaceRoot: '/repo', + activeWorkspaceId: 'workspace-1', + stateDir: '.heddle', + preferApiKey: true, + runtimeHost: { + kind: 'none', + registryPath: '/registry.json', + }, + }); + } finally { + resolve.mockRestore(); + createClient.mockRestore(); + stdout.mockRestore(); + } + + expect(query).toHaveBeenCalledWith({ workspaceId: 'workspace-1' }); + expect(runtime.close).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/cli-v2/commands/README.md b/src/cli-v2/commands/README.md index 52f95a00..997c3c28 100644 --- a/src/cli-v2/commands/README.md +++ b/src/cli-v2/commands/README.md @@ -47,10 +47,10 @@ domain should have: | `heddle init` | `cli-v2` command adapter delegates `.heddle/config.json` path/default/template behavior to `ProjectConfigService`. | Direct management adapter over the core project-config service/schema contract. | | `heddle memory status/list/read/search` | Direct memory visibility/catalog service calls implemented inline in `src/cli/main.ts`. | Direct management adapter calling documented memory service contracts. | | `heddle memory init/validate/maintain` | Direct core memory services; maintenance resolves model/credentials locally. | Direct management adapter, once memory README/public methods explicitly cover validation, repair, backlog, and credential expectations. | -| `heddle heartbeat task ...` | Legacy CLI constructs and mutates heartbeat task objects directly. | Use existing control-plane heartbeat APIs or explicit heartbeat service methods; command code must not own task/schedule mutation policy. | -| `heddle heartbeat runs ...` | Legacy CLI reads run records through heartbeat store abstractions. | Read through the same API/service shape as web-v2. | -| `heddle heartbeat run` | Legacy CLI worker executes due tasks locally. | Server-backed runtime command. CLI requests execution from the live/embedded control-plane server. | -| `heddle heartbeat start` | Legacy CLI starts a long-running scheduler loop. | Server-backed lifecycle command. Start/attach to the control-plane server and report scheduler status; do not run a separate CLI scheduler loop. | +| `heddle heartbeat task ...` | `cli-v2` command adapter attaches/embeds the control-plane server and calls heartbeat task API procedures. | API-backed management command; command code must not own task/schedule mutation policy. | +| `heddle heartbeat runs ...` | `cli-v2` command adapter reads heartbeat run views through control-plane API procedures. | API-backed read command using the same run view shape as web-v2. | +| `heddle heartbeat run` | `cli-v2` command adapter requests task execution or due-task execution through the live/embedded control-plane server. | Server-backed runtime command; no local CLI scheduler worker. | +| `heddle heartbeat start` | `cli-v2` command adapter creates/updates a task through API and reports the server-backed scheduler. Embedded mode keeps the control-plane server alive until Ctrl+C. | Server-backed lifecycle command; do not run a separate CLI scheduler loop. | | `heddle eval` | Local eval harness adapter over core eval modules. | Direct dev/management adapter unless remote/API evals become a product goal. | ## Migration Order diff --git a/src/cli-v2/commands/chat-v2-command.ts b/src/cli-v2/commands/chat-v2-command.ts index e2f7518a..77bcbf93 100644 --- a/src/cli-v2/commands/chat-v2-command.ts +++ b/src/cli-v2/commands/chat-v2-command.ts @@ -1,11 +1,6 @@ -import { resolve } from 'node:path'; import type { ResolvedRuntimeHost } from '@/core/runtime/daemon/index.js'; -import type { HeddleControlPlaneServerHandle, HeddleControlPlaneServerOptions } from '@/server/index.js'; -import { createServerLogger, startHeddleControlPlaneServer } from '@/server/index.js'; import { startChatCliV2 } from '../index.js'; - -const DEFAULT_CONTROL_PLANE_HOST = '127.0.0.1'; -const DEFAULT_CONTROL_PLANE_PORT = 8765; +import { ControlPlaneCommandRuntimeService } from './control-plane-command-runtime.js'; export type ChatCliV2CommandOptions = { workspaceRoot: string; @@ -20,35 +15,11 @@ export type ChatCliV2CommandOptions = { forceOwnerConflict: boolean; }; -export type ChatV2RuntimeInput = { - workspaceRoot: string; - stateDir: string; - preferApiKey: boolean; - runtimeHost: ResolvedRuntimeHost; - forceOwnerConflict: boolean; -}; - -export type ChatV2Runtime = { - kind: 'attached' | 'embedded'; - trpcUrl: string; - endpoint: { - host: string; - port: number; - }; - serverId: string; - close: () => Promise; -}; - -type ChatV2RuntimeDependencies = { - startServer?: (options: HeddleControlPlaneServerOptions) => Promise; - createLogger?: (stateRoot: string) => HeddleControlPlaneServerOptions['logger']; -}; - export async function runChatCliV2Command(options: ChatCliV2CommandOptions): Promise { - const runtime = await resolveChatV2Runtime(options); - process.stdout.write(`${formatChatV2RuntimeNotice(runtime)}\n`); + const runtime = await ControlPlaneCommandRuntimeService.resolve(options); + process.stdout.write(`${ControlPlaneCommandRuntimeService.formatNotice(runtime, 'chat-v2')}\n`); const uninstallRuntimeShutdown = - runtime.kind === 'embedded' ? installChatV2EmbeddedRuntimeShutdown(runtime) : () => undefined; + runtime.kind === 'embedded' ? ControlPlaneCommandRuntimeService.installEmbeddedShutdown(runtime, 'chat-v2') : () => undefined; const app = startChatCliV2({ trpcUrl: runtime.trpcUrl, workspaceId: options.activeWorkspaceId, @@ -65,90 +36,3 @@ export async function runChatCliV2Command(options: ChatCliV2CommandOptions): Pro await runtime.close(); } } - -export async function resolveChatV2Runtime( - input: ChatV2RuntimeInput, - dependencies: ChatV2RuntimeDependencies = {}, -): Promise { - if (!input.forceOwnerConflict && input.runtimeHost.kind === 'server' && !input.runtimeHost.stale) { - return { - kind: 'attached', - trpcUrl: buildTrpcUrl(input.runtimeHost.endpoint), - endpoint: input.runtimeHost.endpoint, - serverId: input.runtimeHost.serverId, - close: async () => undefined, - }; - } - - const startServer = dependencies.startServer ?? startHeddleControlPlaneServer; - const stateRoot = resolve(input.workspaceRoot, input.stateDir); - const logger = dependencies.createLogger?.(stateRoot) ?? createServerLogger({ - stateRoot, - console: false, - }); - const handle = await startServer({ - mode: 'embedded-chat', - workspaceRoot: input.workspaceRoot, - stateRoot, - preferApiKey: input.preferApiKey, - host: DEFAULT_CONTROL_PLANE_HOST, - port: DEFAULT_CONTROL_PLANE_PORT, - logger, - }); - - return { - kind: 'embedded', - trpcUrl: buildTrpcUrl(handle.endpoint), - endpoint: handle.endpoint, - serverId: handle.serverId, - close: handle.close, - }; -} - -export function formatChatV2RuntimeNotice(runtime: ChatV2Runtime): string { - if (runtime.kind === 'attached') { - return [ - 'Heddle notice: attaching chat-v2 to the live control-plane server.', - `server=http://${runtime.endpoint.host}:${runtime.endpoint.port}`, - `serverId=${runtime.serverId}`, - ].join(' '); - } - - return [ - 'Heddle notice: started embedded chat-v2 control-plane server.', - `server=http://${runtime.endpoint.host}:${runtime.endpoint.port}`, - `browser=http://${runtime.endpoint.host}:${runtime.endpoint.port}`, - `serverId=${runtime.serverId}`, - ].join(' '); -} - -export function installChatV2EmbeddedRuntimeShutdown(runtime: ChatV2Runtime): () => void { - let shuttingDown = false; - const shutdown = (signal: NodeJS.Signals) => { - if (shuttingDown) { - return; - } - - shuttingDown = true; - runtime.close() - .catch((error) => { - process.stderr.write(`Heddle embedded chat-v2 server failed during ${signal} shutdown: ${String(error)}\n`); - process.exitCode = 1; - }) - .finally(() => { - process.exit(); - }); - }; - - process.once('SIGINT', shutdown); - process.once('SIGTERM', shutdown); - - return () => { - process.off('SIGINT', shutdown); - process.off('SIGTERM', shutdown); - }; -} - -function buildTrpcUrl(endpoint: { host: string; port: number }): string { - return `http://${endpoint.host}:${endpoint.port}/trpc`; -} diff --git a/src/cli-v2/commands/control-plane-command-runtime.ts b/src/cli-v2/commands/control-plane-command-runtime.ts new file mode 100644 index 00000000..f31750af --- /dev/null +++ b/src/cli-v2/commands/control-plane-command-runtime.ts @@ -0,0 +1,127 @@ +import { resolve } from 'node:path'; +import type { ResolvedRuntimeHost } from '@/core/runtime/daemon/index.js'; +import type { HeddleControlPlaneServerHandle, HeddleControlPlaneServerOptions } from '@/server/index.js'; +import { createServerLogger, startHeddleControlPlaneServer } from '@/server/index.js'; + +const DEFAULT_CONTROL_PLANE_HOST = '127.0.0.1'; +const DEFAULT_CONTROL_PLANE_PORT = 8765; + +export type ControlPlaneCommandRuntimeInput = { + workspaceRoot: string; + stateDir: string; + preferApiKey: boolean; + runtimeHost: ResolvedRuntimeHost; + forceOwnerConflict: boolean; +}; + +export type ControlPlaneCommandRuntime = { + kind: 'attached' | 'embedded'; + trpcUrl: string; + endpoint: { + host: string; + port: number; + }; + serverId: string; + close: () => Promise; +}; + +type ControlPlaneCommandRuntimeDependencies = { + startServer?: (options: HeddleControlPlaneServerOptions) => Promise; + createLogger?: (stateRoot: string) => HeddleControlPlaneServerOptions['logger']; +}; + +/** + * Owns command-edge control-plane transport bootstrap. + * + * Runtime commands attach to a live server when available, or start an embedded + * control-plane server when needed. After this bootstrap step, commands should + * use the shared control-plane API rather than core runtime services. + */ +export class ControlPlaneCommandRuntimeService { + static async resolve( + input: ControlPlaneCommandRuntimeInput, + dependencies: ControlPlaneCommandRuntimeDependencies = {}, + ): Promise { + if (!input.forceOwnerConflict && input.runtimeHost.kind === 'server' && !input.runtimeHost.stale) { + return { + kind: 'attached', + trpcUrl: ControlPlaneCommandRuntimeService.buildTrpcUrl(input.runtimeHost.endpoint), + endpoint: input.runtimeHost.endpoint, + serverId: input.runtimeHost.serverId, + close: async () => undefined, + }; + } + + const startServer = dependencies.startServer ?? startHeddleControlPlaneServer; + const stateRoot = resolve(input.workspaceRoot, input.stateDir); + const logger = dependencies.createLogger?.(stateRoot) ?? createServerLogger({ + stateRoot, + console: false, + }); + const handle = await startServer({ + mode: 'embedded-chat', + workspaceRoot: input.workspaceRoot, + stateRoot, + preferApiKey: input.preferApiKey, + host: DEFAULT_CONTROL_PLANE_HOST, + port: DEFAULT_CONTROL_PLANE_PORT, + logger, + }); + + return { + kind: 'embedded', + trpcUrl: ControlPlaneCommandRuntimeService.buildTrpcUrl(handle.endpoint), + endpoint: handle.endpoint, + serverId: handle.serverId, + close: handle.close, + }; + } + + static formatNotice(runtime: ControlPlaneCommandRuntime, surface: string): string { + if (runtime.kind === 'attached') { + return [ + `Heddle notice: attaching ${surface} to the live control-plane server.`, + `server=http://${runtime.endpoint.host}:${runtime.endpoint.port}`, + `serverId=${runtime.serverId}`, + ].join(' '); + } + + return [ + `Heddle notice: started embedded ${surface} control-plane server.`, + `server=http://${runtime.endpoint.host}:${runtime.endpoint.port}`, + `browser=http://${runtime.endpoint.host}:${runtime.endpoint.port}`, + `serverId=${runtime.serverId}`, + ].join(' '); + } + + static installEmbeddedShutdown(runtime: ControlPlaneCommandRuntime, label: string): () => void { + let shuttingDown = false; + const shutdown = (signal: NodeJS.Signals) => { + if (shuttingDown) { + return; + } + + shuttingDown = true; + runtime.close() + .catch((error) => { + process.stderr.write(`Heddle embedded ${label} server failed during ${signal} shutdown: ${String(error)}\n`); + process.exitCode = 1; + }) + .finally(() => { + process.exit(); + }); + }; + + process.once('SIGINT', shutdown); + process.once('SIGTERM', shutdown); + + return () => { + process.off('SIGINT', shutdown); + process.off('SIGTERM', shutdown); + }; + } + + private static buildTrpcUrl(endpoint: { host: string; port: number }): string { + return `http://${endpoint.host}:${endpoint.port}/trpc`; + } +} diff --git a/src/cli-v2/commands/heartbeat-command.ts b/src/cli-v2/commands/heartbeat-command.ts new file mode 100644 index 00000000..55c85429 --- /dev/null +++ b/src/cli-v2/commands/heartbeat-command.ts @@ -0,0 +1,69 @@ +import { parseHeartbeatArgs } from './heartbeat/args.js'; +import { printHeartbeatHelp } from './heartbeat/output.js'; +import { runHeartbeatRunsCli } from './heartbeat/run-commands.js'; +import { runHeartbeatTaskCli } from './heartbeat/task-commands.js'; +import { runHeartbeatWorkerCli, startHeartbeatCli } from './heartbeat/worker.js'; +import type { HeartbeatCliOptions } from './heartbeat/types.js'; +import { ClientSharedProxyApiService } from '@/client-shared/api/proxy.js'; +import { ControlPlaneCommandRuntimeService } from './control-plane-command-runtime.js'; + +export type { HeartbeatCliOptions } from './heartbeat/types.js'; +export { parseHeartbeatArgs } from './heartbeat/args.js'; +export { formatDurationMs, parseDurationMs } from './heartbeat/duration.js'; + +export async function runHeartbeatCli(args: string[], options: HeartbeatCliOptions = {}) { + const parsed = parseHeartbeatArgs(args); + + if (!parsed.command || parsed.command === 'help' || parsed.command === '--help' || parsed.command === '-h') { + printHeartbeatHelp(); + return; + } + + const runtime = await ControlPlaneCommandRuntimeService.resolve({ + workspaceRoot: options.workspaceRoot ?? process.cwd(), + stateDir: options.stateDir ?? '.heddle', + preferApiKey: Boolean(options.preferApiKey), + runtimeHost: options.runtimeHost ?? { kind: 'none', registryPath: '' }, + forceOwnerConflict: Boolean(options.forceOwnerConflict), + }); + process.stdout.write(`${ControlPlaneCommandRuntimeService.formatNotice(runtime, 'heartbeat')}\n`); + const context = { + client: ClientSharedProxyApiService.createClient({ url: runtime.trpcUrl }), + workspaceId: options.activeWorkspaceId ?? 'default', + options, + }; + + const uninstallRuntimeShutdown = + runtime.kind === 'embedded' ? ControlPlaneCommandRuntimeService.installEmbeddedShutdown(runtime, 'heartbeat') : () => undefined; + + try { + if (parsed.command === 'task') { + await runHeartbeatTaskCli(parsed, context); + return; + } + + if (parsed.command === 'run') { + await runHeartbeatWorkerCli(parsed, context); + return; + } + + if (parsed.command === 'runs') { + await runHeartbeatRunsCli(parsed, context); + return; + } + + if (parsed.command === 'start') { + await startHeartbeatCli(parsed, context); + if (runtime.kind === 'embedded' && !parsed.flags.once) { + process.stdout.write('Embedded heartbeat server running. Press Ctrl+C to stop.\n'); + await new Promise(() => undefined); + } + return; + } + + throw new Error(`Unknown heartbeat command: ${parsed.command}`); + } finally { + uninstallRuntimeShutdown(); + await runtime.close(); + } +} diff --git a/src/cli/heartbeat/args.ts b/src/cli-v2/commands/heartbeat/args.ts similarity index 100% rename from src/cli/heartbeat/args.ts rename to src/cli-v2/commands/heartbeat/args.ts diff --git a/src/cli-v2/commands/heartbeat/duration.ts b/src/cli-v2/commands/heartbeat/duration.ts new file mode 100644 index 00000000..8c512ed6 --- /dev/null +++ b/src/cli-v2/commands/heartbeat/duration.ts @@ -0,0 +1,50 @@ +import dayjs from 'dayjs'; +import duration from 'dayjs/plugin/duration.js'; + +dayjs.extend(duration); + +const DURATION_UNITS = { + ms: 'millisecond', + s: 'second', + m: 'minute', + h: 'hour', + d: 'day', +} as const; + +export function parseDurationMs(raw: string): number { + const match = raw.trim().match(/^(\d+)(ms|s|m|h|d)?$/); + if (!match) { + throw new Error(`Invalid duration: ${raw}`); + } + + const value = Number.parseInt(match[1] ?? '', 10); + if (!Number.isFinite(value) || value <= 0) { + throw new Error(`Invalid duration: ${raw}`); + } + + const unit = parseDurationUnit(match[2]); + return dayjs.duration(value, DURATION_UNITS[unit]).asMilliseconds(); +} + +export function formatDurationMs(value: number): string { + const interval = dayjs.duration(value); + const units = [ + ['day', 'd'], + ['hour', 'h'], + ['minute', 'm'], + ['second', 's'], + ] as const; + const readable = units.find(([unit]) => value % dayjs.duration(1, unit).asMilliseconds() === 0); + if (readable) { + const [unit, suffix] = readable; + return `${interval.as(unit)}${suffix}`; + } + return `${value}ms`; +} + +function parseDurationUnit(value: string | undefined): keyof typeof DURATION_UNITS { + if (!value || value in DURATION_UNITS) { + return (value ?? 'ms') as keyof typeof DURATION_UNITS; + } + throw new Error(`Invalid duration unit: ${value}`); +} diff --git a/src/cli-v2/commands/heartbeat/output.ts b/src/cli-v2/commands/heartbeat/output.ts new file mode 100644 index 00000000..1b736fd8 --- /dev/null +++ b/src/cli-v2/commands/heartbeat/output.ts @@ -0,0 +1,22 @@ +export function printHeartbeatHelp() { + process.stdout.write([ + 'Usage: heddle heartbeat ', + '', + 'Manage and run heartbeat tasks', + '', + 'Commands:', + ' task add add a heartbeat task', + ' task list list heartbeat tasks', + ' task show show a heartbeat task', + ' task enable enable a heartbeat task', + ' task disable disable a heartbeat task', + ' start create/update a task and use the server-backed scheduler', + ' run ask the server to run due tasks or one task now', + ' runs list list heartbeat run records', + ' runs show show a heartbeat run record', + '', + 'Duration examples:', + ' 30s, 15m, 1h, 2d', + '', + ].join('\n')); +} diff --git a/src/cli-v2/commands/heartbeat/run-commands.ts b/src/cli-v2/commands/heartbeat/run-commands.ts new file mode 100644 index 00000000..a7fbbc7c --- /dev/null +++ b/src/cli-v2/commands/heartbeat/run-commands.ts @@ -0,0 +1,141 @@ +import dayjs from 'dayjs'; +import type { ControlPlaneHeartbeatRunView } from '@/client-shared/api/types.js'; +import { truncate } from '@/core/utils/text.js'; +import type { ParsedHeartbeatArgs } from './args.js'; +import { parsePositiveInt, stringFlag } from './args.js'; +import { firstLine, stripHeartbeatDecisionLine } from './summary.js'; +import type { HeartbeatCliContext } from './types.js'; + +export async function runHeartbeatRunsCli( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + switch (parsed.subcommand) { + case 'list': + case undefined: + await listHeartbeatRuns(parsed, context); + return; + case 'show': + await showHeartbeatRun(parsed, context); + return; + default: + throw new Error(`Unknown heartbeat runs command: ${parsed.subcommand}`); + } +} + +async function listHeartbeatRuns( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + const taskId = stringFlag(parsed.flags, 'task') ?? parsed.rest[0]; + const limit = parsePositiveInt(stringFlag(parsed.flags, 'limit')) ?? 10; + const { runs } = await context.client.controlPlane.heartbeatRuns.query({ + workspaceId: context.workspaceId, + taskId, + limit, + }); + if (!runs.length) { + process.stdout.write(taskId ? `No heartbeat runs found for task ${taskId}.\n` : 'No heartbeat runs found.\n'); + return; + } + + process.stdout.write(`${runs.map(formatRunListItem).join('\n\n')}\n`); +} + +async function showHeartbeatRun( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + const id = parsed.rest[0] ?? stringFlag(parsed.flags, 'id') ?? 'latest'; + const taskId = stringFlag(parsed.flags, 'task'); + const run = + taskId ? + (await context.client.controlPlane.heartbeatRun.query({ + workspaceId: context.workspaceId, + taskId, + runId: id, + })).run + : await findHeartbeatRunById(context, id); + + if (!run || (taskId && run.taskId !== taskId)) { + throw new Error(taskId ? `Heartbeat run not found for task ${taskId}: ${id}` : `Heartbeat run not found: ${id}`); + } + + process.stdout.write(formatRunDetail(run)); +} + +export function formatRunListItem(run: ControlPlaneHeartbeatRunView): string { + return [ + `Run ${run.runId}`, + ` task: ${run.taskId}`, + ` result: ${run.result.decision}, ${run.result.outcome}`, + ` finished: ${formatTimestamp(run.createdAt)}`, + run.result.usage ? ` usage: ${formatUsage(run.result.usage)}` : undefined, + ` summary: ${truncate(firstLine(stripHeartbeatDecisionLine(run.result.summary)), 140)}`, + ].filter((line): line is string => Boolean(line)).join('\n'); +} + +export function formatRunDetail(run: ControlPlaneHeartbeatRunView): string { + return [ + `Heartbeat run ${run.runId}`, + '', + formatSection('Overview', [ + `task: ${run.taskId}`, + `result: ${run.result.decision}, ${run.result.outcome}`, + `finished: ${formatTimestamp(run.createdAt)}`, + `checkpoint: ${run.loadedCheckpoint ? 'loaded' : 'not loaded'}`, + run.result.usage ? `usage: ${formatUsage(run.result.usage)}` : undefined, + ]), + formatSection('Task', [run.task.task]), + formatSection('Summary', [ + stripHeartbeatDecisionLine(run.result.summary).trim() || run.result.summary.trim(), + ]), + '', + ].filter((line): line is string => Boolean(line)).join('\n\n'); +} + +function formatTimestamp(value: string): string { + const timestamp = dayjs(value); + return timestamp.isValid() ? timestamp.format('YYYY-MM-DD HH:mm:ss') : value; +} + +function formatUsage(usage: { + inputTokens: number; + outputTokens: number; + totalTokens: number; + requests?: number; +}): string { + return [ + `input ${usage.inputTokens}`, + `output ${usage.outputTokens}`, + `total ${usage.totalTokens}`, + usage.requests === undefined ? undefined : `requests ${usage.requests}`, + ].filter((part): part is string => Boolean(part)).join(', '); +} + +function formatSection(title: string, lines: Array): string | undefined { + const body = lines.filter((line): line is string => Boolean(line)); + if (!body.length) { + return undefined; + } + return [`${title}:`, ...body.map((line) => indentBlock(line))].join('\n'); +} + +function indentBlock(value: string): string { + return value.split(/\r?\n/).map((line) => ` ${line}`).join('\n'); +} + +async function findHeartbeatRunById( + context: HeartbeatCliContext, + id: string, +): Promise { + const { runs } = await context.client.controlPlane.heartbeatRuns.query({ + workspaceId: context.workspaceId, + limit: 100, + }); + if (id === 'latest') { + return runs[0] ?? null; + } + + return runs.find((run) => run.id === id || run.runId === id) ?? null; +} diff --git a/src/cli/heartbeat/summary.ts b/src/cli-v2/commands/heartbeat/summary.ts similarity index 87% rename from src/cli/heartbeat/summary.ts rename to src/cli-v2/commands/heartbeat/summary.ts index 88095a02..b8915127 100644 --- a/src/cli/heartbeat/summary.ts +++ b/src/cli-v2/commands/heartbeat/summary.ts @@ -1,4 +1,4 @@ -import { truncate } from '../../core/utils/text.js'; +import { truncate } from '@/core/utils/text.js'; export function stripHeartbeatDecisionLine(summary: string): string { return summary.replace(/\n?\s*HEARTBEAT_DECISION:\s*(continue|pause|complete|escalate)\s*$/i, ''); diff --git a/src/cli-v2/commands/heartbeat/task-commands.ts b/src/cli-v2/commands/heartbeat/task-commands.ts new file mode 100644 index 00000000..6e6bf26b --- /dev/null +++ b/src/cli-v2/commands/heartbeat/task-commands.ts @@ -0,0 +1,204 @@ +import dayjs from 'dayjs'; +import type { ControlPlaneHeartbeatTaskView } from '@/client-shared/api/types.js'; +import { truncate } from '@/core/utils/text.js'; +import type { ParsedHeartbeatArgs } from './args.js'; +import { booleanFlag, parsePositiveInt, stringFlag } from './args.js'; +import { formatDurationMs, parseDurationMs } from './duration.js'; +import type { HeartbeatCliContext } from './types.js'; + +export async function runHeartbeatTaskCli( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + switch (parsed.subcommand) { + case 'add': + await addHeartbeatTask(parsed, context); + return; + case 'list': + case undefined: + await listHeartbeatTasks(context); + return; + case 'enable': + await setHeartbeatTaskEnabled(parsed, context, true); + return; + case 'disable': + await setHeartbeatTaskEnabled(parsed, context, false); + return; + case 'show': + await showHeartbeatTask(parsed, context); + return; + default: + throw new Error(`Unknown heartbeat task command: ${parsed.subcommand}`); + } +} + +async function addHeartbeatTask( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + const id = stringFlag(parsed.flags, 'id') ?? parsed.rest[0]; + const taskText = stringFlag(parsed.flags, 'task') ?? stringFlag(parsed.flags, 'goal') ?? parsed.rest.slice(id ? 1 : 0).join(' '); + if (!id || !taskText.trim()) { + throw new Error('Usage: heddle heartbeat task add --id --task "" [--every 15m]'); + } + + const result = await context.client.controlPlane.heartbeatTaskCreate.mutate({ + workspaceId: context.workspaceId, + id, + name: stringFlag(parsed.flags, 'name'), + task: taskText.trim(), + enabled: !booleanFlag(parsed.flags, 'disabled'), + continuationMode: parseContinuationMode(stringFlag(parsed.flags, 'continuation')), + intervalMs: parseDurationMs(stringFlag(parsed.flags, 'every') ?? stringFlag(parsed.flags, 'interval') ?? '1h'), + defer: booleanFlag(parsed.flags, 'defer'), + model: stringFlag(parsed.flags, 'model') ?? context.options.model, + maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? context.options.maxSteps, + searchIgnoreDirs: context.options.searchIgnoreDirs, + systemContext: context.options.systemContext, + }); + + process.stdout.write(`Saved heartbeat task ${result.task.taskId} (${formatDurationMs(result.task.schedule.intervalMs)} interval)\n`); +} + +async function listHeartbeatTasks(context: HeartbeatCliContext) { + const { tasks } = await context.client.controlPlane.heartbeatTasks.query({ + workspaceId: context.workspaceId, + }); + if (!tasks.length) { + process.stdout.write('No heartbeat tasks found.\n'); + return; + } + + process.stdout.write(`${tasks.map(formatTaskListItem).join('\n\n')}\n`); +} + +async function setHeartbeatTaskEnabled( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, + enabled: boolean, +) { + const id = stringFlag(parsed.flags, 'id') ?? parsed.rest[0]; + if (!id) { + throw new Error(`Usage: heddle heartbeat task ${enabled ? 'enable' : 'disable'} `); + } + + await ( + enabled ? + context.client.controlPlane.heartbeatTaskEnable.mutate({ workspaceId: context.workspaceId, taskId: id }) + : context.client.controlPlane.heartbeatTaskDisable.mutate({ workspaceId: context.workspaceId, taskId: id }) + ); + process.stdout.write(`${enabled ? 'Enabled' : 'Disabled'} heartbeat task ${id}\n`); +} + +async function showHeartbeatTask( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + const id = stringFlag(parsed.flags, 'id') ?? parsed.rest[0]; + if (!id) { + throw new Error('Usage: heddle heartbeat task show '); + } + + const { task } = await context.client.controlPlane.heartbeatTask.query({ + workspaceId: context.workspaceId, + taskId: id, + runLimit: 3, + }); + process.stdout.write(formatTaskDetail(task)); +} + +export function formatTaskListItem(task: ControlPlaneHeartbeatTaskView): string { + const state = task.state; + return [ + `Task ${task.taskId}`, + task.name ? ` name: ${task.name}` : undefined, + ` state: ${task.enabled ? 'enabled' : 'disabled'}, ${state.status}`, + ` schedule: every ${formatDurationMs(task.schedule.intervalMs)}, next ${formatTimestamp(task.schedule.nextRunAt)}`, + ` model: ${task.runtime?.model ?? 'default'}`, + ` prompt: ${truncate(firstTextLine(task.task), 140)}`, + state.progress ? ` progress: ${truncate(state.progress, 140)}` : undefined, + state.runId ? ` run: ${state.runId} (${state.resumable === false ? 'not resumable' : 'resumable'}, checkpoint ${state.loadedCheckpoint ? 'loaded' : 'not loaded'})` : undefined, + state.result ? ` last: ${state.result.decision}, ${state.result.outcome} at ${formatTimestamp(state.runAt)}` : undefined, + state.result?.usage ? ` usage: ${formatUsage(state.result.usage)}` : undefined, + state.error ? ` error: ${truncate(state.error, 140)}` : undefined, + ].filter((line): line is string => Boolean(line)).join('\n'); +} + +export function formatTaskDetail(task: ControlPlaneHeartbeatTaskView): string { + const state = task.state; + return [ + `Heartbeat task ${task.taskId}`, + '', + formatSection('Overview', [ + task.name ? `name: ${task.name}` : undefined, + `state: ${task.enabled ? 'enabled' : 'disabled'}, ${state.status}`, + `model: ${task.runtime?.model ?? 'default'}`, + ]), + formatSection('Schedule', [ + `every: ${formatDurationMs(task.schedule.intervalMs)}`, + `next: ${formatTimestamp(task.schedule.nextRunAt)}`, + ]), + formatSection('Last Run', [ + state.result ? `decision: ${state.result.decision}` : 'decision: none', + state.result ? `outcome: ${state.result.outcome}` : undefined, + state.runAt ? `time: ${formatTimestamp(state.runAt)}` : undefined, + state.runId ? `run id: ${state.runId}` : undefined, + state.runId ? `resumable: ${state.resumable === false ? 'no' : 'yes'}` : undefined, + state.runId ? `checkpoint: ${state.loadedCheckpoint ? 'loaded' : 'not loaded'}` : undefined, + state.result?.usage ? `usage: ${formatUsage(state.result.usage)}` : undefined, + state.progress ? `progress: ${state.progress}` : undefined, + state.error ? `error: ${state.error}` : undefined, + ]), + formatSection('Task', [task.task]), + state.result ? formatSection('Last Summary', [state.result.summary]) : undefined, + '', + ].filter((line): line is string => Boolean(line)).join('\n\n'); +} + +function formatTimestamp(value: string | undefined): string { + if (!value) { + return 'now'; + } + const timestamp = dayjs(value); + return timestamp.isValid() ? timestamp.format('YYYY-MM-DD HH:mm:ss') : value; +} + +function formatUsage(usage: { + inputTokens: number; + outputTokens: number; + totalTokens: number; + requests?: number; +}): string { + return [ + `input ${usage.inputTokens}`, + `output ${usage.outputTokens}`, + `total ${usage.totalTokens}`, + usage.requests === undefined ? undefined : `requests ${usage.requests}`, + ].filter((part): part is string => Boolean(part)).join(', '); +} + +function firstTextLine(value: string): string { + return value.split(/\r?\n/).find((line) => line.trim().length > 0)?.trim() ?? ''; +} + +function formatSection(title: string, lines: Array): string | undefined { + const body = lines.filter((line): line is string => Boolean(line)); + if (!body.length) { + return undefined; + } + return [`${title}:`, ...body.map((line) => indentBlock(line))].join('\n'); +} + +function indentBlock(value: string): string { + return value.split(/\r?\n/).map((line) => ` ${line}`).join('\n'); +} + +function parseContinuationMode(value: string | undefined): 'operator' | 'agent' | undefined { + if (!value) { + return undefined; + } + if (value === 'operator' || value === 'agent') { + return value; + } + throw new Error('Usage: --continuation '); +} diff --git a/src/cli-v2/commands/heartbeat/types.ts b/src/cli-v2/commands/heartbeat/types.ts new file mode 100644 index 00000000..80204101 --- /dev/null +++ b/src/cli-v2/commands/heartbeat/types.ts @@ -0,0 +1,21 @@ +import type { ControlPlaneProxyClient } from '@/client-shared/api/proxy.js'; +import type { ResolvedRuntimeHost } from '@/core/runtime/daemon/index.js'; + +export type HeartbeatCliOptions = { + model?: string; + maxSteps?: number; + workspaceRoot?: string; + activeWorkspaceId?: string; + stateDir?: string; + searchIgnoreDirs?: string[]; + systemContext?: string; + preferApiKey?: boolean; + runtimeHost?: ResolvedRuntimeHost; + forceOwnerConflict?: boolean; +}; + +export type HeartbeatCliContext = { + client: ControlPlaneProxyClient; + workspaceId: string; + options: HeartbeatCliOptions; +}; diff --git a/src/cli-v2/commands/heartbeat/worker.ts b/src/cli-v2/commands/heartbeat/worker.ts new file mode 100644 index 00000000..009576ed --- /dev/null +++ b/src/cli-v2/commands/heartbeat/worker.ts @@ -0,0 +1,107 @@ +import type { ParsedHeartbeatArgs } from './args.js'; +import { booleanFlag, parsePositiveInt, stringFlag } from './args.js'; +import { formatDurationMs, parseDurationMs } from './duration.js'; +import type { HeartbeatCliContext } from './types.js'; + +const DEFAULT_HEARTBEAT_TASK_ID = 'default'; +const DEFAULT_HEARTBEAT_TASK = [ + 'Run a periodic autonomous heartbeat for this workspace.', + 'Read HEARTBEAT.md if it exists, inspect recent project state when useful, continue safe low-risk maintenance work, and escalate when human input is needed.', +].join(' '); + +export async function runHeartbeatWorkerCli( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + const taskId = stringFlag(parsed.flags, 'task') ?? parsed.rest[0]; + if (taskId) { + const result = await context.client.controlPlane.heartbeatTaskRunNow.mutate({ + workspaceId: context.workspaceId, + taskId, + model: stringFlag(parsed.flags, 'model') ?? context.options.model, + maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? context.options.maxSteps, + preferApiKey: context.options.preferApiKey, + searchIgnoreDirs: context.options.searchIgnoreDirs, + systemContext: context.options.systemContext, + }); + process.stdout.write(`accepted=${result.accepted} task=${result.task.taskId} status=${result.task.state.status}\n`); + return; + } + + const result = await context.client.controlPlane.heartbeatRunDueTasks.mutate({ + workspaceId: context.workspaceId, + model: stringFlag(parsed.flags, 'model') ?? context.options.model, + maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? context.options.maxSteps, + preferApiKey: context.options.preferApiKey, + searchIgnoreDirs: context.options.searchIgnoreDirs, + systemContext: context.options.systemContext, + }); + process.stdout.write(`checked=${result.checked} ran=${result.ran} failed=${result.failed}\n`); +} + +export async function startHeartbeatCli( + parsed: ParsedHeartbeatArgs, + context: HeartbeatCliContext, +) { + const id = stringFlag(parsed.flags, 'id') ?? parsed.subcommand ?? DEFAULT_HEARTBEAT_TASK_ID; + const intervalMs = parseDurationMs(stringFlag(parsed.flags, 'every') ?? stringFlag(parsed.flags, 'interval') ?? '30m'); + const pollIntervalMs = parseDurationMs(stringFlag(parsed.flags, 'poll') ?? '60s'); + const existing = await findExistingTask(context, id); + const taskText = stringFlag(parsed.flags, 'task') ?? stringFlag(parsed.flags, 'goal') ?? existing?.task ?? DEFAULT_HEARTBEAT_TASK; + const task = + existing ? + (await context.client.controlPlane.heartbeatTaskUpdate.mutate({ + workspaceId: context.workspaceId, + taskId: id, + name: stringFlag(parsed.flags, 'name') ?? existing.name, + task: taskText.trim(), + enabled: true, + intervalMs, + model: stringFlag(parsed.flags, 'model') ?? existing.runtime?.model ?? context.options.model, + maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? existing.runtime?.maxSteps ?? context.options.maxSteps, + searchIgnoreDirs: context.options.searchIgnoreDirs, + systemContext: context.options.systemContext, + })).task + : (await context.client.controlPlane.heartbeatTaskCreate.mutate({ + workspaceId: context.workspaceId, + id, + name: stringFlag(parsed.flags, 'name'), + task: taskText.trim(), + enabled: true, + intervalMs, + defer: booleanFlag(parsed.flags, 'defer'), + model: stringFlag(parsed.flags, 'model') ?? context.options.model, + maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? context.options.maxSteps, + searchIgnoreDirs: context.options.searchIgnoreDirs, + systemContext: context.options.systemContext, + })).task; + + if (booleanFlag(parsed.flags, 'once')) { + await runHeartbeatWorkerCli({ + command: 'run', + subcommand: undefined, + rest: [], + flags: { + ...parsed.flags, + task: id, + }, + }, context); + return; + } + + process.stdout.write([ + `Heartbeat scheduler is server-backed for workspace ${context.workspaceId}.`, + `task=${task.taskId} status=${task.state.status} every=${formatDurationMs(intervalMs)} poll=${formatDurationMs(pollIntervalMs)}`, + 'Use `heddle daemon` for a standalone long-running server, or keep this embedded command running.', + ].join('\n') + '\n'); +} + +async function findExistingTask( + context: HeartbeatCliContext, + taskId: string, +) { + const { tasks } = await context.client.controlPlane.heartbeatTasks.query({ + workspaceId: context.workspaceId, + }); + return tasks.find((task) => task.taskId === taskId); +} diff --git a/src/cli/heartbeat.ts b/src/cli/heartbeat.ts index 6a4281d7..fb30f537 100644 --- a/src/cli/heartbeat.ts +++ b/src/cli/heartbeat.ts @@ -1,43 +1,9 @@ -import { parseHeartbeatArgs } from './heartbeat/args.js'; -import { createHeartbeatCliStore } from './heartbeat/store.js'; -import { printHeartbeatHelp } from './heartbeat/output.js'; -import { runHeartbeatRunsCli } from './heartbeat/run-commands.js'; -import { runHeartbeatTaskCli } from './heartbeat/task-commands.js'; -import { runHeartbeatWorkerCli, startHeartbeatCli } from './heartbeat/worker.js'; -import type { HeartbeatCliOptions } from './heartbeat/types.js'; - -export type { HeartbeatCliOptions } from './heartbeat/types.js'; -export { parseHeartbeatArgs } from './heartbeat/args.js'; -export { formatDurationMs, parseDurationMs } from './heartbeat/duration.js'; - -export async function runHeartbeatCli(args: string[], options: HeartbeatCliOptions = {}) { - const parsed = parseHeartbeatArgs(args); - const store = createHeartbeatCliStore(options); - - if (!parsed.command || parsed.command === 'help' || parsed.command === '--help' || parsed.command === '-h') { - printHeartbeatHelp(); - return; - } - - if (parsed.command === 'task') { - await runHeartbeatTaskCli(parsed, store, options); - return; - } - - if (parsed.command === 'run') { - await runHeartbeatWorkerCli(parsed, store, options); - return; - } - - if (parsed.command === 'runs') { - await runHeartbeatRunsCli(parsed, store); - return; - } - - if (parsed.command === 'start') { - await startHeartbeatCli(parsed, store, options); - return; - } - - throw new Error(`Unknown heartbeat command: ${parsed.command}`); -} +// Legacy v1 compatibility export. Remove when the old `src/cli` command +// entrypoint is retired and heartbeat is imported only from `src/cli-v2`. +export { + formatDurationMs, + parseDurationMs, + parseHeartbeatArgs, + runHeartbeatCli, + type HeartbeatCliOptions, +} from '@/cli-v2/commands/heartbeat-command.js'; diff --git a/src/cli/heartbeat/duration.ts b/src/cli/heartbeat/duration.ts deleted file mode 100644 index 859d841f..00000000 --- a/src/cli/heartbeat/duration.ts +++ /dev/null @@ -1,36 +0,0 @@ -export function parseDurationMs(raw: string): number { - const match = raw.trim().match(/^(\d+)(ms|s|m|h|d)?$/); - if (!match) { - throw new Error(`Invalid duration: ${raw}`); - } - - const value = Number.parseInt(match[1] ?? '', 10); - if (!Number.isFinite(value) || value <= 0) { - throw new Error(`Invalid duration: ${raw}`); - } - - const unit = match[2] ?? 'ms'; - const multiplier = - unit === 'ms' ? 1 - : unit === 's' ? 1_000 - : unit === 'm' ? 60_000 - : unit === 'h' ? 60 * 60_000 - : 24 * 60 * 60_000; - return value * multiplier; -} - -export function formatDurationMs(value: number): string { - if (value % (24 * 60 * 60_000) === 0) { - return `${value / (24 * 60 * 60_000)}d`; - } - if (value % (60 * 60_000) === 0) { - return `${value / (60 * 60_000)}h`; - } - if (value % 60_000 === 0) { - return `${value / 60_000}m`; - } - if (value % 1_000 === 0) { - return `${value / 1_000}s`; - } - return `${value}ms`; -} diff --git a/src/cli/heartbeat/output.ts b/src/cli/heartbeat/output.ts deleted file mode 100644 index 9ea24386..00000000 --- a/src/cli/heartbeat/output.ts +++ /dev/null @@ -1,94 +0,0 @@ -import type { AgentHeartbeatEvent, HeartbeatSchedulerEvent } from '@/core/heartbeat/index.js'; -import { stripHeartbeatDecisionLine } from './summary.js'; - -export function printAgentLoopEvent(event: AgentHeartbeatEvent) { - switch (event.type) { - case 'loop.started': - process.stdout.write(`[heartbeat] agent started run=${event.runId} model=${event.model}\n`); - break; - case 'loop.resumed': - process.stdout.write(`[heartbeat] agent resumed from=${event.fromCheckpoint} priorTraceEvents=${event.priorTraceEvents}\n`); - break; - case 'tool.calling': - process.stdout.write(`[heartbeat] tool calling step=${event.step} tool=${event.tool}${event.requiresApproval ? ' approval=true' : ''}\n`); - break; - case 'tool.completed': - process.stdout.write(`[heartbeat] tool completed step=${event.step} tool=${event.tool} ok=${event.result.ok} durationMs=${event.durationMs}\n`); - break; - case 'assistant.stream': - if (event.done) { - process.stdout.write(`[heartbeat] assistant response complete step=${event.step}\n`); - } - break; - case 'heartbeat.decision': - process.stdout.write(`[heartbeat] decision=${event.decision} outcome=${event.outcome}\n`); - break; - case 'checkpoint.saved': - process.stdout.write(`[heartbeat] checkpoint saved step=${event.step}\n`); - break; - case 'escalation.required': - process.stdout.write(`[heartbeat] escalation required outcome=${event.outcome}\n`); - break; - case 'loop.finished': - process.stdout.write(`[heartbeat] agent finished outcome=${event.outcome}\n`); - break; - case 'trace': - break; - } -} - -export function printSchedulerEvent(event: HeartbeatSchedulerEvent) { - switch (event.type) { - case 'heartbeat.scheduler.started': - process.stdout.write('[heartbeat] scheduler started\n'); - break; - case 'heartbeat.scheduler.stopped': - process.stdout.write(`[heartbeat] scheduler stopped reason=${event.reason}\n`); - break; - case 'heartbeat.task.due': - process.stdout.write(`[heartbeat] task due id=${event.taskId}\n`); - break; - case 'heartbeat.task.started': - process.stdout.write(`[heartbeat] task started id=${event.taskId} loadedCheckpoint=${event.loadedCheckpoint} status=${event.status} progress=${event.progress}\n`); - break; - case 'heartbeat.task.finished': { - const { task, result } = event.record; - process.stdout.write([ - `[heartbeat] task finished id=${event.taskId} decision=${result.decision} outcome=${result.state.outcome} status=${task.state?.status ?? 'waiting'} enabled=${task.enabled} next=${task.schedule.nextRunAt ?? 'none'}`, - `[heartbeat] progress ${task.state?.progress ?? ''}`, - result.state.usage ? `[heartbeat] usage input=${result.state.usage.inputTokens} output=${result.state.usage.outputTokens} total=${result.state.usage.totalTokens} requests=${result.state.usage.requests}` : undefined, - '', - 'Heartbeat summary:', - stripHeartbeatDecisionLine(result.summary).trim() || result.summary.trim(), - '', - ].filter((line): line is string => line !== undefined).join('\n')); - break; - } - case 'heartbeat.task.failed': - process.stdout.write(`[heartbeat] task failed id=${event.taskId} status=${event.status} error=${event.error} next=${event.nextRunAt ?? 'none'}\n[heartbeat] progress ${event.progress}\n`); - break; - } -} - -export function printHeartbeatHelp() { - process.stdout.write([ - 'Usage: heddle heartbeat ', - '', - 'Manage and run heartbeat tasks', - '', - 'Commands:', - ' task add add a heartbeat task', - ' task list list heartbeat tasks', - ' task show show a heartbeat task', - ' task enable enable a heartbeat task', - ' task disable disable a heartbeat task', - ' start start the heartbeat scheduler convenience flow', - ' run run due heartbeat tasks once or in a poll loop', - ' runs list list heartbeat run records', - ' runs show show a heartbeat run record', - '', - 'Duration examples:', - ' 30s, 15m, 1h, 2d', - '', - ].join('\n')); -} diff --git a/src/cli/heartbeat/run-commands.ts b/src/cli/heartbeat/run-commands.ts deleted file mode 100644 index 87bb441f..00000000 --- a/src/cli/heartbeat/run-commands.ts +++ /dev/null @@ -1,75 +0,0 @@ -import type { ParsedHeartbeatArgs } from './args.js'; -import { parsePositiveInt, stringFlag } from './args.js'; -import { firstLine, stripHeartbeatDecisionLine } from './summary.js'; -import type { HeartbeatCliStore } from './types.js'; - -export async function runHeartbeatRunsCli( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, -) { - switch (parsed.subcommand) { - case 'list': - case undefined: - await listHeartbeatRuns(parsed, store); - return; - case 'show': - await showHeartbeatRun(parsed, store); - return; - default: - throw new Error(`Unknown heartbeat runs command: ${parsed.subcommand}`); - } -} - -async function listHeartbeatRuns( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, -) { - const taskId = stringFlag(parsed.flags, 'task') ?? parsed.rest[0]; - const limit = parsePositiveInt(stringFlag(parsed.flags, 'limit')) ?? 10; - const runs = await store.listRunRecords?.({ taskId, limit }); - if (!runs?.length) { - process.stdout.write(taskId ? `No heartbeat runs found for task ${taskId}.\n` : 'No heartbeat runs found.\n'); - return; - } - - for (const run of runs) { - const { result } = run.record; - process.stdout.write([ - `${run.id}`, - ` task=${run.taskId} run=${run.runId} decision=${result.decision} outcome=${result.state.outcome} finished=${run.createdAt}`, - result.state.usage ? ` usage input=${result.state.usage.inputTokens} output=${result.state.usage.outputTokens} total=${result.state.usage.totalTokens} requests=${result.state.usage.requests}` : undefined, - ` summary=${firstLine(stripHeartbeatDecisionLine(result.summary))}`, - ].filter((line): line is string => Boolean(line)).join('\n') + '\n'); - } -} - -async function showHeartbeatRun( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, -) { - const id = parsed.rest[0] ?? stringFlag(parsed.flags, 'id') ?? 'latest'; - const taskId = stringFlag(parsed.flags, 'task'); - const run = - id === 'latest' ? - (await store.listRunRecords?.({ taskId, limit: 1 }))?.[0] - : await store.loadRunRecord?.(id); - - if (!run || (taskId && run.taskId !== taskId)) { - throw new Error(taskId ? `Heartbeat run not found for task ${taskId}: ${id}` : `Heartbeat run not found: ${id}`); - } - - const { result, loadedCheckpoint } = run.record; - process.stdout.write([ - `Heartbeat run ${run.id}`, - `task=${run.taskId} run=${run.runId} loadedCheckpoint=${loadedCheckpoint}`, - `decision=${result.decision} outcome=${result.state.outcome} finished=${run.createdAt}`, - result.state.usage ? `usage input=${result.state.usage.inputTokens} output=${result.state.usage.outputTokens} total=${result.state.usage.totalTokens} requests=${result.state.usage.requests}` : undefined, - '', - 'Task:', - run.record.task.task, - '', - 'Summary:', - stripHeartbeatDecisionLine(result.summary).trim() || result.summary.trim(), - '', - ].filter((line): line is string => line !== undefined).join('\n')); -} diff --git a/src/cli/heartbeat/store.ts b/src/cli/heartbeat/store.ts deleted file mode 100644 index 24b657e1..00000000 --- a/src/cli/heartbeat/store.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { FileHeartbeatTaskService } from '@/core/heartbeat/index.js'; -import type { HeartbeatCliOptions, HeartbeatCliStore } from './types.js'; - -export function createHeartbeatCliStore(options: HeartbeatCliOptions): HeartbeatCliStore { - const workspaceRoot = options.workspaceRoot ?? process.cwd(); - const stateDir = options.stateDir ?? '.heddle'; - return new FileHeartbeatTaskService({ workspaceRoot, stateDir }); -} diff --git a/src/cli/heartbeat/task-commands.ts b/src/cli/heartbeat/task-commands.ts deleted file mode 100644 index c51c42ef..00000000 --- a/src/cli/heartbeat/task-commands.ts +++ /dev/null @@ -1,167 +0,0 @@ -import type { HeartbeatTask } from '@/core/heartbeat/index.js'; -import { resolve } from 'node:path'; -import type { ParsedHeartbeatArgs } from './args.js'; -import { booleanFlag, parsePositiveInt, stringFlag } from './args.js'; -import { formatDurationMs, parseDurationMs } from './duration.js'; -import type { HeartbeatCliOptions, HeartbeatCliStore } from './types.js'; -import { RuntimeWorkspaceService } from '@/core/runtime/workspaces/index.js'; - -export async function runHeartbeatTaskCli( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, - options: HeartbeatCliOptions, -) { - switch (parsed.subcommand) { - case 'add': - await addHeartbeatTask(parsed, store, options); - return; - case 'list': - case undefined: - await listHeartbeatTasks(store); - return; - case 'enable': - await setHeartbeatTaskEnabled(parsed, store, true); - return; - case 'disable': - await setHeartbeatTaskEnabled(parsed, store, false); - return; - case 'show': - await showHeartbeatTask(parsed, store); - return; - default: - throw new Error(`Unknown heartbeat task command: ${parsed.subcommand}`); - } -} - -async function addHeartbeatTask( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, - options: HeartbeatCliOptions, -) { - const id = stringFlag(parsed.flags, 'id') ?? parsed.rest[0]; - const taskText = stringFlag(parsed.flags, 'task') ?? stringFlag(parsed.flags, 'goal') ?? parsed.rest.slice(id ? 1 : 0).join(' '); - if (!id || !taskText.trim()) { - throw new Error('Usage: heddle heartbeat task add --id --task "" [--every 15m]'); - } - - const intervalMs = parseDurationMs(stringFlag(parsed.flags, 'every') ?? stringFlag(parsed.flags, 'interval') ?? '1h'); - const now = new Date(); - const workspaceRoot = options.workspaceRoot ?? process.cwd(); - const stateDir = options.stateDir ?? '.heddle'; - const workspace = RuntimeWorkspaceService.resolveContext({ - workspaceRoot, - stateRoot: resolve(workspaceRoot, stateDir), - }).activeWorkspace; - const task: HeartbeatTask = { - id, - workspaceId: workspace.id, - name: stringFlag(parsed.flags, 'name'), - task: taskText.trim(), - enabled: !booleanFlag(parsed.flags, 'disabled'), - schedule: { - intervalMs, - nextRunAt: booleanFlag(parsed.flags, 'defer') ? new Date(now.getTime() + intervalMs).toISOString() : new Date(now.getTime() - 1_000).toISOString(), - }, - runtime: { - model: stringFlag(parsed.flags, 'model') ?? options.model, - maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? options.maxSteps, - workspaceRoot, - stateDir: options.stateDir, - searchIgnoreDirs: options.searchIgnoreDirs, - systemContext: options.systemContext, - }, - }; - - await store.saveTask(task); - process.stdout.write(`Saved heartbeat task ${task.id} (${formatDurationMs(task.schedule.intervalMs)} interval)\n`); -} - -async function listHeartbeatTasks(store: HeartbeatCliStore) { - const tasks = await store.listTasks(); - if (!tasks.length) { - process.stdout.write('No heartbeat tasks found.\n'); - return; - } - - for (const task of tasks) { - const state = task.state; - process.stdout.write([ - `${task.enabled ? 'enabled ' : 'disabled'} ${task.id}${task.name ? ` (${task.name})` : ''}`, - ` status=${state?.status ?? 'idle'} every=${formatDurationMs(task.schedule.intervalMs)} next=${task.schedule.nextRunAt ?? 'now'} model=${task.runtime?.model ?? 'default'}`, - ` task=${task.task}`, - state?.progress ? ` progress=${state.progress}` : undefined, - state?.runId ? ` run=${state.runId} resumable=${state.resumable === false ? 'no' : 'yes'} loadedCheckpoint=${state.loadedCheckpoint ? 'yes' : 'no'}` : undefined, - state?.result?.state.usage ? ` usage input=${state.result.state.usage.inputTokens} output=${state.result.state.usage.outputTokens} total=${state.result.state.usage.totalTokens} requests=${state.result.state.usage.requests}` : undefined, - state?.result ? ` last=${state.result.decision} outcome=${state.result.state.outcome} runAt=${state.runAt ?? 'unknown'}` : undefined, - state?.error ? ` error=${state.error}` : undefined, - ].filter((line): line is string => Boolean(line)).join('\n') + '\n'); - } -} - -async function setHeartbeatTaskEnabled( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, - enabled: boolean, -) { - const id = stringFlag(parsed.flags, 'id') ?? parsed.rest[0]; - if (!id) { - throw new Error(`Usage: heddle heartbeat task ${enabled ? 'enable' : 'disable'} `); - } - - const tasks = await store.listTasks(); - const task = tasks.find((candidate) => candidate.id === id); - if (!task) { - throw new Error(`Heartbeat task not found: ${id}`); - } - - await store.saveTask({ - ...task, - enabled, - schedule: { - ...task.schedule, - nextRunAt: enabled && !task.schedule.nextRunAt ? new Date(Date.now() - 1_000).toISOString() : task.schedule.nextRunAt, - }, - state: { - ...task.state, - updatedAt: new Date().toISOString(), - }, - }); - process.stdout.write(`${enabled ? 'Enabled' : 'Disabled'} heartbeat task ${id}\n`); -} - -async function showHeartbeatTask( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, -) { - const id = stringFlag(parsed.flags, 'id') ?? parsed.rest[0]; - if (!id) { - throw new Error('Usage: heddle heartbeat task show '); - } - - const tasks = await store.listTasks(); - const task = tasks.find((candidate) => candidate.id === id); - if (!task) { - throw new Error(`Heartbeat task not found: ${id}`); - } - - const state = task.state; - process.stdout.write([ - `${task.enabled ? 'enabled ' : 'disabled'} ${task.id}${task.name ? ` (${task.name})` : ''}`, - `status=${state?.status ?? 'idle'} every=${formatDurationMs(task.schedule.intervalMs)} next=${task.schedule.nextRunAt ?? 'none'} model=${task.runtime?.model ?? 'default'}`, - '', - 'Task:', - task.task, - '', - state?.progress ? `Progress: ${state.progress}` : undefined, - state?.result ? `Last decision: ${state.result.decision}` : 'Last decision: none', - state?.result ? `Last outcome: ${state.result.state.outcome}` : undefined, - state?.runAt ? `Last run: ${state.runAt}` : undefined, - state?.runId ? `Last run id: ${state.runId}` : undefined, - state?.runId ? `Resumable: ${state.resumable === false ? 'no' : 'yes'}` : undefined, - state?.runId ? `Loaded checkpoint: ${state.loadedCheckpoint ? 'yes' : 'no'}` : undefined, - state?.result?.state.usage ? `Usage: input=${state.result.state.usage.inputTokens} output=${state.result.state.usage.outputTokens} total=${state.result.state.usage.totalTokens} requests=${state.result.state.usage.requests}` : undefined, - state?.error ? `Last error: ${state.error}` : undefined, - state?.result ? ['', 'Last summary:', state.result.summary].join('\n') : undefined, - '', - ].filter((line): line is string => Boolean(line)).join('\n')); -} diff --git a/src/cli/heartbeat/types.ts b/src/cli/heartbeat/types.ts deleted file mode 100644 index 0222ab0b..00000000 --- a/src/cli/heartbeat/types.ts +++ /dev/null @@ -1,12 +0,0 @@ -import type { HeartbeatTaskStore } from '@/core/heartbeat/index.js'; - -export type HeartbeatCliOptions = { - model?: string; - maxSteps?: number; - workspaceRoot?: string; - stateDir?: string; - searchIgnoreDirs?: string[]; - systemContext?: string; -}; - -export type HeartbeatCliStore = HeartbeatTaskStore; diff --git a/src/cli/heartbeat/worker.ts b/src/cli/heartbeat/worker.ts deleted file mode 100644 index 7404ff51..00000000 --- a/src/cli/heartbeat/worker.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { resolve } from 'node:path'; -import { - FileHeartbeatTaskService, - HeartbeatSchedulerService, - type HeartbeatTask, -} from '@/core/heartbeat/index.js'; -import type { ParsedHeartbeatArgs } from './args.js'; -import { booleanFlag, parsePositiveInt, stringFlag } from './args.js'; -import { formatDurationMs, parseDurationMs } from './duration.js'; -import { printAgentLoopEvent, printSchedulerEvent } from './output.js'; -import type { HeartbeatCliOptions, HeartbeatCliStore } from './types.js'; -import { RuntimeWorkspaceService } from '@/core/runtime/workspaces/index.js'; - -const DEFAULT_HEARTBEAT_TASK_ID = 'default'; -const DEFAULT_HEARTBEAT_TASK = [ - 'Run a periodic autonomous heartbeat for this workspace.', - 'Read HEARTBEAT.md if it exists, inspect recent project state when useful, continue safe low-risk maintenance work, and escalate when human input is needed.', -].join(' '); - -export async function runHeartbeatWorkerCli( - parsed: ParsedHeartbeatArgs, - _store: HeartbeatCliStore, - options: HeartbeatCliOptions, -) { - const workspaceRoot = options.workspaceRoot ?? process.cwd(); - const stateRoot = resolve(workspaceRoot, options.stateDir ?? '.heddle'); - const store = new FileHeartbeatTaskService({ stateRoot }); - const runtime = { - workspaceRoot, - stateDir: stateRoot, - model: stringFlag(parsed.flags, 'model') ?? options.model, - maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? options.maxSteps, - searchIgnoreDirs: options.searchIgnoreDirs, - systemContext: options.systemContext, - onAgentEvent: printAgentLoopEvent, - }; - - if (booleanFlag(parsed.flags, 'once')) { - const result = await HeartbeatSchedulerService.runDueTasks({ - store, - runtime, - onEvent: printSchedulerEvent, - }); - process.stdout.write(`checked=${result.checked} ran=${result.ran} failed=${result.failed}\n`); - return; - } - - const controller = new AbortController(); - process.on('SIGINT', () => controller.abort()); - process.on('SIGTERM', () => controller.abort()); - await HeartbeatSchedulerService.runLoop({ - store, - runtime, - pollIntervalMs: parseDurationMs(stringFlag(parsed.flags, 'poll') ?? '60s'), - signal: controller.signal, - onEvent: printSchedulerEvent, - }); -} - -export async function startHeartbeatCli( - parsed: ParsedHeartbeatArgs, - store: HeartbeatCliStore, - options: HeartbeatCliOptions, -) { - const id = stringFlag(parsed.flags, 'id') ?? parsed.subcommand ?? DEFAULT_HEARTBEAT_TASK_ID; - const intervalMs = parseDurationMs(stringFlag(parsed.flags, 'every') ?? stringFlag(parsed.flags, 'interval') ?? '30m'); - const pollIntervalMs = parseDurationMs(stringFlag(parsed.flags, 'poll') ?? '60s'); - const existing = (await store.listTasks()).find((task) => task.id === id); - const now = new Date(); - const taskText = stringFlag(parsed.flags, 'task') ?? stringFlag(parsed.flags, 'goal') ?? existing?.task ?? DEFAULT_HEARTBEAT_TASK; - const workspaceRoot = options.workspaceRoot ?? process.cwd(); - const workspace = RuntimeWorkspaceService.resolveContext({ - workspaceRoot, - stateRoot: resolve(workspaceRoot, options.stateDir ?? '.heddle'), - }).activeWorkspace; - const task: HeartbeatTask = { - ...existing, - id, - workspaceId: existing?.workspaceId ?? workspace.id, - name: stringFlag(parsed.flags, 'name') ?? existing?.name, - task: taskText.trim(), - enabled: true, - schedule: { - intervalMs, - nextRunAt: booleanFlag(parsed.flags, 'defer') ? new Date(now.getTime() + intervalMs).toISOString() : new Date(now.getTime() - 1_000).toISOString(), - }, - runtime: { - ...existing?.runtime, - model: stringFlag(parsed.flags, 'model') ?? existing?.runtime?.model ?? options.model, - maxSteps: parsePositiveInt(stringFlag(parsed.flags, 'max-steps')) ?? existing?.runtime?.maxSteps ?? options.maxSteps, - workspaceRoot, - stateDir: options.stateDir, - searchIgnoreDirs: options.searchIgnoreDirs, - systemContext: options.systemContext, - }, - state: { - ...existing?.state, - updatedAt: now.toISOString(), - }, - }; - - await store.saveTask(task); - process.stdout.write(`Started heartbeat task ${task.id} (${formatDurationMs(intervalMs)} interval, poll ${formatDurationMs(pollIntervalMs)}). Press Ctrl+C to stop.\n`); - - await runHeartbeatWorkerCli({ - command: 'run', - subcommand: undefined, - rest: [], - flags: { - ...parsed.flags, - poll: formatDurationMs(pollIntervalMs), - }, - }, store, options); -} diff --git a/src/cli/main.ts b/src/cli/main.ts index aa3d29a3..0b1ec443 100644 --- a/src/cli/main.ts +++ b/src/cli/main.ts @@ -15,10 +15,10 @@ import { AuthCliController } from '@/cli-v2/commands/auth-command.js'; import { AskCliHost } from './ask.js'; import { startChatCli } from './chat/index.js'; import { runChatCliV2Command } from '@/cli-v2/commands/chat-v2-command.js'; +import { runHeartbeatCli } from '@/cli-v2/commands/heartbeat-command.js'; import { runInitCliV2Command } from '@/cli-v2/commands/init-command.js'; import { runDaemonCli } from './daemon.js'; import { runEvalCli } from './eval/index.js'; -import { parseHeartbeatArgs, runHeartbeatCli } from './heartbeat.js'; import { loadProjectAgentContext, resolveAgentContextPaths } from './project-agent-context.js'; import { RuntimeHostMessages, RuntimeHostResolver, type ResolvedRuntimeHost } from '@/core/runtime/daemon/index.js'; import { FileDaemonRegistryRepository, RuntimeDaemonRegistryService } from '@/core/runtime/daemon/index.js'; @@ -259,8 +259,6 @@ async function main() { .action(async (args: string[]) => { const resolved = resolveCliOptions(program.opts()); chdir(resolved.workspaceRoot); - enforceHeartbeatOwnership(args ?? [], resolved.runtimeHost, resolved.forceOwnerConflict); - writeRuntimeHostNotice('heartbeat', resolved.runtimeHost); await runHeartbeatCli(args ?? [], resolved); }); @@ -519,25 +517,6 @@ function writeRuntimeHostNotice(command: string, runtimeHost: ResolvedRuntimeHos process.stdout.write(`${notice}\n`); } -function enforceHeartbeatOwnership(args: string[], runtimeHost: ResolvedRuntimeHost, forceOwnerConflict: boolean) { - if (forceOwnerConflict) { - return; - } - - const parsed = parseHeartbeatArgs(args); - if (parsed.command === 'help' || parsed.command === 'runs') { - return; - } - if (parsed.command === 'task' && (parsed.subcommand === 'list' || parsed.subcommand === 'show')) { - return; - } - - const message = RuntimeHostMessages.embeddedCommandConflict('heartbeat', runtimeHost); - if (message) { - throw new Error(message); - } -} - function readCliVersion(): string { for (const candidatePath of resolvePackageJsonCandidates()) { if (!existsSync(candidatePath)) { diff --git a/src/core/heartbeat/README.md b/src/core/heartbeat/README.md index b9d7b11a..ca894e27 100644 --- a/src/core/heartbeat/README.md +++ b/src/core/heartbeat/README.md @@ -46,6 +46,14 @@ operator-facing heartbeat views. - Keep scheduler/task persistence concerns here, not in runtime. - Heartbeat may depend on runtime's public `AgentLoopRuntimeService.run` and checkpoint types. Runtime should not import heartbeat. +- Interface adapters should use `FileHeartbeatTaskService` methods or the + control-plane heartbeat API as the public task/run contract. Terminal command + code should not construct `HeartbeatTask` objects, write heartbeat JSON, or run + its own scheduler loop. +- `heddle heartbeat run` and `heddle heartbeat start` are server-backed command + paths. The control-plane server owns recurring scheduler lifetime; CLI + commands may request due-task execution or keep an embedded server alive, but + should not own recurring heartbeat execution policy. - When this domain is refactored further, follow the `src/core/chat/engine` pattern: class-backed owning services/repositories, local `types.ts` contracts, schema-owned persistence validation, and no compatibility wrappers. diff --git a/src/server/controllers/trpc/control-plane/heartbeat.ts b/src/server/controllers/trpc/control-plane/heartbeat.ts index e2f33a17..a1a7d6a9 100644 --- a/src/server/controllers/trpc/control-plane/heartbeat.ts +++ b/src/server/controllers/trpc/control-plane/heartbeat.ts @@ -1,5 +1,6 @@ import { FileHeartbeatTaskService, + HeartbeatSchedulerService, HeartbeatTaskRunnerService, type HeartbeatSchedulerEvent, type HeartbeatTaskRunner, @@ -48,6 +49,8 @@ type RunHeartbeatTaskNowArgs = { onEvent?: (event: HeartbeatSchedulerEvent) => void; }; +type RunDueHeartbeatTasksArgs = Omit; + export class ControlPlaneHeartbeatController { static async listTasks(stateRoot: string) { return await new FileHeartbeatTaskService({ stateRoot }).listTaskViews(); @@ -143,4 +146,25 @@ export class ControlPlaneHeartbeatController { run: run ?? null, }; } + + static async runDueTasks( + stateRoot: string, + args: RunDueHeartbeatTasksArgs, + ) { + return await HeartbeatSchedulerService.runDueTasks({ + store: new FileHeartbeatTaskService({ stateRoot }), + runtime: { + apiKey: args.apiKey, + apiKeyProvider: args.apiKey ? 'explicit' : undefined, + model: args.model, + maxSteps: args.maxSteps, + workspaceRoot: args.workspaceRoot, + stateDir: args.stateDir ?? stateRoot, + searchIgnoreDirs: args.searchIgnoreDirs, + systemContext: args.systemContext, + preferApiKey: args.preferApiKey, + }, + onEvent: args.onEvent, + }); + } } diff --git a/src/server/routes/trpc/control-plane.ts b/src/server/routes/trpc/control-plane.ts index 27b01316..bb9cc8f2 100644 --- a/src/server/routes/trpc/control-plane.ts +++ b/src/server/routes/trpc/control-plane.ts @@ -25,6 +25,7 @@ import { createSessionInputSchema, fileSearchInputSchema, heartbeatRunInputSchema, + heartbeatRunDueTasksInputSchema, heartbeatRunsInputSchema, heartbeatTaskCreateInputSchema, heartbeatTaskDetailInputSchema, @@ -459,6 +460,24 @@ export const controlPlaneRouter = router({ run: null, }; }), + heartbeatRunDueTasks: controlPlaneWorkspaceProcedure.input(heartbeatRunDueTasksInputSchema).mutation(async ({ ctx, input }) => { + const { logger, workspace } = ctx.requestWorkspace; + try { + return await ControlPlaneHeartbeatController.runDueTasks(workspace.stateRoot, { + ...(input ?? {}), + workspaceRoot: workspace.workspaceRoot, + stateDir: workspace.stateRoot, + preferApiKey: input?.preferApiKey ?? ctx.preferApiKey, + onEvent: (event) => controlPlaneHeartbeatEventsController.publish({ + workspaceId: workspace.id, + event, + }), + }); + } catch (error) { + logger.error({ error }, 'Failed to run due heartbeat tasks from control plane'); + throw error; + } + }), workspaceFileSearch: controlPlaneWorkspaceProcedure.input(fileSearchInputSchema).query(async ({ ctx, input }) => { const { workspace } = ctx.requestWorkspace; return { diff --git a/src/server/routes/trpc/schema.ts b/src/server/routes/trpc/schema.ts index d8441ca2..0781f074 100644 --- a/src/server/routes/trpc/schema.ts +++ b/src/server/routes/trpc/schema.ts @@ -194,6 +194,16 @@ export const heartbeatTaskRunNowInputSchema = z.object({ systemContext: z.string().min(1).optional(), }); +export const heartbeatRunDueTasksInputSchema = z.object({ + workspaceId: z.string().min(1).optional(), + model: z.string().min(1).optional(), + maxSteps: z.number().int().min(1).max(500).optional(), + apiKey: z.string().min(1).optional(), + preferApiKey: z.boolean().optional(), + searchIgnoreDirs: z.array(z.string().min(1)).optional(), + systemContext: z.string().min(1).optional(), +}).optional(); + export const heartbeatRunInputSchema = z.object({ workspaceId: z.string().min(1).optional(), taskId: z.string().min(1), From b7e4f224ccd7ebe0991daff87efca0906218f059 Mon Sep 17 00:00:00 2001 From: Jay/Fienna Liang Date: Wed, 3 Jun 2026 22:58:24 +0800 Subject: [PATCH 2/2] Fix heartbeat command scheduler semantics --- .../server/server-lifecycle.test.ts | 36 +++++- src/__tests__/unit/tui/heartbeat-cli.test.ts | 105 +++++++++++++++++- .../commands/control-plane-command-runtime.ts | 8 +- src/cli-v2/commands/heartbeat-command.ts | 42 +++++-- src/cli-v2/commands/heartbeat/args.ts | 62 ++++++----- src/cli-v2/commands/heartbeat/output.ts | 22 ---- src/cli-v2/commands/heartbeat/worker.ts | 4 +- src/server/index.ts | 1 + src/server/lifecycle.ts | 13 ++- src/server/types.ts | 6 + 10 files changed, 232 insertions(+), 67 deletions(-) delete mode 100644 src/cli-v2/commands/heartbeat/output.ts diff --git a/src/__tests__/integration/server/server-lifecycle.test.ts b/src/__tests__/integration/server/server-lifecycle.test.ts index d7d374e8..bcd32dc4 100644 --- a/src/__tests__/integration/server/server-lifecycle.test.ts +++ b/src/__tests__/integration/server/server-lifecycle.test.ts @@ -1,8 +1,9 @@ import { mkdtempSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; import { FileDaemonRegistryRepository, RuntimeDaemonRegistryService } from '@/core/runtime/daemon/index.js'; +import { HeddleHeartbeatSchedulerHost } from '@/server/heartbeat-scheduler-host.js'; import { createServerLogger, startHeddleControlPlaneServer } from '@/server/index.js'; describe('control-plane server lifecycle', () => { @@ -60,6 +61,39 @@ describe('control-plane server lifecycle', () => { } }); + it('can start an embedded server without starting the heartbeat scheduler host', async () => { + const paths = createTestPaths('heddle-server-lifecycle-no-scheduler-'); + const startScheduler = vi.spyOn(HeddleHeartbeatSchedulerHost.prototype, 'start'); + const syncScheduler = vi.spyOn(HeddleHeartbeatSchedulerHost.prototype, 'sync'); + const stopScheduler = vi.spyOn(HeddleHeartbeatSchedulerHost.prototype, 'stop'); + const server = await startHeddleControlPlaneServer({ + mode: 'embedded-chat', + serverId: 'embedded-no-scheduler', + host: '127.0.0.1', + port: 0, + workspaceRoot: paths.workspaceRoot, + stateRoot: paths.stateRoot, + daemonRegistryPath: paths.registryPath, + heartbeatScheduler: { + enabled: false, + }, + serveAssets: false, + logger: createTestLogger(paths.stateRoot), + }); + + try { + expect(startScheduler).not.toHaveBeenCalled(); + expect(syncScheduler).not.toHaveBeenCalled(); + } finally { + await server.close(); + } + + expect(stopScheduler).not.toHaveBeenCalled(); + startScheduler.mockRestore(); + syncScheduler.mockRestore(); + stopScheduler.mockRestore(); + }); + it('does not clear a newer live server record when an older lifecycle shuts down', async () => { const paths = createTestPaths('heddle-server-lifecycle-owner-'); const server = await startHeddleControlPlaneServer({ diff --git a/src/__tests__/unit/tui/heartbeat-cli.test.ts b/src/__tests__/unit/tui/heartbeat-cli.test.ts index 1a5887c5..9e773a3f 100644 --- a/src/__tests__/unit/tui/heartbeat-cli.test.ts +++ b/src/__tests__/unit/tui/heartbeat-cli.test.ts @@ -117,13 +117,114 @@ describe('heartbeat CLI helpers', () => { registryPath: '/registry.json', }, }); + expect(resolve).toHaveBeenCalledWith(expect.objectContaining({ + heartbeatScheduler: { + enabled: false, + }, + })); + expect(query).toHaveBeenCalledWith({ workspaceId: 'workspace-1' }); + expect(runtime.close).toHaveBeenCalledTimes(1); } finally { resolve.mockRestore(); createClient.mockRestore(); stdout.mockRestore(); } + }); + + it('passes embedded scheduler config for heartbeat start', async () => { + const runtime = { + kind: 'attached' as const, + trpcUrl: 'http://127.0.0.1:8765/trpc', + endpoint: { + host: '127.0.0.1', + port: 8765, + }, + serverId: 'server-1', + close: vi.fn(async () => undefined), + }; + const resolve = vi.spyOn(ControlPlaneCommandRuntimeService, 'resolve').mockResolvedValue(runtime); + const createClient = vi.spyOn(ClientSharedProxyApiService, 'createClient').mockReturnValue({ + controlPlane: { + heartbeatTasks: { + query: vi.fn(async () => ({ tasks: [] })), + }, + heartbeatTaskCreate: { + mutate: vi.fn(async () => ({ task: { taskId: 'repo-gardener', state: { status: 'idle' } } })), + }, + }, + } as never); + const stdout = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); - expect(query).toHaveBeenCalledWith({ workspaceId: 'workspace-1' }); - expect(runtime.close).toHaveBeenCalledTimes(1); + try { + await runHeartbeatCli(['start', '--id', 'repo-gardener', '--task', 'Maintain the repo', '--poll', '5s'], { + workspaceRoot: '/repo', + activeWorkspaceId: 'workspace-1', + stateDir: '.heddle', + preferApiKey: true, + runtimeHost: { + kind: 'none', + registryPath: '/registry.json', + }, + }).catch(() => undefined); + expect(resolve).toHaveBeenCalledWith(expect.objectContaining({ + heartbeatScheduler: { + enabled: true, + pollIntervalMs: 5_000, + }, + })); + } finally { + resolve.mockRestore(); + createClient.mockRestore(); + stdout.mockRestore(); + } + }); + + it('rejects --poll when heartbeat start attaches to a live server', async () => { + const runtime = { + kind: 'attached' as const, + trpcUrl: 'http://127.0.0.1:8765/trpc', + endpoint: { + host: '127.0.0.1', + port: 8765, + }, + serverId: 'server-1', + close: vi.fn(async () => undefined), + }; + const resolve = vi.spyOn(ControlPlaneCommandRuntimeService, 'resolve').mockResolvedValue(runtime); + const createClient = vi.spyOn(ClientSharedProxyApiService, 'createClient'); + const stdout = vi.spyOn(process.stdout, 'write').mockImplementation(() => true); + + try { + await expect(runHeartbeatCli(['start', '--poll', '5s'], { + workspaceRoot: '/repo', + activeWorkspaceId: 'workspace-1', + stateDir: '.heddle', + preferApiKey: true, + runtimeHost: freshRuntimeHost(), + })).rejects.toThrow('--poll only applies when heartbeat start launches an embedded control-plane server.'); + expect(createClient).not.toHaveBeenCalled(); + expect(runtime.close).toHaveBeenCalledTimes(1); + } finally { + resolve.mockRestore(); + createClient.mockRestore(); + stdout.mockRestore(); + } }); }); + +function freshRuntimeHost() { + return { + kind: 'server' as const, + registryPath: '/registry.json', + serverId: 'server-1', + mode: 'daemon' as const, + endpoint: { + host: '127.0.0.1', + port: 8765, + }, + startedAt: '2026-06-02T00:00:00.000Z', + lastSeenAt: '2026-06-02T00:00:01.000Z', + stale: false, + ageMs: 100, + }; +} diff --git a/src/cli-v2/commands/control-plane-command-runtime.ts b/src/cli-v2/commands/control-plane-command-runtime.ts index f31750af..6d816776 100644 --- a/src/cli-v2/commands/control-plane-command-runtime.ts +++ b/src/cli-v2/commands/control-plane-command-runtime.ts @@ -1,6 +1,10 @@ import { resolve } from 'node:path'; import type { ResolvedRuntimeHost } from '@/core/runtime/daemon/index.js'; -import type { HeddleControlPlaneServerHandle, HeddleControlPlaneServerOptions } from '@/server/index.js'; +import type { + HeddleControlPlaneServerHandle, + HeddleControlPlaneServerOptions, + HeddleHeartbeatSchedulerSettings, +} from '@/server/index.js'; import { createServerLogger, startHeddleControlPlaneServer } from '@/server/index.js'; const DEFAULT_CONTROL_PLANE_HOST = '127.0.0.1'; @@ -12,6 +16,7 @@ export type ControlPlaneCommandRuntimeInput = { preferApiKey: boolean; runtimeHost: ResolvedRuntimeHost; forceOwnerConflict: boolean; + heartbeatScheduler?: HeddleHeartbeatSchedulerSettings; }; export type ControlPlaneCommandRuntime = { @@ -63,6 +68,7 @@ export class ControlPlaneCommandRuntimeService { workspaceRoot: input.workspaceRoot, stateRoot, preferApiKey: input.preferApiKey, + heartbeatScheduler: input.heartbeatScheduler, host: DEFAULT_CONTROL_PLANE_HOST, port: DEFAULT_CONTROL_PLANE_PORT, logger, diff --git a/src/cli-v2/commands/heartbeat-command.ts b/src/cli-v2/commands/heartbeat-command.ts index 55c85429..06b9c62d 100644 --- a/src/cli-v2/commands/heartbeat-command.ts +++ b/src/cli-v2/commands/heartbeat-command.ts @@ -1,5 +1,6 @@ -import { parseHeartbeatArgs } from './heartbeat/args.js'; -import { printHeartbeatHelp } from './heartbeat/output.js'; +import type { ParsedHeartbeatArgs } from './heartbeat/args.js'; +import { buildHeartbeatCommand, parseHeartbeatArgs, stringFlag } from './heartbeat/args.js'; +import { parseDurationMs } from './heartbeat/duration.js'; import { runHeartbeatRunsCli } from './heartbeat/run-commands.js'; import { runHeartbeatTaskCli } from './heartbeat/task-commands.js'; import { runHeartbeatWorkerCli, startHeartbeatCli } from './heartbeat/worker.js'; @@ -15,28 +16,34 @@ export async function runHeartbeatCli(args: string[], options: HeartbeatCliOptio const parsed = parseHeartbeatArgs(args); if (!parsed.command || parsed.command === 'help' || parsed.command === '--help' || parsed.command === '-h') { - printHeartbeatHelp(); + process.stdout.write(`${buildHeartbeatCommand().helpInformation()}\n`); return; } + const heartbeatScheduler = resolveHeartbeatScheduler(parsed); const runtime = await ControlPlaneCommandRuntimeService.resolve({ workspaceRoot: options.workspaceRoot ?? process.cwd(), stateDir: options.stateDir ?? '.heddle', preferApiKey: Boolean(options.preferApiKey), runtimeHost: options.runtimeHost ?? { kind: 'none', registryPath: '' }, forceOwnerConflict: Boolean(options.forceOwnerConflict), + heartbeatScheduler, }); - process.stdout.write(`${ControlPlaneCommandRuntimeService.formatNotice(runtime, 'heartbeat')}\n`); - const context = { - client: ClientSharedProxyApiService.createClient({ url: runtime.trpcUrl }), - workspaceId: options.activeWorkspaceId ?? 'default', - options, - }; - const uninstallRuntimeShutdown = runtime.kind === 'embedded' ? ControlPlaneCommandRuntimeService.installEmbeddedShutdown(runtime, 'heartbeat') : () => undefined; try { + process.stdout.write(`${ControlPlaneCommandRuntimeService.formatNotice(runtime, 'heartbeat')}\n`); + if (runtime.kind === 'attached' && heartbeatScheduler.enabled && stringFlag(parsed.flags, 'poll')) { + throw new Error('--poll only applies when heartbeat start launches an embedded control-plane server.'); + } + + const context = { + client: ClientSharedProxyApiService.createClient({ url: runtime.trpcUrl }), + workspaceId: options.activeWorkspaceId ?? 'default', + options, + }; + if (parsed.command === 'task') { await runHeartbeatTaskCli(parsed, context); return; @@ -67,3 +74,18 @@ export async function runHeartbeatCli(args: string[], options: HeartbeatCliOptio await runtime.close(); } } + +function resolveHeartbeatScheduler(parsed: ParsedHeartbeatArgs): { + enabled?: boolean; + pollIntervalMs?: number; +} { + if (parsed.command !== 'start' || parsed.flags.once) { + return { enabled: false }; + } + + const poll = stringFlag(parsed.flags, 'poll'); + return { + enabled: true, + pollIntervalMs: poll ? parseDurationMs(poll) : undefined, + }; +} diff --git a/src/cli-v2/commands/heartbeat/args.ts b/src/cli-v2/commands/heartbeat/args.ts index dc4c3605..93e0d4e7 100644 --- a/src/cli-v2/commands/heartbeat/args.ts +++ b/src/cli-v2/commands/heartbeat/args.ts @@ -7,79 +7,91 @@ export type ParsedHeartbeatArgs = { flags: Record; }; -export function parseHeartbeatArgs(args: string[]): ParsedHeartbeatArgs { - if (args[0] === 'help' || args[0] === '--help' || args[0] === '-h') { - return { - command: args[0], - subcommand: undefined, - rest: args.slice(1), - flags: {}, - }; - } - +export function buildHeartbeatCommand(onParsed?: (parsed: ParsedHeartbeatArgs) => void): Command { const root = new Command(); root .exitOverride() .allowUnknownOption(true) .allowExcessArguments(true) .name('heddle heartbeat') + .description('Manage and run heartbeat tasks') .addHelpText('after', ['', 'Duration examples:', ' 30s, 15m, 1h, 2d', ''].join('\n')); - let parsed: ParsedHeartbeatArgs = { - command: undefined, - subcommand: undefined, - rest: [], - flags: {}, - }; - root .command('task [subcommand] [rest...]') + .description('manage heartbeat tasks') .allowUnknownOption(true) .action((subcommand: string | undefined, rest: string[] = [], command: Command) => { - parsed = { + onParsed?.({ command: 'task', subcommand, rest, flags: collectUnknownFlags(command), - }; + }); }); root .command('run [rest...]') + .description('ask the server to run due tasks or one task now') .allowUnknownOption(true) .action((rest: string[] = [], command: Command) => { - parsed = { + onParsed?.({ command: 'run', subcommand: undefined, rest, flags: collectUnknownFlags(command), - }; + }); }); root .command('runs [subcommand] [rest...]') + .description('inspect heartbeat run records') .allowUnknownOption(true) .action((subcommand: string | undefined, rest: string[] = [], command: Command) => { - parsed = { + onParsed?.({ command: 'runs', subcommand, rest, flags: collectUnknownFlags(command), - }; + }); }); root .command('start [rest...]') + .description('create or update a task and keep the server-backed scheduler running') .allowUnknownOption(true) .action((rest: string[] = [], command: Command) => { - parsed = { + onParsed?.({ command: 'start', subcommand: undefined, rest, flags: collectUnknownFlags(command), - }; + }); }); + return root; +} + +export function parseHeartbeatArgs(args: string[]): ParsedHeartbeatArgs { + if (args[0] === 'help' || args[0] === '--help' || args[0] === '-h') { + return { + command: args[0], + subcommand: undefined, + rest: args.slice(1), + flags: {}, + }; + } + + let parsed: ParsedHeartbeatArgs = { + command: undefined, + subcommand: undefined, + rest: [], + flags: {}, + }; + const root = buildHeartbeatCommand((next) => { + parsed = next; + }); + try { root.parse(['node', 'heddle-heartbeat', ...args], { from: 'node' }); } catch { diff --git a/src/cli-v2/commands/heartbeat/output.ts b/src/cli-v2/commands/heartbeat/output.ts deleted file mode 100644 index 1b736fd8..00000000 --- a/src/cli-v2/commands/heartbeat/output.ts +++ /dev/null @@ -1,22 +0,0 @@ -export function printHeartbeatHelp() { - process.stdout.write([ - 'Usage: heddle heartbeat ', - '', - 'Manage and run heartbeat tasks', - '', - 'Commands:', - ' task add add a heartbeat task', - ' task list list heartbeat tasks', - ' task show show a heartbeat task', - ' task enable enable a heartbeat task', - ' task disable disable a heartbeat task', - ' start create/update a task and use the server-backed scheduler', - ' run ask the server to run due tasks or one task now', - ' runs list list heartbeat run records', - ' runs show show a heartbeat run record', - '', - 'Duration examples:', - ' 30s, 15m, 1h, 2d', - '', - ].join('\n')); -} diff --git a/src/cli-v2/commands/heartbeat/worker.ts b/src/cli-v2/commands/heartbeat/worker.ts index 009576ed..8d287134 100644 --- a/src/cli-v2/commands/heartbeat/worker.ts +++ b/src/cli-v2/commands/heartbeat/worker.ts @@ -45,7 +45,6 @@ export async function startHeartbeatCli( ) { const id = stringFlag(parsed.flags, 'id') ?? parsed.subcommand ?? DEFAULT_HEARTBEAT_TASK_ID; const intervalMs = parseDurationMs(stringFlag(parsed.flags, 'every') ?? stringFlag(parsed.flags, 'interval') ?? '30m'); - const pollIntervalMs = parseDurationMs(stringFlag(parsed.flags, 'poll') ?? '60s'); const existing = await findExistingTask(context, id); const taskText = stringFlag(parsed.flags, 'task') ?? stringFlag(parsed.flags, 'goal') ?? existing?.task ?? DEFAULT_HEARTBEAT_TASK; const task = @@ -89,9 +88,10 @@ export async function startHeartbeatCli( return; } + const poll = stringFlag(parsed.flags, 'poll'); process.stdout.write([ `Heartbeat scheduler is server-backed for workspace ${context.workspaceId}.`, - `task=${task.taskId} status=${task.state.status} every=${formatDurationMs(intervalMs)} poll=${formatDurationMs(pollIntervalMs)}`, + `task=${task.taskId} status=${task.state.status} every=${formatDurationMs(intervalMs)}${poll ? ` poll=${poll}` : ''}`, 'Use `heddle daemon` for a standalone long-running server, or keep this embedded command running.', ].join('\n') + '\n'); } diff --git a/src/server/index.ts b/src/server/index.ts index 97cd4cbb..6d419732 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -1,5 +1,6 @@ export type { HeddleControlPlaneServerHandle, + HeddleHeartbeatSchedulerSettings, HeddleControlPlaneServerOptions, HeddleServerOptions, } from './types.js'; diff --git a/src/server/lifecycle.ts b/src/server/lifecycle.ts index e7c03edb..071710ae 100644 --- a/src/server/lifecycle.ts +++ b/src/server/lifecycle.ts @@ -38,7 +38,9 @@ export async function startHeddleControlPlaneServer( const registryPath = options.daemonRegistryPath ?? FileDaemonRegistryRepository.resolvePath(); const serverId = options.serverId ?? `${options.mode}-${process.pid}-${Date.now()}`; const startedAt = dayjs().toISOString(); - const heartbeatSchedulerHost = createHeartbeatSchedulerHost(options); + const heartbeatSchedulerSettings = options.heartbeatScheduler ?? {}; + const heartbeatSchedulerEnabled = heartbeatSchedulerSettings.enabled !== false; + const heartbeatSchedulerHost = heartbeatSchedulerEnabled ? createHeartbeatSchedulerHost(options) : null; const endpoint = { host: options.host, @@ -92,7 +94,7 @@ export async function startHeddleControlPlaneServer( if (lifecycleTimers.heartbeat) { clearInterval(lifecycleTimers.heartbeat); } - heartbeatSchedulerHost.stop(); + heartbeatSchedulerHost?.stop(); RuntimeDaemonRegistryService.clearLiveServer({ registryPath, serverId, @@ -108,7 +110,7 @@ export async function startHeddleControlPlaneServer( try { registerServer(startedAt); - heartbeatSchedulerHost.start(); + heartbeatSchedulerHost?.start(); } catch (error) { await closeServer(server).catch((closeError) => { logger.error({ error: closeError, serverId }, 'Failed to close Heddle server after startup error'); @@ -119,7 +121,7 @@ export async function startHeddleControlPlaneServer( lifecycleTimers.heartbeat = setInterval(() => { try { registerServer(); - heartbeatSchedulerHost.sync(); + heartbeatSchedulerHost?.sync(); } catch (error) { logger.warn({ error }, 'Failed to refresh Heddle server registry heartbeat'); } @@ -144,6 +146,8 @@ export async function startHeddleControlPlaneServer( registryPath, serverId, mode: options.mode, + heartbeatSchedulerEnabled, + heartbeatSchedulerPollIntervalMs: heartbeatSchedulerSettings.pollIntervalMs, }, 'Heddle server started'); return { @@ -165,6 +169,7 @@ function createHeartbeatSchedulerHost(options: HeddleControlPlaneServerOptions): workspaceRoot: options.workspaceRoot, stateRoot: options.stateRoot, preferApiKey: options.preferApiKey, + pollIntervalMs: options.heartbeatScheduler?.pollIntervalMs, onEvent: (workspace, event) => { logHeartbeatSchedulerEvent(getWorkspaceOperationLogger(workspace.stateRoot), workspace, event); controlPlaneHeartbeatEventsController.publish({ diff --git a/src/server/types.ts b/src/server/types.ts index 5e0d094c..bd163453 100644 --- a/src/server/types.ts +++ b/src/server/types.ts @@ -18,6 +18,12 @@ export type HeddleControlPlaneServerOptions = Omit