diff --git a/.ai/context.md b/.ai/context.md index fdd5ac4..e51e0ad 100644 --- a/.ai/context.md +++ b/.ai/context.md @@ -349,8 +349,19 @@ canvas-QR card and `vesper connections pair`. **WhatsApp-Web (personal account)* `@vesper/channel-whatsapp-web` package — the SOLE runtime dependency (Baileys; see the opt-in carve-out under Stack), lazy-registered by the daemon, with rotating-QR pairing into a vault-backed session. +**Pipeline notify (`ctx.notify`) SHIPPED.** The outbound, pipeline-initiated complement to the inbound +chatbot flow (`specs/pipeline-notify.md`): a running pipeline can push a notification to the user out a +connected channel. `PipelineContext.notify(text, opts?)` is gated by `NETWORK_FETCH` and backed by a +`NotifyFn` injected through `BuildContextDeps` + `SchedulerOptions` exactly where `complete` is threaded +(top-level + sub-agent). Core stays decoupled (`channel?: string`, not the connections `ChannelId`); the +host `makeNotifyFn` (CLI) resolves the channel + the pairing-persisted owner `defaultChatId`, sends +through the daemon's already-authenticated running handler, and audits each send on the `events` table +(`notification_sent`/`notification_failed`, body/chat id never logged) — no migration, no new capability, +no new dependency. A missing channel/destination/resolver is graceful (`{delivered:false, reason}`); only +a capability violation throws. Issue-capped: the record is the spec + `cycle-log.md` + the commit (Rule 11). + **Agent docs** — single-source `.ai/` drives Claude Code, opencode, Codex, Gemini, and Cursor via -`bun run sync:ai` (`scripts/sync-ai-docs.ts`). Suite: **870 tests / 0 fail**; Biome clean; no +`bun run sync:ai` (`scripts/sync-ai-docs.ts`). Suite: **890 tests / 0 fail**; Biome clean; no provider SDKs (the lone runtime dep is the isolated, opt-in Baileys in `@vesper/channel-whatsapp-web`). **Next:** the Vesper World UI redesign (Omar dislikes the current look — a design prompt is in hand); diff --git a/.ai/generated/rules.mdc b/.ai/generated/rules.mdc index 3e1c884..042ec48 100644 --- a/.ai/generated/rules.mdc +++ b/.ai/generated/rules.mdc @@ -357,8 +357,19 @@ canvas-QR card and `vesper connections pair`. **WhatsApp-Web (personal account)* `@vesper/channel-whatsapp-web` package — the SOLE runtime dependency (Baileys; see the opt-in carve-out under Stack), lazy-registered by the daemon, with rotating-QR pairing into a vault-backed session. +**Pipeline notify (`ctx.notify`) SHIPPED.** The outbound, pipeline-initiated complement to the inbound +chatbot flow (`specs/pipeline-notify.md`): a running pipeline can push a notification to the user out a +connected channel. `PipelineContext.notify(text, opts?)` is gated by `NETWORK_FETCH` and backed by a +`NotifyFn` injected through `BuildContextDeps` + `SchedulerOptions` exactly where `complete` is threaded +(top-level + sub-agent). Core stays decoupled (`channel?: string`, not the connections `ChannelId`); the +host `makeNotifyFn` (CLI) resolves the channel + the pairing-persisted owner `defaultChatId`, sends +through the daemon's already-authenticated running handler, and audits each send on the `events` table +(`notification_sent`/`notification_failed`, body/chat id never logged) — no migration, no new capability, +no new dependency. A missing channel/destination/resolver is graceful (`{delivered:false, reason}`); only +a capability violation throws. Issue-capped: the record is the spec + `cycle-log.md` + the commit (Rule 11). + **Agent docs** — single-source `.ai/` drives Claude Code, opencode, Codex, Gemini, and Cursor via -`bun run sync:ai` (`scripts/sync-ai-docs.ts`). Suite: **870 tests / 0 fail**; Biome clean; no +`bun run sync:ai` (`scripts/sync-ai-docs.ts`). Suite: **890 tests / 0 fail**; Biome clean; no provider SDKs (the lone runtime dep is the isolated, opt-in Baileys in `@vesper/channel-whatsapp-web`). **Next:** the Vesper World UI redesign (Omar dislikes the current look — a design prompt is in hand); diff --git a/AGENTS.md b/AGENTS.md index 0fc6592..be8bdbb 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -351,8 +351,19 @@ canvas-QR card and `vesper connections pair`. **WhatsApp-Web (personal account)* `@vesper/channel-whatsapp-web` package — the SOLE runtime dependency (Baileys; see the opt-in carve-out under Stack), lazy-registered by the daemon, with rotating-QR pairing into a vault-backed session. +**Pipeline notify (`ctx.notify`) SHIPPED.** The outbound, pipeline-initiated complement to the inbound +chatbot flow (`specs/pipeline-notify.md`): a running pipeline can push a notification to the user out a +connected channel. `PipelineContext.notify(text, opts?)` is gated by `NETWORK_FETCH` and backed by a +`NotifyFn` injected through `BuildContextDeps` + `SchedulerOptions` exactly where `complete` is threaded +(top-level + sub-agent). Core stays decoupled (`channel?: string`, not the connections `ChannelId`); the +host `makeNotifyFn` (CLI) resolves the channel + the pairing-persisted owner `defaultChatId`, sends +through the daemon's already-authenticated running handler, and audits each send on the `events` table +(`notification_sent`/`notification_failed`, body/chat id never logged) — no migration, no new capability, +no new dependency. A missing channel/destination/resolver is graceful (`{delivered:false, reason}`); only +a capability violation throws. Issue-capped: the record is the spec + `cycle-log.md` + the commit (Rule 11). + **Agent docs** — single-source `.ai/` drives Claude Code, opencode, Codex, Gemini, and Cursor via -`bun run sync:ai` (`scripts/sync-ai-docs.ts`). Suite: **870 tests / 0 fail**; Biome clean; no +`bun run sync:ai` (`scripts/sync-ai-docs.ts`). Suite: **890 tests / 0 fail**; Biome clean; no provider SDKs (the lone runtime dep is the isolated, opt-in Baileys in `@vesper/channel-whatsapp-web`). **Next:** the Vesper World UI redesign (Omar dislikes the current look — a design prompt is in hand); diff --git a/cycle-log.md b/cycle-log.md index 9eb0d72..d9298b3 100644 --- a/cycle-log.md +++ b/cycle-log.md @@ -969,3 +969,46 @@ Backend->Client->Review workflow; the review's 2 real HIGH gaps were then fixed (the plugin is registered only in the daemon — the UI/daemon is the source of truth; the CLI doesn't load Baileys for a list); the compiled `vesper-desktop` binary omits whatsapp-web (dynamic import not bundled) until Launch wires it; re-pairing an already-live whatsapp-web opens a second socket (rare edge). Signal (signal-cli) still open. + +## Pipeline notify (`ctx.notify` — proactive channel delivery) — SHIPPED +- The outbound, pipeline-initiated complement to the shipped inbound `ChatSink` flow. A running pipeline can now + push a notification to the user out a connected channel. `OutboundIntent.kind:"notify"` + `ChannelHandler.send` + already existed (used only by the operator `vesper connections send`); the gap was a pipeline-facing seam. + Spec: `specs/pipeline-notify.md`. Issue-capped: this entry + the commit are the record (Rule 11). Omar approved + SPEC + PLAN at the advancement gates; chose graceful-degradation + reuse-`NETWORK_FETCH` over throw + new cap. +- DESIGN (mirror `complete`, stay decoupled): `ctx.notify(text, opts?)` on `PipelineContext`, gated by + `NETWORK_FETCH` (the egress cap `send` already needs). A `NotifyFn` is injected through `BuildContextDeps` + + `SchedulerOptions` exactly where `complete` is threaded (top-level run AND the `subagent.ts` child context). + KEY DECISION: the core `NotifyIntent`/`NotifyOutcome` use `channel?: string`, NOT the connections `ChannelId` + union — so `vesper-core/scheduler` keeps ZERO dependency on the connections feature layer (the import is + cycle-safe either way; decoupling is the better architecture). The host (`makeNotifyFn`, CLI) owns channel + identity. DIVERGENCE from `complete`: a missing resolver is GRACEFUL (`{delivered:false, reason:"unavailable"}`), + never throws — a side-channel must not crash a pipeline; only a capability violation throws. +- HOST RESOLUTION (`packages/vesper-cli/src/make-notify.ts`): channel = explicit `intent.channel` (must be running) + -> `config.notify.defaultChannel` (if running) -> first running channel with a paired owner. chatId = explicit + -> `config.connections..params.defaultChatId` (the destination scan-to-connect ALREADY persists at pairing, + `pairing-coordinator.ts#persistLinked`) — so a pipeline never handles a chat id. Sends through the daemon's + ALREADY-AUTHENTICATED running handler (`registry.list().find`), never a fresh handler (that stays the operator + `sendVia` path). Audits every actual send attempt on the `events` table (`notification_sent`/`notification_failed`, + reusing `recordConnectionEvent`, which strips `text`/body) — NO migration, payload is `{channel}` only (never the + body or chat id; a test asserts neither serializes). +- DAEMON WIRING: the Scheduler is constructed BEFORE `buildChannelRegistry`, so `makeNotifyFn` late-binds the + registry through a `getRegistry: () => channelRegistry` getter read only at notify time (`channelRegistry` is a + `let` assigned right after the registry builds). Avoided reordering the whole startup; `uiStore` was moved a few + lines up so it can be the notify-audit sink passed into the constructor. +- SPEC DELTA (the one deviation): the spec's acceptance said `normalizeNotify` "SHALL surface a dropped-record + warning". The codebase has NO warnings channel in `config.ts` — `normalizePresence`/`normalizeConnection` all + SILENTLY drop malformed input. Matched that precedent (drop, never throw) rather than invent a one-off warning + path; behavior is otherwise identical (unknown/non-string `defaultChannel` dropped). Reconcile the contract + wording if a warnings channel is ever added. +- GOTCHA: adding `notify` to the `PipelineContext` interface broke 5 hand-rolled context mocks in pipeline + + subagent tests (tsc: "Property 'notify' is missing") — they had no notify stub. Fixed with a one-line + `notify: async () => ({ delivered:false })` per mock. A reminder that widening a core interface ripples into + every hand-rolled test double; a shared `fakeContext` factory would localize this (follow-up). +- Verified: 890 tests / 0 fail (+20: 5 context + 2 scheduler-context + 4 config + 9 make-notify); 100% line+func + coverage on the two new units; biome clean (exit 0); tsc adds 0 NEW errors (the 5 mock errors fixed; pre-existing + exactOptional/`as`-cast errors in unchanged code remain, CI skips tsc); NO new dependency; NO migration; NO new + capability; transport mocked end-to-end (suite sends to nothing). NOT exercised against a live channel. +- FOLLOW-UPS: rate-limiting/anti-spam on notifications (declared out-of-scope; every send is audited so abuse is + visible); rich/structured messages (plain text only in v1); a shared `fakeContext` test factory; downstream + consumers can now wire delivery (`pipeline-career.md`, `pipeline-secretary.md`) onto `ctx.notify`. diff --git a/packages/pipelines/orchestrator-demo/handler.test.ts b/packages/pipelines/orchestrator-demo/handler.test.ts index 8b14e15..41d8170 100644 --- a/packages/pipelines/orchestrator-demo/handler.test.ts +++ b/packages/pipelines/orchestrator-demo/handler.test.ts @@ -130,6 +130,7 @@ describe("orchestrator-demo pipeline", () => { readSignals: () => { throw new Error("unused"); }, + notify: async () => ({ delivered: false }), }); expect(recorded).toEqual([{ status: "ok", summary: "research complete" }]); diff --git a/packages/pipelines/router/handler.test.ts b/packages/pipelines/router/handler.test.ts index 966f368..cacdea8 100644 --- a/packages/pipelines/router/handler.test.ts +++ b/packages/pipelines/router/handler.test.ts @@ -101,6 +101,9 @@ function makeFakeContext(options: { readSignals() { throw new Error("readSignals is not supported in this fake context"); }, + async notify() { + return { delivered: false }; + }, }; return { ctx, completePrompts, spawned, recordedRuns, progress }; diff --git a/packages/pipelines/selftest/handler.test.ts b/packages/pipelines/selftest/handler.test.ts index 3752f7e..6fe8c3e 100644 --- a/packages/pipelines/selftest/handler.test.ts +++ b/packages/pipelines/selftest/handler.test.ts @@ -84,6 +84,9 @@ function makeFakeContext(options: { readSignals() { throw new Error("readSignals is not supported in this fake context"); }, + async notify() { + return { delivered: false }; + }, }; return { ctx, completeCalls, recordedRuns }; diff --git a/packages/pipelines/skill-train/handler.test.ts b/packages/pipelines/skill-train/handler.test.ts index 1a78be2..a4b6217 100644 --- a/packages/pipelines/skill-train/handler.test.ts +++ b/packages/pipelines/skill-train/handler.test.ts @@ -47,6 +47,9 @@ function makeCtx(params: Record): { readSignals() { throw new Error("readSignals is not supported in this fake context"); }, + async notify() { + return { delivered: false }; + }, }; return { ctx, recorded }; } diff --git a/packages/vesper-cli/src/commands/daemon-run.ts b/packages/vesper-cli/src/commands/daemon-run.ts index 79004b0..ce6be92 100644 --- a/packages/vesper-cli/src/commands/daemon-run.ts +++ b/packages/vesper-cli/src/commands/daemon-run.ts @@ -2,6 +2,7 @@ import { Database } from "bun:sqlite"; import { mkdir } from "node:fs/promises"; import { ApprovalTokenStore, + type ChannelRegistry, channelStates, DEFAULT_AGENT_MATCHERS, detectAvailableCLIs, @@ -19,6 +20,7 @@ import { loadConfig, saveConfig } from "../config.ts"; import { buildChannelRegistry, makeChannelSink } from "../connections-wiring.ts"; import { removePidFile, resolveDaemonState, writePidFile } from "../daemon-lifecycle.ts"; import type { Command } from "../dispatch.ts"; +import { makeNotifyFn } from "../make-notify.ts"; import { loadOptionalChannels } from "../optional-channels.ts"; import { PairingCoordinator } from "../pairing-coordinator.ts"; import { dbPath, pidPath, runDir, socketPath, uiPort } from "../paths.ts"; @@ -62,20 +64,29 @@ export const daemonRunCommand: Command = { const installed = await detectAvailableCLIs(); const complete = makeCompleteFn(config, installed); + // The UI store also serves the router's editable template default_params (#4) + // and is the audit sink for `ctx.notify`; opened before the scheduler so the + // notify resolver can be wired into the constructor. + const uiStore = openStore(dbPath()); + + // `ctx.notify` resolver: delivers a pipeline notification out a connected + // channel. The channel registry is built further below (after the scheduler), so + // the resolver late-binds it through a getter read only at notify time. + let channelRegistry: ChannelRegistry | undefined; + const notify = makeNotifyFn({ getRegistry: () => channelRegistry, config, store: uiStore }); + // Construct the Scheduler granting only the capabilities the built-in - // pipelines actually declare (deny-by-default), with the CLI resolver, then - // register the pipelines so their handlers + tasks are available to the tick loop. + // pipelines actually declare (deny-by-default), with the CLI + notify resolvers, + // then register the pipelines so their handlers + tasks are available to the tick loop. const registry = new HandlerRegistry(); const scheduler = new Scheduler({ db, registry, grants: grantedCapabilities(), complete, + notify, redactSummaries: config.storage?.redactRunSummaries === true, }); - // The UI store is opened first so the router can read editable template - // default_params through it (#4) — an edited template then affects its runs. - const uiStore = openStore(dbPath()); registerPipelines(scheduler, registry, { getDefaultParams: (handlerId) => uiStore.getTemplate(handlerId)?.defaultParams ?? {}, }); @@ -104,6 +115,8 @@ export const daemonRunCommand: Command = { vault, store: uiStore, }); + // Late-bind the live registry into the notify resolver (declared before the scheduler). + channelRegistry = channels.registry; // Pairing (scan-to-connect): the coordinator multiplexes the daemon's single // inbound stream into active QR/link pairing sessions and persists the captured // chat id on link. Exposed to the UI's POST /api/connections/:id/pair route and diff --git a/packages/vesper-cli/src/config.test.ts b/packages/vesper-cli/src/config.test.ts index 0670e8d..4d15e48 100644 --- a/packages/vesper-cli/src/config.test.ts +++ b/packages/vesper-cli/src/config.test.ts @@ -155,6 +155,28 @@ describe("normalizeConfig — connections", () => { }); }); +describe("normalizeConfig — notify", () => { + test("keeps a defaultChannel that names a catalog channel", () => { + expect(normalizeConfig({ notify: { defaultChannel: "telegram" } }).notify).toEqual({ + defaultChannel: "telegram", + }); + }); + + test("drops a defaultChannel that is not a catalog channel", () => { + expect(normalizeConfig({ notify: { defaultChannel: "slack" } }).notify).toBeUndefined(); + }); + + test("drops a non-string defaultChannel", () => { + expect(normalizeConfig({ notify: { defaultChannel: 5 } }).notify).toBeUndefined(); + }); + + test("omits notify entirely when absent or malformed", () => { + expect(normalizeConfig({ cli: { adapters: {} } }).notify).toBeUndefined(); + expect(normalizeConfig({ notify: "nope" }).notify).toBeUndefined(); + expect(normalizeConfig({ notify: {} }).notify).toBeUndefined(); + }); +}); + describe("loadConfig / saveConfig", () => { test("missing file yields the default", async () => { expect(await loadConfig(tempConfigPath())).toEqual(DEFAULT_CONFIG); diff --git a/packages/vesper-cli/src/config.ts b/packages/vesper-cli/src/config.ts index f4cc63c..b085ca4 100644 --- a/packages/vesper-cli/src/config.ts +++ b/packages/vesper-cli/src/config.ts @@ -47,6 +47,15 @@ export interface VesperConfig { }; /** Per-channel messaging wiring (Connections). Secrets stay in the vault. */ readonly connections?: Readonly>; + /** Proactive-notification routing for `ctx.notify` (pipeline -> connected channel). */ + readonly notify?: { + /** + * Catalog channel id `ctx.notify` delivers through when a pipeline names none. + * When unset the host resolves the first enabled+running channel with a paired + * destination. An unknown id is dropped during normalization. + */ + readonly defaultChannel?: string; + }; } /** A fresh config with no default and no overrides. */ @@ -161,6 +170,19 @@ function normalizeConnections(raw: unknown): VesperConfig["connections"] | undef return Object.keys(result).length > 0 ? result : undefined; } +/** + * Coerce untrusted `notify` config. Keeps `defaultChannel` only when it names a + * known catalog channel (mirrors `normalizeConnection`'s catalog gate); an unknown + * or non-string id is dropped, never thrown. Returns undefined when nothing valid + * remains so the host falls back to first-eligible resolution. + */ +function normalizeNotify(raw: unknown): VesperConfig["notify"] | undefined { + if (!isObject(raw)) return undefined; + const defaultChannel = asString(raw.defaultChannel); + if (defaultChannel === undefined || channelById(defaultChannel) === undefined) return undefined; + return { defaultChannel }; +} + /** Coerce untrusted `ui` config; keeps only a string `theme`. */ function normalizeUi(raw: unknown): VesperConfig["ui"] | undefined { if (!isObject(raw)) return undefined; @@ -195,6 +217,8 @@ export function normalizeConfig(raw: unknown): VesperConfig { if (ui !== undefined) result = { ...result, ui }; const connections = normalizeConnections(raw.connections); if (connections !== undefined) result = { ...result, connections }; + const notify = normalizeNotify(raw.notify); + if (notify !== undefined) result = { ...result, notify }; return result; } diff --git a/packages/vesper-cli/src/make-notify.test.ts b/packages/vesper-cli/src/make-notify.test.ts new file mode 100644 index 0000000..4ca0d75 --- /dev/null +++ b/packages/vesper-cli/src/make-notify.test.ts @@ -0,0 +1,215 @@ +import { describe, expect, test } from "bun:test"; +import { + type ChannelHandler, + type ChannelId, + ChannelRegistry, + type OutboundIntent, + openStore, + type Store, +} from "@vesper/core"; +import type { VesperConfig } from "./config.ts"; +import { makeNotifyFn } from "./make-notify.ts"; + +/** A fake channel handler that records outbound sends and may throw on send. */ +function fakeHandler( + id: ChannelId, + opts: { throwOnSend?: boolean } = {}, +): { handler: ChannelHandler; sends: OutboundIntent[] } { + const sends: OutboundIntent[] = []; + const handler: ChannelHandler = { + descriptor: { + id, + displayName: id, + transport: "long-poll", + allowedHosts: ["example.com"], + vaultKeys: [], + docsUrl: "https://example.com", + status: "ready", + }, + authenticate: async () => {}, + send: async (intent) => { + if (opts.throwOnSend) throw new Error("transport down"); + sends.push(intent); + }, + receive: () => ({ stop() {} }), + }; + return { handler, sends }; +} + +function registryWith(...handlers: ChannelHandler[]): ChannelRegistry { + return new ChannelRegistry(handlers); +} + +/** Minimal config with a connections block keyed by channel. */ +function config(opts: { + connections?: VesperConfig["connections"]; + notify?: VesperConfig["notify"]; +}): VesperConfig { + return { + cli: { adapters: {} }, + ...(opts.connections !== undefined ? { connections: opts.connections } : {}), + ...(opts.notify !== undefined ? { notify: opts.notify } : {}), + }; +} + +function memStore(): Store { + const store = openStore(":memory:"); + store.migrate(); + return store; +} + +describe("makeNotifyFn — resolution", () => { + test("no_channel when the registry is not yet built", async () => { + const notify = makeNotifyFn({ getRegistry: () => undefined, config: config({}) }); + expect(await notify({ text: "hi" })).toEqual({ delivered: false, reason: "no_channel" }); + }); + + test("no_channel when no channel is running", async () => { + const notify = makeNotifyFn({ getRegistry: () => registryWith(), config: config({}) }); + expect(await notify({ text: "hi" })).toEqual({ delivered: false, reason: "no_channel" }); + }); + + test("no_channel when an explicitly requested channel is not running", async () => { + const notify = makeNotifyFn({ + getRegistry: () => registryWith(fakeHandler("telegram").handler), + config: config({ + connections: { + telegram: { + enabled: true, + vaultKey: "k", + allowedHosts: [], + params: { defaultChatId: "1" }, + }, + }, + }), + }); + expect(await notify({ text: "hi", channel: "discord" })).toEqual({ + delivered: false, + reason: "no_channel", + }); + }); + + test("no_destination when the resolved channel has no paired chat id", async () => { + const notify = makeNotifyFn({ + getRegistry: () => registryWith(fakeHandler("telegram").handler), + config: config({ + connections: { telegram: { enabled: true, vaultKey: "k", allowedHosts: [] } }, + notify: { defaultChannel: "telegram" }, + }), + }); + expect(await notify({ text: "hi" })).toEqual({ + delivered: false, + channel: "telegram", + reason: "no_destination", + }); + }); + + test("prefers config.notify.defaultChannel when running", async () => { + const tg = fakeHandler("telegram"); + const dc = fakeHandler("discord"); + const notify = makeNotifyFn({ + getRegistry: () => registryWith(tg.handler, dc.handler), + config: config({ + connections: { + telegram: { + enabled: true, + vaultKey: "k", + allowedHosts: [], + params: { defaultChatId: "111" }, + }, + discord: { + enabled: true, + vaultKey: "k", + allowedHosts: [], + params: { defaultChatId: "222" }, + }, + }, + notify: { defaultChannel: "discord" }, + }), + }); + const outcome = await notify({ text: "hi" }); + expect(outcome).toEqual({ delivered: true, channel: "discord" }); + expect(dc.sends).toEqual([{ kind: "notify", chatId: "222", text: "hi" }]); + expect(tg.sends).toHaveLength(0); + }); + + test("falls back to the first running channel with a paired destination", async () => { + const tg = fakeHandler("telegram"); + const notify = makeNotifyFn({ + getRegistry: () => registryWith(tg.handler), + config: config({ + connections: { + telegram: { + enabled: true, + vaultKey: "k", + allowedHosts: [], + params: { defaultChatId: "111" }, + }, + }, + }), + }); + expect(await notify({ text: "yo" })).toEqual({ delivered: true, channel: "telegram" }); + expect(tg.sends).toEqual([{ kind: "notify", chatId: "111", text: "yo" }]); + }); + + test("an explicit chatId overrides the paired default", async () => { + const tg = fakeHandler("telegram"); + const notify = makeNotifyFn({ + getRegistry: () => registryWith(tg.handler), + config: config({ + connections: { + telegram: { + enabled: true, + vaultKey: "k", + allowedHosts: [], + params: { defaultChatId: "111" }, + }, + }, + }), + }); + await notify({ text: "yo", channel: "telegram", chatId: "999" }); + expect(tg.sends).toEqual([{ kind: "notify", chatId: "999", text: "yo" }]); + }); +}); + +describe("makeNotifyFn — delivery + audit", () => { + const conns: VesperConfig["connections"] = { + telegram: { enabled: true, vaultKey: "k", allowedHosts: [], params: { defaultChatId: "111" } }, + }; + + test("send_failed (never throws) when the handler throws, and audits the failure", async () => { + const store = memStore(); + const notify = makeNotifyFn({ + getRegistry: () => registryWith(fakeHandler("telegram", { throwOnSend: true }).handler), + config: config({ connections: conns }), + store, + }); + expect(await notify({ text: "boom" })).toEqual({ + delivered: false, + channel: "telegram", + reason: "send_failed", + }); + const events = store.listEvents({ source: "connections" }); + expect(events.map((e) => e.kind)).toEqual(["notification_failed"]); + store.close(); + }); + + test("audits notification_sent with the channel but never the body or chat id", async () => { + const store = memStore(); + const notify = makeNotifyFn({ + getRegistry: () => registryWith(fakeHandler("telegram").handler), + config: config({ connections: conns }), + store, + }); + await notify({ text: "a private message body" }); + const events = store.listEvents({ source: "connections" }); + expect(events).toHaveLength(1); + expect(events[0]?.kind).toBe("notification_sent"); + expect(events[0]?.payload).toEqual({ channel: "telegram" }); + // The body and the destination must not appear anywhere in the serialized row. + const serialized = JSON.stringify(events[0]); + expect(serialized).not.toContain("a private message body"); + expect(serialized).not.toContain("111"); + store.close(); + }); +}); diff --git a/packages/vesper-cli/src/make-notify.ts b/packages/vesper-cli/src/make-notify.ts new file mode 100644 index 0000000..13f0126 --- /dev/null +++ b/packages/vesper-cli/src/make-notify.ts @@ -0,0 +1,94 @@ +/** + * Host-side resolver for `ctx.notify` (the pipeline-facing proactive-notification + * seam). The scheduler core exposes `ctx.notify(text)` but stays decoupled from the + * connections feature layer; this factory closes the gap: it resolves WHICH channel + * and WHICH destination a notification goes to, then delivers it through the daemon's + * already-authenticated running handler — no re-auth, no second socket. + * + * Destination resolution mirrors the spec: an explicit `intent.channel` wins, then + * `config.notify.defaultChannel`, then the first running channel that has a paired + * owner (`config.connections..params.defaultChatId`, persisted at pairing). The + * owner chat id comes from that same pairing-captured `defaultChatId`, so a pipeline + * never handles a chat id. Every actual send attempt is audited on the `events` + * table (`notification_sent` / `notification_failed`); the body/chat id never land in + * the audit payload (the connections audit helper strips them). + */ + +import { + type ChannelRegistry, + type NotifyFn, + type NotifyIntent, + type NotifyOutcome, + recordConnectionEvent, + type Store, +} from "@vesper/core"; +import type { VesperConfig } from "./config.ts"; + +/** Dependencies for {@link makeNotifyFn}. */ +export interface MakeNotifyFnOpts { + /** + * Late-bound getter for the daemon's live channel registry. A getter (not the + * registry itself) because the daemon constructs the scheduler BEFORE it builds + * the registry; the getter is read at notify time, by which point it is set. + */ + readonly getRegistry: () => ChannelRegistry | undefined; + /** Non-secret wiring: `notify.defaultChannel` + per-channel `params.defaultChatId`. */ + readonly config: VesperConfig; + /** Audit sink (the daemon store). Omitted in unit tests that do not assert audit. */ + readonly store?: Store; +} + +/** + * Resolve which channel a notify delivers through. An explicit request wins (only + * if it is actually running); then a configured `defaultChannel` (if running); else + * the first running channel that has a paired owner destination. Returns undefined + * when nothing is eligible. + */ +function resolveChannel( + requested: string | undefined, + config: VesperConfig, + registry: ChannelRegistry, +): string | undefined { + const running = new Set(registry.list().map((h) => h.descriptor.id)); + if (requested !== undefined) return running.has(requested) ? requested : undefined; + const preferred = config.notify?.defaultChannel; + if (preferred !== undefined && running.has(preferred)) return preferred; + for (const handler of registry.list()) { + const id = handler.descriptor.id; + if (config.connections?.[id]?.params?.defaultChatId !== undefined) return id; + } + return undefined; +} + +/** Build the {@link NotifyFn} injected into the daemon's {@link import("@vesper/core").Scheduler}. */ +export function makeNotifyFn(opts: MakeNotifyFnOpts): NotifyFn { + return async (intent: NotifyIntent): Promise => { + const registry = opts.getRegistry(); + if (registry === undefined) return { delivered: false, reason: "no_channel" }; + + const channel = resolveChannel(intent.channel, opts.config, registry); + if (channel === undefined) return { delivered: false, reason: "no_channel" }; + + const chatId = intent.chatId ?? opts.config.connections?.[channel]?.params?.defaultChatId; + if (chatId === undefined) return { delivered: false, channel, reason: "no_destination" }; + + const handler = registry.list().find((h) => h.descriptor.id === channel); + if (handler === undefined) return { delivered: false, channel, reason: "no_channel" }; + + try { + await handler.send({ kind: "notify", chatId, text: intent.text }); + } catch { + if (opts.store !== undefined) { + recordConnectionEvent(opts.store, "notification_failed", { + channel, + reason: "send_failed", + }); + } + return { delivered: false, channel, reason: "send_failed" }; + } + if (opts.store !== undefined) { + recordConnectionEvent(opts.store, "notification_sent", { channel }); + } + return { delivered: true, channel }; + }; +} diff --git a/packages/vesper-core/src/connections/audit.ts b/packages/vesper-core/src/connections/audit.ts index 8611e86..6ea75eb 100644 --- a/packages/vesper-core/src/connections/audit.ts +++ b/packages/vesper-core/src/connections/audit.ts @@ -17,6 +17,8 @@ export type ConnectionEventKind = | "connection_pairing_started" | "connection_paired" | "connection_pairing_failed" + | "notification_sent" + | "notification_failed" | "mcp_enabled" | "mcp_disabled"; diff --git a/packages/vesper-core/src/scheduler/context.test.ts b/packages/vesper-core/src/scheduler/context.test.ts index 2a51357..77e9899 100644 --- a/packages/vesper-core/src/scheduler/context.test.ts +++ b/packages/vesper-core/src/scheduler/context.test.ts @@ -14,6 +14,8 @@ import { EventBus, RUN_EVENT } from "./events.ts"; import type { Capability, CompleteFn, + NotifyFn, + NotifyIntent, PipelineContext, ScheduledTask, SubAgentHandle, @@ -580,6 +582,65 @@ describe("buildPipelineContext.readSignals", () => { }); }); +describe("buildPipelineContext.notify", () => { + test("throws CapabilityError when NETWORK_FETCH is not declared", async () => { + const { store } = makeStore(); + const ctx = build({ + task: makeTask(["WRITE_STORAGE"]), + now: NOW, + store, + notify: async () => ({ delivered: true }), + }); + await expect(ctx.notify("hi")).rejects.toBeInstanceOf(CapabilityError); + }); + + test("does not invoke the resolver when the capability is denied", async () => { + const { store } = makeStore(); + let calls = 0; + const notify: NotifyFn = async () => { + calls++; + return { delivered: true }; + }; + const ctx = build({ task: makeTask([]), now: NOW, store, notify }); + await expect(ctx.notify("hi")).rejects.toBeInstanceOf(CapabilityError); + expect(calls).toBe(0); + }); + + test("returns unavailable (never throws) when no resolver is configured", async () => { + const { store } = makeStore(); + const ctx = build({ task: makeTask(["NETWORK_FETCH"]), now: NOW, store }); + expect(await ctx.notify("hi")).toEqual({ delivered: false, reason: "unavailable" }); + }); + + test("delegates to the resolver and returns its outcome", async () => { + const { store } = makeStore(); + let seen: NotifyIntent | undefined; + const notify: NotifyFn = async (intent) => { + seen = intent; + return { delivered: true, channel: "telegram" }; + }; + const ctx = build({ task: makeTask(["NETWORK_FETCH"]), now: NOW, store, notify }); + const outcome = await ctx.notify("done", { channel: "telegram", chatId: "42" }); + expect(seen).toEqual({ text: "done", channel: "telegram", chatId: "42" }); + expect(outcome).toEqual({ delivered: true, channel: "telegram" }); + }); + + test("omits channel/chatId from the intent when not supplied", async () => { + const { store } = makeStore(); + let seen: NotifyIntent | undefined; + const notify: NotifyFn = async (intent) => { + seen = intent; + return { delivered: false, reason: "no_channel" }; + }; + const ctx = build({ task: makeTask(["NETWORK_FETCH"]), now: NOW, store, notify }); + const outcome = await ctx.notify("ping"); + expect(seen).toEqual({ text: "ping" }); + expect(Object.hasOwn(seen ?? {}, "channel")).toBe(false); + expect(Object.hasOwn(seen ?? {}, "chatId")).toBe(false); + expect(outcome.delivered).toBe(false); + }); +}); + describe("redactSummary", () => { test("replaces content with a size-only marker", () => { expect(redactSummary("hello")).toBe("[redacted: 5 chars]"); diff --git a/packages/vesper-core/src/scheduler/context.ts b/packages/vesper-core/src/scheduler/context.ts index 951d816..6060121 100644 --- a/packages/vesper-core/src/scheduler/context.ts +++ b/packages/vesper-core/src/scheduler/context.ts @@ -11,6 +11,7 @@ import type { TaskPersistence } from "./persistence.ts"; import type { HandlerRegistry } from "./registry.ts"; import type { CompleteFn, + NotifyFn, PipelineContext, RunOptions, ScheduledTask, @@ -61,6 +62,13 @@ export interface BuildContextDeps { * {@link PipelineContext.complete} throws a clear {@link CLIError}. */ readonly complete?: CompleteFn; + /** + * Resolver that delivers a pipeline notification out a connected channel. + * Injected by the host. When absent, `ctx.notify` resolves to + * `{ delivered:false, reason:"unavailable" }` — a missing side-channel must not + * crash a pipeline. + */ + readonly notify?: NotifyFn; /** Per-run overrides (manual run): transient CLI override + params. */ readonly options?: RunOptions; /** @@ -215,6 +223,20 @@ export function buildPipelineContext(deps: BuildContextDeps): PipelineContext { { sinceMs }, ); }, + + async notify(text, opts) { + assertCapabilities(["NETWORK_FETCH"], task.required_capabilities); + // A missing resolver is graceful: a notification is a side-channel, not the + // pipeline's reason to exist (contrast `complete`, which throws when unset). + if (deps.notify === undefined) { + return { delivered: false, reason: "unavailable" }; + } + return deps.notify({ + text, + ...(opts?.channel !== undefined ? { channel: opts.channel } : {}), + ...(opts?.chatId !== undefined ? { chatId: opts.chatId } : {}), + }); + }, }; return self; diff --git a/packages/vesper-core/src/scheduler/index.ts b/packages/vesper-core/src/scheduler/index.ts index ac5128e..ceca2aa 100644 --- a/packages/vesper-core/src/scheduler/index.ts +++ b/packages/vesper-core/src/scheduler/index.ts @@ -18,6 +18,10 @@ export { remainingBudgetMs, withTimeout } from "./timeout.ts"; export type { CompleteFn, FailedTask, + NotifyFailReason, + NotifyFn, + NotifyIntent, + NotifyOutcome, PipelineContext, ProgressEvent, ProgressKind, diff --git a/packages/vesper-core/src/scheduler/scheduler-context.test.ts b/packages/vesper-core/src/scheduler/scheduler-context.test.ts index d93e0a9..357d0ee 100644 --- a/packages/vesper-core/src/scheduler/scheduler-context.test.ts +++ b/packages/vesper-core/src/scheduler/scheduler-context.test.ts @@ -7,7 +7,7 @@ import { RUN_COMPLETED } from "./events.ts"; import { TaskPersistence } from "./persistence.ts"; import { HandlerRegistry } from "./registry.ts"; import { Scheduler } from "./scheduler.ts"; -import type { CompleteFn, RunOutcome } from "./types.ts"; +import type { CompleteFn, NotifyIntent, NotifyOutcome, RunOutcome } from "./types.ts"; // --------------------------------------------------------------------------- // Helpers — an in-memory DB with migrations applied, plus a recording resolver. @@ -94,6 +94,53 @@ describe("Scheduler — pipeline runtime context", () => { expect(task?.last_error).toBeNull(); }); + test("ctx.notify reaches the injected resolver and returns its outcome", async () => { + const seen: NotifyIntent[] = []; + const notify = async (intent: NotifyIntent): Promise => { + seen.push(intent); + return { delivered: true, channel: "telegram" }; + }; + let handlerOutcome: NotifyOutcome | undefined; + registry.register("notifier", async (ctx) => { + handlerOutcome = await ctx.notify("run finished"); + ctx.recordRun({ status: "ok", summary: "notified" }); + }); + + const scheduler = new Scheduler({ db, registry, grants: CAPABILITIES, notify }); + scheduler.register({ + id: "notifier", + kind: "manual", + schedule_expr: "", + handler_id: "notifier", + required_capabilities: ["NETWORK_FETCH", "WRITE_STORAGE"], + }); + + await scheduler.run("notifier"); + + expect(seen).toEqual([{ text: "run finished" }]); + expect(handlerOutcome).toEqual({ delivered: true, channel: "telegram" }); + }); + + test("ctx.notify yields unavailable (never throws) when no resolver is injected", async () => { + let handlerOutcome: NotifyOutcome | undefined; + registry.register("notifier", async (ctx) => { + handlerOutcome = await ctx.notify("hello"); + ctx.recordRun({ status: "ok", summary: "" }); + }); + + const scheduler = new Scheduler({ db, registry, grants: CAPABILITIES }); + scheduler.register({ + id: "notifier", + kind: "manual", + schedule_expr: "", + handler_id: "notifier", + required_capabilities: ["NETWORK_FETCH", "WRITE_STORAGE"], + }); + + await scheduler.run("notifier"); + expect(handlerOutcome).toEqual({ delivered: false, reason: "unavailable" }); + }); + test("a handler that records a run then throws keeps the recorded status (no error clobber)", async () => { registry.register("recorder", async (ctx) => { ctx.recordRun({ status: "partial", summary: "committed before failure" }); diff --git a/packages/vesper-core/src/scheduler/scheduler.ts b/packages/vesper-core/src/scheduler/scheduler.ts index b6a1f4c..50fd66d 100644 --- a/packages/vesper-core/src/scheduler/scheduler.ts +++ b/packages/vesper-core/src/scheduler/scheduler.ts @@ -15,6 +15,7 @@ import { runSubAgent } from "./subagent.ts"; import { withTimeout } from "./timeout.ts"; import type { CompleteFn, + NotifyFn, PipelineContext, RegisterTaskInput, RunOptions, @@ -49,6 +50,12 @@ export interface SchedulerOptions { * a clear {@link import("../cli/errors.ts").CLIError}. */ readonly complete?: CompleteFn; + /** + * Resolver used by `ctx.notify` to deliver a proactive message out a connected + * channel. Injected by the host (CLI layer). If omitted, `ctx.notify` resolves + * to `{ delivered:false, reason:"unavailable" }` (never throws). + */ + readonly notify?: NotifyFn; /** * When true, run summaries are persisted as size-only metadata (raw CLI output * is never stored in cleartext). Host policy from `~/.vesper/config.json`. @@ -110,6 +117,7 @@ export class Scheduler { readonly #grants: readonly Capability[]; readonly #store: Store; readonly #complete: CompleteFn | undefined; + readonly #notify: NotifyFn | undefined; readonly #redactSummaries: boolean; readonly #maxFanout: number; @@ -139,6 +147,7 @@ export class Scheduler { this.#grants = options.grants ?? []; this.#store = new SqliteStore(options.db); this.#complete = options.complete; + this.#notify = options.notify; this.#redactSummaries = options.redactSummaries ?? false; this.#maxFanout = Math.max(1, options.maxFanout ?? DEFAULT_MAX_FANOUT); @@ -503,6 +512,7 @@ export class Scheduler { }, redactSummaries: this.#redactSummaries, ...(this.#complete !== undefined ? { complete: this.#complete } : {}), + ...(this.#notify !== undefined ? { notify: this.#notify } : {}), ...(options !== undefined ? { options } : {}), }); @@ -642,6 +652,7 @@ export class Scheduler { grants: this.#grants, parentTaskCapabilities: parentTask.required_capabilities, ...(this.#complete !== undefined ? { complete: this.#complete } : {}), + ...(this.#notify !== undefined ? { notify: this.#notify } : {}), redactSummaries: this.#redactSummaries, parentRemainingMs, depth: 0, diff --git a/packages/vesper-core/src/scheduler/subagent.test.ts b/packages/vesper-core/src/scheduler/subagent.test.ts index 6e04569..77d6e15 100644 --- a/packages/vesper-core/src/scheduler/subagent.test.ts +++ b/packages/vesper-core/src/scheduler/subagent.test.ts @@ -157,6 +157,7 @@ describe("ctx.spawn — sub-agent orchestration", () => { readSignals: () => { throw new Error("unused"); }, + notify: async () => ({ delivered: false }), }; expect(() => diff --git a/packages/vesper-core/src/scheduler/subagent.ts b/packages/vesper-core/src/scheduler/subagent.ts index 097176d..6895cff 100644 --- a/packages/vesper-core/src/scheduler/subagent.ts +++ b/packages/vesper-core/src/scheduler/subagent.ts @@ -27,6 +27,7 @@ import type { HandlerRegistry } from "./registry.ts"; import { remainingBudgetMs, withTimeout } from "./timeout.ts"; import type { CompleteFn, + NotifyFn, PipelineContext, RunOutcome, ScheduledTask, @@ -47,6 +48,7 @@ export interface RunSubAgentArgs { /** Parent task's grant — descriptor caps must be a subset of this. */ readonly parentTaskCapabilities: readonly Capability[]; readonly complete?: CompleteFn; + readonly notify?: NotifyFn; readonly redactSummaries: boolean; /** Time the parent still has before ITS cap fires; null = unbounded. */ readonly parentRemainingMs: number | null; @@ -72,6 +74,7 @@ export function runSubAgent(args: RunSubAgentArgs): SubAgentHandle { grants, parentTaskCapabilities, complete, + notify, redactSummaries, parentRemainingMs, depth, @@ -155,6 +158,7 @@ export function runSubAgent(args: RunSubAgentArgs): SubAgentHandle { parentTaskCapabilities: descriptorCaps, maxFanout, ...(complete !== undefined ? { complete } : {}), + ...(notify !== undefined ? { notify } : {}), // Thread the descriptor's params through to the child's `ctx.params`, so a // parent can parameterize each sub-agent it fans out. ...(descriptor.params !== undefined ? { options: { params: descriptor.params } } : {}), diff --git a/packages/vesper-core/src/scheduler/types.ts b/packages/vesper-core/src/scheduler/types.ts index 828c5c9..d8721e2 100644 --- a/packages/vesper-core/src/scheduler/types.ts +++ b/packages/vesper-core/src/scheduler/types.ts @@ -75,6 +75,41 @@ export type CompleteFn = ( opts?: { readonly cli?: string }, ) => Promise; +/** + * A proactive notification a pipeline asks the host to deliver out a connected + * messaging channel (the outbound complement to the inbound chatbot flow). + * `channel`/`chatId` are host concerns: when omitted the host resolves the + * configured default channel and the paired owner destination. `channel` is a + * plain string here so `vesper-core/scheduler` stays decoupled from the + * connections feature layer — the host validates it against the channel catalog. + */ +export interface NotifyIntent { + readonly text: string; + readonly channel?: string; + readonly chatId?: string; +} + +/** Why a {@link NotifyOutcome} did not deliver (`delivered === false`). */ +export type NotifyFailReason = "unavailable" | "no_channel" | "no_destination" | "send_failed"; + +/** The result of a {@link PipelineContext.notify} call. */ +export interface NotifyOutcome { + readonly delivered: boolean; + /** The channel id the host resolved/used, when one was chosen. */ + readonly channel?: string; + /** Set only when `delivered` is false. */ + readonly reason?: NotifyFailReason; +} + +/** + * Resolver that delivers a pipeline notification through a connected channel. + * Injected into the {@link import("./scheduler.ts").Scheduler} by the host (CLI + * layer) so `vesper-core` never imports channel/registry/config code. It returns + * an outcome and NEVER throws for a missing channel/destination — a side-channel + * must not crash a pipeline (contrast {@link CompleteFn}, which is load-bearing). + */ +export type NotifyFn = (intent: NotifyIntent) => Promise; + /** Overrides for a single manual run (transient — not stored on the task). */ export interface RunOptions { /** Per-run CLI override (highest priority during adapter resolution). */ @@ -152,6 +187,7 @@ export interface ProgressEvent { * - `emitProgress` (`WRITE_STORAGE`) — persists a live-trace step and publishes it * - `spawn` (`SPAWN_SUBAGENT`) — runs a registered handler as an in-process child * - `readSignals` (`READ_STORAGE`) — returns a frozen runtime-health snapshot + * - `notify` (`NETWORK_FETCH`) — delivers a proactive message out a connected channel */ export interface PipelineContext { readonly task: ScheduledTask; @@ -188,6 +224,19 @@ export interface PipelineContext { * live store — a handler cannot read past its window or write through it. */ readSignals(opts?: { readonly windowMs?: number }): EvolveSignals; + /** + * Deliver a proactive notification to the user through a connected messaging + * channel. Requires the task to declare `NETWORK_FETCH` (the egress capability + * `ChannelHandler.send` already requires). The channel and destination are + * resolved by the host — `opts.channel`/`opts.chatId` override, otherwise the + * configured default channel and the paired owner are used. A missing channel, + * destination, or host resolver yields `{ delivered:false, reason }`; only a + * capability violation throws. + */ + notify( + text: string, + opts?: { readonly channel?: string; readonly chatId?: string }, + ): Promise; } /**