diff --git a/container/agent-runner/src/formatter.test.ts b/container/agent-runner/src/formatter.test.ts index 9121366f8a2..9fecc730f63 100644 --- a/container/agent-runner/src/formatter.test.ts +++ b/container/agent-runner/src/formatter.test.ts @@ -165,6 +165,115 @@ describe('XML escaping', () => { }); }); +describe('attachments rendering', () => { + it('renders a plain attachment with localPath', () => { + insertMessage('m1', 'chat', { + sender: 'Alice', + text: 'here is a doc', + attachments: [{ type: 'document', name: 'spec.pdf', localPath: 'inbox/m1/spec.pdf' }], + }); + const result = formatMessages(getPendingMessages()); + expect(result).toContain('[document: spec.pdf — saved to /workspace/inbox/m1/spec.pdf]'); + }); + + it('renders inline transcription when a voice attachment has it', () => { + insertMessage('m1', 'chat', { + sender: 'Alice', + text: '', + attachments: [ + { + type: 'voice', + name: 'voice.ogg', + localPath: 'inbox/m1/voice.ogg', + mimeType: 'audio/ogg', + transcription: 'Hello, can you check the deploy?', + }, + ], + }); + const result = formatMessages(getPendingMessages()); + expect(result).toContain('[voice: voice.ogg'); + expect(result).toContain('Transcription: Hello, can you check the deploy?'); + }); + + it('renders transcription error message when whisper failed', () => { + insertMessage('m1', 'chat', { + sender: 'Alice', + text: '', + attachments: [ + { + type: 'voice', + name: 'voice.ogg', + mimeType: 'audio/ogg', + localPath: 'inbox/m1/voice.ogg', + transcriptionError: 'OPENAI_API_KEY not set', + }, + ], + }); + const result = formatMessages(getPendingMessages()); + expect(result).toContain('Transcription failed: OPENAI_API_KEY not set'); + }); + + it('renders extracted PDF text inside a CDATA-wrapped block', () => { + insertMessage('m1', 'chat', { + sender: 'Alice', + text: 'see the PDF', + attachments: [ + { + type: 'document', + name: 'spec.pdf', + mimeType: 'application/pdf', + localPath: 'inbox/m1/spec.pdf', + extractedText: 'Chapter 1\nIntroduction\n\nThis document describes the system.', + }, + ], + }); + const result = formatMessages(getPendingMessages()); + expect(result).toContain('[pdf: spec.pdf'); + expect(result).toContain(''); + }); + + it('escapes embedded "]]>" sequences in PDF text so CDATA stays well-formed', () => { + insertMessage('m1', 'chat', { + sender: 'Alice', + text: '', + attachments: [ + { + type: 'document', + name: 'evil.pdf', + mimeType: 'application/pdf', + localPath: 'inbox/m1/evil.pdf', + extractedText: 'before ]]> after', + }, + ], + }); + const result = formatMessages(getPendingMessages()); + // The literal "]]>" inside the CDATA body must be neutralised; the + // closing CDATA terminator at the end of the block is still present. + expect(result).toContain('before ]]> after'); + expect(result).toContain(']]>'); + }); + + it('renders PDF extraction error inline when extraction failed', () => { + insertMessage('m1', 'chat', { + sender: 'Alice', + text: '', + attachments: [ + { + type: 'document', + name: 'spec.pdf', + mimeType: 'application/pdf', + localPath: 'inbox/m1/spec.pdf', + pdfExtractionError: 'pdftotext not installed', + }, + ], + }); + const result = formatMessages(getPendingMessages()); + expect(result).toContain('PDF extraction failed: pdftotext not installed'); + }); +}); + describe('stripInternalTags', () => { it('strips single-line internal tags and trims', () => { expect(stripInternalTags('hello secret world')).toBe('hello world'); diff --git a/container/agent-runner/src/formatter.ts b/container/agent-runner/src/formatter.ts index 590875c9e3f..e46adace499 100644 --- a/container/agent-runner/src/formatter.ts +++ b/container/agent-runner/src/formatter.ts @@ -247,6 +247,36 @@ function formatAttachments(attachments: any[] | undefined): string { const type = a.type || 'file'; const localPath = a.localPath ? `/workspace/${a.localPath}` : ''; const url = a.url || ''; + + // Voice attachments: prefer the host-preprocessed Whisper transcription + // when present. Renders inline so the agent reads the text directly + // rather than having to Read+decode the audio file (no whisper in the + // container). On transcription failure, `a.transcriptionError` carries + // the reason so the agent can surface it instead of staying silent. + if (typeof a.transcription === 'string' && a.transcription.length > 0) { + const head = `[voice: ${escapeXml(name)}` + (localPath ? ` — saved to ${escapeXml(localPath)}]` : `]`); + return `${head}\nTranscription: ${escapeXml(a.transcription)}`; + } + if (typeof a.transcriptionError === 'string' && a.transcriptionError.length > 0) { + const head = `[voice: ${escapeXml(name)}` + (localPath ? ` — saved to ${escapeXml(localPath)}]` : `]`); + return `${head}\nTranscription failed: ${escapeXml(a.transcriptionError)}`; + } + + // PDF attachments: prefer the host-preprocessed extracted text when + // present (host runs `pdftotext` on the spilled file). Keeps the path + // available too so the agent can fetch raw bytes if it needs to. + if (typeof a.extractedText === 'string' && a.extractedText.length > 0) { + const head = `[pdf: ${escapeXml(name)}` + (localPath ? ` — saved to ${escapeXml(localPath)}]` : `]`); + // Don't escapeXml the body — agents read it as the literal text; + // host already constrained it to UTF-8 text from pdftotext. + const body = a.extractedText.replace(/]]>/g, ']]>'); + return `${head}\n`; + } + if (typeof a.pdfExtractionError === 'string' && a.pdfExtractionError.length > 0) { + const head = `[pdf: ${escapeXml(name)}` + (localPath ? ` — saved to ${escapeXml(localPath)}]` : `]`); + return `${head}\nPDF extraction failed: ${escapeXml(a.pdfExtractionError)}`; + } + if (localPath) { return `[${type}: ${escapeXml(name)} — saved to ${escapeXml(localPath)}]`; } diff --git a/container/agent-runner/src/mcp-tools/core.test.ts b/container/agent-runner/src/mcp-tools/core.test.ts index 4cef950fb9f..f197702e2b7 100644 --- a/container/agent-runner/src/mcp-tools/core.test.ts +++ b/container/agent-runner/src/mcp-tools/core.test.ts @@ -10,7 +10,7 @@ import { describe, it, expect, beforeEach, afterEach } from 'bun:test'; import { initTestSessionDb, closeSessionDb, getInboundDb } from '../db/connection.js'; import { getUndeliveredMessages } from '../db/messages-out.js'; import { setCurrentInReplyTo, clearCurrentInReplyTo } from '../current-batch.js'; -import { sendMessage } from './core.js'; +import { queryReactions, sendMessage } from './core.js'; beforeEach(() => { initTestSessionDb(); @@ -48,3 +48,119 @@ describe('send_message MCP tool — in_reply_to plumbing', () => { expect(out[0].in_reply_to).toBeNull(); }); }); + +function insertReactionRow( + id: string, + timestamp: string, + reaction: { emoji: string; rawEmoji: string; added: boolean; targetMessageId: string; userId: string }, + sender: string = 'John', +): void { + const content = JSON.stringify({ + text: `[${sender} reacted ${reaction.emoji} on message ${reaction.targetMessageId}]`, + sender, + senderId: reaction.userId, + reaction, + }); + getInboundDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, content) + VALUES (?, 'chat-sdk', ?, 'pending', ?)`, + ) + .run(id, timestamp, content); +} + +describe('query_reactions MCP tool', () => { + it('returns "no reactions" when the session has none', async () => { + const result = await queryReactions.handler({}); + expect(result.content[0].type).toBe('text'); + expect(result.content[0].text).toContain('No reactions'); + }); + + it('lists all reactions in the session when no filter is given', async () => { + insertReactionRow('rxn-1', '2026-05-22T10:00:00Z', { + emoji: '👍', + rawEmoji: '+1', + added: true, + targetMessageId: 'ts-1', + userId: 'U1', + }); + insertReactionRow('rxn-2', '2026-05-22T11:00:00Z', { + emoji: '❤️', + rawEmoji: 'heart', + added: true, + targetMessageId: 'ts-2', + userId: 'U2', + }); + const result = await queryReactions.handler({}); + const text = result.content[0].text as string; + expect(text).toContain('"count": 2'); + expect(text).toContain('"emoji": "👍"'); + expect(text).toContain('"emoji": "❤️"'); + }); + + it('filters by target_message_id', async () => { + insertReactionRow('rxn-1', '2026-05-22T10:00:00Z', { + emoji: '👍', + rawEmoji: '+1', + added: true, + targetMessageId: 'ts-A', + userId: 'U1', + }); + insertReactionRow('rxn-2', '2026-05-22T11:00:00Z', { + emoji: '❤️', + rawEmoji: 'heart', + added: true, + targetMessageId: 'ts-B', + userId: 'U2', + }); + const result = await queryReactions.handler({ target_message_id: 'ts-A' }); + const text = result.content[0].text as string; + expect(text).toContain('"count": 1'); + expect(text).toContain('ts-A'); + expect(text).not.toContain('ts-B'); + }); + + it('preserves added=false (reaction removals)', async () => { + insertReactionRow('rxn-1', '2026-05-22T10:00:00Z', { + emoji: '👍', + rawEmoji: '+1', + added: false, + targetMessageId: 'ts-1', + userId: 'U1', + }); + const result = await queryReactions.handler({}); + const text = result.content[0].text as string; + expect(text).toContain('"added": false'); + }); + + it('honors limit (orders newest-first)', async () => { + for (let i = 0; i < 5; i++) { + const ts = `2026-05-22T10:0${i}:00Z`; + insertReactionRow(`rxn-${i}`, ts, { + emoji: '⭐', + rawEmoji: 'star', + added: true, + targetMessageId: `ts-${i}`, + userId: 'U1', + }); + } + const result = await queryReactions.handler({ limit: 2 }); + const text = result.content[0].text as string; + expect(text).toContain('"count": 2'); + // newest first: ts-4 and ts-3 + expect(text).toContain('ts-4'); + expect(text).toContain('ts-3'); + expect(text).not.toContain('ts-0'); + }); + + it('ignores non-reaction chat-sdk rows', async () => { + getInboundDb() + .prepare( + `INSERT INTO messages_in (id, kind, timestamp, status, content) + VALUES (?, 'chat-sdk', ?, 'pending', ?)`, + ) + .run('plain-1', '2026-05-22T09:00:00Z', JSON.stringify({ text: 'hi', sender: 'Alice' })); + const result = await queryReactions.handler({}); + expect(result.content[0].text).toContain('No reactions'); + }); +}); diff --git a/container/agent-runner/src/mcp-tools/core.ts b/container/agent-runner/src/mcp-tools/core.ts index 48f87d596e3..a6d8b5c30eb 100644 --- a/container/agent-runner/src/mcp-tools/core.ts +++ b/container/agent-runner/src/mcp-tools/core.ts @@ -11,6 +11,7 @@ import path from 'path'; import { getCurrentInReplyTo } from '../current-batch.js'; import { findByName, getAllDestinations } from '../destinations.js'; +import { getInboundDb } from '../db/connection.js'; import { getMessageIdBySeq, getRoutingBySeq, writeMessageOut } from '../db/messages-out.js'; import { getSessionRouting } from '../db/session-routing.js'; import { registerTools } from './server.js'; @@ -260,4 +261,109 @@ export const addReaction: McpToolDefinition = { }, }; -registerTools([sendMessage, sendFile, editMessage, addReaction]); +/** + * Read reactions out of the session's inbound DB. Reactions arrive as + * chat-sdk rows synthesised by `chat-sdk-bridge.buildReactionInbound`; we + * filter by inspecting `content.reaction.*`. The query is per-session by + * construction (only reactions on messages in this session's thread are + * routed here by the host), so there's no cross-session leakage. + * + * Optional filter: `target_message_id` returns only reactions on that + * specific platform message id (e.g. the agent's own reply ts on Slack). + * Without it, returns the most recent N reactions across the session. + * + * Each row's `added` field is preserved verbatim so callers can recover + * the live emoji set (an add followed by a remove for the same emoji is + * a net-zero). Callers requiring "currently set" semantics should fold + * the list themselves; the SDK can't infer it without re-fetching state + * from the platform. + */ +export const queryReactions: McpToolDefinition = { + tool: { + name: 'query_reactions', + description: + 'List reactions the bridge has captured in this session. Optionally filter by `target_message_id` to inspect a single message.', + inputSchema: { + type: 'object' as const, + properties: { + target_message_id: { + type: 'string', + description: 'Platform message id (e.g. a Slack `ts`) to filter on. Omit to list across the session.', + }, + limit: { + type: 'integer', + description: 'Max rows to return (default 50, max 500).', + }, + }, + }, + }, + async handler(args) { + const targetMessageId = typeof args.target_message_id === 'string' ? args.target_message_id : undefined; + const limitArg = typeof args.limit === 'number' ? args.limit : 50; + const limit = Math.max(1, Math.min(500, Math.floor(limitArg))); + + let rows: Array<{ id: string; timestamp: string; content: string }>; + try { + const db = getInboundDb(); + const sql = `SELECT id, timestamp, content + FROM messages_in + WHERE kind = 'chat-sdk' + AND content LIKE '%"reaction":%' + ORDER BY timestamp DESC + LIMIT ?`; + rows = db.prepare(sql).all(limit * 4) as typeof rows; + } catch (e) { + return err(`Failed to read inbound DB: ${e instanceof Error ? e.message : String(e)}`); + } + + interface ReactionEntry { + id: string; + timestamp: string; + emoji: string; + rawEmoji: string; + added: boolean; + targetMessageId: string; + userId: string; + userName: string; + } + const entries: ReactionEntry[] = []; + + for (const r of rows) { + let parsed: Record; + try { + parsed = JSON.parse(r.content) as Record; + } catch { + continue; + } + const reaction = parsed.reaction as + | { emoji?: string; rawEmoji?: string; added?: boolean; targetMessageId?: string; userId?: string } + | undefined; + if (!reaction || typeof reaction !== 'object') continue; + if (typeof reaction.targetMessageId !== 'string') continue; + if (targetMessageId && reaction.targetMessageId !== targetMessageId) continue; + + entries.push({ + id: r.id, + timestamp: r.timestamp, + emoji: String(reaction.emoji ?? ''), + rawEmoji: String(reaction.rawEmoji ?? reaction.emoji ?? ''), + added: reaction.added !== false, + targetMessageId: reaction.targetMessageId, + userId: String(reaction.userId ?? ''), + userName: typeof parsed.sender === 'string' ? parsed.sender : String(reaction.userId ?? ''), + }); + if (entries.length >= limit) break; + } + + if (entries.length === 0) { + return ok( + targetMessageId + ? `No reactions recorded for message ${targetMessageId} in this session.` + : 'No reactions recorded in this session.', + ); + } + return ok(JSON.stringify({ count: entries.length, reactions: entries }, null, 2)); + }, +}; + +registerTools([sendMessage, sendFile, editMessage, addReaction, queryReactions]); diff --git a/container/agent-runner/src/multimodal.test.ts b/container/agent-runner/src/multimodal.test.ts new file mode 100644 index 00000000000..14e0a7768db --- /dev/null +++ b/container/agent-runner/src/multimodal.test.ts @@ -0,0 +1,196 @@ +/** + * Tests for image attachment → content block extraction. + * + * Uses `setWorkspaceRootForTests` to redirect multimodal.ts's filesystem + * reads at a temp directory under os.tmpdir(), so the test works without + * `/workspace` being writable on the host. + */ +import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; +import fs from 'fs'; +import path from 'path'; +import os from 'os'; + +import type { MessageInRow } from './db/messages-in.js'; +import { extractImageBlocks, setWorkspaceRootForTests } from './multimodal.js'; + +let workspaceDir: string; + +beforeEach(() => { + workspaceDir = fs.mkdtempSync(path.join(os.tmpdir(), 'nanoclaw-mm-')); + setWorkspaceRootForTests(workspaceDir); +}); + +afterEach(() => { + setWorkspaceRootForTests(null); + try { + fs.rmSync(workspaceDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } +}); + +function makeMsg(content: object, kind: string = 'chat'): MessageInRow { + return { + id: 'msg-test', + seq: 1, + kind, + timestamp: new Date().toISOString(), + status: 'pending', + process_after: null, + recurrence: null, + tries: 0, + trigger: 1, + platform_id: null, + channel_type: null, + thread_id: null, + content: JSON.stringify(content), + } as MessageInRow; +} + +function writeWorkspaceFile(relPath: string, bytes: Buffer): void { + const abs = path.resolve(workspaceDir, relPath); + fs.mkdirSync(path.dirname(abs), { recursive: true }); + fs.writeFileSync(abs, bytes); +} + +describe('extractImageBlocks', () => { + it('returns [] when messages have no attachments', () => { + const blocks = extractImageBlocks([makeMsg({ text: 'hello' })]); + expect(blocks).toEqual([]); + }); + + it('returns [] when messages have non-image attachments only', () => { + const blocks = extractImageBlocks([ + makeMsg({ + text: 'see attached doc', + attachments: [{ type: 'document', mimeType: 'application/pdf', localPath: 'inbox/m/doc.pdf', name: 'doc.pdf' }], + }), + ]); + expect(blocks).toEqual([]); + }); + + it('skips images without a localPath', () => { + const blocks = extractImageBlocks([ + makeMsg({ + text: 'broken image', + attachments: [{ type: 'image', mimeType: 'image/png', name: 'broken.png' }], + }), + ]); + expect(blocks).toEqual([]); + }); + + it('skips unsupported image mime types', () => { + const blocks = extractImageBlocks([ + makeMsg({ + text: 'fancy format', + attachments: [{ type: 'image', mimeType: 'image/heic', localPath: 'inbox/m/x.heic' }], + }), + ]); + expect(blocks).toEqual([]); + }); + + it('refuses unsafe localPaths that escape the workspace root', () => { + const blocks = extractImageBlocks([ + makeMsg({ + text: 'escape', + attachments: [{ type: 'image', mimeType: 'image/png', localPath: '../../etc/passwd' }], + }), + ]); + expect(blocks).toEqual([]); + }); + + it('refuses absolute localPaths', () => { + const blocks = extractImageBlocks([ + makeMsg({ + text: 'abs', + attachments: [{ type: 'image', mimeType: 'image/png', localPath: '/etc/passwd' }], + }), + ]); + expect(blocks).toEqual([]); + }); + + it('honors skipMultimodal=true even when the image exists', () => { + writeWorkspaceFile('inbox/skip/img.png', Buffer.from([0x89, 0x50, 0x4e, 0x47])); + const blocks = extractImageBlocks([ + makeMsg({ + text: 'opted-out', + attachments: [ + { type: 'image', mimeType: 'image/png', localPath: 'inbox/skip/img.png', skipMultimodal: true, name: 'img.png' }, + ], + }), + ]); + expect(blocks).toEqual([]); + }); + + it('emits a base64 image block when a supported image is present on disk', () => { + const bytes = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a]); + writeWorkspaceFile('inbox/happy/img.png', bytes); + const blocks = extractImageBlocks([ + makeMsg({ + text: 'see image', + attachments: [{ type: 'image', mimeType: 'image/png', localPath: 'inbox/happy/img.png', name: 'img.png' }], + }), + ]); + expect(blocks.length).toBe(1); + const b = blocks[0]; + expect(b.type).toBe('image'); + if (b.type === 'image') { + expect(b.source.type).toBe('base64'); + expect(b.source.media_type).toBe('image/png'); + expect(b.source.data).toBe(bytes.toString('base64')); + } + }); + + it('emits multiple blocks across messages and across attachments per message', () => { + const b1 = Buffer.from([0xff, 0xd8, 0xff, 0xe0]); + const b2 = Buffer.from([0xff, 0xd8, 0xff, 0xe1]); + writeWorkspaceFile('inbox/multi-a/a.jpg', b1); + writeWorkspaceFile('inbox/multi-b/b.jpg', b2); + const blocks = extractImageBlocks([ + makeMsg({ + attachments: [ + { type: 'image', mimeType: 'image/jpeg', localPath: 'inbox/multi-a/a.jpg', name: 'a.jpg' }, + { type: 'image', mimeType: 'image/jpeg', localPath: 'inbox/multi-b/b.jpg', name: 'b.jpg' }, + ], + }), + ]); + expect(blocks.length).toBe(2); + }); + + it('ignores task/webhook/system messages — only chat/chat-sdk count', () => { + writeWorkspaceFile('inbox/task/x.png', Buffer.from([0x89, 0x50, 0x4e, 0x47])); + const blocks = extractImageBlocks([ + makeMsg( + { attachments: [{ type: 'image', mimeType: 'image/png', localPath: 'inbox/task/x.png' }] }, + 'task', + ), + ]); + expect(blocks).toEqual([]); + }); + + it('skips oversize images (over 4MB byte cap)', () => { + // A 5MB image — over the 4MB cap. + const big = Buffer.alloc(5 * 1024 * 1024, 0xff); + writeWorkspaceFile('inbox/big/x.jpg', big); + const blocks = extractImageBlocks([ + makeMsg({ + attachments: [{ type: 'image', mimeType: 'image/jpeg', localPath: 'inbox/big/x.jpg' }], + }), + ]); + expect(blocks).toEqual([]); + }); + + it('continues past a missing file and emits blocks for the rest', () => { + const bytes = Buffer.from([0x89, 0x50, 0x4e, 0x47]); + writeWorkspaceFile('inbox/mix/ok.png', bytes); + const blocks = extractImageBlocks([ + makeMsg({ + attachments: [ + { type: 'image', mimeType: 'image/png', localPath: 'inbox/mix/missing.png' }, + { type: 'image', mimeType: 'image/png', localPath: 'inbox/mix/ok.png' }, + ], + }), + ]); + expect(blocks.length).toBe(1); + }); +}); diff --git a/container/agent-runner/src/multimodal.ts b/container/agent-runner/src/multimodal.ts new file mode 100644 index 00000000000..4b74dfba2db --- /dev/null +++ b/container/agent-runner/src/multimodal.ts @@ -0,0 +1,159 @@ +/** + * Image attachment → content block extraction. + * + * The host's chat-sdk-bridge writes attachment metadata into messages_in + * rows with a `localPath` field (relative to `/workspace/`) and strips the + * base64 `data` field — the actual bytes are spilled to + * `/inbox//` to keep the DB small. Inside + * the container that file is mounted at `/workspace/`. + * + * For multimodal-capable providers (Claude), we load each image attachment's + * bytes back into a base64 content block here. The block is delivered as a + * separate user-turn message after the text prompt — mirrors v1's + * `pushMultimodal` pattern. + * + * Non-image attachments (audio, pdf) are NOT handled here — those are + * host-side preprocessed into inline text (`transcription` for voice, + * extracted `text` for PDFs) by the chat-sdk-bridge, and rendered by the + * formatter alongside the message body. + */ + +import fs from 'fs'; +import path from 'path'; + +import type { MessageInRow } from './db/messages-in.js'; +import type { ContentBlock, ImageContentBlock, ImageMediaType } from './providers/types.js'; + +const DEFAULT_WORKSPACE_ROOT = '/workspace'; + +/** + * Test-only override of the workspace root. Production uses the constant + * `/workspace`; tests inject a tempdir via `setWorkspaceRootForTests`. + */ +let WORKSPACE_ROOT = DEFAULT_WORKSPACE_ROOT; + +export function setWorkspaceRootForTests(root: string | null): void { + WORKSPACE_ROOT = root ?? DEFAULT_WORKSPACE_ROOT; +} + +/** + * Max image bytes to forward as a content block. Anthropic's Messages API + * tops out at 5MB per image; we leave a small buffer for base64 expansion + * and aggregate guard the call site. If an attachment is over the cap, the + * formatter's text rendering still references it via `localPath` so the + * agent can `Read` it as a fallback. + */ +const MAX_IMAGE_BYTES = 4 * 1024 * 1024; + +/** Mime types the Messages API accepts for `type: 'image'`. */ +const SUPPORTED_IMAGE_MIME = new Set([ + 'image/jpeg', + 'image/png', + 'image/gif', + 'image/webp', +]); + +function isSupportedImageMime(mime: string): mime is ImageMediaType { + return (SUPPORTED_IMAGE_MIME as Set).has(mime); +} + +function log(msg: string): void { + console.error(`[multimodal] ${msg}`); +} + +interface AttachmentLike { + type?: string; + mimeType?: string; + localPath?: string; + skipMultimodal?: boolean; + name?: string; +} + +interface ParsedContent { + attachments?: AttachmentLike[]; +} + +function safeParse(json: string): ParsedContent { + try { + return JSON.parse(json) as ParsedContent; + } catch { + return {}; + } +} + +/** + * Resolve a host-relative attachment localPath into an in-container absolute + * path under `/workspace/` and confirm it stays inside the workspace root. + * Returns null if the path tries to escape (e.g. via `..`). + */ +function resolveContainerPath(localPath: string): string | null { + if (!localPath) return null; + if (path.isAbsolute(localPath)) return null; + const abs = path.resolve(WORKSPACE_ROOT, localPath); + const rel = path.relative(WORKSPACE_ROOT, abs); + if (rel.startsWith('..') || path.isAbsolute(rel)) return null; + return abs; +} + +/** + * Read every image attachment out of `messages` and return a content-block + * array suitable for `AgentQuery.pushBlocks`. Skips: non-image types, + * unsupported mime types, files that don't exist on disk, files that exceed + * `MAX_IMAGE_BYTES`, and any attachment whose `skipMultimodal` flag is set + * (host-side opt-out for groups configured with `skipImageMultimodal`). + */ +export function extractImageBlocks(messages: MessageInRow[]): ContentBlock[] { + const blocks: ContentBlock[] = []; + + for (const msg of messages) { + if (msg.kind !== 'chat' && msg.kind !== 'chat-sdk') continue; + const content = safeParse(msg.content); + const atts = content.attachments; + if (!Array.isArray(atts) || atts.length === 0) continue; + + for (const att of atts) { + const mime = (att.mimeType || '').toLowerCase(); + if (!mime.startsWith('image/')) continue; + if (att.skipMultimodal === true) continue; + if (!isSupportedImageMime(mime)) { + log(`Skip unsupported image mime: ${mime} (${att.name || '?'})`); + continue; + } + if (!att.localPath) { + log(`Skip image without localPath: ${att.name || '?'}`); + continue; + } + + const abs = resolveContainerPath(att.localPath); + if (!abs) { + log(`Refused unsafe localPath: ${att.localPath}`); + continue; + } + + let bytes: Buffer; + try { + const stat = fs.statSync(abs); + if (stat.size > MAX_IMAGE_BYTES) { + log(`Skip oversize image (${stat.size}B > ${MAX_IMAGE_BYTES}B): ${att.localPath}`); + continue; + } + bytes = fs.readFileSync(abs); + } catch (err) { + log(`Failed to read image at ${abs}: ${err instanceof Error ? err.message : String(err)}`); + continue; + } + + const block: ImageContentBlock = { + type: 'image', + source: { + type: 'base64', + media_type: mime, + data: bytes.toString('base64'), + }, + }; + blocks.push(block); + } + } + + return blocks; +} diff --git a/container/agent-runner/src/poll-loop.ts b/container/agent-runner/src/poll-loop.ts index 1b7d181a3c0..c792b6a2fe7 100644 --- a/container/agent-runner/src/poll-loop.ts +++ b/container/agent-runner/src/poll-loop.ts @@ -13,6 +13,7 @@ import { stripInternalTags, type RoutingContext, } from './formatter.js'; +import { extractImageBlocks } from './multimodal.js'; import type { AgentProvider, AgentQuery, ProviderEvent } from './providers/types.js'; const POLL_INTERVAL_MS = 1000; @@ -211,6 +212,17 @@ export async function runPollLoop(config: PollLoopConfig): Promise { systemContext: config.systemContext, }); + // Deliver image attachments as multimodal content blocks on a separate + // user turn (Claude SDK accepts a content-block array on the user + // message). Mirrors v1's `pushMultimodal` after the initial text prompt. + if (config.provider.supportsMultimodalContent) { + const initialBlocks = extractImageBlocks(keep); + if (initialBlocks.length > 0) { + log(`Pushing ${initialBlocks.length} initial image block(s) to the active query`); + query.pushBlocks(initialBlocks); + } + } + // Process the query while concurrently polling for new messages const skippedSet = new Set(skipped); const processingIds = ids.filter((id) => !commandIds.includes(id) && !skippedSet.has(id)); @@ -218,7 +230,13 @@ export async function runPollLoop(config: PollLoopConfig): Promise { // can stamp it on outbound rows — needed for a2a return-path routing. setCurrentInReplyTo(routing.inReplyTo); try { - const result = await processQuery(query, routing, processingIds, config.providerName); + const result = await processQuery( + query, + routing, + processingIds, + config.providerName, + config.provider.supportsMultimodalContent, + ); if (result.continuation && result.continuation !== continuation) { continuation = result.continuation; setContinuation(config.providerName, continuation); @@ -299,6 +317,7 @@ async function processQuery( routing: RoutingContext, initialBatchIds: string[], providerName: string, + supportsMultimodal: boolean, ): Promise { let queryContinuation: string | undefined; let done = false; @@ -379,6 +398,16 @@ async function processQuery( log(`Pushing ${keep.length} follow-up message(s) into active query`); unwrappedNudged = false; query.push(prompt); + // Follow-up images: same pattern as the initial batch — separate + // user-turn message with the content-block array. Guarded by the + // provider capability flag so non-multimodal providers no-op. + if (supportsMultimodal) { + const followUpBlocks = extractImageBlocks(keep); + if (followUpBlocks.length > 0) { + log(`Pushing ${followUpBlocks.length} follow-up image block(s) into active query`); + query.pushBlocks(followUpBlocks); + } + } markCompleted(keptIds); } catch (err) { // Without this catch the rejection escapes the void IIFE and Node diff --git a/container/agent-runner/src/providers/claude.ts b/container/agent-runner/src/providers/claude.ts index 0653d606538..6b0c2e97815 100644 --- a/container/agent-runner/src/providers/claude.ts +++ b/container/agent-runner/src/providers/claude.ts @@ -6,7 +6,15 @@ import { query as sdkQuery, type HookCallback, type PreCompactHookInput } from ' import { clearContainerToolInFlight, setContainerToolInFlight } from '../db/connection.js'; import { registerProvider } from './provider-registry.js'; -import type { AgentProvider, AgentQuery, McpServerConfig, ProviderEvent, ProviderOptions, QueryInput } from './types.js'; +import type { + AgentProvider, + AgentQuery, + ContentBlock, + McpServerConfig, + ProviderEvent, + ProviderOptions, + QueryInput, +} from './types.js'; function log(msg: string): void { console.error(`[claude-provider] ${msg}`); @@ -70,13 +78,18 @@ function mcpAllowPattern(serverName: string): string { interface SDKUserMessage { type: 'user'; - message: { role: 'user'; content: string }; + message: { role: 'user'; content: string | ContentBlock[] }; parent_tool_use_id: null; session_id: string; } /** * Push-based async iterable for streaming user messages to the Claude SDK. + * + * `push()` sends a plain-text turn; `pushBlocks()` sends a content-block + * turn (used for image attachments). Mirrors the v1 pattern of one text + * SDKUserMessage followed by a separate multimodal SDKUserMessage so the + * SDK delivers them as two consecutive user turns. */ class MessageStream { private queue: SDKUserMessage[] = []; @@ -93,6 +106,17 @@ class MessageStream { this.waiting?.(); } + pushBlocks(blocks: ContentBlock[]): void { + if (blocks.length === 0) return; + this.queue.push({ + type: 'user', + message: { role: 'user', content: blocks }, + parent_tool_use_id: null, + session_id: '', + }); + this.waiting?.(); + } + end(): void { this.done = true; this.waiting?.(); @@ -330,6 +354,7 @@ const STALE_SESSION_RE = /no conversation found|ENOENT.*\.jsonl|session.*not fou export class ClaudeProvider implements AgentProvider { readonly supportsNativeSlashCommands = true; + readonly supportsMultimodalContent = true; private assistantName?: string; private mcpServers: Record; @@ -460,6 +485,7 @@ export class ClaudeProvider implements AgentProvider { return { push: (msg) => stream.push(msg), + pushBlocks: (blocks) => stream.pushBlocks(blocks), end: () => stream.end(), events: translateEvents(), abort: () => { diff --git a/container/agent-runner/src/providers/mock.ts b/container/agent-runner/src/providers/mock.ts index f941f09005e..3cd268e5fb0 100644 --- a/container/agent-runner/src/providers/mock.ts +++ b/container/agent-runner/src/providers/mock.ts @@ -1,5 +1,5 @@ import { registerProvider } from './provider-registry.js'; -import type { AgentProvider, AgentQuery, ProviderEvent, ProviderOptions, QueryInput } from './types.js'; +import type { AgentProvider, AgentQuery, ContentBlock, ProviderEvent, ProviderOptions, QueryInput } from './types.js'; /** * Mock provider for testing. Returns canned responses. @@ -7,6 +7,7 @@ import type { AgentProvider, AgentQuery, ProviderEvent, ProviderOptions, QueryIn */ export class MockProvider implements AgentProvider { readonly supportsNativeSlashCommands = false; + readonly supportsMultimodalContent = true; private responseFactory: (prompt: string) => string; @@ -61,6 +62,16 @@ export class MockProvider implements AgentProvider { pending.push(message); waiting?.(); }, + pushBlocks(blocks: ContentBlock[]) { + // Mirror the v1 pattern: surface a synthetic text turn that summarizes + // the block array so tests can assert against the prompt path without + // pulling base64 binary data through the mock. + const summary = blocks + .map((b) => (b.type === 'image' ? `[image:${b.source.media_type}]` : b.text)) + .join(' '); + pending.push(`__BLOCKS__ ${summary}`); + waiting?.(); + }, end() { ended = true; waiting?.(); diff --git a/container/agent-runner/src/providers/types.ts b/container/agent-runner/src/providers/types.ts index d906a8ce988..e5fbeb6a910 100644 --- a/container/agent-runner/src/providers/types.ts +++ b/container/agent-runner/src/providers/types.ts @@ -6,6 +6,13 @@ export interface AgentProvider { */ readonly supportsNativeSlashCommands: boolean; + /** + * True if this provider can accept image / multimodal content blocks via + * `AgentQuery.pushBlocks`. When false, the poll-loop will fall back to + * text-only attachment rendering and skip block extraction. + */ + readonly supportsMultimodalContent: boolean; + /** Start a new query. Returns a handle for streaming input and output. */ query(input: QueryInput): AgentQuery; @@ -31,6 +38,29 @@ export interface AgentProvider { maybeRotateContinuation?(continuation: string, cwd: string): string | null; } +/** + * Anthropic Messages-API-shaped content block. We only generate the two + * variants the agent-runner actually constructs (text + base64 image) — the + * SDK happily accepts a richer union, but adding the rest unused would just + * widen our test surface. + */ +/** + * Anthropic Messages API base64-image source. The SDK narrows `media_type` + * to this fixed union; mirroring it here lets the Claude provider hand the + * block straight to the SDK without a structural cast. + */ +export type ImageMediaType = 'image/jpeg' | 'image/png' | 'image/gif' | 'image/webp'; + +export interface ImageContentBlock { + type: 'image'; + source: { type: 'base64'; media_type: ImageMediaType; data: string }; +} +export interface TextContentBlock { + type: 'text'; + text: string; +} +export type ContentBlock = ImageContentBlock | TextContentBlock; + /** * Options passed to provider constructors. Fields are common to most * providers; individual providers may ignore any they don't need. @@ -81,9 +111,21 @@ export interface McpServerConfig { } export interface AgentQuery { - /** Push a follow-up message into the active query. */ + /** Push a follow-up text message into the active query. */ push(message: string): void; + /** + * Push a multimodal user message (content-block array) into the active + * query. Used to deliver image attachments alongside the text prompt. The + * SDK treats this as a separate user-turn message; senders are expected to + * call `push()` for the text turn first and then `pushBlocks()` for the + * image turn, mirroring v1's pattern. + * + * Optional: providers that don't support multimodal input throw or no-op. + * Callers must guard with `AgentProvider.supportsMultimodalContent`. + */ + pushBlocks(blocks: ContentBlock[]): void; + /** Signal that no more input will be sent. */ end(): void; diff --git a/src/channels/chat-sdk-bridge.test.ts b/src/channels/chat-sdk-bridge.test.ts index 3049c29de37..cd12d92d9a2 100644 --- a/src/channels/chat-sdk-bridge.test.ts +++ b/src/channels/chat-sdk-bridge.test.ts @@ -2,7 +2,7 @@ import { describe, expect, it } from 'vitest'; import type { Adapter, AdapterPostableMessage, RawMessage } from 'chat'; -import { createChatSdkBridge, splitForLimit } from './chat-sdk-bridge.js'; +import { buildReactionInbound, createChatSdkBridge, splitForLimit } from './chat-sdk-bridge.js'; function stubAdapter(partial: Partial): Adapter { return { name: 'stub', ...partial } as unknown as Adapter; @@ -205,3 +205,69 @@ describe('createChatSdkBridge.deliver — display cards (send_card)', () => { expect(msg.markdown).toBe('plain hello'); }); }); + +describe('buildReactionInbound', () => { + // Reactions are routed via chat.onReaction → buildReactionInbound → + // setupConfig.onInbound. The inbound shape must: (a) produce a + // human-readable `text` so the formatter renders verbatim, (b) preserve + // the structured reaction payload so a future query_reactions tool can + // filter on targetMessageId/added, and (c) carry isMention=false so + // mention-required channels store-as-context without waking the agent. + + const base = { + emoji: '👍', + rawEmoji: '+1', + added: true, + targetMessageId: '1700000000.000300', + threadId: 'C-CHAN-1', + userId: 'U01HJOHN', + userName: 'John', + now: () => new Date('2026-05-22T12:34:56Z'), + idGen: () => 'rxn-test-1', + } as const; + + it('produces a chat-sdk message with isMention=false and isGroup=true', () => { + const inbound = buildReactionInbound({ ...base }); + expect(inbound.kind).toBe('chat-sdk'); + expect(inbound.isMention).toBe(false); + expect(inbound.isGroup).toBe(true); + expect(inbound.id).toBe('rxn-test-1'); + expect(inbound.timestamp).toBe('2026-05-22T12:34:56.000Z'); + }); + + it('renders the added text with the emoji, reactor, and target id', () => { + const inbound = buildReactionInbound({ ...base }); + const content = inbound.content as { text: string }; + expect(content.text).toBe('[John reacted 👍 on message 1700000000.000300]'); + }); + + it('renders the removed text when added=false', () => { + const inbound = buildReactionInbound({ ...base, added: false }); + const content = inbound.content as { text: string }; + expect(content.text).toBe('[John removed reaction 👍 on message 1700000000.000300]'); + }); + + it('embeds the structured reaction payload under content.reaction', () => { + const inbound = buildReactionInbound({ ...base }); + const content = inbound.content as { + reaction: { + emoji: string; + rawEmoji: string; + added: boolean; + targetMessageId: string; + threadId: string; + userId: string; + }; + sender: string; + senderId: string; + }; + expect(content.reaction.emoji).toBe('👍'); + expect(content.reaction.rawEmoji).toBe('+1'); + expect(content.reaction.added).toBe(true); + expect(content.reaction.targetMessageId).toBe('1700000000.000300'); + expect(content.reaction.threadId).toBe('C-CHAN-1'); + expect(content.reaction.userId).toBe('U01HJOHN'); + expect(content.sender).toBe('John'); + expect(content.senderId).toBe('U01HJOHN'); + }); +}); diff --git a/src/channels/chat-sdk-bridge.ts b/src/channels/chat-sdk-bridge.ts index efeb32f162a..95ad234798c 100644 --- a/src/channels/chat-sdk-bridge.ts +++ b/src/channels/chat-sdk-bridge.ts @@ -22,6 +22,8 @@ import { log } from '../log.js'; import { SqliteStateAdapter } from '../state-sqlite.js'; import { registerWebhookAdapter } from '../webhook-server.js'; import { getAskQuestionRender } from '../db/sessions.js'; +import { isPdfMime, extractPdfText, PdfExtractionError } from '../pdf-extract.js'; +import { isTranscribableMime, transcribeAudio, TranscriptionError } from '../transcription.js'; import { normalizeOptions, type NormalizedOption } from './ask-question.js'; import type { ChannelAdapter, ChannelSetup, InboundMessage } from './adapter.js'; @@ -103,6 +105,96 @@ function resolveSelectedOption( return candidate; } +/** + * Synthetic chat-sdk inbound for a reaction event. Mirrors v1's + * pattern (src/v1/index.ts:1077) where a reaction lands as a chat-like row + * the agent reads. The text is human-readable; the structured `reaction` + * payload is preserved so a future `mcp__nanoclaw__query_reactions` tool + * can filter on `targetMessageId` + `added`. + * + * `isMention=false` deliberately — reactions can't @-mention. Channels + * that require a trigger pattern therefore do NOT wake on bare reactions; + * the router stores them as `trigger=0` context and they ride along the + * next real wake. Channels in always-engage mode (e.g. main) treat them + * as normal inbound and may wake the agent. + */ +export interface ReactionInboundInput { + emoji: string; + rawEmoji: string; + added: boolean; + targetMessageId: string; + threadId: string; + userId: string; + userName: string; + now: () => Date; + idGen: () => string; +} + +export function buildReactionInbound(input: ReactionInboundInput): InboundMessage { + const verb = input.added ? 'reacted' : 'removed reaction'; + const text = `[${input.userName} ${verb} ${input.emoji} on message ${input.targetMessageId}]`; + return { + id: input.idGen(), + kind: 'chat-sdk', + content: { + text, + sender: input.userName, + senderId: input.userId, + reaction: { + emoji: input.emoji, + rawEmoji: input.rawEmoji, + added: input.added, + targetMessageId: input.targetMessageId, + threadId: input.threadId, + userId: input.userId, + }, + }, + timestamp: input.now().toISOString(), + isMention: false, + isGroup: true, + }; +} + +/** + * Run host-side Whisper transcription on a voice attachment and stamp the + * result onto the attachment entry. Mutates `entry` in place. Failures are + * captured on `entry.transcriptionError` instead of throwing — the agent + * gets to see why transcription was skipped rather than receiving a silent + * voice note with no text. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +async function maybeTranscribe(entry: Record, buffer: Buffer): Promise { + const mime = typeof entry.mimeType === 'string' ? entry.mimeType : ''; + if (!isTranscribableMime(mime)) return; + const filename = (typeof entry.name === 'string' && entry.name) || 'voice'; + try { + const result = await transcribeAudio(buffer, filename, mime); + entry.transcription = result.text; + } catch (err) { + const msg = err instanceof TranscriptionError ? err.message : err instanceof Error ? err.message : String(err); + entry.transcriptionError = msg; + log.warn('Voice transcription failed', { filename, mime, err: msg }); + } +} + +/** + * Run host-side `pdftotext` on a PDF attachment and stamp the result onto + * the attachment entry. Same error contract as `maybeTranscribe`. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +async function maybePdfExtract(entry: Record, buffer: Buffer): Promise { + const mime = typeof entry.mimeType === 'string' ? entry.mimeType : ''; + if (!isPdfMime(mime)) return; + try { + const text = await extractPdfText(buffer); + entry.extractedText = text; + } catch (err) { + const msg = err instanceof PdfExtractionError ? err.message : err instanceof Error ? err.message : String(err); + entry.pdfExtractionError = msg; + log.warn('PDF extraction failed', { filename: entry.name, mime, err: msg }); + } +} + export function splitForLimit(text: string, limit: number): string[] { if (text.length <= limit) return [text]; const chunks: string[] = []; @@ -135,7 +227,11 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter // eslint-disable-next-line @typescript-eslint/no-explicit-any const serialized = message.toJSON() as Record; - // Download attachment data before serialization loses fetchData() + // Download attachment data before serialization loses fetchData(). + // For voice notes and PDFs we also pre-process the bytes on the host — + // voice → Whisper transcription, PDF → pdftotext text — and stamp the + // result back onto the entry. The container's formatter renders these + // inline so the agent reads the spoken/written words directly. if (message.attachments && message.attachments.length > 0) { const enriched = []; for (const att of message.attachments) { @@ -149,12 +245,17 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter height: (att as unknown as Record).height, }; if (att.fetchData) { + let buffer: Buffer | null = null; try { - const buffer = await att.fetchData(); + buffer = await att.fetchData(); entry.data = buffer.toString('base64'); } catch (err) { log.warn('Failed to download attachment', { type: att.type, err }); } + if (buffer) { + await maybeTranscribe(entry, buffer); + await maybePdfExtract(entry, buffer); + } } enriched.push(entry); } @@ -265,6 +366,55 @@ export function createChatSdkBridge(config: ChatSdkBridgeConfig): ChannelAdapter await setupConfig.onInbound(channelId, thread.id, await messageToInbound(message, false, true)); }); + // Reactions. Mirrors v1's onReaction (src/v1/index.ts:1077): the + // reaction lands as a chat-sdk inbound row so it threads through the + // router exactly like any other message. The router decides whether + // to wake the agent based on per-channel engage settings; bare + // reactions don't carry an @-mention, so trigger-required channels + // accumulate them as context and only ride along when something else + // wakes the agent. The agent's own reactions are filtered here + // because reactor === bot would otherwise produce an inbound on the + // bot's own outbound, looping if the agent ever reacts. + if (typeof chat.onReaction === 'function') { + chat.onReaction(async (event) => { + try { + const reactor = event.user as { userId?: string; fullName?: string; userName?: string } | undefined; + const userId = reactor?.userId || ''; + // Best-effort self-reaction filter: chat-sdk doesn't expose a + // bot-identity field uniformly across adapters, so we rely on + // the SDK to NOT fire onReaction for the bot's own reactions + // (which it does on Slack — `event.user` is always the human + // reactor). Belt-and-braces: skip rows where userId is empty. + if (!userId) return; + const userName = reactor?.fullName ?? reactor?.userName ?? userId; + const channelId = adapter.channelIdFromThreadId(event.threadId); + const inbound = buildReactionInbound({ + emoji: String(event.emoji), + rawEmoji: event.rawEmoji ?? String(event.emoji), + added: event.added !== false, + targetMessageId: event.messageId, + threadId: event.threadId, + userId, + userName, + now: () => new Date(), + idGen: () => `rxn-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`, + }); + await setupConfig.onInbound(channelId, event.threadId, inbound); + } catch (err) { + log.error('Failed to route reaction inbound', { + adapter: adapter.name, + messageId: event.messageId, + emoji: String(event.emoji), + err, + }); + } + }); + } else { + log.info('Adapter does not expose chat.onReaction; skipping reaction subscription', { + adapter: adapter.name, + }); + } + // Handle button clicks (ask_user_question) chat.onAction(async (event) => { if (!event.actionId.startsWith('ncq:')) return; diff --git a/src/pdf-extract.test.ts b/src/pdf-extract.test.ts new file mode 100644 index 00000000000..88b7f8eab56 --- /dev/null +++ b/src/pdf-extract.test.ts @@ -0,0 +1,159 @@ +import { describe, expect, it } from 'vitest'; +import { EventEmitter } from 'events'; +import { Readable, Writable } from 'stream'; +import type { ChildProcess } from 'child_process'; + +import { PdfExtractionError, extractPdfText, isPdfMime } from './pdf-extract.js'; + +/** + * Build a fake ChildProcess-like object that: + * - exposes stdout / stderr Readable streams that emit the given chunks + * - exposes stdin as a Writable that collects pushed bytes (no-op sink) + * - emits 'close' with the given exit code shortly after the test calls end() + * - emits 'error' if `errorCode` is provided (e.g. 'ENOENT') + */ +function fakeChild(opts: { + stdout?: string; + stderr?: string; + exitCode?: number | null; + errorCode?: NodeJS.ErrnoException['code']; + emitOnSpawn?: boolean; +}): ChildProcess { + const child = new EventEmitter() as EventEmitter & { + stdout: Readable; + stderr: Readable; + stdin: Writable; + kill: (signal?: string) => boolean; + }; + + child.stdout = new Readable({ + read() { + /* push happens in setImmediate below */ + }, + }); + child.stderr = new Readable({ + read() { + /* push happens in setImmediate below */ + }, + }); + child.stdin = new Writable({ + write(_chunk, _enc, cb) { + cb(); + }, + final(cb) { + cb(); + }, + }); + child.kill = () => true; + + setImmediate(() => { + if (opts.errorCode) { + const err: NodeJS.ErrnoException = new Error('spawn failed'); + err.code = opts.errorCode; + child.emit('error', err); + return; + } + if (opts.stdout) child.stdout.push(opts.stdout); + child.stdout.push(null); + if (opts.stderr) child.stderr.push(opts.stderr); + child.stderr.push(null); + setImmediate(() => child.emit('close', opts.exitCode ?? 0)); + }); + + return child as unknown as ChildProcess; +} + +describe('isPdfMime', () => { + it('accepts application/pdf and application/x-pdf', () => { + expect(isPdfMime('application/pdf')).toBe(true); + expect(isPdfMime('Application/PDF')).toBe(true); + expect(isPdfMime('application/x-pdf')).toBe(true); + }); + + it('rejects unrelated mime types', () => { + expect(isPdfMime(undefined)).toBe(false); + expect(isPdfMime('image/png')).toBe(false); + expect(isPdfMime('audio/ogg')).toBe(false); + expect(isPdfMime('text/plain')).toBe(false); + }); +}); + +describe('extractPdfText', () => { + it('throws too-large for empty buffer', async () => { + await expect( + extractPdfText(Buffer.alloc(0), { + spawnImpl: (() => fakeChild({ stdout: 'never' })) as unknown as typeof import('child_process').spawn, + }), + ).rejects.toMatchObject({ kind: 'too-large' }); + }); + + it('throws too-large when input exceeds 50MB cap', async () => { + const big = Buffer.alloc(51 * 1024 * 1024, 0x00); + await expect( + extractPdfText(big, { + spawnImpl: (() => fakeChild({ stdout: 'never' })) as unknown as typeof import('child_process').spawn, + }), + ).rejects.toMatchObject({ kind: 'too-large' }); + }); + + it('returns trimmed text on a normal exit', async () => { + const out = await extractPdfText(Buffer.from('%PDF-1.4 etc'), { + spawnImpl: (() => fakeChild({ stdout: ' Hello world \n' })) as unknown as typeof import('child_process').spawn, + }); + expect(out).toBe('Hello world'); + }); + + it('throws binary-missing when spawn emits ENOENT', async () => { + await expect( + extractPdfText(Buffer.from('%PDF-1.4 etc'), { + spawnImpl: (() => fakeChild({ errorCode: 'ENOENT' })) as unknown as typeof import('child_process').spawn, + }), + ).rejects.toMatchObject({ kind: 'binary-missing' }); + }); + + it('throws spawn-failed when spawn synchronously throws', async () => { + const throwing = (() => { + throw new Error('boom'); + }) as unknown as typeof import('child_process').spawn; + await expect(extractPdfText(Buffer.from('%PDF-1.4 etc'), { spawnImpl: throwing })).rejects.toMatchObject({ + kind: 'spawn-failed', + }); + }); + + it('throws nonzero-exit when pdftotext returns a non-zero status', async () => { + await expect( + extractPdfText(Buffer.from('not a pdf'), { + spawnImpl: (() => + fakeChild({ + exitCode: 2, + stderr: 'Syntax Error: Document missing', + })) as unknown as typeof import('child_process').spawn, + }), + ).rejects.toMatchObject({ kind: 'nonzero-exit' }); + }); + + it('throws empty-output when pdftotext succeeds with no text', async () => { + await expect( + extractPdfText(Buffer.from('%PDF-1.4 etc'), { + spawnImpl: (() => fakeChild({ stdout: ' ' })) as unknown as typeof import('child_process').spawn, + }), + ).rejects.toMatchObject({ kind: 'empty-output' }); + }); + + it('TruncatesText output past the byte cap', async () => { + // 300KB of "X" — over the 250_000 byte extracted-text cap. Should + // truncate silently without throwing. + const huge = 'X'.repeat(300_000); + const out = await extractPdfText(Buffer.from('%PDF-1.4 etc'), { + spawnImpl: (() => fakeChild({ stdout: huge })) as unknown as typeof import('child_process').spawn, + }); + expect(out.length).toBeLessThanOrEqual(250_000); + expect(out.length).toBeGreaterThan(100_000); + }); + + it('export: PdfExtractionError carries a kind', () => { + const e = new PdfExtractionError('binary-missing', 'pdftotext not installed'); + expect(e.kind).toBe('binary-missing'); + expect(e.message).toContain('pdftotext'); + }); +}); diff --git a/src/pdf-extract.ts b/src/pdf-extract.ts new file mode 100644 index 00000000000..62a857fbab1 --- /dev/null +++ b/src/pdf-extract.ts @@ -0,0 +1,169 @@ +/** + * PDF text extraction via the `pdftotext` system binary. + * + * Host-side preprocessing — called by the chat-sdk-bridge after downloading + * a PDF attachment, before the row is written into the session inbound DB. + * Extracted text is stored on the attachment as `extractedText` for the + * formatter to render inline; on failure `pdfExtractionError` carries the + * reason. + * + * Doing this on the host (vs. inside the container) keeps the agent's tool + * surface small — every container would otherwise have to either spawn + * pdftotext itself or run a JS PDF parser. Both cost more than running it + * once on the host and shipping plain text. + * + * Soft-fails when pdftotext isn't installed; the formatter will render the + * error message instead of just dropping the attachment. + */ +import { spawn } from 'child_process'; +import { Writable } from 'stream'; + +import { log } from './log.js'; + +/** Cap on extracted text size — keeps very large PDFs from bloating prompts. */ +const MAX_EXTRACTED_BYTES = 250_000; // ~ 60-80 pages of dense text + +/** Cap on input PDF size — guards against huge files exhausting RAM. */ +const MAX_PDF_INPUT_BYTES = 50 * 1024 * 1024; + +export class PdfExtractionError extends Error { + readonly kind: 'too-large' | 'binary-missing' | 'spawn-failed' | 'nonzero-exit' | 'empty-output'; + constructor(kind: PdfExtractionError['kind'], message: string) { + super(message); + this.kind = kind; + this.name = 'PdfExtractionError'; + } +} + +export interface PdfExtractOptions { + /** Override for the binary path (defaults to `pdftotext` on $PATH). */ + binary?: string; + /** + * Override for the `spawn` implementation. Tests inject a stub so we + * don't have to actually have pdftotext installed to exercise the + * branching logic. + */ + spawnImpl?: typeof spawn; + /** Hard timeout for the spawn (ms). */ + timeoutMs?: number; +} + +const DEFAULT_TIMEOUT_MS = 15_000; + +/** + * Run `pdftotext -layout -nopgbrk -enc UTF-8 - -` with the PDF bytes piped + * in on stdin and the extracted text read back from stdout. Returns the + * text on success; throws `PdfExtractionError` on failure. + * + * `-layout` preserves the visual flow (columns, tables) better than the + * default reflow mode, which collapses everything into one stream. Better + * for the agent reading code or structured documents. + */ +export async function extractPdfText(pdf: Buffer, opts: PdfExtractOptions = {}): Promise { + if (pdf.length === 0) { + throw new PdfExtractionError('too-large', 'Empty PDF buffer'); + } + if (pdf.length > MAX_PDF_INPUT_BYTES) { + throw new PdfExtractionError( + 'too-large', + `PDF exceeds extraction limit (${pdf.length} > ${MAX_PDF_INPUT_BYTES} bytes)`, + ); + } + + const spawnImpl = opts.spawnImpl ?? spawn; + const binary = opts.binary ?? 'pdftotext'; + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + + return new Promise((resolve, reject) => { + let child; + try { + child = spawnImpl(binary, ['-layout', '-nopgbrk', '-enc', 'UTF-8', '-', '-'], { + stdio: ['pipe', 'pipe', 'pipe'], + }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + reject(new PdfExtractionError('spawn-failed', `Failed to spawn ${binary}: ${msg}`)); + return; + } + + const chunks: Buffer[] = []; + let stderr = ''; + let bytesWritten = 0; + let timedOut = false; + + const timer = setTimeout(() => { + timedOut = true; + try { + child.kill('SIGKILL'); + } catch { + /* ignore */ + } + }, timeoutMs); + + child.on('error', (err: NodeJS.ErrnoException) => { + clearTimeout(timer); + if (err.code === 'ENOENT') { + reject(new PdfExtractionError('binary-missing', `${binary} not installed`)); + } else { + reject(new PdfExtractionError('spawn-failed', `pdftotext spawn error: ${err.message}`)); + } + }); + + child.stdout?.on('data', (chunk: Buffer) => { + // Hard-cap the extracted output to avoid prompt blowup on massive docs. + // Bytes past the cap are dropped; we still let pdftotext run to + // completion so we get a clean exit code rather than EPIPE. + if (bytesWritten < MAX_EXTRACTED_BYTES) { + const remaining = MAX_EXTRACTED_BYTES - bytesWritten; + if (chunk.length <= remaining) { + chunks.push(chunk); + bytesWritten += chunk.length; + } else { + chunks.push(chunk.subarray(0, remaining)); + bytesWritten = MAX_EXTRACTED_BYTES; + } + } + }); + + child.stderr?.on('data', (chunk: Buffer) => { + stderr += chunk.toString('utf8'); + }); + + child.on('close', (code: number | null) => { + clearTimeout(timer); + if (timedOut) { + reject(new PdfExtractionError('nonzero-exit', `pdftotext timed out after ${timeoutMs}ms`)); + return; + } + if (code !== 0) { + reject( + new PdfExtractionError('nonzero-exit', `pdftotext exited ${code}: ${stderr.slice(0, 300) || '(no stderr)'}`), + ); + return; + } + const text = Buffer.concat(chunks).toString('utf8').trim(); + if (!text) { + reject(new PdfExtractionError('empty-output', 'pdftotext produced no text')); + return; + } + log.debug('Extracted PDF text', { bytesIn: pdf.length, bytesOut: text.length }); + resolve(text); + }); + + // Pipe the PDF bytes to stdin. Catch EPIPE in case pdftotext exits + // early (corrupt file, missing magic header) before we finish writing. + const stdin = child.stdin as Writable | null; + if (stdin) { + stdin.on('error', () => { + /* swallow EPIPE — the close handler reports the real error */ + }); + stdin.end(pdf); + } + }); +} + +export function isPdfMime(mime: string | undefined): boolean { + if (!mime) return false; + const m = mime.toLowerCase(); + return m === 'application/pdf' || m === 'application/x-pdf'; +} diff --git a/src/transcription.test.ts b/src/transcription.test.ts new file mode 100644 index 00000000000..c46715c7b39 --- /dev/null +++ b/src/transcription.test.ts @@ -0,0 +1,133 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +// Mock readEnvFile BEFORE importing transcription so the cached singleton +// inside transcription.ts sees a controlled stub. The .env file in the +// project root contains a real OPENAI_API_KEY which would otherwise leak +// into tests that try to assert the missing-key branch. +vi.mock('./env.js', () => ({ + readEnvFile: vi.fn(() => ({})), +})); + +import { + TranscriptionError, + isTranscribableMime, + resetTranscriptionCacheForTests, + transcribeAudio, +} from './transcription.js'; + +const ORIGINAL_KEY = process.env.OPENAI_API_KEY; + +beforeEach(() => { + process.env.OPENAI_API_KEY = 'sk-test-key'; + resetTranscriptionCacheForTests(); +}); + +afterEach(() => { + if (ORIGINAL_KEY === undefined) delete process.env.OPENAI_API_KEY; + else process.env.OPENAI_API_KEY = ORIGINAL_KEY; + resetTranscriptionCacheForTests(); +}); + +function fakeJsonResponse(body: unknown, ok: boolean = true, status: number = 200): Response { + return new Response(JSON.stringify(body), { + status, + statusText: ok ? 'OK' : 'Bad', + headers: { 'content-type': 'application/json' }, + }); +} + +describe('isTranscribableMime', () => { + it('accepts audio/*', () => { + expect(isTranscribableMime('audio/ogg')).toBe(true); + expect(isTranscribableMime('audio/mpeg')).toBe(true); + expect(isTranscribableMime('AUDIO/WAV')).toBe(true); + }); + + it('accepts video/mp4 and video/webm (chat-platform voice variants)', () => { + expect(isTranscribableMime('video/mp4')).toBe(true); + expect(isTranscribableMime('video/webm')).toBe(true); + }); + + it('rejects unknown / undefined / image / pdf mime types', () => { + expect(isTranscribableMime(undefined)).toBe(false); + expect(isTranscribableMime('')).toBe(false); + expect(isTranscribableMime('image/png')).toBe(false); + expect(isTranscribableMime('application/pdf')).toBe(false); + expect(isTranscribableMime('text/plain')).toBe(false); + }); +}); + +describe('transcribeAudio', () => { + it('throws TranscriptionError(no-key) when OPENAI_API_KEY is missing', async () => { + delete process.env.OPENAI_API_KEY; + resetTranscriptionCacheForTests(); + await expect( + transcribeAudio(Buffer.from('audio'), 'voice.ogg', 'audio/ogg', { + fetchImpl: async () => fakeJsonResponse({ text: 'never' }), + }), + ).rejects.toMatchObject({ kind: 'no-key' }); + }); + + it('throws TranscriptionError(too-large) for empty buffers', async () => { + await expect( + transcribeAudio(Buffer.alloc(0), 'voice.ogg', 'audio/ogg', { + fetchImpl: async () => fakeJsonResponse({ text: 'never' }), + }), + ).rejects.toMatchObject({ kind: 'too-large' }); + }); + + it('throws TranscriptionError(too-large) for over-cap buffers', async () => { + const big = Buffer.alloc(25 * 1024 * 1024 + 1, 0xab); // 24MB+1, over cap + await expect( + transcribeAudio(big, 'voice.ogg', 'audio/ogg', { + fetchImpl: async () => fakeJsonResponse({ text: 'never' }), + }), + ).rejects.toMatchObject({ kind: 'too-large' }); + }); + + it('returns the parsed text on a 200 OK response', async () => { + let captured: string | null = null; + const fetchImpl: typeof fetch = async (input, init) => { + captured = typeof input === 'string' ? input : ((input as URL).toString?.() ?? ''); + const headers = (init?.headers ?? {}) as Record; + expect(headers.Authorization).toBe('Bearer sk-test-key'); + return fakeJsonResponse({ text: ' hello there ' }); + }; + const out = await transcribeAudio(Buffer.from('audio'), 'voice.ogg', 'audio/ogg', { fetchImpl }); + expect(out.text).toBe('hello there'); + expect(captured).toContain('/audio/transcriptions'); + }); + + it('throws TranscriptionError(http) on non-OK response', async () => { + const fetchImpl: typeof fetch = async () => + new Response('rate limited', { status: 429, statusText: 'Too Many Requests' }); + const promise = transcribeAudio(Buffer.from('audio'), 'voice.ogg', 'audio/ogg', { fetchImpl }); + await expect(promise).rejects.toMatchObject({ kind: 'http' }); + await promise.catch((e: TranscriptionError) => { + expect(e.message).toContain('429'); + }); + }); + + it('throws TranscriptionError(network) when fetch itself rejects', async () => { + const fetchImpl: typeof fetch = async () => { + throw new Error('connection reset'); + }; + await expect(transcribeAudio(Buffer.from('audio'), 'voice.ogg', 'audio/ogg', { fetchImpl })).rejects.toMatchObject({ + kind: 'network', + }); + }); + + it('throws TranscriptionError(malformed-response) when JSON has no "text" field', async () => { + const fetchImpl: typeof fetch = async () => fakeJsonResponse({ other: 'data' }); + await expect(transcribeAudio(Buffer.from('audio'), 'voice.ogg', 'audio/ogg', { fetchImpl })).rejects.toMatchObject({ + kind: 'malformed-response', + }); + }); + + it('throws TranscriptionError(malformed-response) when text is empty', async () => { + const fetchImpl: typeof fetch = async () => fakeJsonResponse({ text: ' ' }); + await expect(transcribeAudio(Buffer.from('audio'), 'voice.ogg', 'audio/ogg', { fetchImpl })).rejects.toMatchObject({ + kind: 'malformed-response', + }); + }); +}); diff --git a/src/transcription.ts b/src/transcription.ts new file mode 100644 index 00000000000..5ae69f6bf45 --- /dev/null +++ b/src/transcription.ts @@ -0,0 +1,150 @@ +/** + * Voice-message transcription via the OpenAI Whisper API. + * + * Host-side preprocessing — called by the chat-sdk-bridge after downloading + * a voice attachment, before the row is written into the session inbound DB. + * The resulting text is stored on the attachment as `transcription` so the + * container-side formatter can render it inline; on failure + * `transcriptionError` carries the reason instead. + * + * The container does NOT have access to the OpenAI API key — running Whisper + * inside the sandboxed agent would require leaking a host secret and would + * waste tokens on already-spoken content. Transcribing on the host once, + * then sending the text to the agent, is much cheaper. + * + * Reads the key from the host's `.env` (via `readEnvFile`) on first use; + * missing key returns a structured "OPENAI_API_KEY not set" error so the + * formatter can surface it to the agent. + */ +import { readEnvFile } from './env.js'; +import { log } from './log.js'; + +const WHISPER_ENDPOINT = 'https://api.openai.com/v1/audio/transcriptions'; +const WHISPER_MODEL = 'whisper-1'; + +/** + * Maximum audio byte size accepted by Whisper. OpenAI's documented cap is + * 25 MB; we round down slightly so we don't get rejected on the boundary + * after multipart-form overhead. + */ +const MAX_AUDIO_BYTES = 24 * 1024 * 1024; + +let _cachedKey: string | null | undefined; +function getOpenAIKey(): string | null { + if (_cachedKey !== undefined) return _cachedKey; + const fromProcess = process.env.OPENAI_API_KEY; + if (fromProcess && fromProcess.trim().length > 0) { + _cachedKey = fromProcess.trim(); + return _cachedKey; + } + const fromFile = readEnvFile(['OPENAI_API_KEY']).OPENAI_API_KEY; + _cachedKey = fromFile && fromFile.trim().length > 0 ? fromFile.trim() : null; + return _cachedKey; +} + +/** Test-only reset of the cached key (forces re-read on next call). */ +export function resetTranscriptionCacheForTests(): void { + _cachedKey = undefined; +} + +export interface TranscribeOptions { + /** Optional override for the model name (defaults to `whisper-1`). */ + model?: string; + /** Optional fetch implementation (tests inject this). */ + fetchImpl?: typeof fetch; +} + +export interface TranscribeResult { + text: string; +} + +export class TranscriptionError extends Error { + readonly kind: 'no-key' | 'too-large' | 'http' | 'network' | 'malformed-response'; + constructor(kind: TranscriptionError['kind'], message: string) { + super(message); + this.kind = kind; + this.name = 'TranscriptionError'; + } +} + +/** + * Transcribe an audio buffer via Whisper. Returns the text on success; + * throws `TranscriptionError` with a structured `kind` on failure. The + * caller is expected to convert the error message into + * `attachment.transcriptionError`. + */ +export async function transcribeAudio( + audio: Buffer, + filename: string, + mimeType: string, + opts: TranscribeOptions = {}, +): Promise { + const key = getOpenAIKey(); + if (!key) throw new TranscriptionError('no-key', 'OPENAI_API_KEY not set'); + if (audio.length === 0) throw new TranscriptionError('too-large', 'Empty audio buffer'); + if (audio.length > MAX_AUDIO_BYTES) { + throw new TranscriptionError( + 'too-large', + `Audio exceeds Whisper limit (${audio.length} > ${MAX_AUDIO_BYTES} bytes)`, + ); + } + + const form = new FormData(); + form.append('file', new Blob([audio.buffer as ArrayBuffer], { type: mimeType }), filename); + form.append('model', opts.model ?? WHISPER_MODEL); + form.append('response_format', 'json'); + + const fetchImpl = opts.fetchImpl ?? fetch; + let res: Response; + try { + res = await fetchImpl(WHISPER_ENDPOINT, { + method: 'POST', + headers: { Authorization: `Bearer ${key}` }, + body: form, + }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + throw new TranscriptionError('network', `Whisper network error: ${msg}`); + } + + if (!res.ok) { + const body = await res.text().catch(() => ''); + throw new TranscriptionError('http', `Whisper HTTP ${res.status}: ${body.slice(0, 300) || res.statusText}`); + } + + let parsed: unknown; + try { + parsed = await res.json(); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + throw new TranscriptionError('malformed-response', `Whisper response not JSON: ${msg}`); + } + + if (parsed === null || typeof parsed !== 'object' || typeof (parsed as { text?: unknown }).text !== 'string') { + throw new TranscriptionError('malformed-response', 'Whisper response missing "text" field'); + } + + const text = ((parsed as { text: string }).text || '').trim(); + if (!text) throw new TranscriptionError('malformed-response', 'Whisper returned empty text'); + + log.debug('Transcribed audio attachment', { filename, mimeType, length: text.length }); + return { text }; +} + +/** + * True if the given mime type is one Whisper accepts. Used by the bridge to + * gate the transcription call. We don't try to transcode unsupported + * formats — better to let the agent see the file with a clear "no + * transcription available" note than to silently produce bad text. + */ +export function isTranscribableMime(mime: string | undefined): boolean { + if (!mime) return false; + const m = mime.toLowerCase(); + // Conservative whitelist; Whisper actually accepts more but these are the + // ones the chat platforms we use produce in practice. + return ( + m.startsWith('audio/') || + m === 'video/mp4' || // some Slack voice notes + m === 'video/webm' // some Telegram round-video voice notes + ); +}