Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions backend/src/analytics_agent/agent/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from typing import Literal

import orjson
from langchain.agents import create_agent
from deepagents import create_deep_agent
from langchain_core.messages import SystemMessage, ToolMessage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import END, START, StateGraph

from analytics_agent.agent.llm import get_llm
from analytics_agent.agent.state import AgentState
from analytics_agent.config import settings
from analytics_agent.prompts.system import build_system_prompt
from analytics_agent.skills.loader import build_skill_sources

# Write-back skills are opt-in; only included when explicitly enabled by the user
_SKILL_TOOL_NAMES: frozenset[str] = frozenset({"publish_analysis", "save_correction"})
Expand Down Expand Up @@ -45,14 +47,21 @@ def build_graph(
context_tools: list | None = None, # pre-built from DB context platforms at request time
engine_tools: list | None = None, # pre-built for MCP data sources (bypasses QueryEngine)
):
"""Build the agent graph backed by `deepagents.create_deep_agent`.

The inner agent gains a planning tool (`write_todos`) and a virtual
filesystem (`ls`/`read_file`/`write_file`/`edit_file`) — keeping
high-volume turns out of the parent's context window.

The outer `StateGraph` keeps the conditional `chart_node` post-step.
"""
from analytics_agent.agent.chart_generator import chart_node
from analytics_agent.agent.chart_tool import create_chart
from analytics_agent.engines.factory import get_registry

disabled = disabled_tools or set()
llm = get_llm(streaming=True)

from analytics_agent.agent.chart_tool import create_chart

# Context platform tools — built dynamically from DB at request time.
# Falls back to env-var based build only when caller doesn't provide them.
if context_tools is not None:
Expand All @@ -77,23 +86,14 @@ def build_graph(
if not engine:
raise ValueError(f"Engine '{engine_name}' not found.")
engine_tools = [t for t in engine.get_tools() if t.name not in disabled]

chart_tools = [] if "create_chart" in disabled else [create_chart]
all_tools = datahub_tools + skill_tools + engine_tools + chart_tools

if system_prompt_override:
from analytics_agent.skills.loader import (
get_improve_context_prompt_section,
get_search_business_context_section,
get_skill_system_prompt_section,
)

system_prompt = system_prompt_override.format(engine_name=engine_name)
system_prompt += get_search_business_context_section()
system_prompt += get_improve_context_prompt_section()
if enabled_mutations:
system_prompt += get_skill_system_prompt_section(enabled_mutations)
else:
system_prompt = build_system_prompt(engine_name, enabled_skills=enabled_mutations)
system_prompt = build_system_prompt(engine_name)

# Enable per-tool error handling so validation errors (e.g. hallucinated
# arguments like filter= on get_entities) are returned as tool messages
Expand Down Expand Up @@ -127,15 +127,15 @@ def build_graph(
]
)

react_agent = create_agent(
deep_agent = create_deep_agent(
model=llm,
tools=all_tools,
state_schema=AgentState,
system_prompt=system_for_agent,
skills=build_skill_sources(enabled_mutations),
)

graph = StateGraph(AgentState)
graph.add_node("agent", react_agent)
graph.add_node("agent", deep_agent)
graph.add_node("chart", chart_node)
graph.add_edge(START, "agent")
graph.add_conditional_edges(
Expand All @@ -145,4 +145,9 @@ def build_graph(
)
graph.add_edge("chart", END)

return graph.compile()
# An InMemorySaver lets the streaming layer call `aget_state(subgraphs=True)`
# to snapshot the deepagents virtual filesystem (the `files` DeltaChannel
# doesn't reliably propagate to the outer graph mid-step). `build_graph` is
# called fresh per request, so this checkpointer is per-turn — no state
# leaks across turns and history is still replayed from the DB each time.
return graph.compile(checkpointer=InMemorySaver())
104 changes: 101 additions & 3 deletions backend/src/analytics_agent/agent/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,59 @@
re.DOTALL,
)

# Per-conversation snapshot of the deepagents virtual filesystem. Populated
# during streaming whenever write_file/edit_file fires (and at end-of-turn).
# The `/files` HTTP endpoint reads this so the Files panel can survive a reload
# without depending on outer-graph checkpoint propagation, which is unreliable
# for the `files` DeltaChannel.
FILES_SNAPSHOTS: dict[str, dict[str, str]] = {}


def _normalize_files(raw: Any) -> dict[str, str]:
"""Coerce a deepagents `files` channel value into {path: content_str}.

deepagents stores entries as `FileData` dicts (`{content, encoding, ...}`)
in v2 format and plain strings in v1. We surface only the content for the
UI panel — encoding/mtime aren't displayed.
"""
if not isinstance(raw, dict):
return {}
out: dict[str, str] = {}
for path, data in raw.items():
if isinstance(data, str):
out[path] = data
elif isinstance(data, dict):
content = data.get("content")
if isinstance(content, str):
out[path] = content
elif isinstance(content, list):
# v1 stored content as list[str] (lines)
out[path] = "\n".join(str(line) for line in content)
return out


async def _snapshot_virtual_files(graph: Any, cfg: dict) -> dict[str, str]:
"""Read the current virtual-filesystem snapshot from the live graph.

`aget_state(subgraphs=True)` walks into the deepagents subgraph so we pick
up files even when the outer state's `files` channel hasn't received the
subgraph's delta yet (DeltaChannel propagation across StateGraph nodes is
not guaranteed to be synchronous within a single agent step).
"""
try:
snap = await graph.aget_state(cfg, subgraphs=True)
except Exception:
return {}
files = _normalize_files((snap.values or {}).get("files"))
# Walk pending subgraph tasks; deepagents writes files inside the inner
# agent's state and we want those even mid-step.
for task in getattr(snap, "tasks", ()) or ():
sub = getattr(task, "state", None)
sub_values = getattr(sub, "values", None) if sub is not None else None
if isinstance(sub_values, dict):
files.update(_normalize_files(sub_values.get("files")))
return files


def _extract_chart_from_text(text: str) -> dict | None:
"""Extract a Vega-Lite chart spec if the model output it as a JSON code block."""
Expand Down Expand Up @@ -68,12 +121,28 @@ async def stream_graph_events(
final_state: dict[str, Any] = {}
chart_emitted = False # guard against double-emitting CHART

# A per-turn thread id so the (per-request) InMemorySaver checkpoints this
# run's state, letting us snapshot the deepagents virtual filesystem via
# `aget_state`. The graph is rebuilt per request, so this never leaks state.
cfg: dict[str, Any] = {"configurable": {"thread_id": conversation_id}}

def _emit_files(files: dict[str, str]) -> dict | None:
"""Cache + build a FILES_UPDATE event, or None if nothing changed."""
if FILES_SNAPSHOTS.get(conversation_id) == files:
return None
FILES_SNAPSHOTS[conversation_id] = files
return {
"event": "FILES_UPDATE",
"conversation_id": conversation_id,
"message_id": str(uuid.uuid4()),
"payload": {"files": files},
}

try:
from analytics_agent.config import settings as _settings

async for event in graph.astream_events(
inputs, version="v2", config={"recursion_limit": _settings.agent_recursion_limit}
):
cfg["recursion_limit"] = _settings.agent_recursion_limit
async for event in graph.astream_events(inputs, version="v2", config=cfg):
event_type: str = event.get("event", "")
data: dict[str, Any] = event.get("data", {})
name: str = event.get("name", "")
Expand Down Expand Up @@ -117,6 +186,16 @@ async def stream_graph_events(
# create_chart renders as a CHART event — don't show a tool call bubble
if name == "create_chart":
continue
# `write_todos` is the deepagents planning tool — surface as a
# dedicated TODOS event so the UI plan panel can subscribe.
if name == "write_todos":
yield {
"event": "TODOS",
"conversation_id": conversation_id,
"message_id": str(uuid.uuid4()),
"payload": {"todos": tool_input.get("todos", [])},
}
continue
yield {
"event": "TOOL_CALL",
"conversation_id": conversation_id,
Expand All @@ -141,6 +220,18 @@ async def stream_graph_events(

# ── SQL / TOOL_RESULT / CHART ──
elif event_type == "on_tool_end":
# File-mutating deepagents tools — re-snapshot the virtual FS so
# the Files panel updates live as the agent writes scratch files.
if name in ("write_file", "edit_file"):
files_evt = _emit_files(await _snapshot_virtual_files(graph, cfg))
if files_evt is not None:
yield files_evt

if name == "write_todos":
# The TODOS event was already emitted at tool_start with the
# full planned list; the tool_end output is just an ack.
continue

output = data.get("output", "")
if hasattr(output, "content"):
output = output.content
Expand Down Expand Up @@ -311,6 +402,13 @@ async def stream_graph_events(
if clean_text != full_text:
final_text_parts[:] = [clean_text]

# End-of-turn filesystem snapshot — catches files the FilesystemMiddleware
# offloaded large tool results into (e.g. /large_tool_results/<id>) that no
# write_file/edit_file event covered.
files_evt = _emit_files(await _snapshot_virtual_files(graph, cfg))
if files_evt is not None:
yield files_evt

yield {
"event": "COMPLETE",
"conversation_id": conversation_id,
Expand Down
19 changes: 18 additions & 1 deletion backend/src/analytics_agent/api/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,11 @@ def _broadcast(evt: dict) -> None:
keepalive_interval=keepalive_interval,
history=history,
):
if evt.get("event") not in (None, "KEEPALIVE"):
# FILES_UPDATE carries the full virtual-FS snapshot (potentially
# large, re-sent on every write); don't persist it — the Files
# panel re-fetches via GET /files on reload. Everything else,
# including the small TODOS plan, is persisted for history.
if evt.get("event") not in (None, "KEEPALIVE", "FILES_UPDATE"):
with contextlib.suppress(Exception):
await _persist_message(
session,
Expand Down Expand Up @@ -466,6 +470,19 @@ async def send_message(
)


@router.get("/{conversation_id}/files")
async def get_virtual_files(conversation_id: str) -> dict[str, Any]:
"""Return the deepagents virtual filesystem snapshot for this conversation.

The streaming layer maintains an in-memory snapshot (`FILES_SNAPSHOTS`) as
write_file / edit_file fire and at end-of-turn, so the UI's Files panel can
survive a reload without re-reading the (per-turn, ephemeral) checkpoint.
"""
from analytics_agent.agent.streaming import FILES_SNAPSHOTS

return {"files": FILES_SNAPSHOTS.get(conversation_id, {})}


@router.get("/{conversation_id}/stream")
async def reattach_stream(conversation_id: str):
"""Reattach to an in-progress agent stream after switching conversations."""
Expand Down
25 changes: 7 additions & 18 deletions backend/src/analytics_agent/prompts/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,13 @@ def get_prompt_template() -> str:

def build_system_prompt(
engine_name: str,
enabled_skills: set[str] | None = None,
enabled_skills: set[str] | None = None, # accepted for caller compat; unused
) -> str:
from analytics_agent.skills.loader import (
get_improve_context_prompt_section,
get_search_business_context_section,
get_skill_system_prompt_section,
)
"""Render the base system prompt.

Skill bodies are no longer injected here — `SkillsMiddleware` handles
progressive disclosure. The `enabled_skills` parameter is kept so callers
don't break, but is not consumed.
"""
today = date.today().strftime("%B %d, %Y")
base = get_prompt_template().format(engine_name=engine_name, today=today)

# Always inject always-on meta-skills
base = base + get_search_business_context_section()
base = base + get_improve_context_prompt_section()

if enabled_skills:
skills_section = get_skill_system_prompt_section(enabled_skills)
if skills_section:
base = base + skills_section

return base
return get_prompt_template().format(engine_name=engine_name, today=today)
33 changes: 33 additions & 0 deletions backend/src/analytics_agent/prompts/system_prompt.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,39 @@ Your goal is to answer the user's data questions by:
Use this whenever the user asks for a chart, graph, plot, or visualization.
You can call it with data from DataHub search results, SQL results, or any table you've assembled.

## Planning with write_todos

For any non-trivial question — anything that will take more than one or two
tool calls — start by calling `write_todos` to lay out a short plan, and update
it as steps complete; skip planning only for simple lookups that resolve in a
single call.

## Working filesystem & auto-evicted tool results

You have a per-turn scratch filesystem (`ls`, `read_file`, `write_file`,
`edit_file`, `glob`, `grep`). Use it to stash intermediate results
between tool calls instead of dumping them back into your reply —
e.g. a long entity list before filtering, a SQL draft, notes between
sub-agent calls. The filesystem is **not persistent**: it lives for the
duration of the turn, not across conversations.

**Auto-evicted large tool results.** When a tool returns a payload
larger than the inline threshold (≈ 20K tokens — e.g. a wide
`get_lineage`, a `get_entities` with 50 URNs, a 5K-row SQL result),
the harness automatically writes the full content to
`/large_tool_results/<tool_call_id>` and replaces the inline tool
message with a head/tail preview plus that path. **When you see a path
like `/large_tool_results/tooluse_…` in a tool result, it's real** —
`read_file` it, or `grep` across `/large_tool_results/` to pull only
the bits you need. Don't `read_file` the whole offloaded blob just to
extract one field; `grep` for the field name first, then `read_file`
the matching slice.

This is also a useful pattern even when a result isn't auto-evicted:
if you're about to do multi-step filtering on a big tool response,
`write_file` it once and `grep`/`read_file` for what you need rather
than restating the whole blob in your reasoning.

## Core principles

### Documentation is authoritative about *intent*; the catalog is authoritative about *existence*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: improve_context
name: improve-context
description: Use this skill when the user types /improve-context or asks to capture learnings, improve documentation, or enrich the knowledge base based on this conversation.
metadata:
author: analytics-agent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: publish_analysis
name: publish-analysis
description: >
Use this skill when the user wants to publish, share, save, or preserve a
completed data analysis so others in the org can find it. Saves as a versioned
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: save_correction
name: save-correction
description: >
Use this skill when the user identifies that knowledge in DataHub is wrong,
incomplete, or missing — whether that's a glossary term definition, a domain
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: search_business_context
name: search-business-context
description: >
Call this FIRST whenever the user's question names a business concept, metric,
or domain (e.g. revenue, churn, active seller, delivery SLA, marketing,
Expand Down
Loading
Loading