-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsupervisor.py
More file actions
87 lines (64 loc) · 2.58 KB
/
supervisor.py
File metadata and controls
87 lines (64 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# supervisor.py
from __future__ import annotations
from readline import backend
from typing import Dict, Any
from llm_backend import GeminiBackend
from core.state import new_state, log_event, update_state
from agents.data_engineer import DataEngineerAgent
from agents.analyst import AnalystAgent
from agents.eval_agent import EvalAgent
class Supervisor:
"""
Supervisor = orchestrator + shared-state owner.
MVP stage:
- prove the loop works
- produce logs for the UI
- later: sequence DataEngineer -> Analyst -> Storyteller deterministically
"""
def __init__(self, model: str = "gemini-2.5-flash-lite"):
self.backend = GeminiBackend(model=model)
def run(self, user_prompt: str) -> Dict[str, Any]:
# --- Create shared state (schema-aligned)
state = new_state(user_prompt)
log_event(state, "run.start", {"run_id": state["run_id"]})
# --- Agent 1: Data Engineer
data_agent = DataEngineerAgent(local_filename="space_missions.csv")
data_agent.run(state)
# Agent 2: AnalystAgent (LLM planner + runtime executor)
analyst = AnalystAgent(backend=self.backend)
analyst.run(state)
#Agent 3: EvaluatorAgent (LLM-based evaluation of analysis)
eval_agent = EvalAgent(backend=self.backend)
eval_agent.run(state)
# --- Build messages for the LLM
messages = [
{
"role": "system",
"content": (
"You are the Supervisor in an agentic workflow demo. "
"For now, briefly acknowledge the user's question and outline "
"the next steps: (1) data preparation, "
"(2) analysis + charts, "
"(3) storytelling + publish report."
),
},
{"role": "user", "content": user_prompt},
]
# --- Call the LLM
log_event(
state,
"llm.call",
{"provider": "gemini", "model": self.backend.model},
)
response_text = self.backend.chat(messages)
log_event(state, "llm.response", {"chars": len(response_text)})
# overwrite with actual finish time (kept explicit for clarity)
update_state(
state,
{"finished_at_unix": int(__import__("time").time())},
reason="run.complete",
)
# Supervisor message is not part of shared agent state yet
state["supervisor_message"] = response_text
log_event(state, "run.end", {"run_id": state["run_id"]})
return state