From 796f8fa3b1bc2e6068b35e0d02a9c6d10e40ad40 Mon Sep 17 00:00:00 2001 From: donatoaz Date: Sat, 2 May 2026 19:43:34 -0300 Subject: [PATCH] feat: add Strands Agents runtime support Adds native agent-flow support for AWS Strands Agents via a Python HookProvider that emits AgentEvent JSONL and a TypeScript session watcher that auto-discovers and tails those files. - strands-agentflow: Python HookProvider package that plugs into any Strands Agent via hooks=[AgentFlowHookProvider()] and writes JSONL to ~/.strands/agent-flow/.jsonl - extension/src/strands-session-watcher.ts: session discovery + JSONL tailing (same pattern as Codex runtime) - extension/src/strands-runtime.ts: runtime factory - scripts/relay.ts: Strands watcher integration + fix SSE replay to send all sessions (enables tab-switching) - Unit tests for StrandsSessionWatcher (4 test cases) Co-Authored-By: Claude Opus 4.6 --- .gitignore | 2 + extension/src/extension.ts | 8 +- extension/src/session-runtime.ts | 2 +- extension/src/strands-runtime.ts | 40 +++ extension/src/strands-session-watcher.ts | 294 ++++++++++++++++ .../fixtures/strands-session-sample.jsonl | 10 + .../test/strands-session-watcher.test.ts | 173 ++++++++++ scripts/relay.ts | 40 ++- strands-agentflow/README.md | 14 + strands-agentflow/pyproject.toml | 19 ++ .../strands_agentflow/__init__.py | 314 ++++++++++++++++++ 11 files changed, 899 insertions(+), 17 deletions(-) create mode 100644 extension/src/strands-runtime.ts create mode 100644 extension/src/strands-session-watcher.ts create mode 100644 extension/test/fixtures/strands-session-sample.jsonl create mode 100644 extension/test/strands-session-watcher.test.ts create mode 100644 strands-agentflow/README.md create mode 100644 strands-agentflow/pyproject.toml create mode 100644 strands-agentflow/strands_agentflow/__init__.py diff --git a/.gitignore b/.gitignore index 669f6bd..b695118 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,5 @@ extension/.vscode/ extension/package-lock.json .DS_Store +__pycache__/ +validation/ diff --git a/extension/src/extension.ts b/extension/src/extension.ts index db54657..e57a305 100644 --- a/extension/src/extension.ts +++ b/extension/src/extension.ts @@ -4,6 +4,7 @@ import { JsonlEventSource } from './event-source' import { WebviewToExtensionMessage } from './protocol' import { startClaudeRuntime } from './claude-runtime' import { startCodexRuntime } from './codex-runtime' +import { startStrandsRuntime } from './strands-runtime' import { promptHookSetupIfNeeded, configureClaudeHooks, isDisable1MContext } from './hooks-config' import { createLogger } from './logger' import type { AgentRuntime, AgentRuntimeMode } from './session-runtime' @@ -17,7 +18,7 @@ let runtimes: AgentRuntime[] = [] function readConfiguredMode(): ConfiguredRuntimeMode { const raw = vscode.workspace.getConfiguration('agentVisualizer').get('runtime', 'auto') - return raw === 'claude' || raw === 'codex' ? raw : 'auto' + return raw === 'claude' || raw === 'codex' || raw === 'strands' ? raw : 'auto' } interface StartRuntimesResult { @@ -41,6 +42,11 @@ async function startRuntimes( try { runtimes.push(startCodexRuntime(context)) } catch (err) { log.error('Codex runtime failed to start:', err); failures.push('codex') } } + if (mode === 'strands' || mode === 'auto') { + log.info('Starting Strands runtime...') + try { runtimes.push(startStrandsRuntime(context)) } + catch (err) { log.error('Strands runtime failed to start:', err); failures.push('strands') } + } return { runtimes, failures } } diff --git a/extension/src/session-runtime.ts b/extension/src/session-runtime.ts index fb3bde1..96bf93f 100644 --- a/extension/src/session-runtime.ts +++ b/extension/src/session-runtime.ts @@ -15,7 +15,7 @@ import { VisualizerPanel } from './webview-provider' import { SESSION_ID_DISPLAY, STATUS_MESSAGE_DURATION_MS } from './constants' import type { TypedDisposable, TypedEvent } from './typed-event-emitter' -export type AgentRuntimeMode = 'claude' | 'codex' +export type AgentRuntimeMode = 'claude' | 'codex' | 'strands' export interface SessionLifecycleEvent { type: 'started' | 'ended' | 'updated' diff --git a/extension/src/strands-runtime.ts b/extension/src/strands-runtime.ts new file mode 100644 index 0000000..f9e9bb2 --- /dev/null +++ b/extension/src/strands-runtime.ts @@ -0,0 +1,40 @@ +/** + * Strands runtime. + * + * Strands agents emit agent-flow events via the AgentFlowHookProvider, which + * writes native AgentEvent JSONL to ~/.strands/agent-flow/. This runtime wires + * StrandsSessionWatcher to the visualizer panel. + */ + +import * as vscode from 'vscode' +import * as os from 'os' +import { StrandsSessionWatcher } from './strands-session-watcher' +import { createLogger } from './logger' +import { wireWatcherToPanel } from './session-runtime' +import type { AgentRuntime } from './session-runtime' + +const log = createLogger('StrandsRuntime') + +export function startStrandsRuntime(context: vscode.ExtensionContext): AgentRuntime { + const workspace = vscode.workspace.workspaceFolders?.[0]?.uri.fsPath ?? null + const watcher = new StrandsSessionWatcher(workspace) + context.subscriptions.push(watcher) + + const wiring = wireWatcherToPanel(watcher, { + sessionLabelPrefix: 'Strands', + }) + + watcher.start() + + const homeLabel = process.env.STRANDS_HOME + ? process.env.STRANDS_HOME.replace(os.homedir(), '~') + : '~/.strands' + + const connectionStatus = (): string => `Strands session watcher (${homeLabel}/agent-flow)` + + const dispose = (): void => { wiring.dispose(); watcher.dispose() } + + log.info(`Strands runtime started (home: ${homeLabel})`) + + return { mode: 'strands', watcher, connectionStatus, dispose } +} diff --git a/extension/src/strands-session-watcher.ts b/extension/src/strands-session-watcher.ts new file mode 100644 index 0000000..c9a5641 --- /dev/null +++ b/extension/src/strands-session-watcher.ts @@ -0,0 +1,294 @@ +/** + * Watches Strands agent-flow JSONL files at ~/.strands/agent-flow/.jsonl + * + * The Strands AgentFlowHookProvider writes one JSONL file per agent session. + * Each line is a complete AgentEvent in agent-flow's native schema, so no + * parser/translation layer is needed — just JSON.parse + validate. + * + * Discovery scans the agent-flow directory for recent .jsonl files, optionally + * filtering by workspace cwd (read from the first agent_spawn payload). + * Respects STRANDS_HOME for non-default installs. + */ + +import * as fs from 'fs' +import * as path from 'path' +import * as os from 'os' +import { AgentEvent, SessionInfo } from './protocol' +import { + ACTIVE_SESSION_AGE_S, INACTIVITY_TIMEOUT_MS, ORCHESTRATOR_NAME, + POLL_FALLBACK_MS, SCAN_INTERVAL_MS, SESSION_ID_DISPLAY, +} from './constants' +import { readNewFileLines } from './fs-utils' +import { createLogger } from './logger' +import type { AgentSessionWatcher, SessionLifecycleEvent } from './session-runtime' +import { TypedEventEmitter } from './typed-event-emitter' + +const log = createLogger('StrandsSessionWatcher') + +/** Extract session ID from filename: .jsonl */ +const SESSION_ID_FROM_FILENAME = /^([0-9a-f-]{36})\.jsonl$/ + +interface WatchedStrandsSession { + sessionId: string + filePath: string + fileWatcher: fs.FSWatcher | null + pollTimer: NodeJS.Timeout | null + inactivityTimer: NodeJS.Timeout | null + fileSize: number + fileTail: string + sessionStartTime: number + lastActivityTime: number + sessionDetected: boolean + sessionCompleted: boolean + label: string +} + +function strandsHome(): string { + return process.env.STRANDS_HOME || path.join(os.homedir(), '.strands') +} + +function agentFlowDir(): string { + return path.join(strandsHome(), 'agent-flow') +} + +/** Read the first line of a session file to extract cwd from agent_spawn payload. */ +function readSessionCwd(filePath: string): string | null { + try { + const fd = fs.openSync(filePath, 'r') + try { + const buf = Buffer.alloc(4096) + const read = fs.readSync(fd, buf, 0, buf.length, 0) + const firstNewline = buf.subarray(0, read).indexOf(0x0a) + const end = firstNewline >= 0 ? firstNewline : read + const line = buf.slice(0, end).toString('utf-8') + const parsed = JSON.parse(line) as { type?: string; payload?: { cwd?: string } } + if (parsed.type !== 'agent_spawn') return null + return typeof parsed.payload?.cwd === 'string' ? parsed.payload.cwd : null + } finally { fs.closeSync(fd) } + } catch { return null } +} + +export class StrandsSessionWatcher implements AgentSessionWatcher { + private dirWatcher: fs.FSWatcher | null = null + private sessions = new Map() + private workspacePath: string | null = null + private scanInterval: NodeJS.Timeout | null = null + + private readonly _onEvent = new TypedEventEmitter() + private readonly _onSessionDetected = new TypedEventEmitter() + private readonly _onSessionLifecycle = new TypedEventEmitter() + + readonly onEvent = this._onEvent.event + readonly onSessionDetected = this._onSessionDetected.event + readonly onSessionLifecycle = this._onSessionLifecycle.event + + constructor(private readonly workspace?: string | null) {} + + isActive(): boolean { + for (const s of this.sessions.values()) { + if (s.sessionDetected && !s.sessionCompleted) return true + } + return false + } + + isSessionActive(sessionId: string): boolean { + const s = this.sessions.get(sessionId) + return !!s && s.sessionDetected && !s.sessionCompleted + } + + getActiveSessions(): SessionInfo[] { + return Array.from(this.sessions.values()).map(s => ({ + id: s.sessionId, + label: s.label, + status: s.sessionCompleted ? 'completed' : 'active', + startTime: s.sessionStartTime, + lastActivityTime: s.lastActivityTime, + })) + } + + replaySessionStart(sessionIds?: string[]): void { + for (const [id, session] of this.sessions) { + if (!session.sessionDetected) continue + if (sessionIds && !sessionIds.includes(id)) continue + this._onSessionLifecycle.fire({ type: 'started', sessionId: id, label: session.label }) + } + } + + start(): void { + if (this.workspace) { + try { this.workspacePath = fs.realpathSync(this.workspace) } + catch { this.workspacePath = this.workspace } + } + + const dir = agentFlowDir() + if (!fs.existsSync(dir)) { + try { fs.mkdirSync(dir, { recursive: true }) } + catch (err) { log.debug('Failed to create agent-flow dir:', err) } + } + + this.scanForSessions() + this.scanInterval = setInterval(() => this.scanForSessions(), SCAN_INTERVAL_MS) + + if (fs.existsSync(dir)) { + try { + this.dirWatcher = fs.watch(dir, () => this.scanForSessions()) + } catch (err) { log.debug('Dir watch failed:', err) } + } + + log.info(`Watching ${dir} for workspace ${this.workspacePath ?? ''}`) + } + + private scanForSessions(): void { + const dir = agentFlowDir() + if (!fs.existsSync(dir)) return + + let entries: string[] + try { entries = fs.readdirSync(dir) } + catch { return } + + for (const name of entries) { + if (!name.endsWith('.jsonl')) continue + const m = name.match(SESSION_ID_FROM_FILENAME) + const sessionId = m ? m[1] : name.replace('.jsonl', '') + if (this.sessions.has(sessionId)) continue + + const filePath = path.join(dir, name) + + let stat: fs.Stats + try { stat = fs.statSync(filePath) } catch { continue } + if (stat.size === 0) continue + const ageS = (Date.now() - stat.mtimeMs) / 1000 + if (ageS > ACTIVE_SESSION_AGE_S) continue + + if (this.workspacePath) { + const cwd = readSessionCwd(filePath) + if (cwd !== null) { + const resolvedCwd = this.resolvePath(cwd) + if (resolvedCwd && !this.pathMatchesWorkspace(resolvedCwd)) continue + } + } + + this.attachSession(sessionId, filePath, stat) + } + } + + private resolvePath(p: string): string | null { + try { return fs.realpathSync(p) } catch { return p } + } + + private pathMatchesWorkspace(p: string): boolean { + if (!this.workspacePath) return true + if (p === this.workspacePath) return true + return p.startsWith(this.workspacePath + path.sep) + } + + private attachSession(sessionId: string, filePath: string, stat: fs.Stats): void { + const label = `Strands ${sessionId.slice(0, SESSION_ID_DISPLAY)}` + + const session: WatchedStrandsSession = { + sessionId, + filePath, + fileWatcher: null, + pollTimer: null, + inactivityTimer: null, + fileSize: 0, + fileTail: '', + sessionStartTime: stat.birthtimeMs || stat.mtimeMs, + lastActivityTime: stat.mtimeMs, + sessionDetected: false, + sessionCompleted: false, + label, + } + this.sessions.set(sessionId, session) + + this.readNewLines(sessionId) + + session.sessionDetected = true + this._onSessionDetected.fire(sessionId) + this._onSessionLifecycle.fire({ type: 'started', sessionId, label }) + + try { + session.fileWatcher = fs.watch(filePath, () => this.readNewLines(sessionId)) + } catch (err) { log.debug('File watch failed:', filePath, err) } + + session.pollTimer = setInterval(() => this.readNewLines(sessionId), POLL_FALLBACK_MS) + this.resetInactivityTimer(sessionId) + + log.info(`Attached to session ${sessionId.slice(0, SESSION_ID_DISPLAY)} at ${filePath}`) + } + + private readNewLines(sessionId: string): void { + const session = this.sessions.get(sessionId) + if (!session) return + + const result = readNewFileLines(session.filePath, session.fileSize, session.fileTail) + if (!result) return + session.fileSize = result.newSize + session.fileTail = result.tail + session.lastActivityTime = Date.now() + + if (session.sessionCompleted) { + session.sessionCompleted = false + this._onSessionLifecycle.fire({ type: 'started', sessionId, label: session.label }) + log.info(`Session ${sessionId.slice(0, SESSION_ID_DISPLAY)} re-activated after idle`) + } + + for (const line of result.lines) { + const event = this.parseLine(line) + if (!event) continue + this._onEvent.fire({ ...event, sessionId }) + + // Update label from first user message + if (event.type === 'message' && event.payload?.role === 'user' && session.label.startsWith('Strands ')) { + const content = String(event.payload.content || '').slice(0, 40) + if (content) { + session.label = content + this._onSessionLifecycle.fire({ type: 'updated', sessionId, label: content }) + } + } + } + + this.resetInactivityTimer(sessionId) + } + + private parseLine(line: string): AgentEvent | null { + try { + const parsed = JSON.parse(line.trim()) + if (parsed && typeof parsed.type === 'string' && typeof parsed.time === 'number') { + return parsed as AgentEvent + } + return null + } catch { return null } + } + + private resetInactivityTimer(sessionId: string): void { + const session = this.sessions.get(sessionId) + if (!session) return + if (session.inactivityTimer) { clearTimeout(session.inactivityTimer) } + session.inactivityTimer = setTimeout(() => { + if (session.sessionCompleted) return + session.sessionCompleted = true + this._onEvent.fire({ + time: (Date.now() - session.sessionStartTime) / 1000, + type: 'agent_complete', + payload: { name: ORCHESTRATOR_NAME, sessionEnd: true }, + sessionId, + }) + this._onSessionLifecycle.fire({ type: 'ended', sessionId, label: session.label }) + }, INACTIVITY_TIMEOUT_MS) + } + + dispose(): void { + if (this.scanInterval) { clearInterval(this.scanInterval) } + this.dirWatcher?.close() + for (const s of this.sessions.values()) { + s.fileWatcher?.close() + if (s.pollTimer) clearInterval(s.pollTimer) + if (s.inactivityTimer) clearTimeout(s.inactivityTimer) + } + this.sessions.clear() + this._onEvent.dispose() + this._onSessionDetected.dispose() + this._onSessionLifecycle.dispose() + } +} diff --git a/extension/test/fixtures/strands-session-sample.jsonl b/extension/test/fixtures/strands-session-sample.jsonl new file mode 100644 index 0000000..9460257 --- /dev/null +++ b/extension/test/fixtures/strands-session-sample.jsonl @@ -0,0 +1,10 @@ +{"time":0,"type":"agent_spawn","payload":{"name":"orchestrator","isMain":true,"task":"Strands session","runtime":"strands","cwd":"/tmp/test-workspace"},"sessionId":"test-strands-session"} +{"time":0.5,"type":"agent_idle","payload":{"name":"orchestrator"},"sessionId":"test-strands-session"} +{"time":2.1,"type":"tool_call_start","payload":{"agent":"orchestrator","tool":"list_files","args":".","preview":"list_files: .","inputData":{"directory":"."}},"sessionId":"test-strands-session"} +{"time":2.6,"type":"tool_call_end","payload":{"agent":"orchestrator","tool":"list_files","result":"file1.py\nfile2.py","tokenCost":0,"discovery":{"id":"list_files-.","type":"file","label":".","content":"file1.py\nfile2.py"}},"sessionId":"test-strands-session"} +{"time":2.8,"type":"agent_idle","payload":{"name":"orchestrator"},"sessionId":"test-strands-session"} +{"time":4.5,"type":"tool_call_start","payload":{"agent":"orchestrator","tool":"read_file","args":"file1.py","preview":"read_file: file1.py","inputData":{"file_path":"file1.py"},"filePath":"file1.py"},"sessionId":"test-strands-session"} +{"time":5.0,"type":"tool_call_end","payload":{"agent":"orchestrator","tool":"read_file","result":"print('hello')","tokenCost":0,"discovery":{"id":"read_file-file1.py","type":"file","label":"file1.py","content":"print('hello')"}},"sessionId":"test-strands-session"} +{"time":5.2,"type":"context_update","payload":{"agent":"orchestrator","tokens":15000,"breakdown":{"systemPrompt":0,"userMessages":0,"toolResults":0,"reasoning":0,"subagentResults":0},"tokensMax":200000,"isAuthoritative":true},"sessionId":"test-strands-session"} +{"time":5.4,"type":"message","payload":{"agent":"orchestrator","role":"user","content":"List and read files"},"sessionId":"test-strands-session"} +{"time":8.0,"type":"agent_complete","payload":{"name":"orchestrator","sessionEnd":true},"sessionId":"test-strands-session"} diff --git a/extension/test/strands-session-watcher.test.ts b/extension/test/strands-session-watcher.test.ts new file mode 100644 index 0000000..2532eb3 --- /dev/null +++ b/extension/test/strands-session-watcher.test.ts @@ -0,0 +1,173 @@ +/** + * Unit tests for StrandsSessionWatcher. + * + * Writes a sample JSONL fixture to a temp directory, starts the watcher + * pointing at it, and verifies the emitted events match expectations. + */ + +import { describe, it, beforeEach, afterEach } from 'node:test' +import assert from 'node:assert/strict' +import * as fs from 'node:fs' +import * as path from 'node:path' +import * as os from 'os' +import { StrandsSessionWatcher } from '../src/strands-session-watcher' +import type { AgentEvent } from '../src/protocol' + +const FIXTURE = path.join(__dirname, 'fixtures', 'strands-session-sample.jsonl') + +let tempDir: string + +beforeEach(() => { + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'strands-test-')) +}) + +afterEach(() => { + fs.rmSync(tempDir, { recursive: true, force: true }) +}) + +function copyFixtureToTemp(sessionId: string): string { + const dest = path.join(tempDir, `${sessionId}.jsonl`) + fs.copyFileSync(FIXTURE, dest) + return dest +} + +describe('StrandsSessionWatcher', () => { + it('discovers and parses a JSONL session file', async () => { + const sessionId = 'test-strands-session' + copyFixtureToTemp(sessionId) + + // Point watcher at our temp dir via STRANDS_HOME + process.env.STRANDS_HOME = path.dirname(tempDir) + // Rename tempDir to "agent-flow" since that's what the watcher expects + const agentFlowDir = path.join(path.dirname(tempDir), 'agent-flow') + fs.renameSync(tempDir, agentFlowDir) + tempDir = agentFlowDir // so afterEach cleans it up + + const events: AgentEvent[] = [] + const watcher = new StrandsSessionWatcher(null) + watcher.onEvent((e) => events.push(e)) + + watcher.start() + + // Give the watcher time to scan and read + await new Promise(resolve => setTimeout(resolve, 200)) + + watcher.dispose() + delete process.env.STRANDS_HOME + + // Verify events were emitted + assert.ok(events.length >= 8, `Expected at least 8 events, got ${events.length}`) + + // First event should be agent_spawn + assert.equal(events[0].type, 'agent_spawn') + assert.equal(events[0].payload.name, 'orchestrator') + assert.equal(events[0].payload.runtime, 'strands') + assert.equal(events[0].payload.isMain, true) + + // Should have tool_call_start events + const toolStarts = events.filter(e => e.type === 'tool_call_start') + assert.equal(toolStarts.length, 2) + assert.equal(toolStarts[0].payload.tool, 'list_files') + assert.equal(toolStarts[1].payload.tool, 'read_file') + + // Should have tool_call_end events + const toolEnds = events.filter(e => e.type === 'tool_call_end') + assert.equal(toolEnds.length, 2) + + // Should have context_update + const ctxUpdates = events.filter(e => e.type === 'context_update') + assert.ok(ctxUpdates.length >= 1) + assert.equal(ctxUpdates[0].payload.tokens, 15000) + + // Should have agent_complete + const completes = events.filter(e => e.type === 'agent_complete') + assert.ok(completes.length >= 1) + + // All events should have the sessionId attached + for (const e of events) { + assert.equal(e.sessionId, sessionId) + } + }) + + it('reports session lifecycle events', async () => { + const sessionId = 'test-strands-session' + copyFixtureToTemp(sessionId) + + process.env.STRANDS_HOME = path.dirname(tempDir) + const agentFlowDir = path.join(path.dirname(tempDir), 'agent-flow') + if (fs.existsSync(agentFlowDir)) fs.rmSync(agentFlowDir, { recursive: true }) + fs.renameSync(tempDir, agentFlowDir) + tempDir = agentFlowDir + + const detected: string[] = [] + const watcher = new StrandsSessionWatcher(null) + watcher.onSessionDetected((id) => detected.push(id)) + + watcher.start() + await new Promise(resolve => setTimeout(resolve, 200)) + + watcher.dispose() + delete process.env.STRANDS_HOME + + assert.ok(detected.includes(sessionId), 'Session should be detected') + assert.ok(watcher.getActiveSessions().length === 0, 'After dispose, no active sessions') + }) + + it('filters by workspace cwd from first agent_spawn', async () => { + const sessionId = 'test-strands-session' + copyFixtureToTemp(sessionId) + + process.env.STRANDS_HOME = path.dirname(tempDir) + const agentFlowDir = path.join(path.dirname(tempDir), 'agent-flow') + if (fs.existsSync(agentFlowDir)) fs.rmSync(agentFlowDir, { recursive: true }) + fs.renameSync(tempDir, agentFlowDir) + tempDir = agentFlowDir + + const events: AgentEvent[] = [] + // Use a workspace that does NOT match the fixture's cwd (/tmp/test-workspace) + const watcher = new StrandsSessionWatcher('/nonexistent/workspace') + watcher.onEvent((e) => events.push(e)) + + watcher.start() + await new Promise(resolve => setTimeout(resolve, 200)) + + watcher.dispose() + delete process.env.STRANDS_HOME + + // Session should be filtered out — no events emitted + assert.equal(events.length, 0, 'Session with non-matching cwd should be filtered') + }) + + it('skips empty and malformed lines gracefully', async () => { + process.env.STRANDS_HOME = path.dirname(tempDir) + const agentFlowDir = path.join(path.dirname(tempDir), 'agent-flow') + if (fs.existsSync(agentFlowDir)) fs.rmSync(agentFlowDir, { recursive: true }) + fs.renameSync(tempDir, agentFlowDir) + tempDir = agentFlowDir + + const sessionId = 'malformed-test' + const dest = path.join(agentFlowDir, `${sessionId}.jsonl`) + fs.writeFileSync(dest, [ + '', + 'not json at all', + '{"no_type_field": true}', + '{"type": "agent_spawn", "time": 0, "payload": {"name": "orchestrator", "isMain": true, "runtime": "strands"}}', + '{"type": "agent_complete", "time": 1, "payload": {"name": "orchestrator"}}', + ].join('\n') + '\n') + + const events: AgentEvent[] = [] + const watcher = new StrandsSessionWatcher(null) + watcher.onEvent((e) => events.push(e)) + + watcher.start() + await new Promise(resolve => setTimeout(resolve, 200)) + + watcher.dispose() + delete process.env.STRANDS_HOME + + // Only valid lines should produce events + assert.equal(events.length, 2) + assert.equal(events[0].type, 'agent_spawn') + assert.equal(events[1].type, 'agent_complete') + }) +}) diff --git a/scripts/relay.ts b/scripts/relay.ts index bcf0d32..c56ee6d 100644 --- a/scripts/relay.ts +++ b/scripts/relay.ts @@ -15,6 +15,7 @@ import { readNewFileLines } from '../extension/src/fs-utils' import { scanSubagentsDir, readSubagentNewLines } from '../extension/src/subagent-watcher' import { handlePermissionDetection } from '../extension/src/permission-detection' import { CodexSessionWatcher } from '../extension/src/codex-session-watcher' +import { StrandsSessionWatcher } from '../extension/src/strands-session-watcher' import { INACTIVITY_TIMEOUT_MS, SCAN_INTERVAL_MS, ACTIVE_SESSION_AGE_S, POLL_FALLBACK_MS, SESSION_ID_DISPLAY, SYSTEM_PROMPT_BASE_TOKENS, ORCHESTRATOR_NAME, @@ -353,7 +354,7 @@ export interface Relay { dispose: () => void } -export type RelayRuntimeMode = 'claude' | 'codex' | 'auto' +export type RelayRuntimeMode = 'claude' | 'codex' | 'strands' | 'auto' export interface RelayOptions { workspace: string @@ -366,9 +367,9 @@ export interface RelayOptions { } function resolveRuntimeMode(explicit?: RelayRuntimeMode): RelayRuntimeMode { - if (explicit === 'claude' || explicit === 'codex' || explicit === 'auto') return explicit + if (explicit === 'claude' || explicit === 'codex' || explicit === 'strands' || explicit === 'auto') return explicit const raw = process.env.AGENT_FLOW_RUNTIME - return raw === 'claude' || raw === 'codex' ? raw : 'auto' + return raw === 'claude' || raw === 'codex' || raw === 'strands' ? raw : 'auto' } export async function createRelay(options: RelayOptions): Promise { @@ -383,7 +384,8 @@ export async function createRelay(options: RelayOptions): Promise { const mode = resolveRuntimeMode(options.runtime) const wantClaude = mode === 'claude' || mode === 'auto' const wantCodex = mode === 'codex' || mode === 'auto' - log(`[relay] Runtime mode: ${mode} (watching: ${[wantClaude && 'claude', wantCodex && 'codex'].filter(Boolean).join(', ')})`) + const wantStrands = mode === 'strands' || mode === 'auto' + log(`[relay] Runtime mode: ${mode} (watching: ${[wantClaude && 'claude', wantCodex && 'codex', wantStrands && 'strands'].filter(Boolean).join(', ')})`) let hookServer: HookServer | null = null let scanInterval: NodeJS.Timeout | null = null @@ -433,6 +435,17 @@ export async function createRelay(options: RelayOptions): Promise { codexWatcher.start() } + // ─── Strands runtime ─────────────────────────────────────────────────────── + let strandsWatcher: StrandsSessionWatcher | null = null + if (wantStrands) { + strandsWatcher = new StrandsSessionWatcher(workspace) + strandsWatcher.onEvent((event) => broadcastEvent(event)) + strandsWatcher.onSessionLifecycle((lifecycle) => { + broadcastSessionLifecycle(lifecycle.type, lifecycle.sessionId, lifecycle.label) + }) + strandsWatcher.start() + } + const telemetry = options.telemetry const sessionStart = Date.now() let relayDisposed = false @@ -490,20 +503,16 @@ export async function createRelay(options: RelayOptions): Promise { }) } if (codexWatcher) sessionList.push(...codexWatcher.getActiveSessions()) + if (strandsWatcher) sessionList.push(...strandsWatcher.getActiveSessions()) if (sessionList.length > 0) { sendSSE(res, { type: 'session-list', sessions: sessionList }) } - // Replay buffered events for the most recent active session - const sorted = [...sessionList].sort((a, b) => { - const aActive = a.status === 'active' ? 1 : 0 - const bActive = b.status === 'active' ? 1 : 0 - if (aActive !== bActive) return bActive - aActive - return b.lastActivityTime - a.lastActivityTime - }) - if (sorted.length > 0) { - const buffered = eventBuffer.get(sorted[0].id) - if (buffered) { + // Replay buffered events for all sessions so tab-switching works. + // Send each session's events as a separate batch to avoid one giant payload. + for (const session of sessionList) { + const buffered = eventBuffer.get(session.id) + if (buffered && buffered.length > 0) { sendSSE(res, { type: 'agent-event-batch', events: buffered }) } } @@ -515,7 +524,7 @@ export async function createRelay(options: RelayOptions): Promise { if (relayDisposed) return relayDisposed = true const models = [...observedModels].sort().join(',').slice(0, 128) - const runtimes = [wantClaude && 'claude', wantCodex && 'codex'].filter(Boolean).join(',') + const runtimes = [wantClaude && 'claude', wantCodex && 'codex', wantStrands && 'strands'].filter(Boolean).join(',') telemetry?.emit({ ...baseEvent(), event_type: 'session_end', @@ -536,6 +545,7 @@ export async function createRelay(options: RelayOptions): Promise { } } codexWatcher?.dispose() + strandsWatcher?.dispose() }, } } diff --git a/strands-agentflow/README.md b/strands-agentflow/README.md new file mode 100644 index 0000000..bbdc3bd --- /dev/null +++ b/strands-agentflow/README.md @@ -0,0 +1,14 @@ +# strands-agentflow + +Agent-Flow visualizer integration for Strands Agents. + +## Usage + +```python +from strands import Agent +from strands_agentflow import AgentFlowHookProvider + +agent = Agent(hooks=[AgentFlowHookProvider()]) +``` + +This writes real-time events to `~/.strands/agent-flow/.jsonl` which the agent-flow visualizer auto-discovers and renders. diff --git a/strands-agentflow/pyproject.toml b/strands-agentflow/pyproject.toml new file mode 100644 index 0000000..73b45d8 --- /dev/null +++ b/strands-agentflow/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "strands-agentflow" +version = "0.1.0" +description = "Agent-Flow visualizer integration for Strands Agents" +readme = "README.md" +requires-python = ">=3.10" +license = "Apache-2.0" +dependencies = [ + "strands-agents>=0.1.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0", +] diff --git a/strands-agentflow/strands_agentflow/__init__.py b/strands-agentflow/strands_agentflow/__init__.py new file mode 100644 index 0000000..33abe84 --- /dev/null +++ b/strands-agentflow/strands_agentflow/__init__.py @@ -0,0 +1,314 @@ +"""Agent-Flow visualizer integration for Strands Agents. + +Provides a HookProvider that emits agent-flow AgentEvent JSONL, enabling +real-time visualization of Strands agent activity in the agent-flow UI. + +Usage: + from strands import Agent + from strands_agentflow import AgentFlowHookProvider + + agent = Agent(hooks=[AgentFlowHookProvider()]) +""" + +from __future__ import annotations + +import json +import os +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from strands.hooks import HookProvider, HookRegistry +from strands.hooks.events import ( + AfterInvocationEvent, + AfterModelCallEvent, + AfterNodeCallEvent, + AfterToolCallEvent, + BeforeInvocationEvent, + BeforeModelCallEvent, + BeforeNodeCallEvent, + BeforeToolCallEvent, + MessageAddedEvent, +) + + +def _agent_flow_dir() -> Path: + home = os.environ.get("STRANDS_HOME", os.path.join(os.path.expanduser("~"), ".strands")) + return Path(home) / "agent-flow" + + +def _summarize_tool_input(tool_input: Any) -> str: + if tool_input is None: + return "" + if isinstance(tool_input, str): + return tool_input[:80] + if isinstance(tool_input, dict): + if "file_path" in tool_input: + return str(tool_input["file_path"]) + if "command" in tool_input: + return str(tool_input["command"])[:80] + if "query" in tool_input: + return str(tool_input["query"])[:80] + return json.dumps(tool_input, default=str)[:80] + return str(tool_input)[:80] + + +def _summarize_result(result: Any) -> str: + if result is None: + return "" + if isinstance(result, str): + return result[:200] + if isinstance(result, Exception): + return str(result)[:200] + if isinstance(result, dict): + output = result.get("output", result.get("content", "")) + if isinstance(output, str): + return output[:200] + return str(result)[:200] + + +def _extract_file_path(tool_name: str, tool_input: Any) -> str | None: + if not isinstance(tool_input, dict): + return None + for key in ("file_path", "path", "filePath", "filename"): + if key in tool_input: + return str(tool_input[key]) + return None + + +@dataclass +class AgentFlowHookProvider(HookProvider): + """Strands HookProvider that writes agent-flow AgentEvent JSONL. + + Creates a JSONL file at ~/.strands/agent-flow/.jsonl and + appends one event per line as the agent executes. The agent-flow visualizer + auto-discovers and tails these files. + + Args: + session_id: Optional session ID. Generated if not provided. + agent_name: Name for the orchestrator node in the visualizer. + """ + + session_id: str = field(default_factory=lambda: str(uuid.uuid4())) + agent_name: str = "orchestrator" + _start_time: float = field(default=0.0, init=False, repr=False) + _file_path: Path | None = field(default=None, init=False, repr=False) + _file: Any = field(default=None, init=False, repr=False) + _spawned: bool = field(default=False, init=False, repr=False) + + def register_hooks(self, registry: HookRegistry) -> None: + registry.add_callback(BeforeInvocationEvent, self._on_before_invocation) + registry.add_callback(AfterInvocationEvent, self._on_after_invocation) + registry.add_callback(BeforeModelCallEvent, self._on_before_model_call) + registry.add_callback(AfterModelCallEvent, self._on_after_model_call) + registry.add_callback(BeforeToolCallEvent, self._on_before_tool_call) + registry.add_callback(AfterToolCallEvent, self._on_after_tool_call) + registry.add_callback(MessageAddedEvent, self._on_message_added) + registry.add_callback(BeforeNodeCallEvent, self._on_before_node_call) + registry.add_callback(AfterNodeCallEvent, self._on_after_node_call) + + def _elapsed(self) -> float: + if self._start_time == 0.0: + return 0.0 + return time.time() - self._start_time + + def _ensure_file(self) -> None: + if self._file is not None: + return + directory = _agent_flow_dir() + directory.mkdir(parents=True, exist_ok=True) + self._file_path = directory / f"{self.session_id}.jsonl" + self._file = open(self._file_path, "a", encoding="utf-8") + + def _emit(self, event_type: str, payload: dict[str, Any]) -> None: + self._ensure_file() + event = { + "time": round(self._elapsed(), 3), + "type": event_type, + "payload": payload, + "sessionId": self.session_id, + } + self._file.write(json.dumps(event, default=str) + "\n") + self._file.flush() + + def _ensure_spawned(self) -> None: + if self._spawned: + return + self._spawned = True + self._emit("agent_spawn", { + "name": self.agent_name, + "isMain": True, + "task": "Strands session", + "runtime": "strands", + "cwd": os.getcwd(), + }) + + def _on_before_invocation(self, event: BeforeInvocationEvent) -> None: + self._start_time = time.time() + self._ensure_spawned() + + def _on_before_model_call(self, event: BeforeModelCallEvent) -> None: + self._ensure_spawned() + self._emit("agent_idle", {"name": self.agent_name}) + + def _on_after_invocation(self, event: AfterInvocationEvent) -> None: + self._emit("agent_complete", { + "name": self.agent_name, + "sessionEnd": True, + }) + if self._file: + self._file.close() + self._file = None + + def _on_before_tool_call(self, event: BeforeToolCallEvent) -> None: + self._ensure_spawned() + tool_use = event.tool_use + tool_name = tool_use.get("name", "unknown") if isinstance(tool_use, dict) else "unknown" + tool_input = tool_use.get("input", {}) if isinstance(tool_use, dict) else {} + + args_summary = _summarize_tool_input(tool_input) + file_path = _extract_file_path(tool_name, tool_input) + + payload: dict[str, Any] = { + "agent": self.agent_name, + "tool": tool_name, + "args": args_summary, + "preview": f"{tool_name}: {args_summary}"[:60], + } + if tool_input and isinstance(tool_input, dict): + payload["inputData"] = {k: str(v)[:200] for k, v in tool_input.items()} + if file_path: + payload["filePath"] = file_path + + self._emit("tool_call_start", payload) + + def _on_after_tool_call(self, event: AfterToolCallEvent) -> None: + tool_use = event.tool_use + tool_name = tool_use.get("name", "unknown") if isinstance(tool_use, dict) else "unknown" + + result = event.result + is_error = isinstance(result, Exception) + result_summary = _summarize_result(result) + + payload: dict[str, Any] = { + "agent": self.agent_name, + "tool": tool_name, + "result": result_summary, + "tokenCost": 0, + } + if is_error: + payload["isError"] = True + payload["errorMessage"] = result_summary + + file_path = None + tool_input = tool_use.get("input", {}) if isinstance(tool_use, dict) else {} + if isinstance(tool_input, dict): + file_path = _extract_file_path(tool_name, tool_input) + + if file_path: + payload["discovery"] = { + "id": f"{tool_name}-{file_path}", + "type": "file", + "label": os.path.basename(file_path), + "content": result_summary[:100], + } + + self._emit("tool_call_end", payload) + + def _on_after_model_call(self, event: AfterModelCallEvent) -> None: + self._ensure_spawned() + stop_data = getattr(event, "stop_data", None) or getattr(event, "stopData", None) + usage = getattr(stop_data, "usage", None) if stop_data else None + + tokens = 0 + tokens_max = 0 + if usage: + tokens = getattr(usage, "inputTokens", 0) or getattr(usage, "input_tokens", 0) or 0 + tokens_max = getattr(usage, "totalTokens", 0) or getattr(usage, "total_tokens", 0) or 0 + + if tokens > 0: + self._emit("context_update", { + "agent": self.agent_name, + "tokens": tokens, + "breakdown": { + "systemPrompt": 0, + "userMessages": 0, + "toolResults": 0, + "reasoning": 0, + "subagentResults": 0, + }, + **({"tokensMax": tokens_max} if tokens_max > 0 else {}), + "isAuthoritative": True, + }) + + def _on_message_added(self, event: MessageAddedEvent) -> None: + self._ensure_spawned() + message = getattr(event, "message", None) + if not message: + return + + role = message.get("role", "") if isinstance(message, dict) else getattr(message, "role", "") + if role not in ("user", "assistant"): + return + + content = "" + raw_content = message.get("content", "") if isinstance(message, dict) else getattr(message, "content", "") + if isinstance(raw_content, str): + content = raw_content + elif isinstance(raw_content, list): + parts = [] + for block in raw_content: + if isinstance(block, dict) and block.get("type") == "text": + parts.append(block.get("text", "")) + elif isinstance(block, str): + parts.append(block) + content = "".join(parts) + + if not content.strip(): + return + + self._emit("message", { + "agent": self.agent_name, + "role": role, + "content": content[:2000], + }) + + def _on_before_node_call(self, event: BeforeNodeCallEvent) -> None: + self._ensure_spawned() + node_id = getattr(event, "node_id", None) or getattr(event, "nodeId", "subagent") + child_name = str(node_id) + + self._emit("subagent_dispatch", { + "parent": self.agent_name, + "child": child_name, + "task": f"Node: {child_name}", + }) + self._emit("agent_spawn", { + "name": child_name, + "parent": self.agent_name, + "task": f"Node: {child_name}", + }) + + def _on_after_node_call(self, event: AfterNodeCallEvent) -> None: + node_id = getattr(event, "node_id", None) or getattr(event, "nodeId", "subagent") + child_name = str(node_id) + + self._emit("subagent_return", { + "child": child_name, + "parent": self.agent_name, + "summary": "completed", + }) + self._emit("agent_complete", { + "name": child_name, + }) + + def close(self) -> None: + """Explicitly close the JSONL file (called automatically on AfterInvocation).""" + if self._file: + self._file.close() + self._file = None + + def __del__(self) -> None: + self.close()