From 5263f23d5bbdb8c4da514f57a74dbbe098b0d552 Mon Sep 17 00:00:00 2001 From: meidad Date: Wed, 17 Jun 2026 19:27:24 -0700 Subject: [PATCH 1/2] feat(mobile): card events, consumer Tasks/cron, elicitation answer, in-loop teams - AgentEvent surfaces for Plan (TodoWrite) and Ask (ask_user elicitation); stream the ask to mobile/terminal clients via a per-source emitter - AnswerQuestion RPC (NomosAgent + MobileApi) -> ElicitationManager.resolveById, so ask_user answers arrive out-of-band without deadlocking the session queue - Consumer Tasks: NomosAgent ListTasks/UpdateTask/DeleteTask; curateConsumerTasks filters infra loops by source; prettifyTaskName for friendly display names - prettifySchedule renders weekdays/weekly/monthly cron forms - delegate_to_team MCP tool: spin up parallel sub-agent teams in-loop with no /team prefix, in both hosted + power-user modes, with a worker recursion guard - eval: runTasks coverage + manifest updates (scheduled-tasks effect flipped to a hard check, new ask-user-elicitation, multi-agent-teams tool); proto + tests Co-Authored-By: Claude Opus 4.8 (1M context) --- eval/agent-eval.ts | 60 +++++++++++++ eval/feature-manifest.ts | 50 +++++++++-- proto/nomos.proto | 19 +++++ src/cron/loop-view.test.ts | 10 ++- src/cron/schedule-format.test.ts | 34 ++++++++ src/cron/schedule-format.ts | 18 +++- src/cron/task-view.test.ts | 37 +++++++- src/cron/task-view.ts | 43 +++++++++- src/daemon/agent-runtime.ts | 136 +++++++++++++++++++++++++----- src/daemon/elicitation-manager.ts | 43 +++++++++- src/daemon/gateway.ts | 1 + src/daemon/grpc-server.ts | 108 ++++++++++++++++++++++++ src/daemon/mobile-api.ts | 18 ++++ src/daemon/types.ts | 12 +++ src/sdk/team-mcp.ts | 86 +++++++++++++++++++ 15 files changed, 634 insertions(+), 41 deletions(-) create mode 100644 src/cron/schedule-format.test.ts create mode 100644 src/sdk/team-mcp.ts diff --git a/eval/agent-eval.ts b/eval/agent-eval.ts index f0ec7856..b7b0c8c1 100644 --- a/eval/agent-eval.ts +++ b/eval/agent-eval.ts @@ -91,6 +91,7 @@ import { } from "../src/memory/magic-docs.ts"; import { SessionStore } from "../src/sessions/store.ts"; import { CronStore } from "../src/cron/store.ts"; +import { curateConsumerTasks } from "../src/cron/task-view.ts"; import { createDraft, getDraft, @@ -1086,6 +1087,64 @@ async function runCron(): Promise { if (!KEEP) await store.deleteJob(jobId); // cron_runs cascade via FK } +async function runTasks(): Promise { + // scheduled-tasks: the consumer Tasks surface (curateConsumerTasks, served behind + // BOTH NomosAgent.ListTasks and MobileApi.ListTasks). A reminder the assistant + // scheduled via schedule_task (source='agent') must surface; the instance's + // always-on system/bundled infra loops -- which collapse onto the owner's user_id + // in power-user mode -- must be filtered OUT by curateConsumerTasks' source guard. + // Deterministic (no LLM): the live agent->schedule_task->Tasks path is covered by + // the iOS XCUITest; here we guard the durable effect + the curation/filter. + const store = new CronStore(); + const owner = "eval-tasks-a"; + // cron_jobs.name is globally UNIQUE (idx_cron_name), so namespace both rows to avoid + // colliding with seeded loops in nomos_eval. + const reminderId = await store.createJob({ + userId: owner, + name: "eval-tasks-call-dentist", + schedule: "2026-06-18T09:00:00", + scheduleType: "at", + sessionTarget: "isolated", + deliveryMode: "none", + prompt: "Remind the user to call the dentist", + enabled: true, + errorCount: 0, + source: "agent", // what schedule_task stamps for a user-owned task + }); + const infraId = await store.createJob({ + userId: owner, // same user_id as the reminder (power-user collapses system onto the owner) + name: "eval-tasks-infra-loop", + schedule: "6h", + scheduleType: "every", + sessionTarget: "isolated", + deliveryMode: "none", + prompt: "consolidate", + enabled: true, + errorCount: 0, + source: "system", // an infra loop -- must NOT appear on Tasks + }); + + const tasks = curateConsumerTasks(await store.listJobs({ userId: owner })); + check( + "[tasks] a schedule_task reminder (source=agent) surfaces on the consumer Tasks view", + tasks.some((t) => t.id === reminderId && t.source === "agent" && t.scheduleType === "at"), + `tasks=${tasks.map((t) => `${t.name}:${t.source}`).join(", ")}`, + ); + check( + "[tasks] system/bundled infra loops are filtered out of Tasks (curateConsumerTasks source guard)", + !tasks.some((t) => t.id === infraId) && + tasks.every((t) => t.source !== "system" && t.source !== "bundled"), + ); + + // Under --audit (KEEP) the source=agent reminder is intentionally left in nomos_eval + // so the scheduled-tasks effect SQL (source IN ('agent','user')) is exercised, not + // just declared. A plain run cleans both up. + if (!KEEP) { + await store.deleteJob(reminderId); + await store.deleteJob(infraId); + } +} + async function runDrafts(): Promise { // draft_messages: consent-aware outgoing drafts. Per-user at the LIST layer; // status state machine is pending -> approved -> sent, or pending -> rejected. @@ -3207,6 +3266,7 @@ async function runEval(): Promise { await runCommitments(); await runSessionResume(); await runCron(); + await runTasks(); await runDrafts(); await runAutoLinkerGuard(); await runRelationshipStats(); diff --git a/eval/feature-manifest.ts b/eval/feature-manifest.ts index ae3677e8..b704ef92 100644 --- a/eval/feature-manifest.ts +++ b/eval/feature-manifest.ts @@ -819,15 +819,19 @@ export const FEATURES: FeatureSpec[] = [ { id: "multi-agent-teams", summary: - "Coordinator/worker orchestration via the /team prefix; parallel workers, synthesized result.", + "Coordinator/worker orchestration: a coordinator decomposes a task into parallel workers and synthesizes one result. Triggered EITHER by the `/team` prefix (fast path) OR by the in-loop `delegate_to_team` tool (buildTeamMcpServer) the agent calls when the user asks in natural language ('research X from three angles', 'spin up a team') — both work in hosted + power-user modes (both converge on AgentRuntime.runAgent). Gated on teamMode. Workers receive only the BASE mcp set (no nomos-team), so they can never recurse into delegation.", trigger: { kind: "turn", gate: "teamMode" }, - entry: ["stripTeamPrefix", "TeamRuntime"], + entry: ["stripTeamPrefix", "TeamRuntime", "buildTeamMcpServer"], effects: [ { claim: "spawns parallel workers + synthesizes (transient, no durable DB state)", notExercised: true, }, ], + invariants: [ + "invokable without the /team prefix via delegate_to_team, in both hosted + power-user modes", + "workers get only the base mcp set, so a worker can never spawn a nested team", + ], }, // ── Self-improvement (the learning loop) ── @@ -983,25 +987,59 @@ export const FEATURES: FeatureSpec[] = [ { id: "scheduled-tasks", summary: - "Consumer Tasks surface (MobileApi.ListTasks/UpdateTask/DeleteTask). ListTasks returns the user's own scheduled cron_jobs (one-off 'at' reminders + recurring jobs created via schedule_task/loop_create), owner-scoped by user_id so the instance's system-owned background loops never appear. UpdateTask reschedules/renames/edits the instruction/enables; DeleteTask removes one. Both assert ownership before mutating.", + "Consumer Tasks surface, served by BOTH NomosAgent.ListTasks/UpdateTask/DeleteTask (grpc-server, local power-user) and MobileApi.ListTasks/UpdateTask/DeleteTask (hosted, auth-gated); GetToday reuses curateConsumerTasks for its task strip. A 'task' is any cron_jobs row the user/assistant scheduled (one-off 'at' reminders + recurring 'every'/'cron' jobs created via schedule_task/loop_create). curateConsumerTasks(jobs) filters out INFRA_SOURCES (source in {system,bundled}) so the instance's always-on system loops + bundled templates never appear on Tasks even in power-user mode, where systemTenant() collapses onto the owner so they share the owner's user_id; sorts enabled-first then alphabetical. toConsumerTask shapes each row + prettifies the schedule. UpdateTask reschedules/renames/edits the instruction/enables; DeleteTask removes one; both assert ownership before mutating.", trigger: { kind: "turn" }, entry: ["curateConsumerTasks", "toConsumerTask"], effects: [ { - claim: "the user's scheduled tasks are stored as owner-scoped cron_jobs", + // Exercised by runTasks: a schedule_task-style reminder (source='agent') + // is created + survives under --audit (KEEP), so the count is nonzero. + claim: + "user/assistant-scheduled tasks are stored as owner-scoped cron_jobs (source='agent')", sql: { query: "SELECT count(*) FROM cron_jobs WHERE source IN ('agent','user')", expect: "nonzero", }, - notExercised: true, + }, + { + // The complement: infra loops genuinely EXIST in cron_jobs (so "Tasks hides + // them" is a non-vacuous claim); curateConsumerTasks filters them out of the + // view (asserted by runTasks' check() + the task-view unit test). + claim: + "infra loops (system/bundled) exist as cron_jobs but are filtered out of the Tasks view", + sql: { + query: "SELECT count(*) FROM cron_jobs WHERE source IN ('system','bundled')", + expect: "nonzero", + }, }, ], invariants: [ - "Tasks are owner-scoped (user_id); managed/system loops never appear on this surface", + "Tasks are owner-scoped (user_id); system/bundled infra loops are filtered out by INFRA_SOURCES and never appear on this surface, even when they share the owner's user_id in power-user mode", + "served by both NomosAgent (local) and MobileApi (hosted) ListTasks/UpdateTask/DeleteTask off the same curateConsumerTasks view", "UpdateTask/DeleteTask assert job.userId === the resolved owner before mutating", "schedule_task stamps source='agent' (a user-owned task, not infra)", ], }, + { + id: "ask-user-elicitation", + summary: + "MCP-native ask_user round-trip: an in-process tool raises an elicitation/create request; the SDK relays it to AgentRuntime's onElicitation callback (handleElicitation), which renders the question on the user's active channel. Slack gets Block Kit buttons, any text channel matches a numbered/label reply, and channel-less clients (mobile/terminal) get an 'ask' AgentEvent over the open stream via a per-source registered emitter (registerEmitter/unregisterEmitter), with the answer returned OUT-OF-BAND through the AnswerQuestion RPC (NomosAgent + MobileApi) -> resolveById. Answering out-of-band (not as a new chat turn) avoids deadlocking the per-session FIFO queue behind the suspended turn. A pending entry is keyed by elicitation id with a TTL auto-decline.", + trigger: { kind: "turn" }, + entry: ["handleElicitation", "registerEmitter", "unregisterEmitter", "resolveById"], + effects: [ + { + claim: + "ask_user renders the question on the active channel / over the open stream and resolves the agent's suspended promise from the out-of-band answer (in-memory pending map + transient 'ask' stream event; no durable table)", + notExercised: true, + }, + ], + invariants: [ + "the elicitation manager is created at gateway boot and handed to both the runtime and the gRPC server (setElicitationManager)", + "AnswerQuestion is served by BOTH NomosAgent (grpc-server) and MobileApi (auth-gated) and both resolve via resolveById", + "mobile/terminal sources (no channel adapter) register a per-source emitter that pushes an 'ask' AgentEvent and is torn down when the turn ends", + "the answer arrives out-of-band (a dedicated RPC), never as a new chat message, so the suspended turn never deadlocks the session queue", + ], + }, { id: "brain-overview", summary: diff --git a/proto/nomos.proto b/proto/nomos.proto index 271e5639..48ac33ad 100644 --- a/proto/nomos.proto +++ b/proto/nomos.proto @@ -26,6 +26,14 @@ service NomosAgent { rpc SetLoopEnabled (SetLoopEnabledRequest) returns (LoopActionResponse); rpc DeleteLoop (LoopDeleteRequest) returns (LoopActionResponse); + // Scheduled tasks (the local owner's cron_jobs) — mirrors MobileApi for local mode. + rpc ListTasks (Empty) returns (MTasksResponse); + rpc UpdateTask (MTaskUpdateRequest) returns (MAck); + rpc DeleteTask (MTaskDeleteRequest) returns (MAck); + + // Answer a pending ask_user elicitation out-of-band (does not enqueue a turn). + rpc AnswerQuestion (MAnswerRequest) returns (MAck); + // Health check rpc Ping (Empty) returns (PongResponse); } @@ -208,6 +216,9 @@ service MobileApi { rpc UpdateTask (MTaskUpdateRequest) returns (MAck); rpc DeleteTask (MTaskDeleteRequest) returns (MAck); + // Answer a pending ask_user elicitation out-of-band (does not enqueue a turn). + rpc AnswerQuestion (MAnswerRequest) returns (MAck); + // Brain tab (the user's knowledge graph + learned facts, for the feed + map) rpc GetBrain (Empty) returns (MBrainResponse); @@ -362,6 +373,14 @@ message MAck { string message = 2; } +// Answer to a pending ask_user elicitation, delivered out-of-band so it does not +// queue behind the suspended turn that is awaiting it. +message MAnswerRequest { + string session_key = 1; + string question_id = 2; + string answer = 3; +} + // Vault (long-term memory / knowledge base) message MVaultListRequest { string prefix = 1; diff --git a/src/cron/loop-view.test.ts b/src/cron/loop-view.test.ts index 3c9f7d98..cf56824a 100644 --- a/src/cron/loop-view.test.ts +++ b/src/cron/loop-view.test.ts @@ -98,12 +98,16 @@ describe("prettifySchedule", () => { expect(prettifySchedule("15m", "every")).toBe("Every 15 minutes"); }); - it("renders daily cron expressions", () => { + it("renders daily + weekday/weekly/monthly cron expressions", () => { expect(prettifySchedule("0 8 * * *", "cron")).toBe("Daily at 8:00 AM"); expect(prettifySchedule("30 17 * * *", "cron")).toBe("Daily at 5:30 PM"); + expect(prettifySchedule("0 9 * * 1-5", "cron")).toBe("Weekdays at 9:00 AM"); + expect(prettifySchedule("0 9 * * 1", "cron")).toBe("Weekly on Mon at 9:00 AM"); + expect(prettifySchedule("0 8 15 * *", "cron")).toBe("Monthly on day 15 at 8:00 AM"); }); - it("falls back to the raw string for unrecognized shapes", () => { - expect(prettifySchedule("0 9 * * 1", "cron")).toBe("0 9 * * 1"); + it("falls back to the raw string for genuinely unrecognized shapes", () => { + expect(prettifySchedule("*/5 * * * *", "cron")).toBe("*/5 * * * *"); + expect(prettifySchedule("0 9 1 1 *", "cron")).toBe("0 9 1 1 *"); // specific month, not modeled }); }); diff --git a/src/cron/schedule-format.test.ts b/src/cron/schedule-format.test.ts new file mode 100644 index 00000000..7ea0b2ec --- /dev/null +++ b/src/cron/schedule-format.test.ts @@ -0,0 +1,34 @@ +import { describe, it, expect } from "vitest"; +import { prettifySchedule } from "./schedule-format.ts"; + +describe("prettifySchedule", () => { + it("renders interval ('every') forms", () => { + expect(prettifySchedule("1h", "every")).toBe("Hourly"); + expect(prettifySchedule("24h", "every")).toBe("Daily"); + expect(prettifySchedule("15m", "every")).toBe("Every 15 minutes"); + expect(prettifySchedule("2d", "every")).toBe("Every 2 days"); + }); + + it("renders the friendly cron forms the consumer editor produces", () => { + expect(prettifySchedule("0 9 * * *", "cron")).toBe("Daily at 9:00 AM"); + expect(prettifySchedule("30 17 * * 1-5", "cron")).toBe("Weekdays at 5:30 PM"); + expect(prettifySchedule("0 18 * * 1", "cron")).toBe("Weekly on Mon at 6:00 PM"); + expect(prettifySchedule("0 18 * * 1,3,5", "cron")).toBe("Weekly on Mon, Wed, Fri at 6:00 PM"); + expect(prettifySchedule("0 8 15 * *", "cron")).toBe("Monthly on day 15 at 8:00 AM"); + }); + + it("treats Sunday as 0 or 7", () => { + expect(prettifySchedule("0 9 * * 0", "cron")).toBe("Weekly on Sun at 9:00 AM"); + expect(prettifySchedule("0 9 * * 7", "cron")).toBe("Weekly on Sun at 9:00 AM"); + }); + + it("falls back to the raw expression for unmappable cron", () => { + expect(prettifySchedule("*/5 * * * *", "cron")).toBe("*/5 * * * *"); + expect(prettifySchedule("0 9 1 1 *", "cron")).toBe("0 9 1 1 *"); // specific month, not handled + }); + + it("renders one-off 'at' times", () => { + expect(prettifySchedule("2026-06-18T09:00:00Z", "at").startsWith("Once,")).toBe(true); + expect(prettifySchedule("not-a-date", "at")).toBe("not-a-date"); + }); +}); diff --git a/src/cron/schedule-format.ts b/src/cron/schedule-format.ts index 9e3acd8a..532e11b2 100644 --- a/src/cron/schedule-format.ts +++ b/src/cron/schedule-format.ts @@ -4,6 +4,8 @@ * and toggle/delete key off the id/name); this is display-only. */ +const DOW = ["Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"]; + export function prettifySchedule(schedule: string, scheduleType: string): string { const s = schedule.trim(); @@ -28,9 +30,19 @@ export function prettifySchedule(schedule: string, scheduleType: string): string if (scheduleType === "cron") { const parts = s.split(/\s+/); if (parts.length === 5) { - const [min, hour, dom, , dow] = parts; - if (/^\d+$/.test(min) && /^\d+$/.test(hour) && dom === "*" && dow === "*") { - return `Daily at ${formatClock(Number(hour), min)}`; + const [min, hour, dom, mon, dow] = parts; + if (/^\d+$/.test(min) && /^\d+$/.test(hour) && mon === "*") { + const clock = formatClock(Number(hour), min); + if (dom === "*" && dow === "*") return `Daily at ${clock}`; + if (dom === "*" && dow === "1-5") return `Weekdays at ${clock}`; + if (dom === "*" && /^[0-7](,[0-7])*$/.test(dow)) { + const names = dow + .split(",") + .map((d) => DOW[Number(d) === 7 ? 0 : Number(d)]) + .join(", "); + return `Weekly on ${names} at ${clock}`; + } + if (dow === "*" && /^\d+$/.test(dom)) return `Monthly on day ${dom} at ${clock}`; } } return s; diff --git a/src/cron/task-view.test.ts b/src/cron/task-view.test.ts index 7f124875..fa0b4bd4 100644 --- a/src/cron/task-view.test.ts +++ b/src/cron/task-view.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import { curateConsumerTasks, toConsumerTask } from "./task-view.ts"; +import { curateConsumerTasks, prettifyTaskName, toConsumerTask } from "./task-view.ts"; import { prettifySchedule } from "./schedule-format.ts"; import type { CronJob } from "./types.ts"; @@ -36,6 +36,23 @@ describe("toConsumerTask", () => { }); }); +describe("prettifyTaskName", () => { + it("humanizes kebab/snake/camel slugs to Title Case", () => { + expect(prettifyTaskName("call-dentist")).toBe("Call Dentist"); + expect(prettifyTaskName("water_plants")).toBe("Water Plants"); + expect(prettifyTaskName("checkUrgentEmails")).toBe("Check Urgent Emails"); + }); + + it("leaves real prose alone (only capitalizes the first letter)", () => { + expect(prettifyTaskName("Check my inbox")).toBe("Check my inbox"); + expect(prettifyTaskName("review the PR diff")).toBe("Review the PR diff"); + }); + + it("is applied by toConsumerTask so both transports show a friendly name", () => { + expect(toConsumerTask(job({ name: "call-dentist" })).name).toBe("Call Dentist"); + }); +}); + describe("curateConsumerTasks", () => { it("sorts enabled first, then alphabetical", () => { const out = curateConsumerTasks([ @@ -43,13 +60,25 @@ describe("curateConsumerTasks", () => { job({ name: "apple", enabled: false }), job({ name: "mango", enabled: true }), ]); - expect(out.map((t) => t.name)).toEqual(["mango", "zebra", "apple"]); + expect(out.map((t) => t.name)).toEqual(["Mango", "Zebra", "Apple"]); }); - it("passes through every owned job (filtering is done by the per-user query)", () => { - const out = curateConsumerTasks([job({ name: "a" }), job({ name: "b" })]); + it("passes through user/agent-scheduled jobs", () => { + const out = curateConsumerTasks([ + job({ name: "a", source: "agent" }), + job({ name: "b", source: "user" }), + ]); expect(out).toHaveLength(2); }); + + it("hides infra loops (system/bundled) that share the owner's user_id in power-user mode", () => { + const out = curateConsumerTasks([ + job({ name: "call-dentist", source: "agent" }), + job({ name: "auto-dream", source: "system" }), + job({ name: "calendar-prep", source: "bundled" }), + ]); + expect(out.map((t) => t.name)).toEqual(["Call Dentist"]); + }); }); describe("prettifySchedule (shared)", () => { diff --git a/src/cron/task-view.ts b/src/cron/task-view.ts index b3a0261b..e00988c7 100644 --- a/src/cron/task-view.ts +++ b/src/cron/task-view.ts @@ -11,6 +11,28 @@ import type { CronJob } from "./types.ts"; import { prettifySchedule } from "./schedule-format.ts"; +/** + * Humanize a stored task name for display. The agent's `schedule_task` tool coins + * slug-style names ("call-dentist", "water_plants", "checkUrgentEmails"); this turns + * them into a Title-Cased label ("Call Dentist", "Water Plants") without touching the + * stored value -- it stays display-only so name-keyed lookups (getJobByName, + * isLoopUserDisabled) and the id-keyed edit round-trip are unaffected. A name that's + * already a real sentence (contains a space) is left alone except for capitalization. + */ +export function prettifyTaskName(name: string): string { + const raw = name.trim(); + if (!raw) return raw; + // Already prose (has whitespace): only ensure the first letter is capitalized. + if (/\s/.test(raw)) return raw.charAt(0).toUpperCase() + raw.slice(1); + return raw + .replace(/[-_]+/g, " ") // kebab/snake -> spaces + .replace(/([a-z0-9])([A-Z])/g, "$1 $2") // camelCase -> words + .trim() + .split(/\s+/) + .map((w) => w.charAt(0).toUpperCase() + w.slice(1)) + .join(" "); +} + export interface ConsumerTask { id: string; name: string; @@ -26,10 +48,21 @@ export interface ConsumerTask { lastRun: string; } +/** + * Infra-owned cron sources that are NEVER user "tasks": the always-on background + * loops (`system`) and the shipped templates (`bundled`). In hosted mode these + * live under the synthetic `system` tenant so a per-user query already excludes + * them -- but in power-user mode `systemTenant()` collapses onto the owner + * (`local`), so they share the user's `user_id` and MUST be filtered by source. + * Tasks = what the user or the assistant (`source: "agent"`/`"user"`) scheduled; + * the curated infra loops live on the Loops surface (see cron/loop-view.ts). + */ +const INFRA_SOURCES = new Set(["system", "bundled"]); + export function toConsumerTask(j: CronJob): ConsumerTask { return { id: j.id, - name: j.name, + name: prettifyTaskName(j.name), prompt: j.prompt, schedule: j.schedule, scheduleType: j.scheduleType, @@ -40,9 +73,15 @@ export function toConsumerTask(j: CronJob): ConsumerTask { }; } -/** The user's scheduled tasks, enabled first then alphabetical. */ +/** + * The user's scheduled tasks: every reminder/job the user or assistant created, + * with the instance's infra loops (`system`/`bundled`) filtered out so they never + * leak onto Tasks in power-user mode (where they share `user_id`). Enabled first, + * then alphabetical. + */ export function curateConsumerTasks(jobs: CronJob[]): ConsumerTask[] { return jobs + .filter((j) => !INFRA_SOURCES.has(j.source ?? "user")) .map(toConsumerTask) .sort((a, b) => Number(b.enabled) - Number(a.enabled) || a.name.localeCompare(b.name)); } diff --git a/src/daemon/agent-runtime.ts b/src/daemon/agent-runtime.ts index 3b052a18..2d23d9c5 100644 --- a/src/daemon/agent-runtime.ts +++ b/src/daemon/agent-runtime.ts @@ -33,6 +33,7 @@ import { buildStudioMcpServer } from "../sdk/studio-mcp.ts"; import { buildVaultMcpServer } from "../sdk/vault-mcp.ts"; import { buildThinkMcpServer } from "../sdk/think-mcp.ts"; import { buildLoopMcpServer } from "../sdk/loop-mcp.ts"; +import { buildTeamMcpServer } from "../sdk/team-mcp.ts"; import { buildMemoryDigest } from "../memory/digest.ts"; import { captureMoodFromTurn } from "../memory/mood-log.ts"; import { getRelevantArticles } from "../memory/wiki-reader.ts"; @@ -64,6 +65,53 @@ function formatElapsedSince(date: Date): string { const months = Math.floor(days / 30); return `${months} month${months === 1 ? "" : "s"}`; } + +/** A short, human one-liner describing a tool call, derived from its input. */ +function summarizeToolInput(name: string, input: unknown): string { + const o = (input ?? {}) as Record; + const str = (v: unknown): string => (typeof v === "string" ? v : ""); + switch (name) { + case "WebSearch": + return str(o.query); + case "WebFetch": + return str(o.url); + case "Read": + case "Edit": + case "Write": + return str(o.file_path); + case "Bash": + return str(o.command); + case "Grep": + case "Glob": + return str(o.pattern); + default: { + const pick = + str(o.query) || str(o.prompt) || str(o.path) || str(o.message) || str(o.description); + if (pick) return pick; + try { + const j = JSON.stringify(input); + return j && j.length > 2 ? (j.length > 120 ? `${j.slice(0, 117)}…` : j) : ""; + } catch { + return ""; + } + } + } +} + +/** Convert a TodoWrite tool input into a `plan` event for clients (PlanCard). */ +function todoWriteToPlan(input: unknown): Extract | null { + const todos = (input as { todos?: unknown })?.todos; + if (!Array.isArray(todos) || todos.length === 0) return null; + const items = todos.map((t) => { + const o = (t ?? {}) as Record; + const status = typeof o.status === "string" ? o.status : "pending"; + const state = status === "completed" ? "done" : status === "in_progress" ? "active" : "todo"; + const content = typeof o.content === "string" ? o.content : ""; + const activeForm = typeof o.activeForm === "string" ? o.activeForm : undefined; + return { title: content, sub: state === "active" ? activeForm : undefined, state } as const; + }); + return { type: "plan", title: "Plan", items }; +} import { classifyQuery } from "../routing/classifier.ts"; import { loadUserProfile, @@ -1028,6 +1076,34 @@ export class AgentRuntime { } } + // Team delegation as an in-loop tool: the agent spins up a parallel sub-agent + // team when the user asks in natural language ("research X from three angles", + // "spin up a team") — no `/team` prefix needed, in BOTH hosted and power-user + // modes (both converge here). The TeamTask carries only the BASE mcp set (which + // excludes `nomos-team`), so workers can never receive the delegate tool and + // recurse. Gated on teamMode; the `/team` prefix stays as a fast path. + if (this.config.teamMode && this.teamRuntime) { + const teamRuntime = this.teamRuntime; + const turnModel = model ?? this.config.model; + const teamServers: Record> = { + "nomos-team": buildTeamMcpServer({ + runTeam: (t, e) => teamRuntime.runTeam(t, e), + teamTaskBase: () => ({ + systemPromptAppend: this.systemPromptAppend, + mcpServers: this.mcpServers, + permissionMode: "bypassPermissions", + allowedTools: Object.keys(this.mcpServers).map((n) => `mcp__${n}`), + disallowedTools: getDisallowedTools(), + model: turnModel, + plugins: this.plugins, + }), + isWorkerContext: false, + onProgress: (m) => emit({ type: "system", subtype: "status", message: m }), + }), + }; + mcpServers = { ...mcpServers, ...teamServers }; + } + // Reasoning-first: always-inject what the agent already knows about the user, // so it stays continuous without having to call a recall tool first. const memoryDigest = await buildMemoryDigest(vaultUserId).catch(() => ""); @@ -1076,6 +1152,14 @@ export class AgentRuntime { // Inject team context from a previous /team turn (if any) let systemPromptAppend = this.systemPromptAppend; + // Tell the agent it can delegate to a parallel sub-agent team in-loop, just by + // the user asking — no `/team` prefix needed (the tool is registered above when + // teamMode is on, in both hosted and power-user modes). + if (this.config.teamMode && this.teamRuntime) { + systemPromptAppend = + systemPromptAppend + + "\n\n## Working as a team\nWhen a request is genuinely parallelizable — research from several angles at once, compare multiple options, or the user explicitly asks for a team / parallel work — call `delegate_to_team` with a self-contained task (and optional `angles`). It runs independent sub-agents and hands you back one synthesized result to weave into your reply. Reserve it for multi-angle or heavy work; don't use it for simple single-step tasks. (Power users can also start a message with `/team` for the same thing.)"; + } if (sessionKey) { const teamCtx = this.teamContext.get(sessionKey); if (teamCtx) { @@ -1142,6 +1226,12 @@ export class AgentRuntime { mgr && source ? (request, opts) => mgr.handleElicitation(request, source, opts.signal) : undefined; + // Mobile / local-terminal clients have no channel adapter, so render ask_user over + // THIS open chat stream via `emit` (the answer returns out-of-band via AnswerQuestion). + const elicitationOnStream = Boolean( + mgr && source && (source.platform === "mobile" || source.platform === "terminal"), + ); + if (elicitationOnStream && mgr && source) mgr.registerEmitter(source, emit); const sdkQuery = runSession({ prompt, @@ -1187,6 +1277,28 @@ export class AgentRuntime { if (block.type === "text" && block.text) { if (fullText && !fullText.endsWith("\n")) fullText += "\n"; fullText += block.text; + } else if (block.type === "tool_use" || block.type === "server_tool_use") { + // The model called a tool. Surface it as a tool_use_summary event so + // clients (CLI, mobile) can render a tool-use card. The SDK never sends + // a standalone "tool_use_summary" message -- tool calls only arrive as + // content blocks on the assistant turn, so this is the one place to + // catch them (incl. server tools like web_search). + const toolName = (block as { name?: string }).name ?? "unknown"; + const summary = summarizeToolInput(toolName, (block as { input?: unknown }).input); + emit({ type: "tool_use_summary", tool_name: toolName, summary }); + // TodoWrite also drives a richer Plan card (clients suppress its tool card). + if (toolName === "TodoWrite") { + const plan = todoWriteToPlan((block as { input?: unknown }).input); + if (plan) emit(plan); + } + // Shadow mode: record tool usage observation. + if (this.shadowObserver?.isEnabled() && sessionKey) { + this.shadowObserver.recordToolUse(toolName, summary, sessionKey); + if (["Read", "Edit", "Write"].includes(toolName) && summary) { + const action = toolName.toLowerCase() as "read" | "edit" | "write"; + this.shadowObserver.recordFileAccess(summary, action); + } + } } } emit({ type: "stream_event", event: msg }); @@ -1198,28 +1310,6 @@ export class AgentRuntime { break; } - case "tool_use_summary": { - const toolName = (msg as { tool_name?: string }).tool_name ?? "unknown"; - emit({ - type: "tool_use_summary", - tool_name: toolName, - summary: msg.summary, - }); - // Shadow mode: record tool usage observation - if (this.shadowObserver?.isEnabled() && sessionKey) { - this.shadowObserver.recordToolUse(toolName, msg.summary, sessionKey); - // Record file access for Read/Edit/Write tools - if (["Read", "Edit", "Write"].includes(toolName) && typeof msg.summary === "string") { - const pathMatch = msg.summary.match(/(?:Read|Edit|Write)\s+(\S+)/); - if (pathMatch) { - const action = toolName.toLowerCase() as "read" | "edit" | "write"; - this.shadowObserver.recordFileAccess(pathMatch[1]!, action); - } - } - } - break; - } - case "result": { sessionId = msg.session_id; costUsd = msg.total_cost_usd ?? 0; @@ -1268,6 +1358,8 @@ export class AgentRuntime { } } + if (elicitationOnStream && mgr && source) mgr.unregisterEmitter(source); + return { text: fullText, sessionId, costUsd, inputTokens, outputTokens }; } } diff --git a/src/daemon/elicitation-manager.ts b/src/daemon/elicitation-manager.ts index 2c4aab17..b28cce57 100644 --- a/src/daemon/elicitation-manager.ts +++ b/src/daemon/elicitation-manager.ts @@ -21,7 +21,7 @@ import { randomUUID } from "node:crypto"; import type { ElicitationRequest, ElicitationResult } from "@anthropic-ai/claude-agent-sdk"; import type { ChannelManager } from "./channel-manager.ts"; -import type { OutgoingMessage } from "./types.ts"; +import type { AgentEvent, OutgoingMessage } from "./types.ts"; import { createLogger } from "../lib/logger.ts"; const log = createLogger("elicitation-manager"); @@ -69,9 +69,32 @@ export class ElicitationManager { private pending = new Map(); /** Reverse index: channelId → pending id, for fast text-reply lookup. */ private byChannel = new Map(); + /** Per-channel event emitters for clients without a channel adapter (mobile/terminal): + * render the question over the open Chat stream and accept the answer out-of-band. */ + private emitters = new Map void>(); constructor(private readonly channelManager: ChannelManager) {} + /** Register an event emitter for a source so renderQuestion can push an `ask` event. */ + registerEmitter(source: ElicitationSource, emit: (e: AgentEvent) => void): void { + this.emitters.set(channelKeyFor(source), emit); + } + + unregisterEmitter(source: ElicitationSource): void { + this.emitters.delete(channelKeyFor(source)); + } + + /** Resolve a pending elicitation by id (out-of-band answer via a client RPC). */ + resolveById(id: string, answer: string): boolean { + const entry = this.pending.get(id); + if (!entry) return false; + const matched = matchOption(answer, entry.options); + const label = + matched !== null && matched !== "ambiguous" ? entry.options[matched].label : answer; + this.resolvePending(entry, { action: "accept", content: { [ANSWER_PROPERTY]: label } }); + return true; + } + /** * Handle an MCP elicitation request from the agent SDK. Resolves with * the user's answer when they click a button or reply with a matching @@ -230,6 +253,24 @@ export class ElicitationManager { options: Array<{ label: string; description?: string }>, source: ElicitationSource, ): Promise { + // Mobile / terminal clients have no channel adapter — render the question over the + // open Chat stream via the registered emitter, and accept the answer out-of-band. + const emit = this.emitters.get(channelKeyFor(source)); + if (emit) { + emit({ + type: "ask", + id, + prompt: message, + options: options.map((o, i) => ({ + label: o.label, + desc: o.description, + key: String(i + 1), + })), + multiSelect: false, + }); + return; + } + const adapter = this.channelManager.getAdapter(source.platform); if (!adapter) { throw new Error(`No adapter for platform ${source.platform}`); diff --git a/src/daemon/gateway.ts b/src/daemon/gateway.ts index 2199295c..0173c675 100644 --- a/src/daemon/gateway.ts +++ b/src/daemon/gateway.ts @@ -138,6 +138,7 @@ export class Gateway { // its `onElicitation` callback can dispatch through here. this.elicitationManager = new ElicitationManager(this.channelManager); this.runtime.setElicitationManager(this.elicitationManager); + this.grpcServer.setElicitationManager(this.elicitationManager); // 6. Create cron engine with broadcast to connected clients this.cronEngine = new CronEngine(this.messageQueue, this.channelManager, (event) => { diff --git a/src/daemon/grpc-server.ts b/src/daemon/grpc-server.ts index 3cf4f566..079d7570 100644 --- a/src/daemon/grpc-server.ts +++ b/src/daemon/grpc-server.ts @@ -43,6 +43,7 @@ export class GrpcServer { private draftManager: DraftManager | null; private port: number; private commandHandler?: (command: string) => Promise; + private elicitationManager: import("./elicitation-manager.ts").ElicitationManager | null = null; constructor(messageQueue: MessageQueue, port: number = 8766, draftManager?: DraftManager) { this.messageQueue = messageQueue; @@ -50,6 +51,11 @@ export class GrpcServer { this.port = port; } + /** Wire in the elicitation manager so AnswerQuestion can resolve pending questions. */ + setElicitationManager(mgr: import("./elicitation-manager.ts").ElicitationManager): void { + this.elicitationManager = mgr; + } + /** Register a handler for Command RPCs. */ onCommand(handler: (command: string) => Promise): void { this.commandHandler = handler; @@ -88,6 +94,10 @@ export class GrpcServer { ListLoops: this.handleListLoops.bind(this), SetLoopEnabled: this.handleSetLoopEnabled.bind(this), DeleteLoop: this.handleDeleteLoop.bind(this), + ListTasks: this.handleListTasks.bind(this), + UpdateTask: this.handleUpdateTask.bind(this), + DeleteTask: this.handleDeleteTask.bind(this), + AnswerQuestion: this.handleAnswerQuestion.bind(this), Ping: this.handlePing.bind(this), }); this.server.addService(OAuthDepositService, { @@ -98,6 +108,7 @@ export class GrpcServer { buildMobileApiHandlers({ messageQueue: this.messageQueue, draftManager: this.draftManager, + getElicitationManager: () => this.elicitationManager, }) as unknown as grpc.UntypedServiceImplementation, ); @@ -367,6 +378,103 @@ export class GrpcServer { } } + /** ListTasks RPC -- the local owner's scheduled tasks (cron_jobs), curated. */ + private async handleListTasks( + _call: grpc.ServerUnaryCall, + callback: grpc.sendUnaryData<{ tasks: unknown[] }>, + ): Promise { + try { + const { CronStore } = await import("../cron/store.ts"); + const { curateConsumerTasks } = await import("../cron/task-view.ts"); + const jobs = await new CronStore().listJobs({ userId: "local" }); + callback(null, { tasks: curateConsumerTasks(jobs) }); + } catch (err) { + callback(err as grpc.ServiceError, null); + } + } + + /** UpdateTask RPC -- full-state edit of a local task (toggle/rename/reschedule). */ + private async handleUpdateTask( + call: grpc.ServerUnaryCall< + { + id?: string; + name?: string; + prompt?: string; + schedule?: string; + scheduleType?: string; + enabled?: boolean; + }, + unknown + >, + callback: grpc.sendUnaryData<{ success: boolean; message: string }>, + ): Promise { + try { + const req = call.request ?? {}; + if (!req.id) { + callback(null, { success: false, message: "missing_id" }); + return; + } + const { CronStore } = await import("../cron/store.ts"); + const store = new CronStore(); + const job = await store.getJob(req.id); + if (!job || job.userId !== "local") { + callback(null, { success: false, message: "task_not_found" }); + return; + } + const updates: Record = { enabled: Boolean(req.enabled) }; + if (req.name?.trim()) updates.name = req.name.trim(); + if (req.prompt?.trim()) updates.prompt = req.prompt; + if (req.schedule?.trim()) { + updates.schedule = req.schedule.trim(); + updates.scheduleType = req.scheduleType || job.scheduleType; + } + await store.updateJob(job.id, updates); + process.emit("cron:refresh" as never); + callback(null, { success: true, message: "updated" }); + } catch (err) { + callback(err as grpc.ServiceError, null); + } + } + + /** DeleteTask RPC -- delete a local task. */ + private async handleDeleteTask( + call: grpc.ServerUnaryCall<{ id?: string }, unknown>, + callback: grpc.sendUnaryData<{ success: boolean; message: string }>, + ): Promise { + try { + const id = call.request?.id; + if (!id) { + callback(null, { success: false, message: "missing_id" }); + return; + } + const { CronStore } = await import("../cron/store.ts"); + const store = new CronStore(); + const job = await store.getJob(id); + if (!job || job.userId !== "local") { + callback(null, { success: false, message: "task_not_found" }); + return; + } + await store.deleteJob(job.id); + process.emit("cron:refresh" as never); + callback(null, { success: true, message: "deleted" }); + } catch (err) { + callback(err as grpc.ServiceError, null); + } + } + + /** AnswerQuestion RPC -- resolve a pending ask_user elicitation out-of-band. */ + private handleAnswerQuestion( + call: grpc.ServerUnaryCall<{ questionId?: string; answer?: string }, unknown>, + callback: grpc.sendUnaryData<{ success: boolean; message: string }>, + ): void { + const { questionId, answer } = call.request ?? {}; + const ok = + questionId && answer != null + ? (this.elicitationManager?.resolveById(questionId, answer) ?? false) + : false; + callback(null, { success: Boolean(ok), message: ok ? "answered" : "no_pending_question" }); + } + /** Handle ApproveDraft RPC. */ private handleApproveDraft( call: grpc.ServerUnaryCall<{ draftId: string }, unknown>, diff --git a/src/daemon/mobile-api.ts b/src/daemon/mobile-api.ts index 027f6b6a..56472ca6 100644 --- a/src/daemon/mobile-api.ts +++ b/src/daemon/mobile-api.ts @@ -89,6 +89,8 @@ const AUTH_BASE_URL = process.env.AUTH_BASE_URL ?? "https://auth.mynomos.ai"; export interface MobileApiDeps { messageQueue: MessageQueue; draftManager: DraftManager | null; + /** Late-bound so AnswerQuestion can resolve a pending ask_user elicitation. */ + getElicitationManager?: () => import("./elicitation-manager.ts").ElicitationManager | null; } /** @@ -193,6 +195,9 @@ export function buildMobileApiHandlers(deps: MobileApiDeps) { DeleteTask: withAuthUnary("/nomos.MobileApi/DeleteTask", (call, ctx) => handleDeleteTask(call, ctx), ), + AnswerQuestion: withAuthUnary("/nomos.MobileApi/AnswerQuestion", (call, ctx) => + handleAnswerQuestion(deps, call, ctx), + ), GetBrain: withAuthUnary("/nomos.MobileApi/GetBrain", (_, ctx) => handleGetBrain(ctx)), GetInbox: withAuthUnary("/nomos.MobileApi/GetInbox", (_, ctx) => handleGetInbox(ctx)), GetToday: withAuthUnary("/nomos.MobileApi/GetToday", (_, ctx) => handleGetToday(ctx)), @@ -1172,6 +1177,19 @@ async function handleDeleteTask( return { success: true, message: "deleted" }; } +async function handleAnswerQuestion( + deps: MobileApiDeps, + call: grpc.ServerUnaryCall, + _ctx: TenantContext, +): Promise<{ success: boolean; message: string }> { + const req = call.request as { questionId?: string; answer?: string }; + if (!req.questionId || req.answer == null) return { success: false, message: "missing_args" }; + const ok = deps.getElicitationManager?.()?.resolveById(req.questionId, req.answer) ?? false; + return ok + ? { success: true, message: "answered" } + : { success: false, message: "no_pending_question" }; +} + // ──────────── Brain (knowledge graph + learned facts) ──────────── async function handleGetBrain(ctx: TenantContext): Promise<{ diff --git a/src/daemon/types.ts b/src/daemon/types.ts index 53f85446..b96adc41 100644 --- a/src/daemon/types.ts +++ b/src/daemon/types.ts @@ -47,6 +47,18 @@ export interface OutgoingMessage { export type AgentEvent = | { type: "stream_event"; event: SDKMessage } | { type: "tool_use_summary"; tool_name: string; summary?: string } + | { + type: "plan"; + title: string; + items: { title: string; sub?: string; state: "done" | "active" | "todo" }[]; + } + | { + type: "ask"; + id: string; + prompt: string; + options: { label: string; desc?: string; key: string }[]; + multiSelect: boolean; + } | { type: "result"; result: string; diff --git a/src/sdk/team-mcp.ts b/src/sdk/team-mcp.ts new file mode 100644 index 00000000..ca670f2b --- /dev/null +++ b/src/sdk/team-mcp.ts @@ -0,0 +1,86 @@ +/** + * In-process MCP server that lets the agent delegate a complex, parallelizable + * task to a team of sub-agents IN-LOOP — no `/team` prefix required. The model + * calls `delegate_to_team` when the user asks to tackle something from multiple + * angles, compare several options at once, or explicitly asks for a team, in BOTH + * hosted and power-user modes (both converge on AgentRuntime.runAgent). The + * synthesized result returns as the tool result, so the agent weaves it into its + * reply — strictly better than the `/team` early-return, which bypasses the loop. + * + * Recursion guard: the TeamTask handed to runTeam carries only the BASE mcp set + * (which excludes this server), so workers physically never receive the delegate + * tool and cannot fan out. `isWorkerContext` is a belt-and-suspenders refusal. + */ + +import { + createSdkMcpServer, + tool, + type McpSdkServerConfigWithInstance, +} from "@anthropic-ai/claude-agent-sdk"; +import { z } from "zod/v4"; +import type { TeamTask } from "../daemon/team-runtime.ts"; + +export interface TeamMcpDeps { + /** Run the team end-to-end (decompose → parallel workers → synthesize). */ + runTeam: ( + task: TeamTask, + emit?: (e: { type: string; message: string }) => void, + ) => Promise; + /** Everything-but-the-prompt the team needs (BASE mcp set, perms, model, plugins). */ + teamTaskBase: () => Omit; + /** True when THIS turn is itself a team worker — block recursive delegation. */ + isWorkerContext: boolean; + /** Forward team progress to the turn's event stream. */ + onProgress?: (message: string) => void; +} + +const ok = (text: string) => ({ content: [{ type: "text" as const, text }] }); +const fail = (text: string) => ({ content: [{ type: "text" as const, text }], isError: true }); + +export function buildTeamMcpServer(deps: TeamMcpDeps): McpSdkServerConfigWithInstance { + const delegateToTeam = tool( + "delegate_to_team", + "Delegate a complex, parallelizable task to a team of parallel sub-agents that research/work independently; their results are synthesized into one answer returned to you. Use it when the user asks to tackle something from multiple angles, compare several options at once, or explicitly asks for a team / parallel work. This is heavyweight — reserve it for genuinely multi-angle or large tasks, not simple single-step ones. The synthesized result comes back to you: weave it into your reply, don't just paste it.", + { + task: z + .string() + .describe( + "A complete, self-contained description of the work for the team. Workers cannot see this conversation, so include every bit of context they need.", + ), + angles: z + .array(z.string()) + .optional() + .describe( + "Optional explicit sub-angles/perspectives to seed the decomposition (e.g. ['technical feasibility','user demand','competitive landscape']).", + ), + }, + async (args) => { + if (deps.isWorkerContext) { + return fail( + "You are a team worker and cannot spawn another team. Complete your assigned subtask directly.", + ); + } + const angles = + args.angles && args.angles.length > 0 + ? `\n\nApproach this from these distinct angles, one per worker:\n${args.angles + .map((a, i) => `${i + 1}. ${a}`) + .join("\n")}` + : ""; + const prompt = `${args.task}${angles}`; + try { + const result = await deps.runTeam({ ...deps.teamTaskBase(), prompt }, (e) => + deps.onProgress?.(e.message), + ); + return ok(result || "The team finished but produced no output."); + } catch (err) { + return fail(`Team delegation failed: ${err instanceof Error ? err.message : String(err)}`); + } + }, + ); + + return createSdkMcpServer({ + name: "nomos-team", + version: "0.1.0", + tools: [delegateToTeam], + }); +} From 8e59d52fa983fccf5c895dd384fab71167631dbf Mon Sep 17 00:00:00 2001 From: meidad Date: Wed, 17 Jun 2026 20:10:34 -0700 Subject: [PATCH 2/2] fix(mobile): route "spin up a team" to delegate_to_team, not the SDK Workflow tool The SDK's generic Workflow orchestration tool spawns its own sub-agents outside the Nomos team runtime (no memory/persona, an async "notify when done" model that doesn't fit a single chat turn) and leaks a raw script into the UI. Disallow it so team requests route to the in-loop delegate_to_team tool. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/daemon/agent-runtime.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/daemon/agent-runtime.ts b/src/daemon/agent-runtime.ts index 2d23d9c5..fc872e95 100644 --- a/src/daemon/agent-runtime.ts +++ b/src/daemon/agent-runtime.ts @@ -46,7 +46,11 @@ import { resolveMemoryUserId } from "../auth/tenant-context.ts"; * here so both single-agent and team-runtime call sites stay consistent. */ function getDisallowedTools(): string[] { - const blocked: string[] = []; + // The SDK's generic `Workflow` orchestration tool spawns its OWN sub-agents + // outside Nomos's team runtime (no memory/persona, an async "notify when done" + // model that doesn't fit a single chat turn) and leaks a raw script into the UI. + // Block it so "spin up a team" routes to the Nomos-native `delegate_to_team`. + const blocked: string[] = ["Workflow"]; if (!FEATURES.bashTool()) { blocked.push("Bash", "BashOutput", "KillBash"); }