diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bb15f0..3e567bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,9 @@ marketplace version bump + update. unset ⇒ in-memory only. Hardened against clobbering the persisted blob on a transient read failure (#57, #60). - **Clear scopes** — remove one scope or all, from the CLI/HTTP or the viewer UI (#49). +- **Console inbox — workspace-wide read** — `GET /inbox?all=1` aggregates unread human→agent + messages across every scope in one call (peek-only; never advances an ack), so an agent can + watch the whole workspace instead of polling each board scope-by-scope (#108). ### Themes & visuals - **Theme system** — System / Dark / Light / Midnight / Paper, persisted; React Flow, Vega-Lite, @@ -60,6 +63,9 @@ marketplace version bump + update. handles a malformed viewer response gracefully (#56, #61). - **`patch`** — apply incremental ops to a live `flow`/`component`/`panes` view (#58, #81, #82). - **`clear`** — remove one scope (`--project`/`--agent`) or `--all` (#49). +- **`inbox --all`** — aggregate unread console messages across every scope in one call (no + `--project`/`--agent`); **`inbox --peek`** — read a scope without consuming it (no ack + advance), for non-destructive scans (#108). - **`push`** now requires `--description` (the label shown in `list`) (#56). - **Default theme is `zinc-dark`** — beautiful-mermaid's light-oriented default rendered node labels near-black (invisible on a dark terminal); override with `--theme zinc-light` (#80). diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 0be1b3d..8b50849 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -34,8 +34,9 @@ Usage: termchart pull [flags] Fetch a stored view's spec to iterate on (--project --agent [--json]) termchart list [flags] List stored views — scope · type · description (--project [--json]) termchart patch [flags] Update part of a stored flow/component without re-sending it (--project --agent --node --status / --check / / ops JSON) - termchart inbox [flags] Read the human→agent console for a scope (--project --agent [--since ] [--wait] [--follow] [--json]) + termchart inbox [flags] Read the human→agent console for a scope (--project --agent [--since ] [--wait] [--follow] [--peek] [--json]) --follow/-f streams messages as they arrive (resilient long-poll, like tail -f; Ctrl-C to stop) + --peek reads without consuming (no ack advance); --all aggregates unread across every scope in one call termchart suggest [flags] Push clickable suggestion chips to the human's console (--project --agent --items '[...]' / --item) termchart template Reusable diagram templates: save --project --agent --name | list | get | delete termchart board Scheduled boards (auto-refreshing): create | list | show | prompt | scaffold-workflow | delete (try \`board --help\`) diff --git a/packages/cli/src/inbox.ts b/packages/cli/src/inbox.ts index d7662a6..c8f5acf 100644 --- a/packages/cli/src/inbox.ts +++ b/packages/cli/src/inbox.ts @@ -7,7 +7,7 @@ export interface InboxDeps { maxPolls?: number; } -interface Args { project?: string; agent?: string; since?: number; wait: boolean; follow: boolean; json: boolean; error?: string; } +interface Args { project?: string; agent?: string; since?: number; wait: boolean; follow: boolean; json: boolean; peek: boolean; all: boolean; error?: string; } // Long-poll the server holds for up to ~25s; give fetch a little headroom before we treat it as stuck. const WAIT_MS = 25000; @@ -16,7 +16,7 @@ const BACKOFF_START_MS = 1000; const BACKOFF_MAX_MS = 15000; function parse(argv: string[]): Args { - const a: Args = { wait: false, follow: false, json: false }; + const a: Args = { wait: false, follow: false, json: false, peek: false, all: false }; const take = (flag: string, set: (v: string) => void, i: number): void => { const t = takeValue(argv, i, flag); if (t.error) a.error = t.error; else set(t.value!); @@ -32,6 +32,8 @@ function parse(argv: string[]): Args { } else if (arg === "--wait") a.wait = true; else if (arg === "--follow" || arg === "-f") a.follow = true; else if (arg === "--json") a.json = true; + else if (arg === "--peek") a.peek = true; + else if (arg === "--all") a.all = true; else a.error = `Unknown flag: ${arg}`; } return a; @@ -40,6 +42,8 @@ function parse(argv: string[]): Args { interface InboxTarget { kind: string; panePath?: number[]; paneTitle?: string; id?: string; label?: string; excerpt?: string; seq?: number; } interface InboxEvent { seq: number; ts: number; kind: "message" | "action"; text?: string; ref?: string; target?: InboxTarget; } interface InboxResponse { events: InboxEvent[]; cursor: number; } +interface UnreadScope { project: string; agent: string; scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[]; } +interface InboxAllResponse { scopes: UnreadScope[]; } /** One indented context line for a pinned message: where the human pinned it, in spec terms. */ function fmtTarget(t: InboxTarget): string { @@ -65,7 +69,9 @@ const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); * console, issue #108). Prints events newer than `--since` and (in the human format) the cursor to * poll with next. `--wait` long-polls once so an agent can block on human input; `--follow`/`-f` * streams events as they arrive in a resilient loop (long-poll + reconnect/backoff, advancing the - * cursor itself) — like `tail -f`, until Ctrl-C. `--json` prints raw events. Read is open (URL only, + * cursor itself) — like `tail -f`, until Ctrl-C. `--json` prints raw events. `--peek` reads WITHOUT + * advancing the ack (a non-consuming scan). `--all` aggregates unread across every scope in one call + * (no --project/--agent needed) so an agent can watch the whole workspace. Read is open (URL only, * like list/pull); the bearer token is sent if present. Returns an exit code. */ export async function inbox(argv: string[], deps: InboxDeps): Promise { @@ -74,9 +80,14 @@ export async function inbox(argv: string[], deps: InboxDeps): Promise { const token = deps.env.TERMCHART_VIEWER_TOKEN; if (a.error) { process.stderr.write(a.error + "\n"); return 3; } if (!base) { process.stderr.write(missingConfigMessage(false)); return EXIT_NO_VIEWER; } - if (!a.project || !a.agent) { process.stderr.write("--project and --agent are required.\n"); return 3; } const headers: Record = token ? { authorization: `Bearer ${token}` } : {}; + + // `--all`: one aggregated call for unread across the whole workspace — the scope flags don't apply. + if (a.all) return inboxAll(base, headers, a); + + if (!a.project || !a.agent) { process.stderr.write("--project and --agent are required (or use --all).\n"); return 3; } + const scopeQs = `project=${encodeURIComponent(a.project)}&agent=${encodeURIComponent(a.agent)}`; if (a.follow) return follow(base, headers, scopeQs, a, deps); @@ -84,6 +95,7 @@ export async function inbox(argv: string[], deps: InboxDeps): Promise { let qs = scopeQs; if (a.since !== undefined) qs += `&since=${a.since}`; if (a.wait) qs += `&wait=${WAIT_MS}`; + if (a.peek) qs += `&peek=1`; try { // When waiting, bound the client a bit beyond the server's hold; a plain read gets the shorter @@ -119,6 +131,45 @@ export async function inbox(argv: string[], deps: InboxDeps): Promise { } } +/** + * `--all`: one call to GET /inbox?all=1 — the unread events across EVERY scope in the workspace, so + * an agent can watch the whole workspace at once instead of polling each scope. Always a peek (never + * advances any ack); the agent then does a normal per-scope read to consume + act. Human output is + * one block per scope with unread; `--json` prints the raw aggregate. + */ +async function inboxAll(base: string, headers: Record, a: Args): Promise { + try { + const r = await fetch(`${base}/inbox?all=1`, { method: "GET", headers }); + if (!r.ok) { + const detail = (await r.text().catch(() => "")).trim(); + process.stderr.write(`inbox failed: HTTP ${r.status}${detail ? ` — ${detail}` : ""}\n`); + return 1; + } + const data = (await r.json().catch(() => null)) as InboxAllResponse | null; + if (!data || !Array.isArray(data.scopes)) { + process.stderr.write("inbox failed: unexpected response from viewer\n"); + return 1; + } + if (a.json) { + process.stdout.write(JSON.stringify(data, null, 2) + "\n"); + return 0; + } + for (const s of data.scopes) { + process.stdout.write(`${s.scope} (${s.unread} unread, cursor ${s.cursor})\n`); + for (const e of s.events) process.stdout.write(` ${fmt(e)}\n`); + } + process.stderr.write(`scopes with unread: ${data.scopes.length}\n`); + return 0; + } catch (e) { + if (isConnError(e)) { + process.stderr.write(unreachableMessage(base, e)); + return EXIT_NO_VIEWER; + } + process.stderr.write(`inbox failed: ${(e as Error).message}\n`); + return 1; + } +} + /** * `--follow`: stream inbox events as they arrive. A resilient long-poll loop — on each iteration it * long-polls from the current cursor; new events print immediately (and advance the cursor), an empty diff --git a/packages/cli/test/inbox.test.ts b/packages/cli/test/inbox.test.ts index de2ad6f..c0f406e 100644 --- a/packages/cli/test/inbox.test.ts +++ b/packages/cli/test/inbox.test.ts @@ -100,6 +100,49 @@ describe("inbox", () => { expect((await received).path).toBe("/w/ws1/inbox?project=p&agent=a&since=5"); }); + it("--peek forwards peek=1 (a non-consuming read)", async () => { + const { url, received } = await stub(resp); + const spy = vi.spyOn(process.stdout, "write").mockImplementation(() => true); + const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + await inbox(["--project", "p", "--agent", "a", "--peek"], { env: env(url) }); + spy.mockRestore(); se.mockRestore(); + expect((await received).path).toBe("/w/ws1/inbox?project=p&agent=a&peek=1"); + }); + + it("--all hits /inbox?all=1 (no project/agent) and prints each unread scope + its events", async () => { + const allResp = { + scopes: [ + { project: "p", agent: "a", scope: "p/a", unread: 2, cursor: 2, acked: 0, events: [{ seq: 1, ts: 1, kind: "message", text: "a1" }, { seq: 2, ts: 2, kind: "message", text: "a2" }] }, + { project: "q", agent: "c", scope: "q/c", unread: 1, cursor: 1, acked: 0, events: [{ seq: 1, ts: 1, kind: "action", ref: "rerun" }] }, + ], + }; + const { url, received } = await stub(allResp); + const out: string[] = []; + const so = vi.spyOn(process.stdout, "write").mockImplementation((s) => (out.push(String(s)), true)); + const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + const code = await inbox(["--all"], { env: env(url) }); + so.mockRestore(); se.mockRestore(); + expect(code).toBe(0); + expect((await received).path).toBe("/w/ws1/inbox?all=1"); + const printed = out.join(""); + expect(printed).toContain("p/a"); + expect(printed).toContain("a1"); + expect(printed).toContain("q/c"); + expect(printed).toContain("rerun"); + }); + + it("--all --json prints the raw aggregate", async () => { + const allResp = { scopes: [{ project: "p", agent: "a", scope: "p/a", unread: 1, cursor: 1, acked: 0, events: [{ seq: 1, ts: 1, kind: "message", text: "x" }] }] }; + const { url } = await stub(allResp); + const out: string[] = []; + const so = vi.spyOn(process.stdout, "write").mockImplementation((s) => (out.push(String(s)), true)); + const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + const code = await inbox(["--all", "--json"], { env: env(url) }); + so.mockRestore(); se.mockRestore(); + expect(code).toBe(0); + expect(JSON.parse(out.join("")).scopes).toHaveLength(1); + }); + it("--wait long-polls (sends wait= and returns the events that arrive)", async () => { const { url, received } = await stub({ events: [{ seq: 3, ts: 9, kind: "message", text: "late" }], cursor: 3 }); const out: string[] = []; diff --git a/packages/viewer/src/server.ts b/packages/viewer/src/server.ts index f2ccc9c..fb4692f 100644 --- a/packages/viewer/src/server.ts +++ b/packages/viewer/src/server.ts @@ -847,9 +847,20 @@ export function createViewerServer(opts: ServerOpts) { // then return (possibly empty). Cleanup is leak-proof: the waiter is unregistered and the timer // cleared on resolve, on the wait timeout, AND on client disconnect (`req.on("close")`). if (req.method === "GET" && action === "inbox") { + // `?all=1`: aggregate unread across EVERY scope in the workspace in one call — so an agent can + // watch the whole workspace instead of polling each scope. Always a PEEK (never advances any + // ack: you can't sensibly consume many scopes at once); the agent then does a normal per-scope + // read to consume + act. Returns only scopes that have unread, each with its unread events. + if (url.searchParams.get("all") === "1") { + const scopes = inboxStore.unreadByScope(wsid).map((s) => { + const sl = s.scope.indexOf("/"); + return { project: s.scope.slice(0, sl), agent: s.scope.slice(sl + 1), ...s }; + }); + return send(res, 200, JSON.stringify({ scopes }), "application/json"); + } const project = url.searchParams.get("project"); const agent = url.searchParams.get("agent"); - if (!project || !agent) return send(res, 400, "inbox needs ?project=&agent="); + if (!project || !agent) return send(res, 400, "inbox needs ?project=&agent= (or ?all=1)"); const scope = `${project}/${agent}`; const since = Math.max(0, Number(url.searchParams.get("since") ?? "0") | 0); const waitRaw = Number(url.searchParams.get("wait") ?? "0"); diff --git a/packages/viewer/src/state.ts b/packages/viewer/src/state.ts index dfdb8b8..db68ef2 100644 --- a/packages/viewer/src/state.ts +++ b/packages/viewer/src/state.ts @@ -254,6 +254,23 @@ export class InboxStore { return s ? Math.max(0, s.seq - s.acked) : 0; } + /** + * Every scope in this workspace that has unread events, with its unread events (those with + * `seq > acked`), unread count, cursor, and ack. PEEK semantics — does NOT advance any ack — so + * an agent can watch the WHOLE workspace with one call (GET /inbox?all=1) instead of polling each + * scope, then do a normal per-scope read to consume + act. Scopes with nothing unread are omitted. + */ + unreadByScope(wsid: string): { scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[] }[] { + const scopes = this.ws.get(wsid); + if (!scopes) return []; + const out: { scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[] }[] = []; + for (const [scope, s] of scopes) { + const unread = Math.max(0, s.seq - s.acked); + if (unread > 0) out.push({ scope, unread, cursor: s.seq, acked: s.acked, events: s.events.filter((e) => e.seq > s.acked) }); + } + return out; + } + /** Unread events the agent hasn't yet been NUDGED about (`seq − max(acked, notified)`). The * push/patch/status nudge uses this so it fires only on NEW messages — not re-announcing the same * stale backlog on every write (the "inbox heartbeat" that was annoying when nothing was new). */ diff --git a/packages/viewer/test/server.test.ts b/packages/viewer/test/server.test.ts index 9960c67..d0e0fdd 100644 --- a/packages/viewer/test/server.test.ts +++ b/packages/viewer/test/server.test.ts @@ -772,6 +772,32 @@ describe("viewer server", () => { expect((await fetch(`${base}/w/ws1/inbox`)).status).toBe(400); }); + it("GET inbox?all=1 aggregates unread across every scope (peek-only — no ack advance)", async () => { + await inboxReq({ project: "p", agent: "a", kind: "message", text: "a1" }); + await inboxReq({ project: "p", agent: "a", kind: "message", text: "a2" }); + await inboxReq({ project: "p", agent: "b", kind: "message", text: "b1" }); + await inboxReq({ project: "q", agent: "c", kind: "action", ref: "rerun" }); + await getJson("inbox?project=q&agent=c"); // consume q/c → it should drop out of the aggregate + + const got = await getJson("inbox?all=1"); + expect(got.status).toBe(200); + const scopes = (got.body as { scopes: { project: string; agent: string; scope: string; unread: number; cursor: number; events: { text?: string }[] }[] }).scopes; + const byScope = Object.fromEntries(scopes.map((s) => [s.scope, s])); + expect(Object.keys(byScope).sort()).toEqual(["p/a", "p/b"]); // q/c was read → omitted + expect(byScope["p/a"]).toMatchObject({ project: "p", agent: "a", unread: 2, cursor: 2 }); + expect(byScope["p/a"].events.map((e) => e.text)).toEqual(["a1", "a2"]); + + // The aggregate is a peek: it must NOT have consumed p/a, so a later push still nudges. + const r = await push({ project: "p", agent: "a", type: "mermaid", content: "graph LR\n A-->B" }); + expect(r.headers.get("x-termchart-inbox-unread")).toBe("2"); + }); + + it("GET inbox?all=1 returns an empty list when nothing is unread", async () => { + const got = await getJson("inbox?all=1"); + expect(got.status).toBe(200); + expect(got.body).toEqual({ scopes: [] }); + }); + it("nudges via x-termchart-inbox-unread ONCE on new messages, not as a heartbeat on every write", async () => { // Fresh scope: the human has typed nothing → no nudge header. const r0 = await push({ project: "p", agent: "a", type: "mermaid", content: "graph LR\n A-->B" }); diff --git a/packages/viewer/test/state.test.ts b/packages/viewer/test/state.test.ts index 87dc3b2..a570d2f 100644 --- a/packages/viewer/test/state.test.ts +++ b/packages/viewer/test/state.test.ts @@ -182,6 +182,37 @@ describe("InboxStore", () => { expect(s.unread("w1", "p/a")).toBe(0); }); + it("unreadByScope aggregates every scope with unread (peek — no ack advance), omitting read ones", () => { + const s = new InboxStore(); + expect(s.unreadByScope("w1")).toEqual([]); // empty workspace + s.append("w1", "p/a", { kind: "message", text: "a1" }); + s.append("w1", "p/a", { kind: "message", text: "a2" }); + s.append("w1", "p/b", { kind: "message", text: "b1" }); + s.append("w1", "p/c", { kind: "message", text: "c1" }); + s.markReadTo("w1", "p/c", 4); // c is fully read → omitted + s.append("w2", "p/a", { kind: "message", text: "other-ws" }); // a different workspace is isolated + + const got = s.unreadByScope("w1"); + expect(got.map((x) => x.scope).sort()).toEqual(["p/a", "p/b"]); + const a = got.find((x) => x.scope === "p/a")!; + expect(a).toMatchObject({ scope: "p/a", unread: 2, cursor: 2, acked: 0 }); + expect(a.events.map((e) => e.text)).toEqual(["a1", "a2"]); + + // Peek must NOT advance the ack — the scope is still unread afterwards. + expect(s.unread("w1", "p/a")).toBe(2); + }); + + it("unreadByScope returns only events past the ack for a partially-read scope", () => { + const s = new InboxStore(); + s.append("w1", "p/a", { kind: "message", text: "old" }); // seq 1 + s.markReadTo("w1", "p/a", 1); + s.append("w1", "p/a", { kind: "message", text: "new" }); // seq 2, unread + const got = s.unreadByScope("w1"); + expect(got).toHaveLength(1); + expect(got[0]).toMatchObject({ scope: "p/a", unread: 1, cursor: 2, acked: 1 }); + expect(got[0].events.map((e) => e.text)).toEqual(["new"]); + }); + it("stores + replaces suggestion chips", () => { const s = new InboxStore(); s.setSuggestions("w1", "p/a", [{ id: "x", label: "X" }]);