-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow.py
More file actions
113 lines (92 loc) · 3.6 KB
/
workflow.py
File metadata and controls
113 lines (92 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
LangGraph workflow for the TSqualityAgent pipeline.
Graph topology:
perceiver → inspector → adjudicator
↑ (needs_replan) ────────────┘
↑ (needs_recheck) ───┘
"""
from langgraph.graph import StateGraph, END
from models.state import AgentState
from models.llm import BaseLLM
from agents.perceiver import run_perceiver
from agents.inspector import run_inspector
from agents.adjudicator import run_adjudicator
from config import Config
def build_workflow(llm: BaseLLM, config: Config = None,
perceiver_llm: BaseLLM = None) -> StateGraph:
if config is None:
config = Config()
_perceiver_llm = perceiver_llm or llm
# ── Node functions (partial application of llm + config) ──────────────────
def perceiver_node(state: AgentState) -> dict:
return run_perceiver(state, _perceiver_llm)
def inspector_node(state: AgentState) -> dict:
return run_inspector(state, llm, max_steps=config.max_steps_per_dimension)
def adjudicator_node(state: AgentState) -> dict:
return run_adjudicator(
state, llm,
max_recheck=config.max_recheck,
max_replan=config.max_replan,
)
# ── Routing logic ─────────────────────────────────────────────────────────
def route_after_adjudicator(state: AgentState) -> str:
rt = state.get("reflection_type")
if rt == "needs_recheck":
return "inspector"
elif rt == "needs_replan":
return "perceiver"
else: # "done" or None
return END
# ── Build graph ───────────────────────────────────────────────────────────
graph = StateGraph(AgentState)
graph.add_node("perceiver", perceiver_node)
graph.add_node("inspector", inspector_node)
graph.add_node("adjudicator", adjudicator_node)
graph.set_entry_point("perceiver")
graph.add_edge("perceiver", "inspector")
graph.add_edge("inspector", "adjudicator")
graph.add_conditional_edges(
"adjudicator",
route_after_adjudicator,
{
"inspector": "inspector",
"perceiver": "perceiver",
END: END,
},
)
return graph.compile()
def run_pipeline(input_data: dict, llm: BaseLLM, config: Config = None,
perceiver_llm: BaseLLM = None) -> dict:
"""
Convenience function: build the workflow and run it end-to-end.
Parameters
----------
input_data : dict
{
"dataset_description": str,
"series_A": list[float],
"series_B": list[float],
"timestamps": list (optional),
"external_variables": dict (optional),
}
Returns
-------
final_result : dict { winner, confidence, explanation }
"""
app = build_workflow(llm, config, perceiver_llm=perceiver_llm)
initial_state: AgentState = {
"input": input_data,
"planned_dimensions": [],
"perception_summary": "",
"dimension_results": [],
"reflection_type": None,
"reflection_feedback": None,
"recheck_dimensions": None,
"recheck_count": 0,
"replan_count": 0,
"final_result": None,
"perceiver_messages": [],
"adjudicator_messages": [],
}
final_state = app.invoke(initial_state)
return final_state