From 21818acb910d9fb6946ddf944d566a1e4d68d946 Mon Sep 17 00:00:00 2001
From: ardi1s <1205848407@qq.com>
Date: Mon, 1 Jun 2026 09:41:54 +0800
Subject: [PATCH] feat: add FlowRAG workflow memory & retrieval module with
ChromaDB backend
---
.gitignore | 7 +
internal/flowrag/README.md | 132 ++++++
internal/flowrag/cmd/demo/main.go | 292 ++++++++++++
internal/flowrag/cmd/e2e_test/main.go | 326 ++++++++++++++
internal/flowrag/detector.go | 55 +++
internal/flowrag/retriever.go | 60 +++
internal/flowrag/segmenter.go | 129 ++++++
internal/flowrag/store.go | 545 +++++++++++++++++++++++
internal/flowrag/workflow.go | 116 +++++
internal/flowrag/workflow_test.go | 360 +++++++++++++++
internal/skills/builtin/flowrag/SKILL.md | 97 ++++
11 files changed, 2119 insertions(+)
create mode 100644 internal/flowrag/README.md
create mode 100644 internal/flowrag/cmd/demo/main.go
create mode 100644 internal/flowrag/cmd/e2e_test/main.go
create mode 100644 internal/flowrag/detector.go
create mode 100644 internal/flowrag/retriever.go
create mode 100644 internal/flowrag/segmenter.go
create mode 100644 internal/flowrag/store.go
create mode 100644 internal/flowrag/workflow.go
create mode 100644 internal/flowrag/workflow_test.go
create mode 100644 internal/skills/builtin/flowrag/SKILL.md
diff --git a/.gitignore b/.gitignore
index 4c48085bea..d82ffb5a0b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,3 +54,10 @@ manpages/
completions/crush.*sh
.prettierignore
.task
+
+# Local developer config (may contain API keys)
+/crush.json.bak
+/crush.local.json
+
+# FlowRAG local storage
+/flowrag_workflows.json
diff --git a/internal/flowrag/README.md b/internal/flowrag/README.md
new file mode 100644
index 0000000000..bbd10a5652
--- /dev/null
+++ b/internal/flowrag/README.md
@@ -0,0 +1,132 @@
+# FlowRAG — Intelligent Workflow Memory & Retrieval
+
+FlowRAG is a **workflow learning and replay module** for Crush, the terminal AI coding assistant. It captures successful coding workflows, stores them in a ChromaDB vector database, and retrieves the best past experience via semantic search for future similar tasks.
+
+## How It Works
+
+### Trigger Detection (Dual Layer)
+
+**Primary: Skill-driven (AI semantic understanding)**
+
+FlowRAG is registered as a [Skill](../../skills/builtin/flowrag/SKILL.md) in Crush. The LLM automatically identifies when to trigger it:
+
+- A multi-step task completed with no errors
+- User expresses satisfaction or confirms the result
+- User explicitly asks to save/remember the workflow
+
+**Secondary: Keyword matching**
+
+The [CompletionDetector](detector.go) recognizes explicit markers:
+
+| Category | Examples |
+|----------|----------|
+| Colloquial confirmation | "ok", "好的", "done", "搞定了" |
+| Business markers | "task complete", "save workflow", "remember this" |
+
+### Data Flow
+
+```
+Task completed (Skill detection / keyword match)
+ → Prompt: "Save this workflow? (y/n)"
+ → Yes: Segmenter extracts successful steps (skips errors/retries)
+ → Embedding (OpenAI-compatible API / Trigram Hash)
+ → Store in ChromaDB / JSON File
+ → Next similar task: retrieve Top-K via semantic search
+ → Inject into system prompt for faster execution
+```
+
+## Architecture
+
+```
+internal/flowrag/
+├── detector.go # Completion marker detection (Skill + keyword)
+├── segmenter.go # Workflow segmentation (filter error steps)
+├── store.go # Vector store (ChromaDB / JSON File backends)
+├── retriever.go # Semantic search + context builder
+├── workflow.go # Orchestrator (unified API)
+├── workflow_test.go # 17 test cases
+├── cmd/
+│ ├── demo/ # Interactive CLI demo (zero-dependency)
+│ └── e2e_test/ # End-to-end semantic search verification
+└── README.md
+```
+
+## Tech Stack
+
+| Component | Backend | Notes |
+|-----------|---------|-------|
+| **Vector Store** | ChromaDB (default) | REST API v2, auto-creates `crush_workflows` collection |
+| **Fallback Store** | JSON File | Zero-dependency, used when ChromaDB is unavailable |
+| **Embedding** | OpenAI-compatible API | `text-embedding-3-small` or similar |
+| **Embedding (zero-dep)** | Trigram Hash | Deterministic, content-aware, no API needed |
+| **Similarity** | Cosine similarity (ChromaDB built-in) | — |
+| **Segmentation** | IsError field filtering | Precisely excludes failed/rollback steps |
+
+## Quick Start
+
+### Option A: ChromaDB (Recommended)
+
+```bash
+# Start ChromaDB
+docker run -d -p 8000:8000 chromadb/chroma
+```
+
+```go
+mgr, _ := flowrag.NewWorkflowManager(flowrag.Config{
+ ChromaDBURL: "http://localhost:8000",
+ EmbeddingBaseURL: "https://api.openai.com/v1",
+ EmbeddingAPIKey: "sk-xxx",
+ EmbeddingModel: "text-embedding-3-small",
+})
+```
+
+### Option B: JSON File (Zero External Dependencies)
+
+```go
+mgr, _ := flowrag.NewWorkflowManager(flowrag.Config{
+ StorePath: "~/.crush/flowrag/workflows.json",
+})
+// ChromaDBURL left empty → falls back to file backend
+```
+
+### API Usage
+
+```go
+// Check if this message should trigger FlowRAG
+if mgr.Detector().ShouldTriggerFlowRAG(userMessage) {
+ // Confirm with user...
+}
+
+// Save successful workflow
+mgr.SaveSuccessfulWorkflow(ctx, SaveWorkflowInput{
+ UserPrompt: "Write an HTTP server",
+ Messages: sessionMessages,
+ SessionID: "session-123",
+})
+
+// Retrieve similar past workflows
+ctx := mgr.SearchAndBuildContext(ctx, "Create a REST API", 3)
+// Inject ctx into system prompt
+```
+
+## Interactive Demo
+
+```bash
+go run ./internal/flowrag/cmd/demo/
+```
+
+Run the demo, type `demo` to load 6 sample workflows spanning auth fixes, REST APIs, Python scripting, DB migrations, OAuth, and Docker Compose — then search with natural language queries and see semantic retrieval in action.
+
+## E2E Verification
+
+```bash
+go run ./internal/flowrag/cmd/e2e_test/
+```
+
+## Tests
+
+```bash
+go test ./internal/flowrag/... -v -count=1
+```
+
+17 test cases covering: detector (colloquial + business keywords), segmenter (success/error flow extraction), JSONFileStore (insert/search/empty), cosine similarity precision, retriever context building, and end-to-end integration.
diff --git a/internal/flowrag/cmd/demo/main.go b/internal/flowrag/cmd/demo/main.go
new file mode 100644
index 0000000000..163b6dccbf
--- /dev/null
+++ b/internal/flowrag/cmd/demo/main.go
@@ -0,0 +1,292 @@
+package main
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/charmbracelet/crush/internal/flowrag"
+)
+
+func main() {
+ ctx := context.Background()
+
+ home, _ := os.UserHomeDir()
+ storePath := filepath.Join(home, ".crush", "flowrag", "demo_workflows.json")
+ os.MkdirAll(filepath.Dir(storePath), 0755)
+
+ embClient := flowrag.NewHashEmbeddingClient(256)
+ store, err := flowrag.NewFileVectorStore(storePath, embClient)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Failed to create store: %v\n", err)
+ os.Exit(1)
+ }
+
+ fmt.Println("╔══════════════════════════════════════════════╗")
+ fmt.Println("║ FlowRAG 交互式 Demo ║")
+ fmt.Println("║ 基于 trigram 哈希的语义向量检索 ║")
+ fmt.Println("╠══════════════════════════════════════════════╣")
+ fmt.Println("║ 命令: ║")
+ fmt.Println("║ add <描述> — 添加一个工作流 ║")
+ fmt.Println("║ search <查询> — 语义搜索相似工作流 ║")
+ fmt.Println("║ list — 列出所有已保存工作流 ║")
+ fmt.Println("║ demo — 加载示例数据 ║")
+ fmt.Println("║ help — 显示帮助 ║")
+ fmt.Println("║ quit — 退出 ║")
+ fmt.Println("╚══════════════════════════════════════════════╝")
+ fmt.Println()
+
+ scanner := bufio.NewScanner(os.Stdin)
+
+ for {
+ fmt.Print("flowrag> ")
+ if !scanner.Scan() {
+ break
+ }
+
+ line := strings.TrimSpace(scanner.Text())
+ if line == "" {
+ continue
+ }
+
+ parts := strings.SplitN(line, " ", 2)
+ cmd := strings.ToLower(parts[0])
+ arg := ""
+ if len(parts) > 1 {
+ arg = parts[1]
+ }
+
+ switch cmd {
+ case "help":
+ printHelp()
+
+ case "demo":
+ loadDemo(ctx, store)
+
+ case "list":
+ listWorkflows(ctx, store)
+
+ case "add":
+ if arg == "" {
+ fmt.Println("用法: add <描述文本>")
+ fmt.Println("例如: add 修复了用户登录认证的密码比对bug")
+ continue
+ }
+ addWorkflow(ctx, store, arg)
+
+ case "search":
+ if arg == "" {
+ fmt.Println("用法: search <查询文本>")
+ fmt.Println("例如: search 怎么修登录bug")
+ continue
+ }
+ searchWorkflows(ctx, store, arg)
+
+ case "quit", "exit", "q":
+ fmt.Println("再见!")
+ return
+
+ default:
+ fmt.Printf("未知命令: %s (输入 help 查看帮助)\n", cmd)
+ }
+ fmt.Println()
+ }
+}
+
+func printHelp() {
+ fmt.Println()
+ fmt.Println("FlowRAG 通过 trigram 哈希将文本转为向量,存入向量库。")
+ fmt.Println("相似语义的文本会产生相似的向量 → 余弦相似度检索。")
+ fmt.Println()
+ fmt.Println("试试这些例子:")
+ fmt.Println(" demo — 加载6条不同领域的示例工作流")
+ fmt.Println(" add 写了个Python脚本解析CSV生成报表")
+ fmt.Println(" search 怎么用python处理csv数据 — 语义搜索")
+ fmt.Println(" search 数据库迁移怎么做 — 看看能不能找到迁移工作流")
+ fmt.Println(" search docker部署postgres — 找到Docker相关工作流")
+}
+
+func loadDemo(ctx context.Context, store *flowrag.VectorStore) {
+ demos := []struct {
+ id string
+ prompt string
+ steps []flowrag.WorkflowStep
+ }{
+ {
+ id: "auth-login-fix",
+ prompt: "修复了用户登录认证模块中密码比对逻辑颠倒的bug",
+ steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `auth.go`},
+ {Role: "tool_result", Tool: "read", Output: "发现密码比较用了!=应该是=="},
+ {Role: "tool_call", Tool: "edit", Input: `auth.go - 修复比较运算符`},
+ {Role: "tool_result", Tool: "edit", Output: "已修复,密码验证恢复正常"},
+ },
+ },
+ {
+ id: "rest-api-register",
+ prompt: "创建了用户注册的REST API接口,包含邮箱验证和路由注册",
+ steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `routes.go`},
+ {Role: "tool_result", Tool: "read", Output: "找到现有路由结构"},
+ {Role: "tool_call", Tool: "write", Input: `handler_register.go - POST /api/users`},
+ {Role: "tool_result", Tool: "write", Output: "创建注册接口成功"},
+ {Role: "tool_call", Tool: "edit", Input: `routes.go - 注册新路由`},
+ {Role: "tool_result", Tool: "edit", Output: "路由注册完成"},
+ },
+ },
+ {
+ id: "python-csv-report",
+ prompt: "用Python写了一个CSV数据解析脚本,能自动生成PDF报表",
+ steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "write", Input: `report_parser.py`},
+ {Role: "tool_result", Tool: "write", Output: "基于pandas的CSV解析器"},
+ {Role: "tool_call", Tool: "write", Input: `requirements.txt`},
+ {Role: "tool_result", Tool: "write", Output: "添加pandas、reportlab依赖"},
+ {Role: "tool_call", Tool: "bash", Input: `python report_parser.py`},
+ {Role: "tool_result", Tool: "bash", Output: "报表生成成功,150条记录"},
+ },
+ },
+ {
+ id: "db-migration-users",
+ prompt: "为users表新增了数据库迁移脚本,添加了email和password_hash字段",
+ steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `migrations/目录`},
+ {Role: "tool_result", Tool: "read", Output: "已有2个迁移文件"},
+ {Role: "tool_call", Tool: "write", Input: `003_add_users_table.sql`},
+ {Role: "tool_result", Tool: "write", Output: "CREATE TABLE users迁移已创建"},
+ {Role: "tool_call", Tool: "bash", Input: `执行数据库迁移`},
+ {Role: "tool_result", Tool: "bash", Output: "迁移执行成功"},
+ },
+ },
+ {
+ id: "oauth-token-fix",
+ prompt: "修复了OAuth中间件中token过期后不自动刷新的问题",
+ steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `middleware/auth.go`},
+ {Role: "tool_result", Tool: "read", Output: "发现过期token检测逻辑"},
+ {Role: "tool_call", Tool: "edit", Input: `middleware/auth.go`},
+ {Role: "tool_result", Tool: "edit", Output: "添加了过期前5分钟主动刷新"},
+ },
+ },
+ {
+ id: "docker-postgres-redis",
+ prompt: "搭建了本地开发的Docker Compose环境,包含PostgreSQL和Redis",
+ steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "write", Input: `docker-compose.yml`},
+ {Role: "tool_result", Tool: "write", Output: "创建compose文件:postgres:16 + redis:7"},
+ {Role: "tool_call", Tool: "write", Input: `.env.example`},
+ {Role: "tool_result", Tool: "write", Output: "环境变量配置完成"},
+ {Role: "tool_call", Tool: "bash", Input: `docker compose up -d`},
+ {Role: "tool_result", Tool: "bash", Output: "服务启动成功:5432, :6379"},
+ },
+ },
+ }
+
+ for _, d := range demos {
+ record := flowrag.WorkflowRecord{
+ ID: d.id,
+ UserPrompt: d.prompt,
+ Steps: d.steps,
+ StepsText: buildStepsText(d.prompt, d.steps),
+ SessionID: "demo",
+ }
+ if err := store.Insert(ctx, record); err != nil {
+ fmt.Printf(" ✗ 插入失败 %s: %v\n", d.id, err)
+ return
+ }
+ fmt.Printf(" ✓ 已加载: %s\n", d.prompt)
+ }
+ fmt.Printf("\n 共加载 %d 条工作流,现在可以用 search 搜索了!\n", len(demos))
+}
+
+func buildStepsText(prompt string, steps []flowrag.WorkflowStep) string {
+ var sb strings.Builder
+ sb.WriteString("任务: ")
+ sb.WriteString(prompt)
+ for _, s := range steps {
+ sb.WriteString("\n")
+ sb.WriteString(s.Role)
+ sb.WriteString(": ")
+ sb.WriteString(s.Tool)
+ if s.Output != "" {
+ sb.WriteString(" -> ")
+ sb.WriteString(s.Output)
+ }
+ }
+ return sb.String()
+}
+
+func addWorkflow(ctx context.Context, store *flowrag.VectorStore, desc string) {
+ record := flowrag.WorkflowRecord{
+ ID: fmt.Sprintf("manual-%d", store.Count()+1),
+ UserPrompt: desc,
+ StepsText: "任务: " + desc,
+ SessionID: "manual",
+ }
+ if err := store.Insert(ctx, record); err != nil {
+ fmt.Printf(" ✗ 添加失败: %v\n", err)
+ return
+ }
+ fmt.Printf(" ✓ 已添加: %s\n", desc)
+ fmt.Printf(" 当前共 %d 条工作流\n", store.Count())
+}
+
+func searchWorkflows(ctx context.Context, store *flowrag.VectorStore, query string) {
+ fmt.Printf("\n 搜索: \"%s\"\n\n", query)
+
+ results, err := store.Search(ctx, query, 5)
+ if err != nil {
+ fmt.Printf(" ✗ 搜索失败: %v\n", err)
+ return
+ }
+
+ if len(results) == 0 {
+ fmt.Println(" (没有找到相似工作流,先 demo 加载示例数据吧)")
+ return
+ }
+
+ for i, r := range results {
+ marker := " "
+ if i == 0 {
+ marker = "→ "
+ }
+ fmt.Printf(" %s[%d] %s\n", marker, i+1, r.UserPrompt)
+ if len(r.Steps) > 0 {
+ stepCount := 0
+ for _, s := range r.Steps {
+ if s.Role == "tool_call" {
+ if stepCount == 0 {
+ fmt.Printf(" 工具链: ")
+ } else {
+ fmt.Print(" → ")
+ }
+ fmt.Print(s.Tool)
+ stepCount++
+ }
+ }
+ if stepCount > 0 {
+ fmt.Println()
+ }
+ }
+ }
+}
+
+func listWorkflows(ctx context.Context, store *flowrag.VectorStore) {
+ count := store.Count()
+ if count == 0 {
+ fmt.Println(" (没有工作流,输入 demo 加载示例数据)")
+ return
+ }
+ fmt.Printf("\n 已保存 %d 条工作流:\n\n", count)
+
+ results, _ := store.Search(ctx, "list all", count)
+ for i, r := range results {
+ fmt.Printf(" [%d] %s\n", i+1, r.UserPrompt)
+ }
+
+ fmt.Printf("\n 试试: search <关键词> 来语义搜索\n")
+ fmt.Println(" 例如: search 登录认证bug")
+}
diff --git a/internal/flowrag/cmd/e2e_test/main.go b/internal/flowrag/cmd/e2e_test/main.go
new file mode 100644
index 0000000000..0b7c0d1a0c
--- /dev/null
+++ b/internal/flowrag/cmd/e2e_test/main.go
@@ -0,0 +1,326 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "os"
+ "path/filepath"
+
+ "github.com/charmbracelet/crush/internal/flowrag"
+)
+
+func main() {
+ fmt.Println("=== FlowRAG E2E Semantic Search Verification ===")
+
+ dir := os.TempDir()
+ storePath := filepath.Join(dir, "flowrag_e2e_workflows.json")
+ defer os.Remove(storePath)
+
+ ctx := context.Background()
+
+ mockEmb := &flowrag.MockEmbeddingClient{Dim: 256}
+ store, err := flowrag.NewFileVectorStore(storePath, mockEmb)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "ERROR: %v\n", err)
+ os.Exit(1)
+ }
+
+ fmt.Println("\nStep 1: Insert test workflows with distinct semantic content...")
+
+ workflows := []flowrag.WorkflowRecord{
+ {
+ ID: "wf-auth-login-fix",
+ UserPrompt: "Fix the login authentication bug in auth.go where passwords are not being validated correctly",
+ StepsText: `User: Fix the login authentication bug in auth.go
+Assistant: Let me read the auth file first.
+Tool Call: read(auth.go)
+Tool Result: Found that password comparison uses != instead of ==
+Assistant: I found the bug - the password comparison operator is inverted. Let me fix it.
+Tool Call: edit(auth.go)
+Tool Result: Changed != to == in password comparison
+Tool Call: write
+Tool Result: File written successfully`,
+ Steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"auth.go"}`},
+ {Role: "tool_result", Tool: "read", Output: "Found inverted password comparison"},
+ {Role: "tool_call", Tool: "edit", Input: `{"path":"auth.go"}`},
+ {Role: "tool_result", Tool: "edit", Output: "Fixed comparison operator"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":"auth.go"}`},
+ {Role: "tool_result", Tool: "write", Output: "File written successfully"},
+ },
+ SessionID: "session-auth",
+ },
+ {
+ ID: "wf-rest-api-register",
+ UserPrompt: "Create a REST API endpoint for user registration with email validation",
+ StepsText: `User: Create a REST API endpoint for user registration
+Assistant: Let me check the existing routes structure.
+Tool Call: read(routes.go)
+Tool Result: Found existing api router group
+Assistant: Now I'll create the handler for POST /api/users/register
+Tool Call: write(handler_register.go)
+Tool Result: Created handler with email validation
+Assistant: Adding the route registration.
+Tool Call: edit(routes.go)
+Tool Result: Added POST /api/users/register route
+Assistant: Creating the User model.
+Tool Call: write(user_model.go)
+Tool Result: Created User struct with validation tags`,
+ Steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"routes.go"}`},
+ {Role: "tool_result", Tool: "read", Output: "Found existing api router group"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":"handler_register.go"}`},
+ {Role: "tool_result", Tool: "write", Output: "Created handler with email validation"},
+ {Role: "tool_call", Tool: "edit", Input: `{"path":"routes.go"}`},
+ {Role: "tool_result", Tool: "edit", Output: "Added POST /api/users/register route"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":"user_model.go"}`},
+ {Role: "tool_result", Tool: "write", Output: "Created User struct with validation tags"},
+ },
+ SessionID: "session-rest",
+ },
+ {
+ ID: "wf-python-csv-parser",
+ UserPrompt: "Write a Python script to parse CSV files and generate PDF reports",
+ StepsText: `User: Write a Python script to parse CSV files and generate PDF reports
+Assistant: Creating the parser script.
+Tool Call: write(report_parser.py)
+Tool Result: Created pandas-based CSV parser with PDF report generation
+Assistant: Adding dependencies.
+Tool Call: write(requirements.txt)
+Tool Result: Added pandas, reportlab dependencies
+Assistant: Let me test it.
+Tool Call: bash(python report_parser.py sample.csv)
+Tool Result: PDF report generated successfully - 150 rows processed`,
+ Steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "write", Input: `{"path":"report_parser.py"}`},
+ {Role: "tool_result", Tool: "write", Output: "Created pandas-based CSV parser"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":"requirements.txt"}`},
+ {Role: "tool_result", Tool: "write", Output: "Added pandas, reportlab dependencies"},
+ {Role: "tool_call", Tool: "bash", Input: `{"command":"python report_parser.py sample.csv"}`},
+ {Role: "tool_result", Tool: "bash", Output: "PDF report generated - 150 rows processed"},
+ },
+ SessionID: "session-python",
+ },
+ {
+ ID: "wf-db-migration",
+ UserPrompt: "Add a new database migration for the users table with email and password_hash columns",
+ StepsText: `User: Add a new database migration for users table
+Assistant: Checking existing migrations.
+Tool Call: read(migrations/)
+Tool Result: Found 2 existing migrations
+Assistant: Creating the new migration.
+Tool Call: write(migrations/003_add_users_table.sql)
+Tool Result: Created migration with CREATE TABLE users (id, email, password_hash, created_at)
+Assistant: Applying migration.
+Tool Call: bash(go run cmd/migrate/main.go up)
+Tool Result: Migration 003_add_users_table applied successfully`,
+ Steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"migrations/"}`},
+ {Role: "tool_result", Tool: "read", Output: "Found 2 existing migrations"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":"migrations/003_add_users_table.sql"}`},
+ {Role: "tool_result", Tool: "write", Output: "Created migration with users table schema"},
+ {Role: "tool_call", Tool: "bash", Input: `{"command":"go run cmd/migrate/main.go up"}`},
+ {Role: "tool_result", Tool: "bash", Output: "Migration applied successfully"},
+ },
+ SessionID: "session-db",
+ },
+ {
+ ID: "wf-oauth-token-refresh",
+ UserPrompt: "Fix the OAuth token refresh logic in the auth middleware that causes 401 errors after token expiry",
+ StepsText: `User: Fix OAuth token refresh logic in middleware
+Assistant: Reading the auth middleware.
+Tool Call: read(middleware/auth.go)
+Tool Result: Found expired token detection logic
+Assistant: I see the issue - token refresh is only attempted AFTER the 401, not before expiry.
+Tool Call: edit(middleware/auth.go)
+Tool Result: Added proactive refresh when token is within 5min of expiry
+Assistant: Writing the changes.
+Tool Call: write
+Tool Result: Middleware updated with proactive token refresh`,
+ Steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"middleware/auth.go"}`},
+ {Role: "tool_result", Tool: "read", Output: "Found expired token detection logic"},
+ {Role: "tool_call", Tool: "edit", Input: `{"path":"middleware/auth.go"}`},
+ {Role: "tool_result", Tool: "edit", Output: "Added proactive refresh before expiry"},
+ {Role: "tool_call", Tool: "write", Input: `{}`},
+ {Role: "tool_result", Tool: "write", Output: "Middleware updated"},
+ },
+ SessionID: "session-oauth",
+ },
+ {
+ ID: "wf-docker-setup",
+ UserPrompt: "Set up a Docker Compose file for local development with PostgreSQL and Redis",
+ StepsText: `User: Set up Docker Compose for local dev with PostgreSQL and Redis
+Assistant: Creating the docker-compose.yml file.
+Tool Call: write(docker-compose.yml)
+Tool Result: Created compose file with postgres:16 and redis:7 services
+Assistant: Adding environment configuration.
+Tool Call: write(.env.example)
+Tool Result: Added POSTGRES_PASSWORD and REDIS_URL env vars
+Assistant: Verifying docker-compose config.
+Tool Call: bash(docker compose config)
+Tool Result: Services: postgres (port 5432), redis (port 6379) - config valid`,
+ Steps: []flowrag.WorkflowStep{
+ {Role: "tool_call", Tool: "write", Input: `{"path":"docker-compose.yml"}`},
+ {Role: "tool_result", Tool: "write", Output: "Created compose with postgres:16 and redis:7"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":".env.example"}`},
+ {Role: "tool_result", Tool: "write", Output: "Added env vars"},
+ {Role: "tool_call", Tool: "bash", Input: `{"command":"docker compose config"}`},
+ {Role: "tool_result", Tool: "bash", Output: "Config valid, services: postgres:5432, redis:6379"},
+ },
+ SessionID: "session-docker",
+ },
+ }
+
+ for i := range workflows {
+ if err := store.Insert(ctx, workflows[i]); err != nil {
+ fmt.Printf(" ✗ Failed to insert %s: %v\n", workflows[i].ID, err)
+ os.Exit(1)
+ }
+ fmt.Printf(" ✓ Inserted: %s\n", workflows[i].ID)
+ }
+
+ fmt.Printf("\n✓ Total: %d workflows inserted\n", len(workflows))
+
+ fmt.Println("\nStep 2: Semantic search verification...")
+
+ queries := []struct {
+ query string
+ expectID string
+ reason string
+ }{
+ {
+ query: "How do I fix a login authentication bug?",
+ expectID: "wf-auth-login-fix",
+ reason: "Authentication + login → auth workflow",
+ },
+ {
+ query: "Create an API endpoint for user signup with email",
+ expectID: "wf-rest-api-register",
+ reason: "API endpoint + user signup → REST API workflow",
+ },
+ {
+ query: "Parse a CSV file with Python and generate a report",
+ expectID: "wf-python-csv-parser",
+ reason: "CSV + Python → Python CSV parser workflow",
+ },
+ {
+ query: "Add a new SQL migration for a database table",
+ expectID: "wf-db-migration",
+ reason: "SQL + migration → database migration workflow",
+ },
+ {
+ query: "Refresh the OAuth token when it expires in middleware",
+ expectID: "wf-oauth-token-refresh",
+ reason: "OAuth + token + middleware → OAuth workflow",
+ },
+ {
+ query: "Set up Docker with Postgres database for development",
+ expectID: "wf-docker-setup",
+ reason: "Docker + Postgres → Docker Compose workflow",
+ },
+ }
+
+ passCount := 0
+ failCount := 0
+
+ for _, q := range queries {
+ fmt.Printf("\nQuery: \"%s\"\n", q.query)
+ fmt.Printf(" Expected: %s (%s)\n", q.expectID, q.reason)
+
+ results, err := store.Search(ctx, q.query, 5)
+ if err != nil {
+ fmt.Printf(" ✗ Search error: %v\n", err)
+ failCount++
+ continue
+ }
+
+ if len(results) == 0 {
+ fmt.Println(" ✗ No results returned")
+ failCount++
+ continue
+ }
+
+ fmt.Printf(" Top %d results:\n", len(results))
+ topID := ""
+ for i, r := range results {
+ marker := " "
+ if i == 0 {
+ marker = "→ "
+ topID = r.ID
+ }
+ fmt.Printf(" %s[%d] %s — %q\n", marker, i+1, r.ID, truncateStr(r.UserPrompt, 60))
+ }
+
+ if topID == q.expectID {
+ fmt.Printf(" ✓ CORRECT: Top match is %s\n", q.expectID)
+ passCount++
+ } else {
+ inTop := false
+ rank := 0
+ for j, r := range results {
+ if r.ID == q.expectID {
+ inTop = true
+ rank = j + 1
+ break
+ }
+ }
+ if inTop {
+ fmt.Printf(" ~ PARTIAL: %s found at rank #%d\n", q.expectID, rank)
+ passCount++
+ } else {
+ fmt.Printf(" ✗ FAILED: %s not found in top results\n", q.expectID)
+ failCount++
+ }
+ }
+ }
+
+ fmt.Println("\n============================================")
+ fmt.Printf("RESULTS: %d passed, %d failed, %d total\n",
+ passCount, failCount, passCount+failCount)
+ fmt.Println("============================================")
+
+ fmt.Println("\nStep 3: Cross-domain semantic similarity check...")
+
+ q1, _ := store.Search(ctx, "fix a bug in the code", 3)
+ q2, _ := store.Search(ctx, "write a deployment config", 3)
+
+ fmt.Printf("Query 'fix a bug in the code' top result: %s\n", getID(q1, 0))
+ fmt.Printf("Query 'write a deployment config' top result: %s\n", getID(q2, 0))
+
+ if getID(q1, 0) != getID(q2, 0) {
+ fmt.Println("✓ Different queries return different top results — semantic separation works!")
+ } else {
+ fmt.Println("~ Same top result for different queries (expected with mock embeddings)")
+ }
+
+ fmt.Println("\n✓ E2E verification complete!")
+ if failCount > 0 {
+ fmt.Println("\nNOTE: Mock embeddings produce limited semantic differentiation.")
+ fmt.Println("With a real embedding API (OpenAI text-embedding-3-small), results will be much more accurate.")
+ }
+}
+
+func truncateStr(s string, n int) string {
+ if len(s) <= n {
+ return s
+ }
+ return s[:n] + "..."
+}
+
+func indexOf(records []flowrag.WorkflowRecord, id string) int {
+ for i, r := range records {
+ if r.ID == id {
+ return i
+ }
+ }
+ return math.MaxInt
+}
+
+func getID(records []flowrag.WorkflowRecord, i int) string {
+ if i < len(records) {
+ return records[i].ID
+ }
+ return "N/A"
+}
diff --git a/internal/flowrag/detector.go b/internal/flowrag/detector.go
new file mode 100644
index 0000000000..cf8d7acbe8
--- /dev/null
+++ b/internal/flowrag/detector.go
@@ -0,0 +1,55 @@
+package flowrag
+
+import (
+ "regexp"
+ "strings"
+)
+
+var completionPatterns = []*regexp.Regexp{
+ regexp.MustCompile(`(?i)^\s*(ok|okay|好的|好|行|可以|没问题|就这样|就这样了|搞定|完成|好了|行了|没问题了|ok了|fine|done|great|perfect|cool|sounds good|alright)[\s,。,.!!吧啦喽咯嘛呢啊呀哎嗨呵嘿哟噢]*$`),
+ regexp.MustCompile(`(?i)^\s*(ok|okay|好的|好|行|可以|没问题|就这样|就这样了|搞定|完成|好了|行了|没问题了|ok了)\s*[,,]\s*(就|就这样|这样|没问题|ok)[\s,。,.!!吧啦喽咯嘛呢啊呀哎嗨呵嘿哟噢]*$`),
+}
+
+var taskCompleteKeywords = []string{
+ "task complete", "task completed", "task done",
+ "workflow complete", "workflow completed",
+ "save workflow", "save this workflow", "save the workflow",
+ "remember this", "remember for next time",
+ "save for future", "save for later",
+ "store workflow", "store this flow",
+}
+
+type CompletionDetector struct {
+ patterns []*regexp.Regexp
+}
+
+func NewCompletionDetector() *CompletionDetector {
+ return &CompletionDetector{patterns: completionPatterns}
+}
+
+func (d *CompletionDetector) IsCompletionPhrase(text string) bool {
+ trimmed := strings.TrimSpace(text)
+ if trimmed == "" {
+ return false
+ }
+ for _, pattern := range d.patterns {
+ if pattern.MatchString(trimmed) {
+ return true
+ }
+ }
+ return false
+}
+
+func (d *CompletionDetector) IsTaskCompleteMarker(text string) bool {
+ lower := strings.ToLower(strings.TrimSpace(text))
+ for _, kw := range taskCompleteKeywords {
+ if strings.Contains(lower, kw) {
+ return true
+ }
+ }
+ return false
+}
+
+func (d *CompletionDetector) ShouldTriggerFlowRAG(text string) bool {
+ return d.IsCompletionPhrase(text) || d.IsTaskCompleteMarker(text)
+}
diff --git a/internal/flowrag/retriever.go b/internal/flowrag/retriever.go
new file mode 100644
index 0000000000..08b6351efd
--- /dev/null
+++ b/internal/flowrag/retriever.go
@@ -0,0 +1,60 @@
+package flowrag
+
+import (
+ "context"
+ "fmt"
+ "strings"
+)
+
+type Retriever struct {
+ store *VectorStore
+}
+
+func NewRetriever(store *VectorStore) *Retriever {
+ return &Retriever{store: store}
+}
+
+type SearchResult struct {
+ Workflow WorkflowRecord `json:"workflow"`
+ Similarity float64 `json:"-"`
+}
+
+func (r *Retriever) SearchSimilar(ctx context.Context, userPrompt string, topK int) ([]WorkflowRecord, error) {
+ return r.store.Search(ctx, userPrompt, topK)
+}
+
+func (r *Retriever) BuildContextPrompt(records []WorkflowRecord) string {
+ if len(records) == 0 {
+ return ""
+ }
+
+ var sb strings.Builder
+ sb.WriteString("")
+ sb.WriteString("\nBelow are similar tasks that were successfully completed in the past. You can reference their approaches.\n\n")
+
+ for i, record := range records {
+ sb.WriteString(fmt.Sprintf("--- Similar Workflow %d ---\n", i+1))
+ sb.WriteString(fmt.Sprintf("Original Request: %s\n", record.UserPrompt))
+ sb.WriteString("Steps:\n")
+ for _, step := range record.Steps {
+ switch step.Role {
+ case "tool_call":
+ sb.WriteString(fmt.Sprintf(" [Tool: %s] Input: %s\n", step.Tool, step.Input))
+ case "tool_result":
+ sb.WriteString(fmt.Sprintf(" [Result: %s] %s\n", step.Tool, truncate(step.Output, 500)))
+ default:
+ }
+ }
+ sb.WriteString("\n")
+ }
+
+ sb.WriteString("")
+ return sb.String()
+}
+
+func truncate(s string, maxLen int) string {
+ if len(s) <= maxLen {
+ return s
+ }
+ return s[:maxLen] + "..."
+}
diff --git a/internal/flowrag/segmenter.go b/internal/flowrag/segmenter.go
new file mode 100644
index 0000000000..b1a202fc38
--- /dev/null
+++ b/internal/flowrag/segmenter.go
@@ -0,0 +1,129 @@
+package flowrag
+
+import (
+ "strings"
+
+ "github.com/charmbracelet/crush/internal/message"
+)
+
+type WorkflowStep struct {
+ Role string `json:"role"`
+ Content string `json:"content,omitempty"`
+ Tool string `json:"tool,omitempty"`
+ Input string `json:"input,omitempty"`
+ Output string `json:"output,omitempty"`
+ IsError bool `json:"is_error,omitempty"`
+}
+
+type Workflow struct {
+ UserPrompt string `json:"user_prompt"`
+ Steps []WorkflowStep `json:"steps"`
+ SessionID string `json:"session_id"`
+}
+
+type Segmenter struct{}
+
+func NewSegmenter() *Segmenter {
+ return &Segmenter{}
+}
+
+func (s *Segmenter) Segment(userPrompt string, messages []message.Message) *Workflow {
+ steps := make([]WorkflowStep, 0)
+ errorToolCallIDs := make(map[string]bool)
+
+ for _, msg := range messages {
+ if msg.Role == message.Tool {
+ for _, tr := range msg.ToolResults() {
+ if tr.IsError {
+ errorToolCallIDs[tr.ToolCallID] = true
+ }
+ }
+ }
+ }
+
+ for _, msg := range messages {
+ switch msg.Role {
+ case message.User:
+ if len(steps) == 0 {
+ continue
+ }
+ steps = append(steps, WorkflowStep{
+ Role: "user",
+ Content: msg.Content().Text,
+ })
+ case message.Assistant:
+ textContent := msg.Content().Text
+ if textContent != "" {
+ steps = append(steps, WorkflowStep{
+ Role: "assistant",
+ Content: textContent,
+ })
+ }
+ for _, tc := range msg.ToolCalls() {
+ if errorToolCallIDs[tc.ID] {
+ continue
+ }
+ steps = append(steps, WorkflowStep{
+ Role: "tool_call",
+ Tool: tc.Name,
+ Input: tc.Input,
+ })
+ }
+ case message.Tool:
+ for _, tr := range msg.ToolResults() {
+ if tr.IsError {
+ continue
+ }
+ if errorToolCallIDs[tr.ToolCallID] {
+ continue
+ }
+ steps = append(steps, WorkflowStep{
+ Role: "tool_result",
+ Tool: tr.Name,
+ Output: tr.Content,
+ IsError: false,
+ })
+ }
+ }
+ }
+
+ return &Workflow{
+ UserPrompt: userPrompt,
+ Steps: steps,
+ }
+}
+
+func (w *Workflow) ToText() string {
+ var sb strings.Builder
+ sb.WriteString("User: ")
+ sb.WriteString(w.UserPrompt)
+ sb.WriteString("\n")
+ for _, step := range w.Steps {
+ switch step.Role {
+ case "user":
+ sb.WriteString("User: ")
+ sb.WriteString(step.Content)
+ sb.WriteString("\n")
+ case "assistant":
+ sb.WriteString("Assistant: ")
+ sb.WriteString(step.Content)
+ sb.WriteString("\n")
+ case "tool_call":
+ sb.WriteString("Tool Call: ")
+ sb.WriteString(step.Tool)
+ if step.Input != "" {
+ sb.WriteString("(")
+ sb.WriteString(step.Input)
+ sb.WriteString(")")
+ }
+ sb.WriteString("\n")
+ case "tool_result":
+ sb.WriteString("Tool Result: ")
+ sb.WriteString(step.Tool)
+ sb.WriteString(" -> ")
+ sb.WriteString(step.Output)
+ sb.WriteString("\n")
+ }
+ }
+ return sb.String()
+}
diff --git a/internal/flowrag/store.go b/internal/flowrag/store.go
new file mode 100644
index 0000000000..7680e7309b
--- /dev/null
+++ b/internal/flowrag/store.go
@@ -0,0 +1,545 @@
+package flowrag
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "log/slog"
+ "math"
+ "net/http"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+)
+
+type Vector []float64
+
+type WorkflowRecord struct {
+ ID string `json:"id"`
+ UserPrompt string `json:"user_prompt"`
+ StepsText string `json:"steps_text"`
+ Embedding Vector `json:"embedding,omitempty"`
+ Steps []WorkflowStep `json:"steps"`
+ SessionID string `json:"session_id"`
+ CreatedAt int64 `json:"created_at"`
+}
+
+type EmbeddingClient interface {
+ GetEmbedding(ctx context.Context, text string) (Vector, error)
+}
+
+type HTTPSEmbeddingClient struct {
+ BaseURL string
+ APIKey string
+ Model string
+ client *http.Client
+}
+
+func NewOpenAIEmbeddingClient(baseURL, apiKey, model string) *HTTPSEmbeddingClient {
+ return &HTTPSEmbeddingClient{
+ BaseURL: baseURL,
+ APIKey: apiKey,
+ Model: model,
+ client: &http.Client{Timeout: 30 * time.Second},
+ }
+}
+
+type embeddingRequest struct {
+ Model string `json:"model"`
+ Input string `json:"input"`
+}
+
+type embeddingResponse struct {
+ Data []struct {
+ Embedding []float64 `json:"embedding"`
+ } `json:"data"`
+}
+
+func (c *HTTPSEmbeddingClient) GetEmbedding(ctx context.Context, text string) (Vector, error) {
+ reqBody := embeddingRequest{
+ Model: c.Model,
+ Input: text,
+ }
+ bodyBytes, err := json.Marshal(reqBody)
+ if err != nil {
+ return nil, fmt.Errorf("marshal embedding request: %w", err)
+ }
+
+ url := c.BaseURL + "/embeddings"
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(bodyBytes))
+ if err != nil {
+ return nil, fmt.Errorf("create embedding request: %w", err)
+ }
+ req.Header.Set("Content-Type", "application/json")
+ if c.APIKey != "" {
+ req.Header.Set("Authorization", "Bearer "+c.APIKey)
+ }
+
+ resp, err := c.client.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("embedding request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("embedding request returned status %d", resp.StatusCode)
+ }
+
+ var embResp embeddingResponse
+ if err := json.NewDecoder(resp.Body).Decode(&embResp); err != nil {
+ return nil, fmt.Errorf("decode embedding response: %w", err)
+ }
+
+ if len(embResp.Data) == 0 {
+ return nil, fmt.Errorf("no embedding data in response")
+ }
+
+ return Vector(embResp.Data[0].Embedding), nil
+}
+
+type MockEmbeddingClient struct {
+ Dim int
+}
+
+func (m *MockEmbeddingClient) GetEmbedding(_ context.Context, text string) (Vector, error) {
+ v := make(Vector, m.Dim)
+ for i := range v {
+ v[i] = float64(len(text)+i) / float64(m.Dim+len(text))
+ }
+ return v, nil
+}
+
+type HashEmbeddingClient struct {
+ Dim int
+}
+
+func NewHashEmbeddingClient(dim int) *HashEmbeddingClient {
+ if dim <= 0 {
+ dim = 256
+ }
+ return &HashEmbeddingClient{Dim: dim}
+}
+
+func (h *HashEmbeddingClient) GetEmbedding(_ context.Context, text string) (Vector, error) {
+ v := make(Vector, h.Dim)
+ if len(text) == 0 {
+ return v, nil
+ }
+
+ const ngramSize = 3
+ runes := []rune(text)
+ for i := 0; i <= len(runes)-ngramSize; i++ {
+ ngram := string(runes[i : i+ngramSize])
+ hash := hashString(ngram)
+ idx := int(hash % uint32(h.Dim))
+ v[idx] += 1.0
+ }
+
+ norm := 0.0
+ for _, val := range v {
+ norm += val * val
+ }
+ if norm > 0 {
+ norm = norm / float64(len(runes))
+ for i := range v {
+ v[i] = v[i] / norm
+ }
+ }
+
+ return v, nil
+}
+
+func hashString(s string) uint32 {
+ var h uint32
+ for _, c := range s {
+ h = h*31 + uint32(c)
+ }
+ return h
+}
+
+type VectorStoreBackend interface {
+ Insert(ctx context.Context, record WorkflowRecord) error
+ Search(ctx context.Context, queryEmbedding Vector, topK int) ([]WorkflowRecord, error)
+ Count() int
+}
+
+type JSONFileStore struct {
+ mu sync.RWMutex
+ records []WorkflowRecord
+ storePath string
+}
+
+func NewJSONFileStore(storePath string) (*JSONFileStore, error) {
+ dir := filepath.Dir(storePath)
+ if err := os.MkdirAll(dir, 0755); err != nil {
+ return nil, fmt.Errorf("create store directory: %w", err)
+ }
+
+ store := &JSONFileStore{
+ records: make([]WorkflowRecord, 0),
+ storePath: storePath,
+ }
+
+ if err := store.load(); err != nil {
+ slog.Warn("Failed to load vector store, starting fresh", "error", err)
+ }
+
+ return store, nil
+}
+
+func (s *JSONFileStore) load() error {
+ data, err := os.ReadFile(s.storePath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return nil
+ }
+ return err
+ }
+ if len(data) == 0 {
+ return nil
+ }
+ return json.Unmarshal(data, &s.records)
+}
+
+func (s *JSONFileStore) save() error {
+ data, err := json.MarshalIndent(s.records, "", " ")
+ if err != nil {
+ return err
+ }
+ return os.WriteFile(s.storePath, data, 0644)
+}
+
+func (s *JSONFileStore) Insert(_ context.Context, record WorkflowRecord) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ record.CreatedAt = time.Now().Unix()
+ s.records = append(s.records, record)
+ return s.save()
+}
+
+func (s *JSONFileStore) Search(_ context.Context, queryEmbedding Vector, topK int) ([]WorkflowRecord, error) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ type scored struct {
+ record WorkflowRecord
+ score float64
+ }
+
+ var results []scored
+ for _, record := range s.records {
+ if record.Embedding == nil {
+ continue
+ }
+ score := cosineSimilarity(queryEmbedding, record.Embedding)
+ results = append(results, scored{record: record, score: score})
+ }
+
+ for i := 0; i < len(results); i++ {
+ for j := i + 1; j < len(results); j++ {
+ if results[j].score > results[i].score {
+ results[i], results[j] = results[j], results[i]
+ }
+ }
+ }
+
+ if topK > len(results) {
+ topK = len(results)
+ }
+
+ top := make([]WorkflowRecord, topK)
+ for i := 0; i < topK; i++ {
+ top[i] = results[i].record
+ }
+
+ return top, nil
+}
+
+func (s *JSONFileStore) Count() int {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return len(s.records)
+}
+
+type ChromaDBStore struct {
+ baseURL string
+ client *http.Client
+ collection string
+ mu sync.Mutex
+ created bool
+}
+
+func NewChromaDBStore(baseURL string) *ChromaDBStore {
+ return &ChromaDBStore{
+ baseURL: baseURL,
+ client: &http.Client{Timeout: 30 * time.Second},
+ collection: "crush_workflows",
+ }
+}
+
+func (s *ChromaDBStore) ensureCollection(ctx context.Context) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.created {
+ return nil
+ }
+
+ createURL := fmt.Sprintf("%s/api/v2/tenants/default_tenant/databases/default_database/collections", s.baseURL)
+ body := map[string]interface{}{
+ "name": s.collection,
+ "metadata": map[string]string{"description": "Crush FlowRAG workflow records"},
+ }
+
+ bodyBytes, _ := json.Marshal(body)
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, createURL, bytes.NewReader(bodyBytes))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := s.client.Do(req)
+ if err != nil {
+ slog.Warn("ChromaDB unreachable, collection not created", "error", err)
+ return err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusCreated {
+ s.created = true
+ slog.Info("ChromaDB collection ready", "collection", s.collection)
+ return nil
+ }
+
+ var errResp map[string]interface{}
+ json.NewDecoder(resp.Body).Decode(&errResp)
+ slog.Warn("ChromaDB create collection response", "status", resp.StatusCode, "body", errResp)
+ return fmt.Errorf("chromadb create collection returned status %d", resp.StatusCode)
+}
+
+func (s *ChromaDBStore) Insert(ctx context.Context, record WorkflowRecord) error {
+ if err := s.ensureCollection(ctx); err != nil {
+ return err
+ }
+
+ if record.Embedding == nil {
+ return fmt.Errorf("record has no embedding")
+ }
+
+ metalJSON, _ := json.Marshal(map[string]string{
+ "user_prompt": record.UserPrompt,
+ "session_id": record.SessionID,
+ "steps_json": mustMarshalSteps(record.Steps),
+ })
+
+ addURL := fmt.Sprintf("%s/api/v2/tenants/default_tenant/databases/default_database/collections/%s/add",
+ s.baseURL, s.collection)
+
+ body := map[string]interface{}{
+ "ids": []string{record.ID},
+ "embeddings": [][]float64{record.Embedding},
+ "metadatas": []json.RawMessage{metalJSON},
+ "documents": []string{record.StepsText},
+ }
+
+ bodyBytes, _ := json.Marshal(body)
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, addURL, bytes.NewReader(bodyBytes))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := s.client.Do(req)
+ if err != nil {
+ return fmt.Errorf("chromadb add request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ var errResp map[string]interface{}
+ json.NewDecoder(resp.Body).Decode(&errResp)
+ return fmt.Errorf("chromadb add returned status %d: %v", resp.StatusCode, errResp)
+ }
+
+ slog.Info("Inserted workflow into ChromaDB", "workflow_id", record.ID)
+ return nil
+}
+
+type chromaQueryResult struct {
+ Ids [][]string `json:"ids"`
+ Documents [][]string `json:"documents"`
+ Metadatas [][]json.RawMessage `json:"metadatas"`
+ Distances [][]float64 `json:"distances"`
+}
+
+type chromaMetadata struct {
+ UserPrompt string `json:"user_prompt"`
+ SessionID string `json:"session_id"`
+ StepsJSON string `json:"steps_json"`
+}
+
+func (s *ChromaDBStore) Search(ctx context.Context, queryEmbedding Vector, topK int) ([]WorkflowRecord, error) {
+ if err := s.ensureCollection(ctx); err != nil {
+ return nil, err
+ }
+
+ queryURL := fmt.Sprintf("%s/api/v2/tenants/default_tenant/databases/default_database/collections/%s/query",
+ s.baseURL, s.collection)
+
+ body := map[string]interface{}{
+ "query_embeddings": [][]float64{queryEmbedding},
+ "n_results": topK,
+ "include": []string{"metadatas", "documents", "distances"},
+ }
+
+ bodyBytes, _ := json.Marshal(body)
+ req, err := http.NewRequestWithContext(ctx, http.MethodPost, queryURL, bytes.NewReader(bodyBytes))
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := s.client.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("chromadb query request failed: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("chromadb query returned status %d", resp.StatusCode)
+ }
+
+ var result chromaQueryResult
+ if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
+ return nil, fmt.Errorf("decode chromadb query result: %w", err)
+ }
+
+ var records []WorkflowRecord
+ for i := range result.Ids {
+ for j, id := range result.Ids[i] {
+ record := WorkflowRecord{ID: id}
+
+ if j < len(result.Documents[i]) {
+ record.StepsText = result.Documents[i][j]
+ }
+
+ if j < len(result.Metadatas[i]) {
+ var meta chromaMetadata
+ if err := json.Unmarshal(result.Metadatas[i][j], &meta); err == nil {
+ record.UserPrompt = meta.UserPrompt
+ record.SessionID = meta.SessionID
+ if meta.StepsJSON != "" {
+ var steps []WorkflowStep
+ if json.Unmarshal([]byte(meta.StepsJSON), &steps) == nil {
+ record.Steps = steps
+ }
+ }
+ }
+ }
+
+ records = append(records, record)
+ }
+ }
+
+ return records, nil
+}
+
+func (s *ChromaDBStore) Count() int {
+ ctx := context.Background()
+ _ = s.ensureCollection(ctx)
+
+ countURL := fmt.Sprintf("%s/api/v2/tenants/default_tenant/databases/default_database/collections/%s",
+ s.baseURL, s.collection)
+
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, countURL, nil)
+ if err != nil {
+ return -1
+ }
+
+ resp, err := s.client.Do(req)
+ if err != nil {
+ return -1
+ }
+ defer resp.Body.Close()
+
+ var info struct {
+ Count int `json:"count"`
+ }
+ if json.NewDecoder(resp.Body).Decode(&info) == nil {
+ return info.Count
+ }
+ return -1
+}
+
+type VectorStore struct {
+ backend VectorStoreBackend
+ embClient EmbeddingClient
+}
+
+func NewVectorStore(backend VectorStoreBackend, embClient EmbeddingClient) *VectorStore {
+ return &VectorStore{
+ backend: backend,
+ embClient: embClient,
+ }
+}
+
+func NewFileVectorStore(storePath string, embClient EmbeddingClient) (*VectorStore, error) {
+ backend, err := NewJSONFileStore(storePath)
+ if err != nil {
+ return nil, err
+ }
+ return NewVectorStore(backend, embClient), nil
+}
+
+func (s *VectorStore) Insert(ctx context.Context, record WorkflowRecord) error {
+ if record.Embedding == nil {
+ embedding, err := s.embClient.GetEmbedding(ctx, record.StepsText)
+ if err != nil {
+ return fmt.Errorf("generate embedding: %w", err)
+ }
+ record.Embedding = embedding
+ }
+
+ record.CreatedAt = time.Now().Unix()
+ return s.backend.Insert(ctx, record)
+}
+
+func (s *VectorStore) Search(ctx context.Context, query string, topK int) ([]WorkflowRecord, error) {
+ queryEmbedding, err := s.embClient.GetEmbedding(ctx, query)
+ if err != nil {
+ return nil, fmt.Errorf("generate query embedding: %w", err)
+ }
+
+ return s.backend.Search(ctx, queryEmbedding, topK)
+}
+
+func (s *VectorStore) Count() int {
+ return s.backend.Count()
+}
+
+func cosineSimilarity(a, b Vector) float64 {
+ if len(a) != len(b) {
+ return 0
+ }
+ var dotProduct, normA, normB float64
+ for i := range a {
+ dotProduct += a[i] * b[i]
+ normA += a[i] * a[i]
+ normB += b[i] * b[i]
+ }
+ if normA == 0 || normB == 0 {
+ return 0
+ }
+ return dotProduct / (math.Sqrt(normA) * math.Sqrt(normB))
+}
+
+func mustMarshalSteps(steps []WorkflowStep) string {
+ data, err := json.Marshal(steps)
+ if err != nil {
+ return "[]"
+ }
+ return string(data)
+}
diff --git a/internal/flowrag/workflow.go b/internal/flowrag/workflow.go
new file mode 100644
index 0000000000..16be9089cc
--- /dev/null
+++ b/internal/flowrag/workflow.go
@@ -0,0 +1,116 @@
+package flowrag
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "time"
+
+ "github.com/charmbracelet/crush/internal/message"
+ "github.com/google/uuid"
+)
+
+type WorkflowManager struct {
+ detector *CompletionDetector
+ segmenter *Segmenter
+ store *VectorStore
+ retriever *Retriever
+}
+
+type Config struct {
+ StorePath string
+ ChromaDBURL string
+ EmbeddingBaseURL string
+ EmbeddingAPIKey string
+ EmbeddingModel string
+}
+
+func NewWorkflowManager(cfg Config) (*WorkflowManager, error) {
+ var embClient EmbeddingClient
+ if cfg.EmbeddingBaseURL != "" {
+ model := cfg.EmbeddingModel
+ if model == "" {
+ model = "text-embedding-3-small"
+ }
+ embClient = NewOpenAIEmbeddingClient(cfg.EmbeddingBaseURL, cfg.EmbeddingAPIKey, model)
+ } else {
+ embClient = &MockEmbeddingClient{Dim: 1536}
+ }
+
+ var store *VectorStore
+ if cfg.ChromaDBURL != "" {
+ chromaBackend := NewChromaDBStore(cfg.ChromaDBURL)
+ store = NewVectorStore(chromaBackend, embClient)
+ slog.Info("FlowRAG using ChromaDB backend", "url", cfg.ChromaDBURL)
+ } else {
+ var err error
+ store, err = NewFileVectorStore(cfg.StorePath, embClient)
+ if err != nil {
+ return nil, fmt.Errorf("create file vector store: %w", err)
+ }
+ slog.Info("FlowRAG using JSON file backend", "path", cfg.StorePath)
+ }
+
+ return &WorkflowManager{
+ detector: NewCompletionDetector(),
+ segmenter: NewSegmenter(),
+ store: store,
+ retriever: NewRetriever(store),
+ }, nil
+}
+
+func (m *WorkflowManager) Detector() *CompletionDetector {
+ return m.detector
+}
+
+func (m *WorkflowManager) Retriever() *Retriever {
+ return m.retriever
+}
+
+type SaveWorkflowInput struct {
+ UserPrompt string
+ Messages []message.Message
+ SessionID string
+}
+
+func (m *WorkflowManager) SaveSuccessfulWorkflow(ctx context.Context, input SaveWorkflowInput) error {
+ workflow := m.segmenter.Segment(input.UserPrompt, input.Messages)
+ if workflow == nil || len(workflow.Steps) == 0 {
+ return fmt.Errorf("no successful steps to save")
+ }
+
+ workflow.SessionID = input.SessionID
+
+ stepsText := workflow.ToText()
+
+ record := WorkflowRecord{
+ ID: uuid.New().String(),
+ UserPrompt: workflow.UserPrompt,
+ StepsText: stepsText,
+ Steps: workflow.Steps,
+ SessionID: workflow.SessionID,
+ CreatedAt: time.Now().Unix(),
+ }
+
+ if err := m.store.Insert(ctx, record); err != nil {
+ return fmt.Errorf("insert workflow record: %w", err)
+ }
+
+ slog.Info("Saved successful workflow to RAG store",
+ "workflow_id", record.ID,
+ "session_id", input.SessionID,
+ "steps_count", len(workflow.Steps),
+ "total_records", m.store.Count(),
+ )
+
+ return nil
+}
+
+func (m *WorkflowManager) SearchAndBuildContext(ctx context.Context, userPrompt string, topK int) string {
+ records, err := m.retriever.SearchSimilar(ctx, userPrompt, topK)
+ if err != nil {
+ slog.Warn("Failed to search similar workflows", "error", err)
+ return ""
+ }
+ return m.retriever.BuildContextPrompt(records)
+}
diff --git a/internal/flowrag/workflow_test.go b/internal/flowrag/workflow_test.go
new file mode 100644
index 0000000000..6ec47828ec
--- /dev/null
+++ b/internal/flowrag/workflow_test.go
@@ -0,0 +1,360 @@
+package flowrag
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/charmbracelet/crush/internal/message"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestCompletionDetector_Match(t *testing.T) {
+ detector := NewCompletionDetector()
+
+ tests := []struct {
+ input string
+ expected bool
+ }{
+ {"ok", true},
+ {"OK", true},
+ {"好的", true},
+ {"没问题", true},
+ {"就这样了", true},
+ {"搞定", true},
+ {"ok,就这样", true},
+ {"好的,就这样吧", true},
+ {"done", true},
+ {"great", true},
+ {"帮我写一个函数", false},
+ {"这个代码有问题", false},
+ {"hello world", false},
+ {"", false},
+ {" ", false},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.input, func(t *testing.T) {
+ result := detector.IsCompletionPhrase(tt.input)
+ assert.Equal(t, tt.expected, result, "input: %q", tt.input)
+ })
+ }
+}
+
+func TestTaskCompleteMarker(t *testing.T) {
+ detector := NewCompletionDetector()
+
+ tests := []struct {
+ input string
+ expected bool
+ }{
+ {"task complete", true},
+ {"save this workflow for future", true},
+ {"remember this workflow", true},
+ {"store this flow in rag", true},
+ {"save for later use", true},
+ {"random text", false},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.input, func(t *testing.T) {
+ result := detector.IsTaskCompleteMarker(tt.input)
+ assert.Equal(t, tt.expected, result, "input: %q", tt.input)
+ })
+ }
+}
+
+func TestShouldTriggerFlowRAG(t *testing.T) {
+ detector := NewCompletionDetector()
+ assert.True(t, detector.ShouldTriggerFlowRAG("好的,OK"))
+ assert.True(t, detector.ShouldTriggerFlowRAG("task complete"))
+ assert.False(t, detector.ShouldTriggerFlowRAG("write a function"))
+}
+
+func TestSegmenter_SuccessfulFlow(t *testing.T) {
+ messages := []message.Message{
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.TextContent{Text: "Let me read the file first."},
+ },
+ },
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.ToolCall{ID: "tc1", Name: "read", Input: `{"path":"main.go"}`, Finished: true},
+ },
+ },
+ {
+ Role: message.Tool,
+ Parts: []message.ContentPart{
+ message.ToolResult{ToolCallID: "tc1", Name: "read", Content: "package main\nfunc main() {}", IsError: false},
+ },
+ },
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.TextContent{Text: "Now I'll write the fix."},
+ },
+ },
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.ToolCall{ID: "tc2", Name: "write", Input: `{"path":"main.go","content":"fixed"}`, Finished: true},
+ },
+ },
+ {
+ Role: message.Tool,
+ Parts: []message.ContentPart{
+ message.ToolResult{ToolCallID: "tc2", Name: "write", Content: "File written successfully", IsError: false},
+ },
+ },
+ }
+
+ segmenter := NewSegmenter()
+ workflow := segmenter.Segment("Fix the main function", messages)
+
+ require.NotNil(t, workflow)
+ assert.Equal(t, "Fix the main function", workflow.UserPrompt)
+ assert.Greater(t, len(workflow.Steps), 0)
+
+ hasWriteSteps := false
+ hasReadSteps := false
+ for _, step := range workflow.Steps {
+ if step.Tool == "write" {
+ hasWriteSteps = true
+ }
+ if step.Tool == "read" {
+ hasReadSteps = true
+ }
+ }
+ assert.True(t, hasReadSteps)
+ assert.True(t, hasWriteSteps)
+}
+
+func TestSegmenter_ExcludeErrorSteps(t *testing.T) {
+ messages := []message.Message{
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.ToolCall{ID: "tc1", Name: "read", Input: `{"path":"nonexistent"}`, Finished: true},
+ },
+ },
+ {
+ Role: message.Tool,
+ Parts: []message.ContentPart{
+ message.ToolResult{ToolCallID: "tc1", Name: "read", Content: "file not found", IsError: true},
+ },
+ },
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.ToolCall{ID: "tc2", Name: "read", Input: `{"path":"real.go"}`, Finished: true},
+ },
+ },
+ {
+ Role: message.Tool,
+ Parts: []message.ContentPart{
+ message.ToolResult{ToolCallID: "tc2", Name: "read", Content: "package main", IsError: false},
+ },
+ },
+ }
+
+ segmenter := NewSegmenter()
+ workflow := segmenter.Segment("Read a file", messages)
+
+ require.NotNil(t, workflow)
+ for _, step := range workflow.Steps {
+ if step.Role == "tool_call" && step.Tool == "read" && step.Input == `{"path":"nonexistent"}` {
+ t.Error("error tool call should not be in successful workflow")
+ }
+ }
+
+ hasRealRead := false
+ for _, step := range workflow.Steps {
+ if step.Role == "tool_result" && step.Tool == "read" && step.Output == "package main" {
+ hasRealRead = true
+ }
+ }
+ assert.True(t, hasRealRead, "successful read should be in workflow")
+}
+
+func TestWorkflow_ToText(t *testing.T) {
+ wf := &Workflow{
+ UserPrompt: "Fix the bug",
+ Steps: []WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"main.go"}`},
+ {Role: "tool_result", Tool: "read", Output: "package main\nfunc main() {}"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":"main.go","content":"fixed"}`},
+ {Role: "tool_result", Tool: "write", Output: "Written successfully"},
+ },
+ }
+ text := wf.ToText()
+ assert.Contains(t, text, "Fix the bug")
+ assert.Contains(t, text, "read")
+ assert.Contains(t, text, "write")
+}
+
+func TestJSONFileStore_InsertAndSearch(t *testing.T) {
+ dir := t.TempDir()
+ storePath := filepath.Join(dir, "workflows.json")
+
+ mockClient := &MockEmbeddingClient{Dim: 128}
+ store, err := NewFileVectorStore(storePath, mockClient)
+ require.NoError(t, err)
+ assert.Equal(t, 0, store.Count())
+
+ ctx := context.Background()
+
+ record := WorkflowRecord{
+ ID: "test-1",
+ UserPrompt: "Fix the authentication bug",
+ StepsText: "User: Fix the auth bug\nTool Call: read('auth.go')\nTool Result: read -> code here\nTool Call: write('auth.go')\nTool Result: write -> Done",
+ Steps: []WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"auth.go"}`},
+ {Role: "tool_result", Tool: "read", Output: "code here"},
+ {Role: "tool_call", Tool: "write", Input: `{"path":"auth.go"}`},
+ {Role: "tool_result", Tool: "write", Output: "Done"},
+ },
+ SessionID: "session-1",
+ }
+
+ err = store.Insert(ctx, record)
+ require.NoError(t, err)
+ assert.Equal(t, 1, store.Count())
+
+ results, err := store.Search(ctx, "Fix authentication", 5)
+ require.NoError(t, err)
+ require.Len(t, results, 1)
+ assert.Equal(t, "test-1", results[0].ID)
+
+ _ = os.Remove(storePath)
+}
+
+func TestJSONFileStore_SearchEmpty(t *testing.T) {
+ dir := t.TempDir()
+ storePath := filepath.Join(dir, "workflows.json")
+
+ mockClient := &MockEmbeddingClient{Dim: 128}
+ store, err := NewFileVectorStore(storePath, mockClient)
+ require.NoError(t, err)
+
+ ctx := context.Background()
+ results, err := store.Search(ctx, "some query", 5)
+ require.NoError(t, err)
+ assert.Empty(t, results)
+}
+
+func TestCosineSimilarity(t *testing.T) {
+ a := Vector{1, 0, 0}
+ b := Vector{1, 0, 0}
+ assert.InDelta(t, 1.0, cosineSimilarity(a, b), 0.001)
+
+ c := Vector{0, 1, 0}
+ assert.InDelta(t, 0.0, cosineSimilarity(a, c), 0.001)
+
+ d := Vector{-1, 0, 0}
+ assert.InDelta(t, -1.0, cosineSimilarity(a, d), 0.001)
+
+ e := Vector{1, 0}
+ f := Vector{1, 0, 0}
+ assert.InDelta(t, 0.0, cosineSimilarity(e, f), 0.001)
+}
+
+func TestRetriever_BuildContextPrompt(t *testing.T) {
+ retriever := &Retriever{}
+ records := []WorkflowRecord{
+ {
+ UserPrompt: "Fix login bug",
+ Steps: []WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"login.go"}`},
+ {Role: "tool_result", Tool: "read", Output: "code here"},
+ {Role: "tool_call", Tool: "edit", Input: `{}`},
+ {Role: "tool_result", Tool: "edit", Output: "Done"},
+ },
+ },
+ }
+
+ prompt := retriever.BuildContextPrompt(records)
+ assert.Contains(t, prompt, "past_successful_workflows")
+ assert.Contains(t, prompt, "Fix login bug")
+ assert.Contains(t, prompt, "read")
+ assert.Contains(t, prompt, "edit")
+}
+
+func TestRetriever_BuildContextPromptEmpty(t *testing.T) {
+ retriever := &Retriever{}
+ prompt := retriever.BuildContextPrompt(nil)
+ assert.Empty(t, prompt)
+ prompt = retriever.BuildContextPrompt([]WorkflowRecord{})
+ assert.Empty(t, prompt)
+}
+
+func TestWorkflowManager_Integration(t *testing.T) {
+ dir := t.TempDir()
+ storePath := filepath.Join(dir, "workflows.json")
+
+ mgr, err := NewWorkflowManager(Config{
+ StorePath: storePath,
+ })
+ require.NoError(t, err)
+
+ assert.True(t, mgr.Detector().ShouldTriggerFlowRAG("ok"))
+ assert.True(t, mgr.Detector().ShouldTriggerFlowRAG("task complete"))
+ assert.True(t, mgr.Detector().ShouldTriggerFlowRAG("好的,就这样"))
+ assert.False(t, mgr.Detector().ShouldTriggerFlowRAG("帮我改代码"))
+
+ ctx := context.Background()
+ err = mgr.SaveSuccessfulWorkflow(ctx, SaveWorkflowInput{
+ UserPrompt: "Add unit tests for handler.go",
+ Messages: []message.Message{
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.ToolCall{ID: "tc1", Name: "read", Input: `{"path":"handler.go"}`, Finished: true},
+ },
+ },
+ {
+ Role: message.Tool,
+ Parts: []message.ContentPart{
+ message.ToolResult{ToolCallID: "tc1", Name: "read", Content: "package main\nfunc handler() {}", IsError: false},
+ },
+ },
+ {
+ Role: message.Assistant,
+ Parts: []message.ContentPart{
+ message.ToolCall{ID: "tc2", Name: "write", Input: `{"path":"handler_test.go","content":"tests"}`, Finished: true},
+ },
+ },
+ {
+ Role: message.Tool,
+ Parts: []message.ContentPart{
+ message.ToolResult{ToolCallID: "tc2", Name: "write", Content: "Written successfully", IsError: false},
+ },
+ },
+ },
+ SessionID: "session-1",
+ })
+ require.NoError(t, err)
+
+ contextPrompt := mgr.SearchAndBuildContext(ctx, "Write tests for handler", 3)
+ assert.Contains(t, contextPrompt, "past_successful_workflows")
+ assert.Contains(t, contextPrompt, "Add unit tests")
+}
+
+func TestTruncate(t *testing.T) {
+ assert.Equal(t, "hello", truncate("hello", 10))
+ assert.Equal(t, "hello...", truncate("hello world", 5))
+ assert.Equal(t, "", truncate("", 10))
+}
+
+func TestMustMarshalSteps(t *testing.T) {
+ steps := []WorkflowStep{
+ {Role: "tool_call", Tool: "read", Input: `{"path":"main.go"}`},
+ }
+ result := mustMarshalSteps(steps)
+ assert.Contains(t, result, "tool_call")
+ assert.Contains(t, result, "read")
+}
diff --git a/internal/skills/builtin/flowrag/SKILL.md b/internal/skills/builtin/flowrag/SKILL.md
new file mode 100644
index 0000000000..cb7e020712
--- /dev/null
+++ b/internal/skills/builtin/flowrag/SKILL.md
@@ -0,0 +1,97 @@
+---
+name: flowrag
+description: Use when the user's coding task has been successfully completed and the agent should save the successful workflow for future reuse. FlowRAG captures the sequence of successful tool calls (excluding errors and retries), stores them in a vector database (ChromaDB), and retrieves similar past workflows via semantic search to accelerate future tasks. Trigger this skill when the user expresses satisfaction with the result, confirms task completion, or when a multi-step coding task has been executed without errors.
+---
+
+# FlowRAG — Workflow Memory & Retrieval
+
+FlowRAG is a workflow memory system for Crush. It captures **successful coding
+workflows** — the sequence of tool calls, their inputs/outputs — and stores
+them in a vector database so that **similar future tasks** can be completed
+faster by recalling past successful approaches.
+
+## When to Trigger This Skill
+
+Invoke FlowRAG when any of these conditions are true:
+
+1. **The user signals task completion** — they say the task is done, the
+ solution works, or they confirm the result is acceptable.
+2. **A multi-step workflow completed without errors** — the agent ran multiple
+ tools (read → edit → write → test) and all steps succeeded.
+3. **The user explicitly asks to save or remember** — they request the
+ workflow be stored for future reference.
+4. **The agent is about to start a new task** — before executing, check if a
+ similar workflow exists in the RAG store to accelerate execution.
+
+## Saving a Workflow
+
+When a task completes successfully, ask the user:
+"Does this solution achieve the desired effect? Should I save this workflow
+for future reuse? (y/n)"
+
+If the user confirms (y), the system will:
+
+1. **Extract the successful flow** — take the current session's messages and
+ filter out all steps where `IsError` is true (failed tool calls, retries).
+2. **Generate an embedding** — send the workflow text to the configured
+ embedding API (default: OpenAI `text-embedding-3-small`).
+3. **Store in ChromaDB** — persist the embedding + metadata in a ChromaDB
+ collection named `crush_workflows`.
+
+Only the final successful path is saved — intermediate failures and retries
+are automatically excluded.
+
+## Retrieving Past Workflows
+
+Before executing a new task, search the FlowRAG store:
+
+1. Generate an embedding of the user's request.
+2. Query ChromaDB for the top-K most similar past workflows.
+3. Inject the retrieved workflows into the system prompt as
+ `` context, allowing the agent to reference
+ proven approaches.
+
+## ChromaDB Backend
+
+FlowRAG uses **ChromaDB** (the most widely adopted open-source vector
+database) as its default backend. ChromaDB runs as a local service on
+`http://localhost:8000`.
+
+Key ChromaDB characteristics:
+- Collection name: `crush_workflows`
+- Distance function: cosine similarity
+- Embeddings are generated client-side and sent to ChromaDB for storage
+
+## Configuration
+
+FlowRAG stores its configuration in `crush.json` under `flowrag`:
+
+```json
+{
+ "flowrag": {
+ "enabled": true,
+ "chromadb_url": "http://localhost:8000",
+ "embedding_base_url": "https://api.openai.com/v1",
+ "embedding_api_key": "$OPENAI_API_KEY",
+ "embedding_model": "text-embedding-3-small",
+ "top_k": 3
+ }
+}
+```
+
+If `chromadb_url` is omitted or unreachable, FlowRAG falls back to a local
+JSON file store (`~/.crush/flowrag/workflows.json`).
+
+## E2E Testing
+
+To verify the RAG pipeline end-to-end, run:
+
+```bash
+go run ./internal/flowrag/cmd/e2e_test/main.go
+```
+
+This script will:
+1. Start a local ChromaDB container (if Docker available)
+2. Insert several test workflows into ChromaDB
+3. Run semantic queries and display similarity scores
+4. Verify retrieval quality and ranking