diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e567bb..69a8a15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,12 @@ marketplace version bump + update. - **Console inbox — workspace-wide read** — `GET /inbox?all=1` aggregates unread human→agent messages across every scope in one call (peek-only; never advances an ack), so an agent can watch the whole workspace instead of polling each board scope-by-scope (#108). +- **Agent → human replies** — `POST /reply` posts an agent message into a scope's console thread + (token-gated), broadcast live so the human sees what the agent is doing; replies share the inbox + ring but never count toward the agent's own unread (#108). +- **Board subscriptions (takeover)** — `POST /subscribe` / `/unsubscribe` + `GET /subscriptions` + give one owner per scope so two watching sessions don't both act on a board; a fresh subscribe + takes over and the SSE `subscribe` broadcast names the evicted session (#108). ### Themes & visuals - **Theme system** — System / Dark / Light / Midnight / Paper, persisted; React Flow, Vega-Lite, @@ -66,6 +72,11 @@ marketplace version bump + update. - **`inbox --all`** — aggregate unread console messages across every scope in one call (no `--project`/`--agent`); **`inbox --peek`** — read a scope without consuming it (no ack advance), for non-destructive scans (#108). +- **`reply`** — post an agent→human message into a scope's console thread, so the human sees what + the agent is doing in real time (#108). +- **`subscribe` / `unsubscribe` / `subscriptions`** — claim/release exclusive ownership of a board + (`--session `) so multiple watching sessions don't overlap; takeover wins, `subscriptions` + lists who owns what (#108). - **`push`** now requires `--description` (the label shown in `list`) (#56). - **Default theme is `zinc-dark`** — beautiful-mermaid's light-oriented default rendered node labels near-black (invisible on a dark terminal); override with `--theme zinc-light` (#80). diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index c6961ca..28795c2 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -40,6 +40,10 @@ Usage: --follow/-f streams messages as they arrive (resilient long-poll, like tail -f; Ctrl-C to stop) --peek reads without consuming (no ack advance); --all aggregates unread across every scope in one call termchart suggest [flags] Push clickable suggestion chips to the human's console (--project --agent --items '[...]' / --item) + termchart reply [flags] Post an agent→human message into a scope's console thread (--project --agent --message) + termchart subscribe [flags] Claim ownership of a board so sessions coordinate (advisory; --project --agent --session); takeover wins + termchart unsubscribe … Release a board you own (--project --agent --session) + termchart subscriptions List which boards are currently owned (--project optional; [--json]) termchart template Reusable diagram templates: save --project --agent --name | list | get | delete termchart --version @@ -376,6 +380,19 @@ if (isEntryPoint()) { import("./suggest.js") .then(({ suggest }) => suggest(argv.slice(1), { env: process.env }).then((code) => process.exit(code))) .catch((e) => { process.stderr.write(`suggest failed: ${e?.message ?? e}\n`); process.exit(1); }); + } else if (argv[0] === "reply") { + import("./reply.js") + .then(({ reply }) => reply(argv.slice(1), { env: process.env }).then((code) => process.exit(code))) + .catch((e) => { process.stderr.write(`reply failed: ${e?.message ?? e}\n`); process.exit(1); }); + } else if (argv[0] === "subscribe" || argv[0] === "unsubscribe") { + const mode = argv[0]; + import("./subscribe.js") + .then(({ subscribe }) => subscribe(mode, argv.slice(1), { env: process.env }).then((code) => process.exit(code))) + .catch((e) => { process.stderr.write(`${mode} failed: ${e?.message ?? e}\n`); process.exit(1); }); + } else if (argv[0] === "subscriptions") { + import("./subscribe.js") + .then(({ subscriptions }) => subscriptions(argv.slice(1), { env: process.env }).then((code) => process.exit(code))) + .catch((e) => { process.stderr.write(`subscriptions failed: ${e?.message ?? e}\n`); process.exit(1); }); } else if (argv[0] === "template") { import("./template.js") .then(({ template }) => template(argv.slice(1), { env: process.env }).then((code) => process.exit(code))) diff --git a/packages/cli/src/inbox.ts b/packages/cli/src/inbox.ts index 79eb641..e5b1ece 100644 --- a/packages/cli/src/inbox.ts +++ b/packages/cli/src/inbox.ts @@ -34,13 +34,16 @@ function parse(argv: string[]): Args { return a; } -interface InboxEvent { seq: number; ts: number; kind: "message" | "action"; text?: string; ref?: string; } +interface InboxEvent { seq: number; ts: number; kind: "message" | "action" | "reply"; text?: string; ref?: string; } interface InboxResponse { events: InboxEvent[]; cursor: number; } interface UnreadScope { project: string; agent: string; scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[]; } interface InboxAllResponse { scopes: UnreadScope[]; } function fmt(e: InboxEvent): string { if (e.kind === "action") return `[${e.seq}] action ${e.ref ?? ""}`; + // `reply` is the agent's OWN message echoed back in a plain read — label it so the agent never + // mistakes it for new human input (the consume path returns replies; only unread/--all drop them). + if (e.kind === "reply") return `[${e.seq}] reply ${e.text ?? ""}`; return `[${e.seq}] message ${e.text ?? ""}`; } diff --git a/packages/cli/src/reply.ts b/packages/cli/src/reply.ts new file mode 100644 index 0000000..2376d6c --- /dev/null +++ b/packages/cli/src/reply.ts @@ -0,0 +1,56 @@ +import { EXIT_NO_VIEWER, isConnError, missingConfigMessage, unreachableMessage } from "./viewer-detect.js"; + +export interface ReplyDeps { + env: Record; +} + +interface Args { project?: string; agent?: string; message?: string; error?: string; } + +function parse(argv: string[]): Args { + const a: Args = {}; + for (let i = 0; i < argv.length; i++) { + const arg = argv[i]; + if (arg === "--project") a.project = argv[++i]; + else if (arg === "--agent") a.agent = argv[++i]; + else if (arg === "--message" || arg === "-m") a.message = argv[++i]; + else a.error = `Unknown flag: ${arg}`; + } + return a; +} + +/** + * `termchart reply` — post an agent → human message into a scope's console (the agent → human side of + * the EXPERIMENTAL console, issue #108). Token-gated (the agent is posting, like suggest/push). The + * reply lands in the same console thread as the human's messages and shows up live on every open + * console, so the human can see what the agent is doing. Returns an exit code. + */ +export async function reply(argv: string[], deps: ReplyDeps): Promise { + const a = parse(argv); + if (a.error) { process.stderr.write(a.error + "\n"); return 3; } + const base = deps.env.TERMCHART_VIEWER_URL; + const token = deps.env.TERMCHART_VIEWER_TOKEN; + if (!base || !token) { process.stderr.write(missingConfigMessage()); return EXIT_NO_VIEWER; } + if (!a.project || !a.agent) { process.stderr.write("--project and --agent are required.\n"); return 3; } + if (!a.message) { process.stderr.write("--message (-m) is required.\n"); return 3; } + + try { + const r = await fetch(`${base}/reply`, { + method: "POST", + headers: { "content-type": "application/json", authorization: `Bearer ${token}` }, + body: JSON.stringify({ project: a.project, agent: a.agent, text: a.message }), + }); + if (!r.ok) { + const detail = (await r.text().catch(() => "")).trim(); + process.stderr.write(`reply failed: HTTP ${r.status}${detail ? ` — ${detail}` : ""}\n`); + return 1; + } + return 0; + } catch (e) { + if (isConnError(e)) { + process.stderr.write(unreachableMessage(base, e)); + return EXIT_NO_VIEWER; + } + process.stderr.write(`reply failed: ${(e as Error).message}\n`); + return 1; + } +} diff --git a/packages/cli/src/subscribe.ts b/packages/cli/src/subscribe.ts new file mode 100644 index 0000000..80912f3 --- /dev/null +++ b/packages/cli/src/subscribe.ts @@ -0,0 +1,112 @@ +import { EXIT_NO_VIEWER, isConnError, missingConfigMessage, unreachableMessage } from "./viewer-detect.js"; + +export interface SubscribeDeps { + env: Record; +} + +interface Args { project?: string; agent?: string; session?: string; json: boolean; error?: string; } + +function parse(argv: string[]): Args { + const a: Args = { json: false }; + for (let i = 0; i < argv.length; i++) { + const arg = argv[i]; + if (arg === "--project") a.project = argv[++i]; + else if (arg === "--agent") a.agent = argv[++i]; + else if (arg === "--session") a.session = argv[++i]; + else if (arg === "--json") a.json = true; + else a.error = `Unknown flag: ${arg}`; + } + return a; +} + +interface Subscription { scope: string; owner: string; ts: number; } + +/** + * `termchart subscribe` / `unsubscribe` — claim or release ownership of a board so two watching + * sessions coordinate and don't both act on its console messages (issue #108). TAKEOVER model: a + * fresh `subscribe` always wins and evicts the prior owner; the response prints who (if anyone) was + * evicted. `unsubscribe` only releases if you still own it. Token-gated. Pair `--session ` with a + * stable per-session identifier. ADVISORY: ownership is cooperative — the server does not block a + * non-owner's writes, so a session should check `subscriptions` / honor a takeover and stand down. + * Returns an exit code. + */ +export async function subscribe(mode: "subscribe" | "unsubscribe", argv: string[], deps: SubscribeDeps): Promise { + const a = parse(argv); + if (a.error) { process.stderr.write(a.error + "\n"); return 3; } + const base = deps.env.TERMCHART_VIEWER_URL; + const token = deps.env.TERMCHART_VIEWER_TOKEN; + if (!base || !token) { process.stderr.write(missingConfigMessage()); return EXIT_NO_VIEWER; } + if (!a.project || !a.agent) { process.stderr.write("--project and --agent are required.\n"); return 3; } + if (!a.session) { process.stderr.write("--session is required (a stable id for this watching session).\n"); return 3; } + + try { + const r = await fetch(`${base}/${mode}`, { + method: "POST", + headers: { "content-type": "application/json", authorization: `Bearer ${token}` }, + body: JSON.stringify({ project: a.project, agent: a.agent, session: a.session }), + }); + if (!r.ok) { + const detail = (await r.text().catch(() => "")).trim(); + process.stderr.write(`${mode} failed: HTTP ${r.status}${detail ? ` — ${detail}` : ""}\n`); + return 1; + } + if (mode === "subscribe") { + const data = (await r.json().catch(() => null)) as { scope: string; owner: string; tookOverFrom?: string | null } | null; + if (data?.tookOverFrom) process.stderr.write(`subscribed to ${data.scope} — took over from session ${data.tookOverFrom}\n`); + else process.stderr.write(`subscribed to ${a.project}/${a.agent}\n`); + } else { + // 204; the server flags a no-op (you weren't the owner) via a header. + if (r.headers.get("x-termchart-not-owner")) process.stderr.write(`not the owner of ${a.project}/${a.agent} — nothing released\n`); + else process.stderr.write(`unsubscribed from ${a.project}/${a.agent}\n`); + } + return 0; + } catch (e) { + if (isConnError(e)) { + process.stderr.write(unreachableMessage(base, e)); + return EXIT_NO_VIEWER; + } + process.stderr.write(`${mode} failed: ${(e as Error).message}\n`); + return 1; + } +} + +/** + * `termchart subscriptions` — list which boards are currently owned (and by which session) so a new + * session can pick an unclaimed board. Open read (URL-gated, like `list`). `--json` for the raw array. + */ +export async function subscriptions(argv: string[], deps: SubscribeDeps): Promise { + const a = parse(argv); + if (a.error) { process.stderr.write(a.error + "\n"); return 3; } + const base = deps.env.TERMCHART_VIEWER_URL; + const token = deps.env.TERMCHART_VIEWER_TOKEN; + if (!base) { process.stderr.write(missingConfigMessage(false)); return EXIT_NO_VIEWER; } + const headers: Record = token ? { authorization: `Bearer ${token}` } : {}; + + try { + const r = await fetch(`${base}/subscriptions`, { method: "GET", headers }); + if (!r.ok) { + const detail = (await r.text().catch(() => "")).trim(); + process.stderr.write(`subscriptions failed: HTTP ${r.status}${detail ? ` — ${detail}` : ""}\n`); + return 1; + } + const data = (await r.json().catch(() => null)) as { subscriptions: Subscription[] } | null; + if (!data || !Array.isArray(data.subscriptions)) { + process.stderr.write("subscriptions failed: unexpected response from viewer\n"); + return 1; + } + if (a.json) { + process.stdout.write(JSON.stringify(data, null, 2) + "\n"); + return 0; + } + for (const s of data.subscriptions) process.stdout.write(`${s.scope} owned by ${s.owner}\n`); + process.stderr.write(`subscribed boards: ${data.subscriptions.length}\n`); + return 0; + } catch (e) { + if (isConnError(e)) { + process.stderr.write(unreachableMessage(base, e)); + return EXIT_NO_VIEWER; + } + process.stderr.write(`subscriptions failed: ${(e as Error).message}\n`); + return 1; + } +} diff --git a/packages/cli/test/inbox.test.ts b/packages/cli/test/inbox.test.ts index eebffc1..6fc984e 100644 --- a/packages/cli/test/inbox.test.ts +++ b/packages/cli/test/inbox.test.ts @@ -66,6 +66,18 @@ describe("inbox", () => { expect((await received).path).toBe("/w/ws1/inbox?project=p&agent=a&since=5"); }); + it("labels a reply event as `reply`, not `message`, in a plain read", async () => { + const { url } = await stub({ events: [{ seq: 1, ts: 1, kind: "reply", text: "on it" }], cursor: 1 }); + const out: string[] = []; + const so = vi.spyOn(process.stdout, "write").mockImplementation((s) => (out.push(String(s)), true)); + const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + const code = await inbox(["--project", "p", "--agent", "a"], { env: env(url) }); + so.mockRestore(); se.mockRestore(); + expect(code).toBe(0); + expect(out.join("")).toContain("[1] reply"); + expect(out.join("")).not.toContain("[1] message"); // an agent's own reply isn't shown as human input + }); + it("--peek forwards peek=1 (a non-consuming read)", async () => { const { url, received } = await stub(resp); const spy = vi.spyOn(process.stdout, "write").mockImplementation(() => true); diff --git a/packages/cli/test/reply.test.ts b/packages/cli/test/reply.test.ts new file mode 100644 index 0000000..4609d08 --- /dev/null +++ b/packages/cli/test/reply.test.ts @@ -0,0 +1,73 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createServer, type Server } from "node:http"; +import { type AddressInfo } from "node:net"; +import { reply } from "../src/reply.js"; +import { EXIT_NO_VIEWER } from "../src/viewer-detect.js"; + +let server: Server | undefined; +afterEach(() => server?.close()); + +/** A POST stub that records the request (path, auth, body) and replies 200 with the event. */ +function stub(): Promise<{ url: string; received: Promise<{ path: string; auth?: string; body: string }> }> { + return new Promise((resolve) => { + let resolveReq!: (v: { path: string; auth?: string; body: string }) => void; + const received = new Promise<{ path: string; auth?: string; body: string }>((r) => (resolveReq = r)); + server = createServer((req, res) => { + const chunks: Buffer[] = []; + req.on("data", (c) => chunks.push(c)); + req.on("end", () => { + res.writeHead(200, { "content-type": "application/json" }).end(JSON.stringify({ seq: 1, kind: "reply" })); + resolveReq({ path: req.url!, auth: req.headers.authorization, body: Buffer.concat(chunks).toString() }); + }); + }); + server.listen(0, () => resolve({ url: `http://127.0.0.1:${(server!.address() as AddressInfo).port}`, received })); + }); +} + +function errStub(status: number, body: string): Promise { + return new Promise((resolve) => { + server = createServer((_req, res) => res.writeHead(status, { "content-type": "text/plain" }).end(body)); + server.listen(0, () => resolve(`http://127.0.0.1:${(server!.address() as AddressInfo).port}`)); + }); +} + +const env = (url: string) => ({ TERMCHART_VIEWER_URL: `${url}/w/ws1`, TERMCHART_VIEWER_TOKEN: "tok" }); + +describe("reply", () => { + it("POSTs the message to /reply with the bearer token", async () => { + const { url, received } = await stub(); + const code = await reply(["--project", "p", "--agent", "a", "--message", "on it"], { env: env(url) }); + const req = await received; + expect(code).toBe(0); + expect(req.path).toBe("/w/ws1/reply"); + expect(req.auth).toBe("Bearer tok"); + const body = JSON.parse(req.body); + expect(body).toMatchObject({ project: "p", agent: "a", text: "on it" }); + }); + + it("accepts the -m alias", async () => { + const { url, received } = await stub(); + const code = await reply(["--project", "p", "--agent", "a", "-m", "short"], { env: env(url) }); + expect(code).toBe(0); + expect(JSON.parse((await received).body).text).toBe("short"); + }); + + it("requires URL+token (no-viewer code) and project/agent/message (exit 3)", async () => { + expect(await reply(["--project", "p", "--agent", "a", "-m", "x"], { env: {} })).toBe(EXIT_NO_VIEWER); + const { url } = await stub(); + const spy = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + expect(await reply(["--agent", "a", "-m", "x"], { env: env(url) })).toBe(3); // no project + expect(await reply(["--project", "p", "--agent", "a"], { env: env(url) })).toBe(3); // no message + spy.mockRestore(); + }); + + it("surfaces a server error (exit 1)", async () => { + const url = await errStub(401, "unauthorized"); + const errs: string[] = []; + const spy = vi.spyOn(process.stderr, "write").mockImplementation((s) => (errs.push(String(s)), true)); + const code = await reply(["--project", "p", "--agent", "a", "-m", "x"], { env: env(url) }); + spy.mockRestore(); + expect(code).toBe(1); + expect(errs.join("")).toContain("HTTP 401"); + }); +}); diff --git a/packages/cli/test/subscribe.test.ts b/packages/cli/test/subscribe.test.ts new file mode 100644 index 0000000..36da91a --- /dev/null +++ b/packages/cli/test/subscribe.test.ts @@ -0,0 +1,89 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createServer, type Server } from "node:http"; +import { type AddressInfo } from "node:net"; +import { subscribe, subscriptions } from "../src/subscribe.js"; +import { EXIT_NO_VIEWER } from "../src/viewer-detect.js"; + +let server: Server | undefined; +afterEach(() => server?.close()); + +/** A stub replying with `json` (and optional headers); records path/auth/body. */ +function stub(json: unknown, status = 200, headers: Record = {}): Promise<{ url: string; received: Promise<{ path: string; auth?: string; body: string }> }> { + return new Promise((resolve) => { + let resolveReq!: (v: { path: string; auth?: string; body: string }) => void; + const received = new Promise<{ path: string; auth?: string; body: string }>((r) => (resolveReq = r)); + server = createServer((req, res) => { + const chunks: Buffer[] = []; + req.on("data", (c) => chunks.push(c)); + req.on("end", () => { + res.writeHead(status, { "content-type": "application/json", ...headers }).end(json === undefined ? "" : JSON.stringify(json)); + resolveReq({ path: req.url!, auth: req.headers.authorization, body: Buffer.concat(chunks).toString() }); + }); + }); + server.listen(0, () => resolve({ url: `http://127.0.0.1:${(server!.address() as AddressInfo).port}`, received })); + }); +} + +const env = (url: string) => ({ TERMCHART_VIEWER_URL: `${url}/w/ws1`, TERMCHART_VIEWER_TOKEN: "tok" }); + +describe("subscribe / unsubscribe", () => { + it("POSTs /subscribe with session + token and reports a takeover", async () => { + const { url, received } = await stub({ scope: "p/a", owner: "sessB", tookOverFrom: "sessA" }); + const errs: string[] = []; + const se = vi.spyOn(process.stderr, "write").mockImplementation((s) => (errs.push(String(s)), true)); + const code = await subscribe("subscribe", ["--project", "p", "--agent", "a", "--session", "sessB"], { env: env(url) }); + se.mockRestore(); + const req = await received; + expect(code).toBe(0); + expect(req.path).toBe("/w/ws1/subscribe"); + expect(req.auth).toBe("Bearer tok"); + expect(JSON.parse(req.body)).toMatchObject({ project: "p", agent: "a", session: "sessB" }); + expect(errs.join("")).toContain("took over from session sessA"); + }); + + it("POSTs /unsubscribe and notes a non-owner no-op via the header", async () => { + const { url, received } = await stub(undefined, 204, { "x-termchart-not-owner": "1" }); + const errs: string[] = []; + const se = vi.spyOn(process.stderr, "write").mockImplementation((s) => (errs.push(String(s)), true)); + const code = await subscribe("unsubscribe", ["--project", "p", "--agent", "a", "--session", "sessX"], { env: env(url) }); + se.mockRestore(); + expect(code).toBe(0); + expect((await received).path).toBe("/w/ws1/unsubscribe"); + expect(errs.join("")).toContain("not the owner"); + }); + + it("requires URL+token (no-viewer) and project/agent/session (exit 3)", async () => { + expect(await subscribe("subscribe", ["--project", "p", "--agent", "a", "--session", "s"], { env: {} })).toBe(EXIT_NO_VIEWER); + const { url } = await stub({ scope: "p/a", owner: "s" }); + const spy = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + expect(await subscribe("subscribe", ["--project", "p", "--agent", "a"], { env: env(url) })).toBe(3); // no session + spy.mockRestore(); + }); +}); + +describe("subscriptions", () => { + it("lists owned boards (open read; token sent if present)", async () => { + const { url, received } = await stub({ subscriptions: [{ scope: "p/a", owner: "sessA", ts: 1 }, { scope: "p/b", owner: "sessB", ts: 2 }] }); + const out: string[] = []; + const so = vi.spyOn(process.stdout, "write").mockImplementation((s) => (out.push(String(s)), true)); + const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + const code = await subscriptions([], { env: env(url) }); + so.mockRestore(); se.mockRestore(); + expect(code).toBe(0); + expect((await received).path).toBe("/w/ws1/subscriptions"); + const printed = out.join(""); + expect(printed).toContain("p/a owned by sessA"); + expect(printed).toContain("p/b owned by sessB"); + }); + + it("--json prints the raw list", async () => { + const { url } = await stub({ subscriptions: [{ scope: "p/a", owner: "sessA", ts: 1 }] }); + const out: string[] = []; + const so = vi.spyOn(process.stdout, "write").mockImplementation((s) => (out.push(String(s)), true)); + const se = vi.spyOn(process.stderr, "write").mockImplementation(() => true); + const code = await subscriptions(["--json"], { env: env(url) }); + so.mockRestore(); se.mockRestore(); + expect(code).toBe(0); + expect(JSON.parse(out.join("")).subscriptions).toHaveLength(1); + }); +}); diff --git a/packages/viewer/src/client/console.ts b/packages/viewer/src/client/console.ts index 6d07039..119dde6 100644 --- a/packages/viewer/src/client/console.ts +++ b/packages/viewer/src/client/console.ts @@ -11,7 +11,7 @@ const wsid = location.pathname.split("/").filter(Boolean)[1] ?? ""; const STORAGE_KEY = "tc-console"; -interface InboxEvent { seq: number; ts: number; kind: "message" | "action"; text?: string; ref?: string; } +interface InboxEvent { seq: number; ts: number; kind: "message" | "action" | "reply"; text?: string; ref?: string; } interface SuggestItem { id: string; label: string; } let enabled = false; @@ -52,7 +52,7 @@ function escTextNode(parent: HTMLElement, cls: string, text: string): HTMLElemen /** Append a console row (message or action) to the log and scroll to it. Returns the row so the * caller can attach a delivery-status line (see deliver()). */ -function appendRow(kind: "message" | "action" | "sent", text: string): HTMLElement { +function appendRow(kind: "message" | "action" | "sent" | "reply", text: string): HTMLElement { if (!logEl) return document.createElement("div"); const empty = logEl.querySelector(".tc-console-empty"); if (empty) empty.remove(); @@ -229,7 +229,8 @@ function drawEvent(e: InboxEvent): void { const s = sig(e.kind, e.kind === "message" ? (e.text ?? "") : (e.ref ?? "")); const idx = pendingSelf.indexOf(s); if (idx >= 0) { pendingSelf.splice(idx, 1); return; } - if (e.kind === "message") appendRow("message", e.text ?? ""); + if (e.kind === "reply") appendRow("reply", `🤖 ${e.text ?? ""}`); // agent → human reply bubble + else if (e.kind === "message") appendRow("message", e.text ?? ""); else appendRow("action", `→ ${labelForRef(e.ref ?? "")}`); } diff --git a/packages/viewer/src/client/style.css b/packages/viewer/src/client/style.css index 6d65b17..594d6ae 100644 --- a/packages/viewer/src/client/style.css +++ b/packages/viewer/src/client/style.css @@ -543,6 +543,9 @@ body.history-open #tc-history { display: flex; } .tc-console-row--sent { align-self: flex-end; background: var(--focus); color: #fff; border-color: transparent; } .tc-console-row--message { align-self: flex-end; background: var(--focus); color: #fff; border-color: transparent; } .tc-console-row--action { align-self: flex-start; color: var(--muted); font-size: 12px; } +/* Agent → human reply: left-aligned (the agent's side), accent-tinted so it reads as a distinct + voice in the thread vs the human's right-aligned blue bubbles. */ +.tc-console-row--reply { align-self: flex-start; background: var(--surface-2); border-color: var(--focus); border-left-width: 3px; } /* Delivery status under a sent message (sending / sent / failed) — the feedback that the message went through and what you're now waiting on. */ .tc-console-status { display: block; margin-top: 3px; font-size: 10.5px; line-height: 1.3; opacity: 0.9; } diff --git a/packages/viewer/src/client/viewer.ts b/packages/viewer/src/client/viewer.ts index f87d4e9..7376bee 100644 --- a/packages/viewer/src/client/viewer.ts +++ b/packages/viewer/src/client/viewer.ts @@ -509,7 +509,7 @@ function connect() { // EXPERIMENTAL console (issue #108): a human → agent inbox echo, and agent → human suggestions. // The console module ignores these unless its panel is open for the matching scope. es.addEventListener("inbox", (e) => { - const d = JSON.parse((e as MessageEvent).data) as { scope: string; event: { seq: number; ts: number; kind: "message" | "action"; text?: string; ref?: string } }; + const d = JSON.parse((e as MessageEvent).data) as { scope: string; event: { seq: number; ts: number; kind: "message" | "action" | "reply"; text?: string; ref?: string } }; onInboxEvent(d.scope, d.event); }); // The agent read the inbox up to `cursor` — flip the human's sent-message receipts to "read". diff --git a/packages/viewer/src/server.ts b/packages/viewer/src/server.ts index 94501c4..e7fb790 100644 --- a/packages/viewer/src/server.ts +++ b/packages/viewer/src/server.ts @@ -2,7 +2,7 @@ import { createServer, type IncomingMessage, type ServerResponse } from "node:ht import { readFileSync } from "node:fs"; import { gzipSync, brotliCompressSync, constants as zlibConstants } from "node:zlib"; import { join, normalize, sep } from "node:path"; -import { Store, StatusStore, InboxStore, type Payload, type StatusUpdate, type PersistenceBackend, type SuggestItem } from "./state.js"; +import { Store, StatusStore, InboxStore, SubscriptionStore, type Payload, type StatusUpdate, type PersistenceBackend, type SuggestItem } from "./state.js"; import { seedDemo, isReadOnlyWsid } from "./seed.js"; import { SSEHub, type SSEMessage } from "./sse.js"; import { ShareStore, SHARES_WSID } from "./share-store.js"; @@ -23,6 +23,7 @@ export interface ServerOpts { store?: Store; statusStore?: StatusStore; inboxStore?: InboxStore; + subscriptionStore?: SubscriptionStore; hub?: SSEHub; persistence?: PersistenceBackend; // optional durable backing store; unset ⇒ in-memory only } @@ -34,6 +35,7 @@ const MAX_INBOX_REF = 200; // an action's chip id const MAX_SUGGEST_ITEMS = 12; // chips an agent can push at once const MAX_SUGGEST_LABEL = 120; // a chip's label const MAX_INBOX_WAIT = 25000; // long-poll cap (ms) +const MAX_SESSION = 200; // a subscription session id; longer ⇒ 400 const CONTENT_TYPES: Record = { ".html": "text/html; charset=utf-8", @@ -156,6 +158,7 @@ export function createViewerServer(opts: ServerOpts) { seedDemo(store); // populate the read-only /w/demo/ showcase with full-scope fixtures const statusStore = opts.statusStore ?? new StatusStore(); const inboxStore = opts.inboxStore ?? new InboxStore(); + const subscriptionStore = opts.subscriptionStore ?? new SubscriptionStore(); const hub = opts.hub ?? new SSEHub(); const shareStore = new ShareStore(store); const templateStore = new TemplateStore(store); @@ -414,6 +417,7 @@ export function createViewerServer(opts: ServerOpts) { store.clear(wsid); statusStore.clearAll(wsid); inboxStore.clearAll(wsid); // console conversation is scoped to the views; clearing all drops it too + subscriptionStore.clearAll(wsid); // and any board subscriptions for those views historyStore.clear(wsid); // clear-all is a deliberate nuke — drop the action history too await persistRemove(wsid); // clear-all deletes the object outright; no hydrate needed await persistHistoryRemove(wsid); @@ -426,6 +430,7 @@ export function createViewerServer(opts: ServerOpts) { const existed = store.delete(wsid, scope); statusStore.clearScope(wsid, scope); inboxStore.clearScope(wsid, scope); + subscriptionStore.clearScope(wsid, scope); await persistSave(wsid); // Keep the board's history (record a `clear` tombstone) so a cleared/replaced board can still // be rewound + restored — that's the whole point of this feature. @@ -648,6 +653,58 @@ export function createViewerServer(opts: ServerOpts) { return send(res, 204); } + // EXPERIMENTAL agent↔human console — agent → human REPLY (issue #108). Token-gated (the agent is + // posting), like suggest/push. Appends a `reply` event to the SAME per-scope ring as human + // messages so the console renders one chronological thread, and broadcasts an `inbox` event so + // every open console shows it live ("so I can see what it's doing"). Replies never count toward + // the agent's own unread (see InboxStore.unread), so this can't nudge the poster. + if (req.method === "POST" && action === "reply") { + if (!authed(req)) return send(res, 401, "unauthorized"); + const body = await readJson(req).catch(() => null); + const b = (typeof body === "object" && body ? body : {}) as Record; + if (typeof b.project !== "string" || typeof b.agent !== "string" || !b.project || !b.agent) + return send(res, 400, "reply needs { project, agent, text }"); + const text = typeof b.text === "string" ? b.text : ""; + if (!text) return send(res, 400, "reply needs non-empty text"); + if (text.length > MAX_INBOX_TEXT) return send(res, 400, `reply text exceeds ${MAX_INBOX_TEXT} chars`); + const scope = `${b.project}/${b.agent}`; + const event = inboxStore.append(wsid, scope, { kind: "reply", text }); + fanout(wsid, { event: "inbox", data: { scope, event } }); + return send(res, 200, JSON.stringify(event), "application/json"); + } + + // EXPERIMENTAL board subscriptions (issue #108) — track one owner per scope so two watching + // sessions can coordinate and not both act on the same board. Token-gated (an agent + // claims/releases). TAKEOVER model: a fresh subscribe always wins and evicts the prior owner; + // the response carries `tookOverFrom`. ADVISORY only — the server does NOT gate writes on + // ownership; non-overlap relies on each session honoring it (see SubscriptionStore). The + // `subscribe` SSE broadcast is a forward hook for a live "evicted" listener; nothing consumes it yet. + if (req.method === "POST" && (action === "subscribe" || action === "unsubscribe")) { + if (!authed(req)) return send(res, 401, "unauthorized"); + const body = await readJson(req).catch(() => null); + const b = (typeof body === "object" && body ? body : {}) as Record; + if (typeof b.project !== "string" || typeof b.agent !== "string" || !b.project || !b.agent || typeof b.session !== "string" || !b.session) + return send(res, 400, `${action} needs { project, agent, session }`); + if (b.session.length > MAX_SESSION) return send(res, 400, `session exceeds ${MAX_SESSION} chars`); + const scope = `${b.project}/${b.agent}`; + if (action === "subscribe") { + const { sub, previousOwner } = subscriptionStore.subscribe(wsid, scope, b.session); + fanout(wsid, { event: "subscribe", data: { scope, owner: sub.owner, previousOwner } }); + return send(res, 200, JSON.stringify({ scope, owner: sub.owner, tookOverFrom: previousOwner }), "application/json"); + } + const released = subscriptionStore.unsubscribe(wsid, scope, b.session); + if (released) fanout(wsid, { event: "subscribe", data: { scope, owner: null, previousOwner: b.session } }); + // 204 even if not the owner (idempotent); signal "wasn't yours" via a header. + if (!released) res.setHeader("x-termchart-not-owner", "1"); + return send(res, 204); + } + + // Current board subscriptions for the workspace (open read, like list/state) — so a session can + // discover which boards are already owned before claiming one. + if (req.method === "GET" && action === "subscriptions") { + return send(res, 200, JSON.stringify({ subscriptions: subscriptionStore.list(wsid) }), "application/json"); + } + if (req.method === "POST" && (action === "push" || action === "focus")) { if (!authed(req)) return send(res, 401, "unauthorized"); const body = await readJson(req).catch(() => null); @@ -871,5 +928,5 @@ export function createViewerServer(opts: ServerOpts) { const server = createServer((req, res) => { handle(req, res).catch((e) => send(res, 500, String(e))); }); - return { server, store, statusStore, inboxStore, hub, shareStore, shareHub, historyStore }; + return { server, store, statusStore, inboxStore, subscriptionStore, hub, shareStore, shareHub, historyStore }; } diff --git a/packages/viewer/src/state.ts b/packages/viewer/src/state.ts index 730ff8c..2e8fb5b 100644 --- a/packages/viewer/src/state.ts +++ b/packages/viewer/src/state.ts @@ -134,15 +134,16 @@ export class StatusStore { } /** - * A single human → agent console event. Transient conversation, NOT a stored view: a `message` - * the human typed, or an `action` (a click on an agent-pushed suggestion chip, carrying the - * chip's `ref`). Each event gets a monotonic `seq` so a polling agent can read incrementally - * with a `since` cursor. + * A single console event. Mostly human → agent — a `message` the human typed, or an `action` (a + * click on an agent-pushed suggestion chip, carrying the chip's `ref`) — but also the agent → human + * `reply` (a message the agent posts back so the human sees what it's doing). All three share the + * per-scope ring + monotonic `seq` so the console renders one chronological thread; only the human + * kinds (message/action) count toward `unread` (the agent must never nudge itself with its own reply). */ export interface InboxEvent { seq: number; ts: number; - kind: "message" | "action"; + kind: "message" | "action" | "reply"; text?: string; ref?: string; } @@ -209,16 +210,18 @@ export class InboxStore { return { events: s.events.filter((e) => e.seq > since), cursor: s.seq, acked: s.acked }; } - /** How many events have arrived since the agent last read this scope's inbox (`seq − acked`). */ + /** Unread HUMAN events (message/action) since the agent last read this scope. Agent `reply`s are + * excluded — the agent's own replies must never nudge it back to "read the inbox". */ unread(wsid: string, scope: string): number { const s = this.ws.get(wsid)?.get(scope); - return s ? Math.max(0, s.seq - s.acked) : 0; + if (!s) return 0; + return s.events.filter((e) => e.seq > s.acked && e.kind !== "reply").length; } /** - * Every scope in this workspace that has unread events, with its unread events (those with - * `seq > acked`), unread count, cursor, and ack. PEEK semantics — does NOT advance any ack — so - * an agent can watch the WHOLE workspace with one call (GET /inbox?all=1) instead of polling each + * Every scope in this workspace with unread HUMAN events, each carrying those events (`seq > acked`, + * `reply`s excluded), its count, cursor, and ack. PEEK semantics — does NOT advance any ack — so an + * agent can watch the WHOLE workspace with one call (GET /inbox?all=1) instead of polling each * scope, then do a normal per-scope read to consume + act. Scopes with nothing unread are omitted. */ unreadByScope(wsid: string): { scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[] }[] { @@ -226,8 +229,8 @@ export class InboxStore { if (!scopes) return []; const out: { scope: string; unread: number; cursor: number; acked: number; events: InboxEvent[] }[] = []; for (const [scope, s] of scopes) { - const unread = Math.max(0, s.seq - s.acked); - if (unread > 0) out.push({ scope, unread, cursor: s.seq, acked: s.acked, events: s.events.filter((e) => e.seq > s.acked) }); + const events = s.events.filter((e) => e.seq > s.acked && e.kind !== "reply"); + if (events.length > 0) out.push({ scope, unread: events.length, cursor: s.seq, acked: s.acked, events }); } return out; } @@ -283,6 +286,76 @@ export class InboxStore { } } +/** One board subscription: which session owns the right to act on a scope, and when it claimed it. */ +export interface Subscription { + scope: string; + owner: string; // the subscribing session's id + ts: number; +} + +/** + * EXPERIMENTAL board subscriptions (issue #108). Tracks at most ONE "owner" session per scope so two + * watching sessions can coordinate and not both act on the same board's messages. **Takeover model**: + * a fresh `subscribe` always wins and evicts the previous owner. Release is explicit (`unsubscribe`); + * takeover is what reclaims a crashed session's board. Transient + in-memory, like the console inbox — + * never persisted. + * + * ADVISORY, not enforced: ownership is bookkeeping only — the server does NOT gate writes (reply / + * inbox consume / suggest) on it. Non-overlap holds only if each session honors ownership: a session + * checks `subscriptions` (or the `tookOverFrom` it got back) and stands down when it no longer owns + * the board. A `subscribe` SSE event is broadcast on every claim/release as a hook for a future live + * "you've been evicted" listener; nothing consumes it yet. + */ +export class SubscriptionStore { + // wsid -> scope("project/agent") -> subscription + private ws = new Map>(); + + /** Claim a scope for `session`, evicting any prior owner (takeover). Returns the new subscription + * plus `previousOwner` if a *different* session was evicted (null on a fresh claim or self-renew). */ + subscribe(wsid: string, scope: string, session: string): { sub: Subscription; previousOwner: string | null } { + let scopes = this.ws.get(wsid); + if (!scopes) { + scopes = new Map(); + this.ws.set(wsid, scopes); + } + const prior = scopes.get(scope); + const previousOwner = prior && prior.owner !== session ? prior.owner : null; + const sub: Subscription = { scope, owner: session, ts: Date.now() }; + scopes.set(scope, sub); + return { sub, previousOwner }; + } + + /** Release a scope, but only if `session` is the current owner (so a stale session can't drop a + * board that already got taken over). Returns true if it actually released. */ + unsubscribe(wsid: string, scope: string, session: string): boolean { + const scopes = this.ws.get(wsid); + const cur = scopes?.get(scope); + if (!cur || cur.owner !== session) return false; + scopes!.delete(scope); + return true; + } + + /** The session currently owning a scope, or null. */ + owner(wsid: string, scope: string): string | null { + return this.ws.get(wsid)?.get(scope)?.owner ?? null; + } + + /** All current subscriptions in a workspace (for discovery — which boards are already owned). */ + list(wsid: string): Subscription[] { + return [...(this.ws.get(wsid)?.values() ?? [])]; + } + + /** Drop the subscription for one scope. */ + clearScope(wsid: string, scope: string): void { + this.ws.get(wsid)?.delete(scope); + } + + /** Drop all subscriptions for a workspace. */ + clearAll(wsid: string): void { + this.ws.delete(wsid); + } +} + /** In-memory latest-per-scope store, keyed by workspace id then `project/agent`. */ export class Store { private ws = new Map>(); diff --git a/packages/viewer/test/server.test.ts b/packages/viewer/test/server.test.ts index 4eaa7d8..dc43d5c 100644 --- a/packages/viewer/test/server.test.ts +++ b/packages/viewer/test/server.test.ts @@ -80,6 +80,14 @@ const suggestReq = (body: object, token = TOKEN) => body: JSON.stringify(body), }); +// reply / subscribe / unsubscribe are token-gated (the agent posts/claims). Default to valid token. +const postAuthed = (action: string, body: object, token = TOKEN) => + fetch(`${base}/w/ws1/${action}`, { + method: "POST", + headers: { "content-type": "application/json", authorization: `Bearer ${token}` }, + body: JSON.stringify(body), + }); + // A component view with an editable Checklist plus a non-checklist node (an injection-bait id). const checklistTree = JSON.stringify({ type: "Stack", @@ -760,6 +768,101 @@ describe("viewer server", () => { expect((got.body as { events: { kind: string; ref: string }[] }).events[0]).toMatchObject({ kind: "action", ref: "rerun" }); }); }); + + describe("console: reply (agent → human, token-gated)", () => { + it("appends a reply into the same thread (200 + event), readable back, and does NOT nudge the agent", async () => { + await inboxReq({ project: "p", agent: "a", kind: "message", text: "human asks" }); + const r = await postAuthed("reply", { project: "p", agent: "a", text: "on it — investigating" }); + expect(r.status).toBe(200); + expect(await r.json()).toMatchObject({ kind: "reply", text: "on it — investigating" }); + + // Both events live in one chronological thread. + const got = await getJson("inbox?project=p&agent=a&peek=1"); + const kinds = (got.body as { events: { kind: string }[] }).events.map((e) => e.kind); + expect(kinds).toEqual(["message", "reply"]); + + // The reply must not count as unread for the agent → a push reports only the 1 human message. + const p = await push({ project: "p", agent: "a", type: "mermaid", content: "graph LR\n A-->B" }); + expect(p.headers.get("x-termchart-inbox-unread")).toBe("1"); + }); + + it("a reply is excluded from the ?all=1 aggregate (only human messages surface)", async () => { + await postAuthed("reply", { project: "p", agent: "solo", text: "fyi" }); // reply with no human msg + const got = await getJson("inbox?all=1"); + expect((got.body as { scopes: unknown[] }).scopes).toEqual([]); // nothing unread to handle + }); + + it("rejects without a token (401) and 400s an empty reply", async () => { + expect((await postAuthed("reply", { project: "p", agent: "a", text: "x" }, "wrong")).status).toBe(401); + expect((await postAuthed("reply", { project: "p", agent: "a", text: "" })).status).toBe(400); + expect((await postAuthed("reply", { project: "p", agent: "a" })).status).toBe(400); + }); + + it("broadcasts an `inbox` SSE event so open consoles show the reply live", async () => { + const ctrl = new AbortController(); + const res = await fetch(`${base}/w/ws1/events`, { signal: ctrl.signal }); + const reader = res.body!.getReader(); + const dec = new TextDecoder(); + let buf = ""; + while (!buf.includes("event: snapshot")) buf += dec.decode((await reader.read()).value); + await postAuthed("reply", { project: "p", agent: "a", text: "live reply" }); + let live = ""; + while (!live.includes("event: inbox")) live += dec.decode((await reader.read()).value); + expect(live).toContain("live reply"); + expect(live).toContain('"kind":"reply"'); + ctrl.abort(); + }); + }); + + describe("board subscriptions (takeover, token-gated)", () => { + it("subscribe claims a board; a second session takes over and is told who it evicted", async () => { + const r1 = await postAuthed("subscribe", { project: "p", agent: "a", session: "sessA" }); + expect(r1.status).toBe(200); + expect(await r1.json()).toMatchObject({ scope: "p/a", owner: "sessA", tookOverFrom: null }); + + const r2 = await postAuthed("subscribe", { project: "p", agent: "a", session: "sessB" }); + expect(await r2.json()).toMatchObject({ scope: "p/a", owner: "sessB", tookOverFrom: "sessA" }); + + const list = await getJson("subscriptions"); + expect((list.body as { subscriptions: { scope: string; owner: string }[] }).subscriptions) + .toEqual([{ scope: "p/a", owner: "sessB", ts: expect.any(Number) }]); + }); + + it("unsubscribe only releases for the owner (header flags a non-owner no-op)", async () => { + await postAuthed("subscribe", { project: "p", agent: "a", session: "sessA" }); + const notOwner = await postAuthed("unsubscribe", { project: "p", agent: "a", session: "sessB" }); + expect(notOwner.status).toBe(204); + expect(notOwner.headers.get("x-termchart-not-owner")).toBe("1"); + expect((await getJson("subscriptions") as { body: { subscriptions: unknown[] } }).body.subscriptions).toHaveLength(1); + + const owner = await postAuthed("unsubscribe", { project: "p", agent: "a", session: "sessA" }); + expect(owner.status).toBe(204); + expect(owner.headers.get("x-termchart-not-owner")).toBeNull(); + expect((await getJson("subscriptions") as { body: { subscriptions: unknown[] } }).body.subscriptions).toEqual([]); + }); + + it("rejects subscribe without a token (401), a missing session (400), and an oversized session (400)", async () => { + expect((await postAuthed("subscribe", { project: "p", agent: "a", session: "s" }, "wrong")).status).toBe(401); + expect((await postAuthed("subscribe", { project: "p", agent: "a" })).status).toBe(400); + expect((await postAuthed("subscribe", { project: "p", agent: "a", session: "x".repeat(201) })).status).toBe(400); // session cap + }); + + it("broadcasts a `subscribe` SSE event (with the evicted owner) so a session can stand down", async () => { + await postAuthed("subscribe", { project: "p", agent: "a", session: "sessA" }); + const ctrl = new AbortController(); + const res = await fetch(`${base}/w/ws1/events`, { signal: ctrl.signal }); + const reader = res.body!.getReader(); + const dec = new TextDecoder(); + let buf = ""; + while (!buf.includes("event: snapshot")) buf += dec.decode((await reader.read()).value); + await postAuthed("subscribe", { project: "p", agent: "a", session: "sessB" }); + let live = ""; + while (!live.includes("event: subscribe")) live += dec.decode((await reader.read()).value); + expect(live).toContain('"owner":"sessB"'); + expect(live).toContain('"previousOwner":"sessA"'); + ctrl.abort(); + }); + }); }); describe("static asset caching", () => { diff --git a/packages/viewer/test/state.test.ts b/packages/viewer/test/state.test.ts index 63bc457..a75e820 100644 --- a/packages/viewer/test/state.test.ts +++ b/packages/viewer/test/state.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { Store, StatusStore, InboxStore, scopeKey, sortBoards } from "../src/state.js"; +import { Store, StatusStore, InboxStore, SubscriptionStore, scopeKey, sortBoards } from "../src/state.js"; describe("Store", () => { it("keys scopes by project/agent", () => { @@ -226,4 +226,63 @@ describe("InboxStore", () => { s.clearAll("w1"); expect(s.read("w1", "p/b").events).toEqual([]); }); + + it("agent `reply` events never count toward the agent's unread (nor the aggregate)", () => { + const s = new InboxStore(); + s.append("w1", "p/a", { kind: "message", text: "human" }); // seq 1 — unread + s.append("w1", "p/a", { kind: "reply", text: "agent reply" }); // seq 2 — must NOT count + expect(s.unread("w1", "p/a")).toBe(1); + const agg = s.unreadByScope("w1"); + expect(agg).toHaveLength(1); + expect(agg[0].unread).toBe(1); + expect(agg[0].events.map((e) => e.kind)).toEqual(["message"]); // reply excluded from the payload + // After the human msg is read, a further reply still doesn't resurface the scope as unread. + s.markReadTo("w1", "p/a", 1); + s.append("w1", "p/a", { kind: "reply", text: "another reply" }); // seq 3 + expect(s.unread("w1", "p/a")).toBe(0); + expect(s.unreadByScope("w1")).toEqual([]); + }); +}); + +describe("SubscriptionStore", () => { + it("subscribe claims a scope; a fresh subscribe by another session takes over (reports previous owner)", () => { + const s = new SubscriptionStore(); + const first = s.subscribe("w1", "p/a", "sessA"); + expect(first).toMatchObject({ sub: { scope: "p/a", owner: "sessA" }, previousOwner: null }); + expect(s.owner("w1", "p/a")).toBe("sessA"); + + const second = s.subscribe("w1", "p/a", "sessB"); // takeover + expect(second.previousOwner).toBe("sessA"); + expect(second.sub.owner).toBe("sessB"); + expect(s.owner("w1", "p/a")).toBe("sessB"); + }); + + it("re-subscribe by the SAME session is a renew, not a takeover (previousOwner null)", () => { + const s = new SubscriptionStore(); + s.subscribe("w1", "p/a", "sessA"); + expect(s.subscribe("w1", "p/a", "sessA").previousOwner).toBeNull(); + }); + + it("unsubscribe only releases if you still own it", () => { + const s = new SubscriptionStore(); + s.subscribe("w1", "p/a", "sessA"); + expect(s.unsubscribe("w1", "p/a", "sessB")).toBe(false); // not the owner → no-op + expect(s.owner("w1", "p/a")).toBe("sessA"); + expect(s.unsubscribe("w1", "p/a", "sessA")).toBe(true); + expect(s.owner("w1", "p/a")).toBeNull(); + }); + + it("list reports owned boards; clearScope/clearAll drop them; workspaces isolated", () => { + const s = new SubscriptionStore(); + s.subscribe("w1", "p/a", "sessA"); + s.subscribe("w1", "p/b", "sessB"); + s.subscribe("w2", "p/a", "sessC"); // other workspace + expect(s.list("w1").map((x) => `${x.scope}:${x.owner}`).sort()).toEqual(["p/a:sessA", "p/b:sessB"]); + s.clearScope("w1", "p/a"); + expect(s.owner("w1", "p/a")).toBeNull(); + expect(s.owner("w1", "p/b")).toBe("sessB"); + s.clearAll("w1"); + expect(s.list("w1")).toEqual([]); + expect(s.owner("w2", "p/a")).toBe("sessC"); // untouched + }); });