diff --git a/.changeset/daemon-poll-staleness.md b/.changeset/daemon-poll-staleness.md new file mode 100644 index 00000000..81610f36 --- /dev/null +++ b/.changeset/daemon-poll-staleness.md @@ -0,0 +1,10 @@ +--- +"@stoneforge/smithy": minor +"@stoneforge/smithy-web": patch +--- + +feat(smithy): detect and surface dispatch daemon poll-loop staleness + +A wedged dispatch daemon — process alive, HTTP responsive, but `runPollCycle` hung — would quietly stop dispatching, scheduling, and recovering with no signal to the operator. The daemon now tracks `lastPollStartedAt` and `lastPollCompletedAt`, and exposes a `pollStale` flag in `getDispatchHealth()` (true when either a cycle is in flight past the threshold, or the last completion is older than the threshold). Default threshold is `max(60_000, 10 × pollIntervalMs)`, configurable via the new `pollStaleThresholdMs` config field. + +The `DispatchHealthBanner` in smithy-web renders a distinct red wedge-daemon banner when `pollStale` is true (vs. the existing amber stuck-queue banner), advising the operator to restart `sf serve smithy`. When both conditions are true, the wedge banner takes priority since a stuck queue is unresolvable while the daemon is dead. diff --git a/.changeset/dispatch-stuck-warning.md b/.changeset/dispatch-stuck-warning.md new file mode 100644 index 00000000..8086b81d --- /dev/null +++ b/.changeset/dispatch-stuck-warning.md @@ -0,0 +1,12 @@ +--- +"@stoneforge/smithy": minor +--- + +Surface a dispatch-stuck warning when ready unassigned tasks have no available workers to take them. + +- New `getDispatchHealth()` method on `DispatchDaemon` returning `{ readyUnassignedTasks, availableWorkers, stuck, hasStuckQueue, computedAt }`. A worker is "available" when it is registered, not disabled, and not terminated. At-capacity workers do not count as stuck (the queue is busy, not stuck). +- Per-tick CLI warn (rate-limited to once per 20 ticks, configurable via `DispatchDaemonConfig.stuckWarnTickInterval`): `[dispatch] N task(s) ready, no available workers...`. Re-warns immediately when the queue clears and re-stuckens. +- `GET /api/daemon/status` includes a `health` field with the snapshot. +- New smithy-web `DispatchHealthBanner` shown on the agents and workspaces pages when the queue is stuck. Dismissible per page-load. + +Closes #59. The pool-routing observation in #59 is filed separately. diff --git a/apps/smithy-web/src/api/hooks/useDaemon.ts b/apps/smithy-web/src/api/hooks/useDaemon.ts index 1ea77c73..80c68254 100644 --- a/apps/smithy-web/src/api/hooks/useDaemon.ts +++ b/apps/smithy-web/src/api/hooks/useDaemon.ts @@ -5,6 +5,7 @@ */ import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query'; +import type { DispatchHealth } from '../types'; // ============================================================================ // Types @@ -28,6 +29,7 @@ export interface DaemonStatusResponse { limits: Array<{ executable: string; resetsAt: string }>; soonestReset?: string; }; + health?: DispatchHealth; } export interface DaemonStartResponse { @@ -85,7 +87,7 @@ export function useDaemonStatus() { return useQuery({ queryKey: ['daemon-status'], queryFn: () => fetchApi('/daemon/status'), - refetchInterval: 10000, // Poll every 10 seconds + refetchInterval: 5000, // Poll every 5 seconds }); } diff --git a/apps/smithy-web/src/api/types.ts b/apps/smithy-web/src/api/types.ts index 2124244a..2d052dd5 100644 --- a/apps/smithy-web/src/api/types.ts +++ b/apps/smithy-web/src/api/types.ts @@ -622,6 +622,24 @@ export interface ApprovalRequestResponse { request: ApprovalRequest; } +// ============================================================================ +// Daemon / Dispatch Types +// ============================================================================ + +export interface DispatchHealth { + readyUnassignedTasks: number; + availableWorkers: number; + stuck: boolean; + hasStuckQueue: boolean; + computedAt: string; + /** ISO timestamp when the most recent poll cycle started. Undefined if no cycle has ever started. */ + lastPollStartedAt?: string; + /** ISO timestamp when the most recent poll cycle completed. Undefined if no cycle has completed. */ + lastPollCompletedAt?: string; + /** True when the daemon is responsive but its poll loop is wedged or has not advanced past the staleness threshold. */ + pollStale: boolean; +} + // ============================================================================ // Provider Metrics Types // ============================================================================ diff --git a/apps/smithy-web/src/components/dispatch/DispatchHealthBanner.tsx b/apps/smithy-web/src/components/dispatch/DispatchHealthBanner.tsx new file mode 100644 index 00000000..37aa9cb4 --- /dev/null +++ b/apps/smithy-web/src/components/dispatch/DispatchHealthBanner.tsx @@ -0,0 +1,76 @@ +import { useState } from 'react'; +import { AlertTriangle, X } from 'lucide-react'; +import { useDaemonStatus } from '../../api/hooks/useDaemon'; + +interface DispatchHealthBannerProps { + /** Optional layout classes applied to the outer banner element. Lets each mount site control its own page-specific padding without leaving an empty wrapper when the banner self-hides. */ + className?: string; +} + +export function DispatchHealthBanner({ className }: DispatchHealthBannerProps = {}) { + const { data } = useDaemonStatus(); + const [dismissed, setDismissed] = useState(false); + + if (dismissed) return null; + + const health = data?.health; + // pollStale is the more critical signal: the daemon's poll loop is wedged + // and dispatch/scheduling/recovery have all stopped. Render this banner + // first when both conditions hold; users can't act on a stuck queue if + // the daemon itself is broken. + const pollStale = health?.pollStale === true; + const hasStuckQueue = health?.hasStuckQueue === true; + + if (!pollStale && !hasStuckQueue) return null; + + // Red for poll-stale (daemon wedged); amber for stuck-queue (waiting on + // operator action). Different urgency, different colors. + const baseClasses = pollStale + ? 'mb-4 flex items-start gap-3 px-4 py-3 rounded-md bg-red-50 dark:bg-red-950/30 border border-red-300 dark:border-red-700 text-red-900 dark:text-red-100' + : 'mb-4 flex items-start gap-3 px-4 py-3 rounded-md bg-amber-50 dark:bg-amber-950/30 border border-amber-300 dark:border-amber-700 text-amber-900 dark:text-amber-100'; + + const hoverClasses = pollStale + ? 'p-1 rounded hover:bg-red-100 dark:hover:bg-red-900/50 transition-colors' + : 'p-1 rounded hover:bg-amber-100 dark:hover:bg-amber-900/50 transition-colors'; + + const headline = pollStale ? 'Dispatch daemon is wedged.' : 'Dispatch is stuck.'; + + // Compute approximate stuck duration for the message body. Falls back to + // "for a while" when timestamps are missing or the wedge is under a minute. + // Math.floor matches the "for over N minutes" wording: a 90-second wedge + // reads "for over 1 minute", not "for over 2 minutes". + const lastCompletedAt = health?.lastPollCompletedAt; + let stuckForCopy = 'for a while'; + if (lastCompletedAt) { + const ageMin = Math.floor((Date.now() - new Date(lastCompletedAt).getTime()) / 60000); + if (ageMin >= 1) stuckForCopy = `for over ${ageMin} minute${ageMin === 1 ? '' : 's'}`; + } + + const body = pollStale + ? `Poll loop has not completed a cycle ${stuckForCopy}. The HTTP server is responsive but dispatch, scheduling, and recovery have stopped. Restart with \`sf serve smithy\` to recover.` + : `${health?.readyUnassignedTasks ?? 0} task(s) ready, no available workers. Register or enable a worker to start dispatching.`; + + const testId = pollStale ? 'dispatch-health-banner-poll-stale' : 'dispatch-health-banner'; + + return ( +
+ +
+
{headline}
+
{body}
+
+ +
+ ); +} diff --git a/apps/smithy-web/src/routes/agents/index.tsx b/apps/smithy-web/src/routes/agents/index.tsx index 32ece09a..2971ef01 100644 --- a/apps/smithy-web/src/routes/agents/index.tsx +++ b/apps/smithy-web/src/routes/agents/index.tsx @@ -17,6 +17,7 @@ import { AgentCard, CreateAgentDialog, DeleteAgentDialog, RenameAgentDialog, Sta import { PoolCard, CreatePoolDialog, EditPoolDialog } from '../../components/pool'; import { AgentWorkspaceGraph } from '../../components/agent-graph'; import type { Agent, SessionStatus, AgentRole, StewardFocus } from '../../api/types'; +import { DispatchHealthBanner } from '../../components/dispatch/DispatchHealthBanner'; type TabValue = 'agents' | 'stewards' | 'pools' | 'graph'; @@ -429,6 +430,9 @@ export function AgentsPage() { + {/* Dispatch health banner */} + + {/* Content */} {currentTab === 'graph' ? ( // Graph tab handles its own loading/error states diff --git a/apps/smithy-web/src/routes/workspaces/index.tsx b/apps/smithy-web/src/routes/workspaces/index.tsx index 4ad589d1..892e7e58 100644 --- a/apps/smithy-web/src/routes/workspaces/index.tsx +++ b/apps/smithy-web/src/routes/workspaces/index.tsx @@ -28,6 +28,7 @@ import { type LayoutPreset, } from '../../components/workspace'; import { useAgent, useResumeAgentSession } from '../../api/hooks/useAgents'; +import { DispatchHealthBanner } from '../../components/dispatch/DispatchHealthBanner'; /** Layout preset configuration */ const layoutPresets: { id: LayoutPreset; icon: typeof Square; label: string }[] = [ @@ -331,6 +332,9 @@ export function WorkspacesPage() { + {/* Dispatch health banner */} + + {/* Main content area */}
{hasPanes ? ( diff --git a/packages/smithy/src/server/routes/daemon.test.ts b/packages/smithy/src/server/routes/daemon.test.ts new file mode 100644 index 00000000..8b40fe64 --- /dev/null +++ b/packages/smithy/src/server/routes/daemon.test.ts @@ -0,0 +1,78 @@ +/** + * Daemon Routes Tests — GET /api/daemon/status health field + * + * Tests that the status endpoint includes dispatch health when available, + * omits it when the daemon is not configured, and degrades gracefully when + * getDispatchHealth() throws. + */ + +import { describe, it, expect, vi } from 'vitest'; +import type { Services } from '../services.js'; +import { createDaemonRoutes } from './daemon.js'; + +// ============================================================================ +// Test Fixtures +// ============================================================================ + +function createMockServices(opts: { withDaemon?: boolean; healthThrows?: boolean } = {}) { + const dispatchDaemon = + opts.withDaemon === false + ? undefined + : { + isRunning: vi.fn().mockReturnValue(true), + getConfig: vi.fn().mockReturnValue({ pollIntervalMs: 500 }), + getRateLimitStatus: vi.fn().mockReturnValue({ active: false }), + getDispatchHealth: opts.healthThrows + ? vi.fn().mockRejectedValue(new Error('db unreachable')) + : vi.fn().mockResolvedValue({ + readyUnassignedTasks: 3, + availableWorkers: 0, + stuck: true, + hasStuckQueue: true, + computedAt: '2026-05-03T00:00:00.000Z', + }), + }; + + const services = { dispatchDaemon } as unknown as Services; + return { services, dispatchDaemon }; +} + +// ============================================================================ +// Tests +// ============================================================================ + +describe('GET /api/daemon/status — health', () => { + it('includes the health snapshot when the daemon is available', async () => { + const { services } = createMockServices(); + const app = createDaemonRoutes(services); + const res = await app.request('/api/daemon/status'); + const body = await res.json(); + + expect(res.status).toBe(200); + expect(body.health.hasStuckQueue).toBe(true); + expect(body.health.readyUnassignedTasks).toBe(3); + expect(body.health.availableWorkers).toBe(0); + }); + + it('omits the health field when the daemon is unavailable', async () => { + const { services } = createMockServices({ withDaemon: false }); + const app = createDaemonRoutes(services); + const res = await app.request('/api/daemon/status'); + const body = await res.json(); + + expect(res.status).toBe(200); + expect(body.available).toBe(false); + expect(body.health).toBeUndefined(); + }); + + it('still returns 200 with isRunning when getDispatchHealth throws', async () => { + const { services } = createMockServices({ healthThrows: true }); + const app = createDaemonRoutes(services); + const res = await app.request('/api/daemon/status'); + const body = await res.json(); + + expect(res.status).toBe(200); + expect(body.isRunning).toBe(true); + expect(body.health).toBeUndefined(); + }); +}); diff --git a/packages/smithy/src/server/routes/daemon.ts b/packages/smithy/src/server/routes/daemon.ts index 11c18ca9..42c093e7 100644 --- a/packages/smithy/src/server/routes/daemon.ts +++ b/packages/smithy/src/server/routes/daemon.ts @@ -30,7 +30,7 @@ export function createDaemonRoutes(services: Services) { const app = new Hono(); // GET /api/daemon/status - app.get('/api/daemon/status', (c) => { + app.get('/api/daemon/status', async (c) => { if (!dispatchDaemon) { return c.json({ isRunning: false, @@ -42,6 +42,14 @@ export function createDaemonRoutes(services: Services) { const config = dispatchDaemon.getConfig(); const rateLimitStatus = dispatchDaemon.getRateLimitStatus(); + + let health: import('../../services/dispatch-daemon.js').DispatchHealth | undefined; + try { + health = await dispatchDaemon.getDispatchHealth(); + } catch (err) { + logger.warn('Failed to compute dispatch health for /api/daemon/status:', err); + } + return c.json({ isRunning: dispatchDaemon.isRunning(), available: true, @@ -55,6 +63,7 @@ export function createDaemonRoutes(services: Services) { directorInboxForwardingEnabled: config.directorInboxForwardingEnabled, }, rateLimit: rateLimitStatus, + health, }); }); diff --git a/packages/smithy/src/services/dispatch-daemon.bun.test.ts b/packages/smithy/src/services/dispatch-daemon.bun.test.ts index d278cf55..c23ff578 100644 --- a/packages/smithy/src/services/dispatch-daemon.bun.test.ts +++ b/packages/smithy/src/services/dispatch-daemon.bun.test.ts @@ -6436,3 +6436,329 @@ describe('assignTaskToWorker - per-director targetBranch propagation', () => { }); }); }); + +// ============================================================================ +// getDispatchHealth harness + tests +// ============================================================================ + +interface DaemonHarness { + api: QuarryAPI; + registry: AgentRegistry; + daemon: DispatchDaemon; + systemEntity: EntityId; + dispose(): Promise; +} + +async function setupDaemonHarness(opts?: { configOverrides?: Partial }): Promise { + const dbPath = `/tmp/dispatch-health-test-${Date.now()}-${Math.random().toString(36).slice(2)}.db`; + const storage = createStorage({ path: dbPath, create: true }); + initializeSchema(storage); + + const api = createQuarryAPI(storage); + const inboxService = createInboxService(storage); + const registry = createAgentRegistry(api); + const taskAssignment = createTaskAssignmentService(api); + const dispatchService = createDispatchService(api, taskAssignment, registry); + const sessionManager = createMockSessionManager(); + const worktreeManager = createMockWorktreeManager(); + const stewardScheduler = createMockStewardScheduler(); + + const { createEntity, EntityTypeValue } = await import('@stoneforge/core'); + const systemRaw = await createEntity({ + name: 'health-test-system', + entityType: EntityTypeValue.SYSTEM, + createdBy: 'system:test' as EntityId, + }); + const saved = await api.create(systemRaw as unknown as Record & { createdBy: EntityId }); + const systemEntity = saved.id as unknown as EntityId; + + const config: DispatchDaemonConfig = { + ensureTargetBranchExists: mockEnsureTargetBranchExists, + pollIntervalMs: 100, + workerAvailabilityPollEnabled: false, + inboxPollEnabled: false, + stewardTriggerPollEnabled: false, + workflowTaskPollEnabled: false, + ...opts?.configOverrides, + }; + + const daemon = createDispatchDaemon( + api, + registry, + sessionManager, + dispatchService, + worktreeManager, + taskAssignment, + stewardScheduler, + inboxService, + config + ); + + return { + api, + registry, + daemon, + systemEntity, + async dispose() { + await daemon.stop(); + if (fs.existsSync(dbPath)) { + fs.unlinkSync(dbPath); + } + }, + }; +} + +describe('getDispatchHealth', () => { + test('reports stuck=false when there are no ready tasks', async () => { + const harness = await setupDaemonHarness(); + try { + const health = await harness.daemon.getDispatchHealth(); + expect(health.readyUnassignedTasks).toBe(0); + expect(health.availableWorkers).toBe(0); + expect(health.stuck).toBe(false); + expect(typeof health.computedAt).toBe('string'); + } finally { + await harness.dispose(); + } + }); + + test('reports stuck=true when ready unassigned tasks exist and no workers are registered', async () => { + const harness = await setupDaemonHarness(); + try { + const task = await createTask({ + title: 'do the thing', + createdBy: harness.systemEntity, + status: TaskStatus.OPEN, + }); + await harness.api.create(task as unknown as Record & { createdBy: EntityId }); + const health = await harness.daemon.getDispatchHealth(); + expect(health.readyUnassignedTasks).toBe(1); + expect(health.availableWorkers).toBe(0); + expect(health.stuck).toBe(true); + } finally { + await harness.dispose(); + } + }); + + test('reports stuck=false when ready unassigned tasks have at least one available worker', async () => { + const harness = await setupDaemonHarness(); + try { + const task = await createTask({ + title: 'do the thing', + createdBy: harness.systemEntity, + status: TaskStatus.OPEN, + }); + await harness.api.create(task as unknown as Record & { createdBy: EntityId }); + await harness.registry.registerWorker({ + name: 'w1', + workerMode: 'ephemeral', + createdBy: harness.systemEntity, + }); + const health = await harness.daemon.getDispatchHealth(); + expect(health.readyUnassignedTasks).toBe(1); + expect(health.availableWorkers).toBe(1); + expect(health.stuck).toBe(false); + } finally { + await harness.dispose(); + } + }); + + test('treats disabled workers as unavailable for stuck-queue detection', async () => { + const harness = await setupDaemonHarness(); + try { + const task = await createTask({ + title: 'do the thing', + createdBy: harness.systemEntity, + status: TaskStatus.OPEN, + }); + await harness.api.create(task as unknown as Record & { createdBy: EntityId }); + const w = await harness.registry.registerWorker({ + name: 'w1', + workerMode: 'ephemeral', + createdBy: harness.systemEntity, + }); + await harness.registry.updateAgentMetadata(w.id, { disabled: true }); + const health = await harness.daemon.getDispatchHealth(); + expect(health.availableWorkers).toBe(0); + expect(health.stuck).toBe(true); + } finally { + await harness.dispose(); + } + }); +}); + +describe('stuck-queue transition logging', () => { + test('emits a STUCK warn on the healthy → stuck transition, then stays silent on subsequent ticks', async () => { + const warnSpy = mock((..._args: unknown[]) => {}); + const infoSpy = mock((..._args: unknown[]) => {}); + const harness = await setupDaemonHarness({ + configOverrides: { logger: { warn: warnSpy, info: infoSpy } }, + }); + try { + const task = await createTask({ + title: 'x', + createdBy: harness.systemEntity, + status: TaskStatus.OPEN, + }); + await harness.api.create(task as unknown as Record & { createdBy: EntityId }); + // No workers registered; queue is stuck. + await harness.daemon.tick(); + expect(warnSpy).toHaveBeenCalledTimes(1); + const message = warnSpy.mock.calls[0][0] as string; + expect(message).toContain('STUCK'); + expect(message).toContain('1 task(s) ready'); + expect(message).toContain('no available workers'); + + // Subsequent stuck ticks must NOT spam the log (transition-only model). + await harness.daemon.tick(); + await harness.daemon.tick(); + expect(warnSpy).toHaveBeenCalledTimes(1); + } finally { + await harness.dispose(); + } + }); + + test('emits a RESUMED info on the stuck → healthy transition', async () => { + const warnSpy = mock((..._args: unknown[]) => {}); + const infoSpy = mock((..._args: unknown[]) => {}); + const harness = await setupDaemonHarness({ + configOverrides: { logger: { warn: warnSpy, info: infoSpy } }, + }); + try { + const task = await createTask({ + title: 'x', + createdBy: harness.systemEntity, + status: TaskStatus.OPEN, + }); + await harness.api.create(task as unknown as Record & { createdBy: EntityId }); + // First tick: queue is stuck (no workers). Triggers STUCK warn. + await harness.daemon.tick(); + expect(warnSpy).toHaveBeenCalledTimes(1); + + // Register a worker; on next tick the queue is healthy. Triggers RESUMED info. + await harness.registry.registerWorker({ name: 'w1', workerMode: 'ephemeral', createdBy: harness.systemEntity, maxConcurrentTasks: 1 }); + await harness.daemon.tick(); + expect(infoSpy).toHaveBeenCalledTimes(1); + const infoMessage = infoSpy.mock.calls[0][0] as string; + expect(infoMessage).toContain('RESUMED'); + } finally { + await harness.dispose(); + } + }); +}); + +// ============================================================================ +// getDispatchHealth — pollStale detection +// ============================================================================ + +describe('getDispatchHealth - pollStale detection', () => { + test('reports lastPollCompletedAt fresh and pollStale=false right after a successful poll cycle', async () => { + const harness = await setupDaemonHarness(); + try { + await harness.daemon.tick(); + const health = await harness.daemon.getDispatchHealth(); + expect(health.lastPollCompletedAt).toBeDefined(); + expect(health.lastPollStartedAt).toBeDefined(); + expect(health.pollStale).toBe(false); + // lastPollCompletedAt should be within the last second + const ageMs = Date.now() - new Date(health.lastPollCompletedAt!).getTime(); + expect(ageMs).toBeLessThan(1000); + } finally { + await harness.dispose(); + } + }); + + test('reports pollStale=false and undefined timestamps when daemon has never polled', async () => { + const harness = await setupDaemonHarness(); + try { + // Don't call tick() — daemon is constructed but never polled. + const health = await harness.daemon.getDispatchHealth(); + expect(health.lastPollStartedAt).toBeUndefined(); + expect(health.lastPollCompletedAt).toBeUndefined(); + // pollStale must be false when there's no baseline — we don't know if + // the daemon has had a chance to poll yet, so we must not false-alarm. + expect(health.pollStale).toBe(false); + } finally { + await harness.dispose(); + } + }); + + test('reports pollStale=true when a poll cycle is wedged past the threshold', async () => { + // Simulate a wedged cycle: stub api.ready (called by both pollWorkerAvailability + // and getDispatchHealth itself) to return a never-resolving promise on the + // first invocation only. Start tick() without awaiting, sleep past the + // (very short) threshold, then probe health via a second api.ready call + // that resolves normally. + const harness = await setupDaemonHarness({ + configOverrides: { + // Override threshold to a tiny value so the test runs fast. + pollStaleThresholdMs: 50, + // Enable the worker-availability poll so api.ready actually gets called. + workerAvailabilityPollEnabled: true, + }, + }); + try { + // Wedge the first runPollCycle by making api.ready hang on the first call. + // Subsequent calls (e.g. from getDispatchHealth) must resolve quickly. + const realReady = harness.api.ready.bind(harness.api); + let callCount = 0; + const neverResolves = new Promise(() => { /* never */ }); + const stubbedApi = harness.api as { ready: typeof harness.api.ready }; + stubbedApi.ready = ((filter?: Parameters[0]) => { + callCount += 1; + if (callCount === 1) return neverResolves; + return realReady(filter); + }) as typeof harness.api.ready; + + // Start tick() but don't await — the cycle wedges inside. + const wedgedTick = harness.daemon.tick(); + void wedgedTick; // prevent unused-var lint + + // Wait past the threshold (50ms) plus a margin. lastPollStartedAt was set + // at the top of runPollCycle, lastPollCompletedAt was NOT set because + // the finally block hasn't been reached yet. + await new Promise(r => setTimeout(r, 150)); + + const health = await harness.daemon.getDispatchHealth(); + expect(health.pollStale).toBe(true); + expect(health.lastPollStartedAt).toBeDefined(); + expect(health.lastPollCompletedAt).toBeUndefined(); + } finally { + await harness.dispose(); + } + }); + + test('honors a custom pollStaleThresholdMs from config', async () => { + // Default threshold is max(60_000, 10 × pollIntervalMs). With a custom + // override, a freshly-completed poll reports pollStale=false, but if we + // artificially push lastPollCompletedAt back past the override, pollStale + // flips true. + const harness = await setupDaemonHarness({ + configOverrides: { pollStaleThresholdMs: 10 }, + }); + try { + await harness.daemon.tick(); + // Right after tick: not stale. + let health = await harness.daemon.getDispatchHealth(); + expect(health.pollStale).toBe(false); + + // Force BOTH timestamps back 100ms (well past the 10ms threshold). In a + // real wedge both move together; pushing only one would simulate + // "cycle in flight" and trigger the in-flight branch instead of the + // completed-but-stale branch we want to exercise here. + // Cast through unknown to access private fields — acceptable for tests. + const daemon = harness.daemon as unknown as { + lastPollStartedAt: string; + lastPollCompletedAt: string; + }; + const past = new Date(Date.now() - 100).toISOString(); + daemon.lastPollStartedAt = past; + daemon.lastPollCompletedAt = past; + + health = await harness.daemon.getDispatchHealth(); + expect(health.pollStale).toBe(true); + } finally { + await harness.dispose(); + } + }); +}); diff --git a/packages/smithy/src/services/dispatch-daemon.ts b/packages/smithy/src/services/dispatch-daemon.ts index 550e7989..299790fd 100644 --- a/packages/smithy/src/services/dispatch-daemon.ts +++ b/packages/smithy/src/services/dispatch-daemon.ts @@ -28,6 +28,7 @@ import type { Document, Plan, Workflow, + Timestamp, } from '@stoneforge/core'; import { InboxStatus, createTimestamp, TaskStatus, asEntityId, asElementId, PlanStatus, canAutoComplete, WorkflowStatus, computeWorkflowStatus, updateWorkflowStatus } from '@stoneforge/core'; import type { QuarryAPI, InboxService } from '@stoneforge/quarry'; @@ -161,6 +162,17 @@ export interface DispatchDaemonConfig { */ readonly pollIntervalMs?: number; + /** + * Threshold in milliseconds beyond which `getDispatchHealth().pollStale` + * flips true if no poll cycle has completed. Detects a daemon whose + * `runPollCycle` is wedged mid-await (HTTP responsive but loop dead). + * Default: max(60000, 10 × pollIntervalMs). + * Set generously above any healthy poll cycle duration; orphan recovery + * + inbox triage + steward triggers can plausibly take 10+ seconds on a + * loaded workspace. + */ + readonly pollStaleThresholdMs?: number; + /** * Whether worker availability polling is enabled. * Default: true @@ -294,6 +306,48 @@ export interface DispatchDaemonConfig { * Useful for testing without triggering real git remote operations. */ readonly ensureTargetBranchExists?: (projectRoot: string, branchName: string) => Promise; + + /** Optional logger override. Defaults to the module-level dispatch-daemon logger. Used for testability. */ + readonly logger?: { warn: (...args: unknown[]) => void; info?: (...args: unknown[]) => void }; +} + +/** + * Snapshot of the dispatch worker queue's stuck state. + * Computed on demand from the agent registry and ready-task list. + * A queue is "stuck" when there are unassigned ready tasks but no worker + * could take them (registered, not disabled, not terminated). At-capacity + * workers do NOT count: the queue is busy, not stuck. + */ +export interface DispatchHealth { + /** Count of ready tasks (api.ready result) that have no assignee. */ + readonly readyUnassignedTasks: number; + /** Count of workers eligible to take new work: registered, not disabled, not terminated. */ + readonly availableWorkers: number; + /** True iff readyUnassignedTasks > 0 && availableWorkers === 0. */ + readonly stuck: boolean; + /** Convenience alias for stuck, matching the field name surfaced in the HTTP and UI layers. */ + readonly hasStuckQueue: boolean; + /** Timestamp the snapshot was computed (ISO 8601). */ + readonly computedAt: string; + /** + * Timestamp when the most recent poll cycle entered its try block (set BEFORE + * any await). Undefined if no cycle has ever started. Compared with + * `lastPollCompletedAt` and wall-clock to derive {@link pollStale}. + */ + readonly lastPollStartedAt?: string; + /** + * Timestamp when the most recent poll cycle reached its `finally` block. + * Undefined if no cycle has completed yet. A wedged cycle leaves this old. + */ + readonly lastPollCompletedAt?: string; + /** + * True when the daemon's HTTP server is alive but its poll loop has not + * completed a cycle within the configured staleness threshold. Indicates + * the daemon is wedged: process up, dispatch / scheduling / recovery down. + * Always false until a baseline `lastPollCompletedAt` exists, so a + * just-started daemon does not false-alarm. + */ + readonly pollStale: boolean; } /** @@ -319,6 +373,7 @@ export interface PollResult { */ interface NormalizedConfig { pollIntervalMs: number; + pollStaleThresholdMs: number; workerAvailabilityPollEnabled: boolean; inboxPollEnabled: boolean; stewardTriggerPollEnabled: boolean; @@ -338,6 +393,7 @@ interface NormalizedConfig { directorInboxForwardingEnabled: boolean; directorInboxIdleThresholdMs: number; ensureTargetBranchExists: (projectRoot: string, branchName: string) => Promise; + logger?: { warn: (...args: unknown[]) => void; info?: (...args: unknown[]) => void }; } // ============================================================================ @@ -478,7 +534,7 @@ export interface DispatchDaemon { /** * Gets the current configuration. */ - getConfig(): Omit, 'onSessionStarted'> & { onSessionStarted?: OnSessionStartedCallback }; + getConfig(): Omit, 'onSessionStarted' | 'logger'> & { onSessionStarted?: OnSessionStartedCallback; logger?: { warn: (...args: unknown[]) => void } }; /** * Updates the configuration. @@ -486,6 +542,22 @@ export interface DispatchDaemon { */ updateConfig(config: Partial): void; + // ---------------------------------------- + // Health + // ---------------------------------------- + + /** + * Returns a snapshot of dispatch worker-queue health. + * Inexpensive (one ready-task query, one agent list query); safe to call from HTTP routes. + */ + getDispatchHealth(): Promise; + + /** + * Manually triggers one complete poll cycle (same work as the interval-driven cycle). + * Useful for testing and for callers that want to force an immediate dispatch pass. + */ + tick(): Promise; + // ---------------------------------------- // Events // ---------------------------------------- @@ -536,6 +608,25 @@ export class DispatchDaemonImpl implements DispatchDaemon { private currentPollCycle?: Promise; private rateLimitSleepTimer?: NodeJS.Timeout; private lastWakeAt?: number; + /** + * ISO timestamp when the most recent `runPollCycle` invocation entered the + * try block. Set BEFORE awaiting any work, so a wedged-mid-cycle daemon + * leaves this value old. Compared against `lastPollCompletedAt` and + * wall-clock to detect staleness in {@link getDispatchHealth}. + */ + private lastPollStartedAt?: Timestamp; + /** + * ISO timestamp when the most recent `runPollCycle` invocation finished + * (whether it succeeded or threw). Set in the `finally` block. A wedged + * cycle never reaches this assignment, so the value stays frozen at the + * previous successful completion (or undefined if no cycle has yet + * completed). Note that during a wedge, `lastPollStartedAt` also stops + * advancing because the re-entrancy guard at the top of `runPollCycle` + * bails on every subsequent setInterval tick — only the wedged cycle's + * own start-time is recorded. Surfaced via {@link getDispatchHealth} + * as the canonical "loop is alive" signal. + */ + private lastPollCompletedAt?: Timestamp; /** * Tracks inbox item IDs that are currently being forwarded to persistent agents. @@ -576,6 +667,16 @@ export class DispatchDaemonImpl implements DispatchDaemon { /** TTL for emitted warning deduplication keys (5 minutes). */ private static readonly WARNING_DEDUP_TTL_MS = 5 * 60 * 1000; + /** + * Ticks elapsed since the last stuck-queue warn was emitted. + * Initialized to POSITIVE_INFINITY so the first stuck tick warns immediately + * (Infinity + 1 is still Infinity, which is >= any interval). + * Reset to POSITIVE_INFINITY when the queue is healthy so re-stuckening + * warns immediately again. + */ + /** Tracks the last reported stuck state so we log only on transitions, never on every tick. */ + private lastReportedStuck = false; + constructor( api: QuarryAPI, agentRegistry: AgentRegistry, @@ -714,6 +815,99 @@ export class DispatchDaemonImpl implements DispatchDaemon { return this.running; } + // ---------------------------------------- + // Health + // ---------------------------------------- + + async getDispatchHealth(): Promise { + const readyTasks = await this.api.ready({ includeEphemeral: true }); + const readyUnassignedTasks = readyTasks.filter(t => !t.assignee).length; + + const allAgents = await this.agentRegistry.listAgents(); + const availableWorkers = allAgents.filter(a => { + const meta = getAgentMetadata(a); + if (!meta || meta.agentRole !== 'worker') return false; + if (isAgentDisabled(a)) return false; + if (meta.sessionStatus === 'terminated') return false; + return true; + }).length; + + const stuck = readyUnassignedTasks > 0 && availableWorkers === 0; + + // pollStale catches two failure modes: + // 1) A cycle is currently in flight (startedAt exists and is newer than + // completedAt, or completedAt doesn't exist at all) and has been + // running for longer than the threshold. This is the wedged-mid-await + // case observed in production. + // 2) No cycle is in flight, but the last completed cycle is older than + // the threshold (i.e. setInterval stopped firing entirely — rarer, + // since a frozen event loop would also stop responding to HTTP). + // If neither timestamp is set, pollStale stays false to avoid false-firing + // on a just-started daemon that hasn't had a chance to poll yet. + let pollStale = false; + const now = Date.now(); + const threshold = this.config.pollStaleThresholdMs; + const startedAtMs = this.lastPollStartedAt + ? new Date(this.lastPollStartedAt).getTime() + : undefined; + const completedAtMs = this.lastPollCompletedAt + ? new Date(this.lastPollCompletedAt).getTime() + : undefined; + const cycleInFlight = startedAtMs !== undefined + && (completedAtMs === undefined || startedAtMs > completedAtMs); + if (cycleInFlight && startedAtMs !== undefined) { + pollStale = now - startedAtMs > threshold; + } else if (completedAtMs !== undefined) { + pollStale = now - completedAtMs > threshold; + } + + return { + readyUnassignedTasks, + availableWorkers, + stuck, + hasStuckQueue: stuck, + computedAt: new Date().toISOString(), + lastPollStartedAt: this.lastPollStartedAt, + lastPollCompletedAt: this.lastPollCompletedAt, + pollStale, + }; + } + + async tick(): Promise { + await this.runPollCycle(); + } + + /** + * Logs ONLY on stuck-queue state transitions, never repeats while the state holds. + * Healthy → Stuck: emits a single STUCK warn line. + * Stuck → Healthy: emits a single RESUMED info line. + * No periodic reminders, so a long-running stuck state does not spam logs. + */ + private async maybeLogStuckQueueWarning(): Promise { + let health: DispatchHealth; + try { + health = await this.getDispatchHealth(); + } catch { + // Health computation must never crash the tick loop. + return; + } + + if (health.stuck && !this.lastReportedStuck) { + const log = this.config.logger ?? logger; + log.warn( + `[dispatch] STUCK: ${health.readyUnassignedTasks} task(s) ready, no available workers. Register or enable a worker so dispatch can proceed.` + ); + this.lastReportedStuck = true; + } else if (!health.stuck && this.lastReportedStuck) { + const log = this.config.logger ?? logger; + const info = log.info ?? log.warn; + info( + `[dispatch] RESUMED: ${health.availableWorkers} worker(s) available, dispatch flowing again.` + ); + this.lastReportedStuck = false; + } + } + // ---------------------------------------- // Rate Limiting // ---------------------------------------- @@ -1916,7 +2110,7 @@ export class DispatchDaemonImpl implements DispatchDaemon { // Configuration // ---------------------------------------- - getConfig(): Omit, 'onSessionStarted'> & { onSessionStarted?: OnSessionStartedCallback } { + getConfig(): Omit, 'onSessionStarted' | 'logger'> & { onSessionStarted?: OnSessionStartedCallback; logger?: { warn: (...args: unknown[]) => void } } { return { ...this.config }; } @@ -2006,8 +2200,15 @@ export class DispatchDaemonImpl implements DispatchDaemon { let pollIntervalMs = config?.pollIntervalMs ?? DISPATCH_DAEMON_DEFAULT_POLL_INTERVAL_MS; pollIntervalMs = Math.max(DISPATCH_DAEMON_MIN_POLL_INTERVAL_MS, Math.min(DISPATCH_DAEMON_MAX_POLL_INTERVAL_MS, pollIntervalMs)); + // Default staleness threshold: max(60s, 10× poll interval). 60s is well + // above any healthy cycle's expected duration; 10× protects users running + // with a custom long pollIntervalMs. + const pollStaleThresholdMs = config?.pollStaleThresholdMs + ?? Math.max(60_000, 10 * pollIntervalMs); + return { pollIntervalMs, + pollStaleThresholdMs, workerAvailabilityPollEnabled: config?.workerAvailabilityPollEnabled ?? true, inboxPollEnabled: config?.inboxPollEnabled ?? true, stewardTriggerPollEnabled: config?.stewardTriggerPollEnabled ?? true, @@ -2027,6 +2228,7 @@ export class DispatchDaemonImpl implements DispatchDaemon { directorInboxForwardingEnabled: config?.directorInboxForwardingEnabled ?? true, directorInboxIdleThresholdMs: config?.directorInboxIdleThresholdMs ?? 120_000, ensureTargetBranchExists: config?.ensureTargetBranchExists ?? ensureTargetBranchExists, + logger: config?.logger, }; } @@ -2036,6 +2238,7 @@ export class DispatchDaemonImpl implements DispatchDaemon { private async runPollCycle(): Promise { if (this.polling) return; this.polling = true; + this.lastPollStartedAt = createTimestamp(); try { // Check if dispatch is paused due to rate limiting. // When paused, skip dispatch-related polls but still run non-dispatch work. @@ -2133,7 +2336,10 @@ export class DispatchDaemonImpl implements DispatchDaemon { if (this.config.workflowAutoTransitionEnabled) { await this.pollWorkflowAutoTransition(); } + + await this.maybeLogStuckQueueWarning(); } finally { + this.lastPollCompletedAt = createTimestamp(); this.polling = false; } }