A new strategy to understanding your business
# Create python environment
uv venv
# Sync package
uv sync
# Activate environment
source .venv/bin/activate- Create
.envfile in the project root with required variables:
OPENAI_API_KEY=your_openai_api_key_hereOptional environment variables:
MILVUS_HOST(default:localhost)MILVUS_PORT(default:19530)MILVUS_DB(default:None)MILVUS_COLLECTION(default:documents)EMBEDDING_MODEL(default:text-embedding-3-small)SQLITE_DB_PATH(default:db/app.db)
Start Milvus vector database and dependencies with Docker Compose:
docker compose upThis starts:
- Milvus (vector database) on port 19530
- etcd (Milvus metadata) on port 2379
- MinIO (Milvus storage) on ports 9000/9001
Using Make:
make run-apiEquivalent command:
uvicorn api.app:app --reloadPOST /agent/chat/stream - Stream chat responses using CrewAI pipeline or OpenAI fallback
- Body:
{ messages: [{ role, content }], model?: string } - Returns: Server-Sent Events (SSE) stream
POST /uploads - Upload and process documents (PDF, Excel, CSV)
- Extracts, chunks, embeds into Milvus
- CSV/Excel rows loaded into SQLite tables
- Returns:
{ filename, chunk_count, inserted_ids, collection, doc_id, table?, row_count? }
GET /documents - List all documents from SQLite and Milvus (merged by doc_id)
- Returns:
{ documents: [...], count: number, sqlite_tables: [...] } - Includes metadata for all SQLite tables
DELETE /documents/{doc_id} - Delete document from both SQLite and Milvus
- Returns:
{ status, doc_id, deleted_from_sqlite, deleted_from_milvus }
POST /search - Semantic search in Milvus
- Body:
{ query: string, top_k?: number, score_threshold?: number, document_ids_filter?: string[] } - Returns:
{ results: [{ content, metadata }] }
POST /milvus/reset - Drop Milvus collection (destructive)
- Returns:
{ status, dropped_collection }
GET /health - Health check
- Returns:
{ status: "ok" }
- Create
.env.localfile in theui/directory:
NEXT_PUBLIC_API_URL=http://127.0.0.1:8000Navigate to the ui/ directory and start the development server:
cd ui
pnpm dev
# or
npm run devOpen http://localhost:3000 to access the UI.
The UI provides:
- Knowledge Tab: Upload documents (PDF, Excel, CSV) with drag-and-drop interface, view all uploaded documents and SQLite tables, delete documents
- Chat Tab: Interactive chat interface with streaming responses, response duration display, and loading indicators
This section explains the end-to-end agent and tool workflow from request to response, how components interact, and where key logic lives.
Traditional RAG relies primarily on a single vector store to retrieve passages and then draft an answer. Our approach intentionally combines three complementary stores and fuses them during reasoning:
- Relational store (SQLite): precise, schema-aware answers via NL→SQL for counts, aggregations, and table-grounded facts. Enables verifiable numbers and constraints.
- Vector store (Milvus): semantic recall of unstructured content (PDFs/CSVs/XLS) to capture context and evidence beyond tables.
- ER graph (from
metadata/schema.mermaid): structural context over entities/relationships to inform retrieval scope and summarization.
This multi-signal design lets the agent both “look up” exact values (SQL) and “look around” for context (vectors/graph), improving faithfulness and coverage over vanilla RAG.
Implemented in api/agents/crew.py using CrewAI agents and tasks:
-
Query understanding
- Agent:
QueryUnderstandingAgent - Tool:
UnderstandQueryTool - Output: normalized plan JSON with keys like
intent,entities,targets,granularity,filters,source_table,count_like.
- Agent:
-
Plan validation / guard
- Agent:
ContextGuardAgent - Tool:
EnsureValidPlanTool - Output:
{ plan_json, needs_clarification, message }. Ifneeds_clarificationis true, the pipeline may short-circuit with a clarification message.
- Agent:
-
Retrieval and reasoning
- Agent:
RetrieverAgent - Tools invoked in this step:
DbIntrospectToolto fetch live SQLite schema as JSONSqlQueryToolto generate and execute a safe SELECT when applicableRetrieveVectorToolto gather semantic evidence from MilvusRetrieveGraphToolto build an ER subgraph context frommetadata/schema.mermaidSynthesizeToolto merge plan, graph, and evidence into a draftVerifyToolto compute confidence and support flagsSummarizeToolto produce final text
- Agent:
-
Finalization
- Agent:
ReasoningAgent(no-op placeholder; prior step already summarizes) CrewOrchestratorappends the final result to the execution log.
- Agent:
Defined in api/agents/tools.py with Pydantic arg schemas and return values. Each tool appends an event to an execution log.
-
understand_query (
UnderstandQueryTool)- Input:
{ user_question } - Output: plan JSON
- Uses:
api/agents/query_understanding.py(loadsmetadata/schema.mermaidandmetadata/terms.json)
- Input:
-
ensure_valid_plan (
EnsureValidPlanTool)- Input:
{ user_question, plan_json, schema_data? } - Output:
{ plan_json, needs_clarification, message } - Uses: dynamic entity-to-table resolution via
DbIntrospectToolandmetadata/terms.json
- Input:
-
db_introspect (
DbIntrospectTool)- Input:
{ include_columns } - Output:
{ tables: [{ name, columns? }] }from live SQLite
- Input:
-
retrieve_vector (
RetrieveVectorTool)- Input:
{ user_question, top_k } - Output: list of
{ content, metadata }from Milvus
- Input:
-
retrieve_graph (
RetrieveGraphTool)- Input:
{ plan_json } - Output:
{ graph: { entities, relationships }, entities, source_table, targets, filters }
- Input:
-
synthesize (
SynthesizeTool)- Input:
{ plan_json, vector_results_json, graph_context_json } - Output: draft JSON
{ plan, graph, evidence }
- Input:
-
verify (
VerifyTool)- Input:
{ draft_json } - Output:
{ draft, confidence, supported }
- Input:
-
summarize (
SummarizeTool)- Input:
{ verification_json } - Output: final text
- Input:
-
sql_query (
SqlQueryTool)- Input:
{ question, plan_json?, schema_data? } - Output:
{ sql, columns, rows } - Uses:
api/agents/sql_agent.pyfor LLM SQL generation and read-only execution; guards write operations and caps results.
- Input:
api/agents/tools.py manages a per-execution session:
_start_execution_session()starts a session buffer._append_eval_log(event)records tool inputs/outputs with timestamps._end_execution_session()persists the buffered session intoeval/logs.jsonand clears the buffer.
sequenceDiagram
autonumber
participant Client
participant FastAPI as FastAPI (api/agent.py)
participant Crew as CrewOrchestrator
participant Tools as Tools (tools.py)
participant SQLite as SQLite
participant Milvus as Milvus
participant OpenAI as OpenAI
Client->>FastAPI: POST /agent/chat/stream
FastAPI->>Crew: run_stream(question)
Crew->>Tools: understand_query(question)
Tools-->>Crew: plan JSON
Crew->>Tools: ensure_valid_plan(question, plan)
Tools-->>Crew: { plan_json, needs_clarification, message }
alt needs_clarification
Crew-->>FastAPI: clarification message
else proceed
Crew->>Tools: db_introspect()
Tools->>SQLite: PRAGMA, sqlite_master
SQLite-->>Tools: schema JSON
Crew->>Tools: sql_query(question, plan, schema)
Tools->>SQLite: SELECT (read-only)
SQLite-->>Tools: { columns, rows }
Crew->>Tools: retrieve_vector(question)
Tools->>OpenAI: embed_texts
Tools->>Milvus: search_by_vector
Milvus-->>Tools: docs
Crew->>Tools: retrieve_graph(plan)
Tools-->>Crew: graph context
Crew->>Tools: synthesize(plan, vector, graph)
Tools-->>Crew: draft
Crew->>Tools: verify(draft)
Tools-->>Crew: verification
Crew->>Tools: summarize(verification)
Tools-->>Crew: final text
Crew-->>FastAPI: final text
end
note over FastAPI: If Crew fails → fallback to OpenAI streaming, else non-streaming
- Primary: Crew pipeline (multi-agent tools) via
CrewOrchestrator. - Secondary: OpenAI Chat Completions streaming with the provided history.
- Tertiary: Non-streaming single-shot completion to avoid total failure.
https://docs.google.com/presentation/d/1rvQRH_ex0IGg7xhUj9WUlba5_O-bcS6aYDFVa0_LH6s/edit?usp=sharing