Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions .claude/agents/architecture-decisions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Architecture Decisions — SOLVRO MCP

## Core Architecture: Four-Component System

```
[Azure Blob] → [Prefect Pipeline] → [Neo4j Graph DB] ← [MCP Server] ← [ToPWR API] ← [Users]
[LangGraph RAG]
```

1. **MCP Server** — single source of graph intelligence; stateless; exposed as FastMCP tool
2. **ToPWR API** — user-facing HTTP API; owns sessions and conversation history; delegates intelligence to MCP
3. **Data Pipeline** — one-way ETL; documents become graph nodes/relations via LLM-generated Cypher
4. **MCP Client** — CLI; same protocol as API; not used in production path

## Key Patterns to Preserve

### Singleton Config
`src/config/config.py` loads `graph_config.yaml` once and caches it. All components call `get_config()`. **Do not load config elsewhere.** Changes to `graph_config.yaml` require: `just generate-models` to regenerate `config_models.py`, then restart services.

### LangGraph State Machine (RAG)
The RAG pipeline is a typed state machine — not a chain. Each node is a pure function: `State → State`. Routing decisions (guardrails → cypher vs. end) happen via conditional edges. When adding pipeline steps:
1. Add field to `State` TypedDict in `state.py`
2. Add node method in `rag.py`
3. Wire into graph with `add_node()` + `add_edge()` or `add_conditional_edges()`

**Do not bypass LangGraph** by calling nodes directly — the graph handles routing, observability, and error recovery.

### Optional Langfuse
Langfuse is never a hard dependency. All observability code must guard on `LANGFUSE_SECRET_KEY` being set. The pattern is: check env var → conditionally create `CallbackHandler` → pass to LLM invoke config. **Never make Langfuse required.**

### Cypher Generation Rules
The LLM Cypher generation prompts (in `graph_config.yaml` under `prompts.cypher_insert` and `prompts.cypher_search`) encode critical constraints:
- LIMIT on all MATCH queries
- Unique variable names per statement
- Polish character normalization
- Pipe (`|`) delimiter for pipeline output

**Do not change these prompts casually** — they directly affect Neo4j safety and data quality.

## What NOT to Change

| Item | Why |
|---|---|
| `src/config/config_models.py` | Auto-generated — edit `graph_config.yaml` instead |
| State machine edges in `rag.py` | Core routing logic — test thoroughly before changing |
| Cypher LIMIT enforcement | Safety guard — Neo4j can return unbounded results |
| Session in-memory storage | Intentional simplicity — if persistence is needed, add a proper DB layer |
| Non-root Docker users (`mcpuser`, `apiuser`) | Security requirement |

## Extension Points

### Add a new MCP tool
→ Create `src/mcp_server/tools/<tool_name>/` package with `__init__.py` and main module. Register with `@mcp.tool()` in `server.py`.

### Add a new RAG pipeline node
→ Add field to `State`, add node method to `RAG` class in `rag.py`, wire into `StateGraph`.

### Add a new API endpoint
→ Add route to `src/topwr_api/server.py`, add Pydantic models to `models.py` if needed.

### Add a new data source (not Azure)
→ Create new flow in `src/data_pipeline/flows/data_acquisition.py` or add a new flow file. Update `pipeline.py` orchestrator to call it.

### Add a new graph node/relation type
→ Add to `graph_config.yaml` under `graph.nodes` / `graph.relations`. Run `just generate-models`. Update Cypher generation prompts if needed.

### Add a new LLM provider
→ Add config under `llm` in `graph_config.yaml`. Add detection logic in `rag.py` where LLM is instantiated (currently checks for OpenAI/DeepSeek → Google fallback).

## Intentional Simplifications

- **In-memory sessions:** Single-instance deployment. If horizontal scaling is needed, sessions must move to Redis or a DB.
- **Synchronous Prefect pipeline:** Pipeline runs sequentially (acquire → extract → generate → populate). Parallelism is possible with Prefect `.map()` but not currently used.
- **LLM-only Cypher generation:** No pre-built graph schema enforcement beyond the prompt. Generated Cypher is executed as-is (with LIMIT injection).

## Known Inconsistencies

1. **Prefect version mismatch:** `Dockerfile.prefect` installs `prefect==2.*` but `pyproject.toml` requires `prefect>=3.6.7`. Resolve before extending the pipeline.
2. **graph_visualizer.py** appears unused in the production path — exists for debugging/docs generation.
3. **`scripts/data_pipeline/`** was deleted; ensure no references remain.
156 changes: 156 additions & 0 deletions .claude/agents/coding-standards.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Coding Standards — SOLVRO MCP

## File Structure Conventions

```
src/
├── config/ # Config loading only — no business logic
├── mcp_server/ # FastMCP server + RAG tools
│ └── tools/<name>/ # Each tool in its own package: __init__.py, main module, state.py
├── topwr_api/ # FastAPI app — server.py, models.py, session_manager.py
├── mcp_client/ # CLI clients
├── data_pipeline/ # Prefect flows
│ ├── pipeline.py # Orchestrating @flow only — no logic
│ └── flows/ # Individual @task/@flow modules
└── scripts/ # One-off scripts (codegen, etc.)
```

New tool packages go in `src/mcp_server/tools/<tool_name>/`. New pipeline stages go in `src/data_pipeline/flows/`.

## Import Order (enforced by Ruff isort)

```python
# 1. Standard library
import asyncio
import os
from typing import Dict, Any, Optional, List

# 2. Third-party
from langchain_openai import ChatOpenAI
from langfuse import observe
from pydantic import BaseModel

# 3. Local (absolute paths from src/)
from src.config.config import get_config
from src.mcp_server.tools.knowledge_graph.state import State
```

## Type Hints

Always annotate all function parameters and return types. For Python <3.10 compatibility, use `Optional[X]` instead of `X | None`.

```python
# Correct
async def process(query: str, limit: int = 10) -> List[Dict[str, Any]]:
...

# Wrong — missing annotations
async def process(query, limit=10):
...
```

## Docstrings — Google Style

Required for all public functions and classes:

```python
async def query_graph(user_input: str, session_id: str = "default") -> Dict[str, Any]:
"""
Query the knowledge graph with natural language.

Args:
user_input: User's natural language question
session_id: Session identifier for grouping queries

Returns:
Dictionary containing answer and metadata

Raises:
Neo4jQueryError: If database query fails
"""
```

## Async Patterns

- Use `async`/`await` for ALL I/O (Neo4j, LLM calls, HTTP)
- Prefer `asyncio.gather()` for concurrent calls over sequential `await`
- Use async context managers for Neo4j sessions

```python
# Good
results = await asyncio.gather(llm1.ainvoke(p1), llm2.ainvoke(p2), return_exceptions=True)

# Avoid
r1 = await llm1.ainvoke(p1)
r2 = await llm2.ainvoke(p2)
```

## Error Handling

Wrap all external I/O in try-except. Raise custom exceptions from the `KnowledgeGraphError` hierarchy. Log before re-raising:

```python
try:
result = await session.run(cypher_query)
except Exception as e:
logger.error(f"Neo4j query failed: {e}", extra={"query": cypher_query})
raise Neo4jQueryError(f"Query execution failed: {e}") from e
```

Custom exception hierarchy lives in the relevant tool package. Do not use bare `Exception` in raises.

## Pydantic Models

- Use `pydantic.BaseModel` for all external data shapes (API request/response, config)
- Config models: use `graph_config.yaml` as source of truth; run `just generate-models` to regenerate `config_models.py`
- **Never edit `src/config/config_models.py` by hand**

## LangGraph State

- State is a `TypedDict` (or `MessagesState` subclass)
- All state fields must be explicitly typed
- Optional fields use `Optional[T] = None`
- Each node takes `State` and returns updated `State`

```python
class State(MessagesState):
user_question: str
generated_cypher: Optional[str] = None
next_node: str
```

## FastMCP Tools

```python
@mcp.tool()
async def tool_name(param1: str, param2: int = 10) -> str:
"""
One-line description (shown to AI clients in tool catalog).

Args:
param1: Description
param2: Description (default: 10)

Returns:
JSON-serializable string result
"""
```

- snake_case tool names
- Clear docstrings (visible to AI clients)
- Return JSON-serializable types

## Prefect Pipeline

- Top-level `@flow` in `pipeline.py` only orchestrates — no logic
- Logic lives in `@flow` or `@task` functions in `flows/` submodules
- Use `log_prints=True` on flows for Prefect UI visibility

## Formatting (Ruff — do not override)

- Line length: 100
- Quotes: double (`"`)
- Indent: 4 spaces
- Target: Python 3.13

Run `just lint` before committing.
126 changes: 126 additions & 0 deletions .claude/agents/debugging-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Debugging Guide — SOLVRO MCP

## Logging

The project uses Python's standard `logging` module with structured extras:

```python
import logging
logger = logging.getLogger(__name__)

logger.info("Processing query", extra={"session_id": session_id, "trace_id": trace_id})
logger.error(f"Query failed: {e}", extra={"trace_id": trace_id})
```

Log level is configured at startup. Set `DEBUG` for verbose output.

## Langfuse Tracing (Primary Observability)

When `LANGFUSE_SECRET_KEY` is set, all LLM calls are traced:

1. Each `@observe`-decorated function appears as a span
2. `CallbackHandler` traces LangChain chain calls
3. Traces are grouped by `session_id` and identified by `trace_id`
4. Tags like `["cypher_generation", "rag"]` allow filtering in the Langfuse UI

**Debug LLM calls:** Check Langfuse UI → filter by `session_id` or `trace_id`.

**If Langfuse is not configured:** Traces are silently skipped. Check `LANGFUSE_SECRET_KEY` env var.

## Debug Mode (RAG Pipeline)

Enable `enable_debug: true` in `graph_config.yaml` under `rag:` to activate the `debug_print` node in the LangGraph pipeline. This prints intermediate state to stdout.

## Docker Log Commands

```bash
just logs # tail all services
just logs-mcp # MCP server only
just logs-api # FastAPI only
just logs-neo4j # Neo4j only
just prefect-logs # Prefect pipeline
```

Or directly:
```bash
docker compose -f docker/compose.stack.yml logs -f mcp-server
docker compose -f docker/compose.stack.yml logs -f topwr-api
```

## Health Checks

```bash
# API health
curl http://localhost:8000/health

# Stats endpoint
curl http://localhost:8000/api/stats

# Neo4j browser
open http://localhost:7474

# Prefect UI
open http://localhost:4200
```

## Common Failure Modes

### Neo4j connection fails
- Check `NEO4J_URI`, `NEO4J_USER`, `NEO4J_PASSWORD` in `.env`
- Verify Neo4j container is healthy: `docker compose ps`
- Check Neo4j logs: `just logs-neo4j`
- RAG pipeline falls back to config schema if DB is empty (not an error)

### LLM API errors
- `OPENAI_API_KEY` not set → check which model is configured in `graph_config.yaml`
- DeepSeek: token limit errors → enforced at 65536 in pipeline config
- Check `llm.fast_model.model` and `llm.accurate_model.model` in `graph_config.yaml`

### Cypher generation fails
- Enable `enable_debug: true` in `graph_config.yaml`
- Check the generated Cypher in Langfuse traces
- Common issues: Polish characters not normalized, variable name collisions
- Pipeline: verify `|` delimiter splitting in `llm_cypher_generation.py`

### MCP server not reachable from API
- Check `MCP_HOST` and `MCP_PORT` match the running server
- In Docker: services communicate via service names (`mcp-server:8005`)
- Health check: `docker compose ps` → mcp-server should show `healthy`

### Session not found (API)
- Sessions are **in-memory only** — lost on API restart
- Use `/api/sessions/{session_id}` to verify session exists before sending messages

### Prefect pipeline stuck
- Check `prefect-entrypoint.sh` — it starts Prefect server first, waits for health, then runs pipeline
- Prefect UI at port 4200 shows flow run status
- Check env: `AZURE_STORAGE_CONNECTION_STRING` must be set for data acquisition

## Useful Debug Commands

```bash
# Test knowledge graph query directly (requires running MCP server)
uv run kg "Kto wykłada analizę matematyczną?"

# Run pipeline locally without Docker
uv run prefect_pipeline

# Run API integration tests against live server
uv run test-topwr-api

# Check graph schema cached in config
python -c "from src.config.config import get_config; c = get_config(); print(c.graph.nodes)"

# Verify Ruff is happy
just lint
```

## Graph Visualizer

Generate a Mermaid diagram of the RAG state machine:
```python
from src.mcp_server.tools.knowledge_graph.graph_visualizer import visualize_graph
# produces Mermaid JS markup for the LangGraph pipeline
```

Paste output into mermaid.live or any Mermaid renderer to visualize the pipeline.
Loading
Loading