Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ marketplace version bump + update.
unset ⇒ in-memory only. Hardened against clobbering the persisted blob on a transient read
failure (#57, #60).
- **Clear scopes** — remove one scope or all, from the CLI/HTTP or the viewer UI (#49).
- **Console inbox — workspace-wide read** — `GET /inbox?all=1` aggregates unread human→agent
messages across every scope in one call (peek-only; never advances an ack), so an agent can
watch the whole workspace instead of polling each board scope-by-scope (#108).

### Themes & visuals
- **Theme system** — System / Dark / Light / Midnight / Paper, persisted; React Flow, Vega-Lite,
Expand All @@ -60,6 +63,9 @@ marketplace version bump + update.
handles a malformed viewer response gracefully (#56, #61).
- **`patch`** — apply incremental ops to a live `flow`/`component`/`panes` view (#58, #81, #82).
- **`clear`** — remove one scope (`--project`/`--agent`) or `--all` (#49).
- **`inbox --all`** — aggregate unread console messages across every scope in one call (no
`--project`/`--agent`); **`inbox --peek`** — read a scope without consuming it (no ack
advance), for non-destructive scans (#108).
- **`push`** now requires `--description` (the label shown in `list`) (#56).
- **Default theme is `zinc-dark`** — beautiful-mermaid's light-oriented default rendered node
labels near-black (invisible on a dark terminal); override with `--theme zinc-light` (#80).
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ Usage:
termchart pull [flags] Fetch a stored view's spec to iterate on (--project --agent [--json])
termchart list [flags] List stored views — scope · type · description (--project [--json])
termchart patch [flags] Update part of a stored flow/component without re-sending it (--project --agent --node --status / --check <id>/<item> / ops JSON)
termchart inbox [flags] Read the human→agent console for a scope (--project --agent [--since <seq>] [--wait] [--follow] [--json])
termchart inbox [flags] Read the human→agent console for a scope (--project --agent [--since <seq>] [--wait] [--follow] [--peek] [--json])
--follow/-f streams messages as they arrive (resilient long-poll, like tail -f; Ctrl-C to stop)
--peek reads without consuming (no ack advance); --all aggregates unread across every scope in one call
termchart suggest [flags] Push clickable suggestion chips to the human's console (--project --agent --items '[...]' / --item)
termchart template <cmd> Reusable diagram templates: save --project --agent --name | list | get <id> | delete <id>
termchart --version
Expand Down
59 changes: 55 additions & 4 deletions packages/cli/src/inbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export interface InboxDeps {
maxPolls?: number;
}

interface Args { project?: string; agent?: string; since?: number; wait: boolean; follow: boolean; json: boolean; error?: string; }
interface Args { project?: string; agent?: string; since?: number; wait: boolean; follow: boolean; json: boolean; peek: boolean; all: boolean; error?: string; }

// Long-poll the server holds for up to ~25s; give fetch a little headroom before we treat it as stuck.
const WAIT_MS = 25000;
Expand All @@ -15,7 +15,7 @@ const BACKOFF_START_MS = 1000;
const BACKOFF_MAX_MS = 15000;

function parse(argv: string[]): Args {
const a: Args = { wait: false, follow: false, json: false };
const a: Args = { wait: false, follow: false, json: false, peek: false, all: false };
for (let i = 0; i < argv.length; i++) {
const arg = argv[i];
if (arg === "--project") a.project = argv[++i];
Expand All @@ -27,13 +27,17 @@ function parse(argv: string[]): Args {
} else if (arg === "--wait") a.wait = true;
else if (arg === "--follow" || arg === "-f") a.follow = true;
else if (arg === "--json") a.json = true;
else if (arg === "--peek") a.peek = true;
else if (arg === "--all") a.all = true;
else a.error = `Unknown flag: ${arg}`;
}
return a;
}

interface InboxEvent { seq: number; ts: number; kind: "message" | "action"; text?: string; ref?: string; }
interface InboxResponse { events: InboxEvent[]; cursor: number; }
interface UnreadScope { project: string; agent: string; scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[]; }
interface InboxAllResponse { scopes: UnreadScope[]; }

function fmt(e: InboxEvent): string {
if (e.kind === "action") return `[${e.seq}] action ${e.ref ?? ""}`;
Expand All @@ -47,7 +51,9 @@ const sleep = (ms: number) => new Promise<void>((r) => setTimeout(r, ms));
* console, issue #108). Prints events newer than `--since` and (in the human format) the cursor to
* poll with next. `--wait` long-polls once so an agent can block on human input; `--follow`/`-f`
* streams events as they arrive in a resilient loop (long-poll + reconnect/backoff, advancing the
* cursor itself) — like `tail -f`, until Ctrl-C. `--json` prints raw events. Read is open (URL only,
* cursor itself) — like `tail -f`, until Ctrl-C. `--json` prints raw events. `--peek` reads WITHOUT
* advancing the ack (a non-consuming scan). `--all` aggregates unread across every scope in one call
* (no --project/--agent needed) so an agent can watch the whole workspace. Read is open (URL only,
* like list/pull); the bearer token is sent if present. Returns an exit code.
*/
export async function inbox(argv: string[], deps: InboxDeps): Promise<number> {
Expand All @@ -56,16 +62,22 @@ export async function inbox(argv: string[], deps: InboxDeps): Promise<number> {
const token = deps.env.TERMCHART_VIEWER_TOKEN;
if (a.error) { process.stderr.write(a.error + "\n"); return 3; }
if (!base) { process.stderr.write(missingConfigMessage(false)); return EXIT_NO_VIEWER; }
if (!a.project || !a.agent) { process.stderr.write("--project and --agent are required.\n"); return 3; }

const headers: Record<string, string> = token ? { authorization: `Bearer ${token}` } : {};

// `--all`: one aggregated call for unread across the whole workspace — the scope flags don't apply.
if (a.all) return inboxAll(base, headers, a);

if (!a.project || !a.agent) { process.stderr.write("--project and --agent are required (or use --all).\n"); return 3; }

const scopeQs = `project=${encodeURIComponent(a.project)}&agent=${encodeURIComponent(a.agent)}`;

if (a.follow) return follow(base, headers, scopeQs, a, deps);

let qs = scopeQs;
if (a.since !== undefined) qs += `&since=${a.since}`;
if (a.wait) qs += `&wait=${WAIT_MS}`;
if (a.peek) qs += `&peek=1`;

try {
// When waiting, bound the client a bit beyond the server's hold so a dead server doesn't hang us.
Expand Down Expand Up @@ -100,6 +112,45 @@ export async function inbox(argv: string[], deps: InboxDeps): Promise<number> {
}
}

/**
* `--all`: one call to GET /inbox?all=1 — the unread events across EVERY scope in the workspace, so
* an agent can watch the whole workspace at once instead of polling each scope. Always a peek (never
* advances any ack); the agent then does a normal per-scope read to consume + act. Human output is
* one block per scope with unread; `--json` prints the raw aggregate.
*/
async function inboxAll(base: string, headers: Record<string, string>, a: Args): Promise<number> {
try {
const r = await fetch(`${base}/inbox?all=1`, { method: "GET", headers });
if (!r.ok) {
const detail = (await r.text().catch(() => "")).trim();
process.stderr.write(`inbox failed: HTTP ${r.status}${detail ? ` — ${detail}` : ""}\n`);
return 1;
}
const data = (await r.json().catch(() => null)) as InboxAllResponse | null;
if (!data || !Array.isArray(data.scopes)) {
process.stderr.write("inbox failed: unexpected response from viewer\n");
return 1;
}
if (a.json) {
process.stdout.write(JSON.stringify(data, null, 2) + "\n");
return 0;
}
for (const s of data.scopes) {
process.stdout.write(`${s.scope} (${s.unread} unread, cursor ${s.cursor})\n`);
for (const e of s.events) process.stdout.write(` ${fmt(e)}\n`);
}
process.stderr.write(`scopes with unread: ${data.scopes.length}\n`);
return 0;
} catch (e) {
if (isConnError(e)) {
process.stderr.write(unreachableMessage(base, e));
return EXIT_NO_VIEWER;
}
process.stderr.write(`inbox failed: ${(e as Error).message}\n`);
return 1;
}
}

/**
* `--follow`: stream inbox events as they arrive. A resilient long-poll loop — on each iteration it
* long-polls from the current cursor; new events print immediately (and advance the cursor), an empty
Expand Down
43 changes: 43 additions & 0 deletions packages/cli/test/inbox.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,49 @@ describe("inbox", () => {
expect((await received).path).toBe("/w/ws1/inbox?project=p&agent=a&since=5");
});

it("--peek forwards peek=1 (a non-consuming read)", async () => {
const { url, received } = await stub(resp);
const spy = vi.spyOn(process.stdout, "write").mockImplementation(() => true);
const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
await inbox(["--project", "p", "--agent", "a", "--peek"], { env: env(url) });
spy.mockRestore(); se.mockRestore();
expect((await received).path).toBe("/w/ws1/inbox?project=p&agent=a&peek=1");
});

it("--all hits /inbox?all=1 (no project/agent) and prints each unread scope + its events", async () => {
const allResp = {
scopes: [
{ project: "p", agent: "a", scope: "p/a", unread: 2, cursor: 2, acked: 0, events: [{ seq: 1, ts: 1, kind: "message", text: "a1" }, { seq: 2, ts: 2, kind: "message", text: "a2" }] },
{ project: "q", agent: "c", scope: "q/c", unread: 1, cursor: 1, acked: 0, events: [{ seq: 1, ts: 1, kind: "action", ref: "rerun" }] },
],
};
const { url, received } = await stub(allResp);
const out: string[] = [];
const so = vi.spyOn(process.stdout, "write").mockImplementation((s) => (out.push(String(s)), true));
const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const code = await inbox(["--all"], { env: env(url) });
so.mockRestore(); se.mockRestore();
expect(code).toBe(0);
expect((await received).path).toBe("/w/ws1/inbox?all=1");
const printed = out.join("");
expect(printed).toContain("p/a");
expect(printed).toContain("a1");
expect(printed).toContain("q/c");
expect(printed).toContain("rerun");
});

it("--all --json prints the raw aggregate", async () => {
const allResp = { scopes: [{ project: "p", agent: "a", scope: "p/a", unread: 1, cursor: 1, acked: 0, events: [{ seq: 1, ts: 1, kind: "message", text: "x" }] }] };
const { url } = await stub(allResp);
const out: string[] = [];
const so = vi.spyOn(process.stdout, "write").mockImplementation((s) => (out.push(String(s)), true));
const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const code = await inbox(["--all", "--json"], { env: env(url) });
so.mockRestore(); se.mockRestore();
expect(code).toBe(0);
expect(JSON.parse(out.join("")).scopes).toHaveLength(1);
});

it("--wait long-polls (sends wait= and returns the events that arrive)", async () => {
const { url, received } = await stub({ events: [{ seq: 3, ts: 9, kind: "message", text: "late" }], cursor: 3 });
const out: string[] = [];
Expand Down
13 changes: 12 additions & 1 deletion packages/viewer/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -775,9 +775,20 @@ export function createViewerServer(opts: ServerOpts) {
// then return (possibly empty). Cleanup is leak-proof: the waiter is unregistered and the timer
// cleared on resolve, on the wait timeout, AND on client disconnect (`req.on("close")`).
if (req.method === "GET" && action === "inbox") {
// `?all=1`: aggregate unread across EVERY scope in the workspace in one call — so an agent can
// watch the whole workspace instead of polling each scope. Always a PEEK (never advances any
// ack: you can't sensibly consume many scopes at once); the agent then does a normal per-scope
// read to consume + act. Returns only scopes that have unread, each with its unread events.
if (url.searchParams.get("all") === "1") {
const scopes = inboxStore.unreadByScope(wsid).map((s) => {
const sl = s.scope.indexOf("/");
return { project: s.scope.slice(0, sl), agent: s.scope.slice(sl + 1), ...s };
});
return send(res, 200, JSON.stringify({ scopes }), "application/json");
}
const project = url.searchParams.get("project");
const agent = url.searchParams.get("agent");
if (!project || !agent) return send(res, 400, "inbox needs ?project=&agent=");
if (!project || !agent) return send(res, 400, "inbox needs ?project=&agent= (or ?all=1)");
const scope = `${project}/${agent}`;
const since = Math.max(0, Number(url.searchParams.get("since") ?? "0") | 0);
const waitRaw = Number(url.searchParams.get("wait") ?? "0");
Expand Down
17 changes: 17 additions & 0 deletions packages/viewer/src/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,23 @@ export class InboxStore {
return s ? Math.max(0, s.seq - s.acked) : 0;
}

/**
* Every scope in this workspace that has unread events, with its unread events (those with
* `seq > acked`), unread count, cursor, and ack. PEEK semantics — does NOT advance any ack — so
* an agent can watch the WHOLE workspace with one call (GET /inbox?all=1) instead of polling each
* scope, then do a normal per-scope read to consume + act. Scopes with nothing unread are omitted.
*/
unreadByScope(wsid: string): { scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[] }[] {
const scopes = this.ws.get(wsid);
if (!scopes) return [];
const out: { scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[] }[] = [];
for (const [scope, s] of scopes) {
const unread = Math.max(0, s.seq - s.acked);
if (unread > 0) out.push({ scope, unread, cursor: s.seq, acked: s.acked, events: s.events.filter((e) => e.seq > s.acked) });
}
return out;
}

/** Advance the read-ack to (at most) `uptoSeq` — the highest seq the agent was actually shown.
* Monotonic and clamped to the real `seq`, so an overshot cursor can't mark undelivered events
* "read". Returns the new ack watermark. */
Expand Down
26 changes: 26 additions & 0 deletions packages/viewer/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,32 @@ describe("viewer server", () => {
expect((await fetch(`${base}/w/ws1/inbox`)).status).toBe(400);
});

it("GET inbox?all=1 aggregates unread across every scope (peek-only — no ack advance)", async () => {
await inboxReq({ project: "p", agent: "a", kind: "message", text: "a1" });
await inboxReq({ project: "p", agent: "a", kind: "message", text: "a2" });
await inboxReq({ project: "p", agent: "b", kind: "message", text: "b1" });
await inboxReq({ project: "q", agent: "c", kind: "action", ref: "rerun" });
await getJson("inbox?project=q&agent=c"); // consume q/c → it should drop out of the aggregate

const got = await getJson("inbox?all=1");
expect(got.status).toBe(200);
const scopes = (got.body as { scopes: { project: string; agent: string; scope: string; unread: number; cursor: number; events: { text?: string }[] }[] }).scopes;
const byScope = Object.fromEntries(scopes.map((s) => [s.scope, s]));
expect(Object.keys(byScope).sort()).toEqual(["p/a", "p/b"]); // q/c was read → omitted
expect(byScope["p/a"]).toMatchObject({ project: "p", agent: "a", unread: 2, cursor: 2 });
expect(byScope["p/a"].events.map((e) => e.text)).toEqual(["a1", "a2"]);

// The aggregate is a peek: it must NOT have consumed p/a, so a later push still nudges.
const r = await push({ project: "p", agent: "a", type: "mermaid", content: "graph LR\n A-->B" });
expect(r.headers.get("x-termchart-inbox-unread")).toBe("2");
});

it("GET inbox?all=1 returns an empty list when nothing is unread", async () => {
const got = await getJson("inbox?all=1");
expect(got.status).toBe(200);
expect(got.body).toEqual({ scopes: [] });
});

it("nudges via x-termchart-inbox-unread on push/status until the agent reads the inbox", async () => {
// Fresh scope: the human has typed nothing → no nudge header.
const r0 = await push({ project: "p", agent: "a", type: "mermaid", content: "graph LR\n A-->B" });
Expand Down
31 changes: 31 additions & 0 deletions packages/viewer/test/state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,37 @@ describe("InboxStore", () => {
expect(s.unread("w1", "p/a")).toBe(0);
});

it("unreadByScope aggregates every scope with unread (peek — no ack advance), omitting read ones", () => {
const s = new InboxStore();
expect(s.unreadByScope("w1")).toEqual([]); // empty workspace
s.append("w1", "p/a", { kind: "message", text: "a1" });
s.append("w1", "p/a", { kind: "message", text: "a2" });
s.append("w1", "p/b", { kind: "message", text: "b1" });
s.append("w1", "p/c", { kind: "message", text: "c1" });
s.markReadTo("w1", "p/c", 4); // c is fully read → omitted
s.append("w2", "p/a", { kind: "message", text: "other-ws" }); // a different workspace is isolated

const got = s.unreadByScope("w1");
expect(got.map((x) => x.scope).sort()).toEqual(["p/a", "p/b"]);
const a = got.find((x) => x.scope === "p/a")!;
expect(a).toMatchObject({ scope: "p/a", unread: 2, cursor: 2, acked: 0 });
expect(a.events.map((e) => e.text)).toEqual(["a1", "a2"]);

// Peek must NOT advance the ack — the scope is still unread afterwards.
expect(s.unread("w1", "p/a")).toBe(2);
});

it("unreadByScope returns only events past the ack for a partially-read scope", () => {
const s = new InboxStore();
s.append("w1", "p/a", { kind: "message", text: "old" }); // seq 1
s.markReadTo("w1", "p/a", 1);
s.append("w1", "p/a", { kind: "message", text: "new" }); // seq 2, unread
const got = s.unreadByScope("w1");
expect(got).toHaveLength(1);
expect(got[0]).toMatchObject({ scope: "p/a", unread: 1, cursor: 2, acked: 1 });
expect(got[0].events.map((e) => e.text)).toEqual(["new"]);
});

it("stores + replaces suggestion chips", () => {
const s = new InboxStore();
s.setSuggestions("w1", "p/a", [{ id: "x", label: "X" }]);
Expand Down
Loading