-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathorchestrator.py
More file actions
276 lines (232 loc) · 10.7 KB
/
orchestrator.py
File metadata and controls
276 lines (232 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import os
import json
import uvicorn
import httpx
import time
import asyncio
# ==========================================
# SETUP & CONFIGURATION
# ==========================================
raw_key = os.environ.get("GROQ_API_KEY", "")
API_KEY = raw_key.strip().strip('"').strip("'")
app = FastAPI(title="Smart City - Agentic Orchestrator")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ==========================================
# STATE MANAGEMENT (Batching & Per-Service Cooldown)
# ==========================================
SERVICE_COOLDOWN_SECONDS = 60
BATCH_WINDOW_SECONDS = 5.0 # How long to wait to group cascading alerts
SERVICE_COOLDOWNS = {} # Dictionary to track cooldowns INDIVIDUALLY per sensor type
ACTIVE_BATCH = [] # Holds all triggers received during the 5s window
BATCH_TIMER_ACTIVE = False # Is the gathering window currently open?
LATEST_INCIDENT_REPORT = {}
class IncidentPayload(BaseModel):
source_worker: str
trigger_reason: str
trigger_data: dict
historical_context: list[dict]
# ==========================================
# PROMPT ENGINEERING
# ==========================================
SYSTEM_PROMPT = """
You are an autonomous Smart City Incident Commander AI.
Analyze the provided multi-sensor telemetry data. You may receive MULTIPLE concurrent triggering events if a cascading failure is occurring.
Your goal is to identify the root cause connecting these events and formulate an emergency response.
You MUST respond ONLY in valid JSON format matching exactly this schema:
{
"incident_title": "Short descriptive title",
"severity_level": "CRITICAL" | "HIGH" | "MODERATE",
"root_cause_analysis": "1-2 sentences explaining the root event and how it caused the other alerts.",
"recommended_actions": ["Action 1", "Action 2", "Action 3"]
}
Do NOT wrap the output in markdown code blocks. Output raw, parseable JSON only.
"""
def slim_historical_context(context_list: list, source: str) -> str:
if not context_list:
return "No historical data available."
try:
if source == "traffic_flow":
avg_speed = sum(c.get("avg_speed_kmh", 0) for c in context_list) / len(context_list)
avg_cars = sum(c.get("cars_per_minute", 0) for c in context_list) / len(context_list)
return f"Over the last {len(context_list)} readings, baseline average speed was {avg_speed:.1f} km/h with {avg_cars:.0f} cars/min."
elif source == "substation_monitor":
avg_load = sum(c.get("load_mw", 0) for c in context_list) / len(context_list)
return f"Over the last {len(context_list)} readings, baseline power load was {avg_load:.1f} MW."
elif source == "air_quality_node":
avg_aqi = sum(c.get("aqi_level", 0) for c in context_list) / len(context_list)
return f"Over the last {len(context_list)} readings, baseline AQI was {avg_aqi:.1f}."
except Exception as e:
return f"Error summarizing context: {e}"
return "Summary unavailable."
# ==========================================
# CORE AI LOGIC (BATCH PROCESSING)
# ==========================================
async def process_incident_batch():
global ACTIVE_BATCH, BATCH_TIMER_ACTIVE, LATEST_INCIDENT_REPORT
# 1. Wait for the window to gather any concurrent alerts
print(f"\n⏳ [WINDOW OPEN] Waiting {BATCH_WINDOW_SECONDS}s to bundle cascading alerts...")
await asyncio.sleep(BATCH_WINDOW_SECONDS)
# 2. Snapshot the batch and reset the window so future alerts can trigger a new batch
batched_payloads = list(ACTIVE_BATCH)
ACTIVE_BATCH.clear()
BATCH_TIMER_ACTIVE = False
print(f"📦 [WINDOW CLOSED] Synthesizing {len(batched_payloads)} concurrent alerts...")
if not API_KEY:
print("❌ ERROR: GROQ_API_KEY is empty!")
return
# 3. Format ALL triggers into a single prompt context
triggering_events_context = []
for payload in batched_payloads:
triggering_events_context.append({
"sensor_type": payload.source_worker,
"reason": payload.trigger_reason,
"exact_trigger_data": payload.trigger_data,
"historical_baseline": slim_historical_context(payload.historical_context, payload.source_worker)
})
mock_global_state = {
"traffic": "Speed: 4.2 km/h" if any(p.source_worker == 'traffic_flow' for p in batched_payloads) else "Speed Nominal",
"power": "Load: 0 MW" if any(p.source_worker == 'substation_monitor' for p in batched_payloads) else "Load Nominal",
"environment": "AQI Spiking" if any(p.source_worker == 'air_quality_node' for p in batched_payloads) else "AQI Nominal"
}
ai_context = {
"concurrent_triggering_events": triggering_events_context,
"current_city_wide_state": mock_global_state
}
context_string = json.dumps(ai_context, indent=2)
print("📡 Sending Bundled Context to LLM...")
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
request_data = {
"model": "llama-3.3-70b-versatile",
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"CITY SENSOR DATA:\n{context_string}"}
],
"temperature": 0.2,
"response_format": {"type": "json_object"}
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers=headers,
json=request_data,
timeout=15.0
)
if response.status_code != 200:
print(f"❌ API Error {response.status_code}: {response.text}")
return
result = response.json()
llm_output = result['choices'][0]['message']['content']
incident_report = json.loads(llm_output)
LATEST_INCIDENT_REPORT = incident_report
print("\n🚨 AI INCIDENT REPORT GENERATED 🚨")
print(json.dumps(incident_report, indent=4))
print("="*60 + "\n")
except Exception as e:
print(f"❌ Orchestration Failed: {e}")
# ==========================================
# ENDPOINTS
# ==========================================
@app.post("/api/orchestrate")
async def handle_incident(payload: IncidentPayload, background_tasks: BackgroundTasks):
global BATCH_TIMER_ACTIVE, ACTIVE_BATCH, SERVICE_COOLDOWNS
current_time = time.time()
source = payload.source_worker
# 1. Per-Service Cooldown Check
last_alert_time = SERVICE_COOLDOWNS.get(source, 0.0)
if current_time - last_alert_time < SERVICE_COOLDOWN_SECONDS:
print(f"🛡️ SPAM FILTER: Ignoring {source} trigger. Service is on {SERVICE_COOLDOWN_SECONDS}s cooldown.")
return {"status": "ignored", "reason": f"{source}_cooldown"}
# 2. Update the cooldown timer specifically for THIS service
SERVICE_COOLDOWNS[source] = current_time
# 3. Add this alert to the current gathering batch
ACTIVE_BATCH.append(payload)
print(f"📥 Added {source} to current incident batch.")
# 4. If this is the first alert in a while, start the 5-second gathering window
if not BATCH_TIMER_ACTIVE:
BATCH_TIMER_ACTIVE = True
background_tasks.add_task(process_incident_batch)
return {"status": "Alert batched for processing."}
@app.get("/api/latest-incident")
async def get_latest_incident():
"""Endpoint for the React UI to pull the newest AI report."""
if LATEST_INCIDENT_REPORT:
return {"status": "active_incident", "report": LATEST_INCIDENT_REPORT}
return {"status": "system_nominal", "report": None}
# ==========================================
# TRIGGER CHAOS - Dashboard Demo Button
# Sends REAL crisis payloads through the full LLM pipeline
# ==========================================
@app.post("/api/trigger-chaos")
async def trigger_chaos(background_tasks: BackgroundTasks):
global BATCH_TIMER_ACTIVE, ACTIVE_BATCH, SERVICE_COOLDOWNS, LATEST_INCIDENT_REPORT
# 1. Clear any existing cooldowns so the payloads are accepted
SERVICE_COOLDOWNS = {}
LATEST_INCIDENT_REPORT = {}
# 2. Build real crisis payloads matching what the workers would send
power_crisis = IncidentPayload(
source_worker="substation_monitor",
trigger_reason="ABSOLUTE GRID FAILURE (OUTAGE)",
trigger_data={
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"sensor_type": "substation_monitor",
"location": "Substation A - Main Grid",
"load_mw": 0.0,
"transformer_temp_c": 95.0,
"voltage_kv": 0.0,
"power_status": "OUTAGE"
},
historical_context=[
{"load_mw": 45.2, "transformer_temp_c": 61.0, "voltage_kv": 220.0},
{"load_mw": 44.8, "transformer_temp_c": 60.5, "voltage_kv": 219.8},
{"load_mw": 45.0, "transformer_temp_c": 60.0, "voltage_kv": 220.1},
]
)
traffic_crisis = IncidentPayload(
source_worker="traffic_flow",
trigger_reason="ML Model identified severe traffic gridlock or anomaly",
trigger_data={
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
"sensor_type": "traffic_flow",
"location": "Intersection Main St / 5th Ave",
"cars_per_minute": 0,
"avg_speed_kmh": 0.0
},
historical_context=[
{"cars_per_minute": 35, "avg_speed_kmh": 42.0},
{"cars_per_minute": 38, "avg_speed_kmh": 40.5},
{"cars_per_minute": 33, "avg_speed_kmh": 43.0},
]
)
# 3. Inject both payloads into the active batch
ACTIVE_BATCH.clear()
ACTIVE_BATCH.append(power_crisis)
ACTIVE_BATCH.append(traffic_crisis)
# 4. Start the batch processing window (calls the real LLM)
if not BATCH_TIMER_ACTIVE:
BATCH_TIMER_ACTIVE = True
background_tasks.add_task(process_incident_batch)
print("\n🎯 [TRIGGER-CHAOS] Dashboard button pressed! Sending real crisis data to LLM pipeline...")
return {"status": "chaos_triggered", "payloads_injected": 2}
# Reset endpoint so you can clear the UI during a presentation
@app.post("/api/reset")
async def reset_system():
global LATEST_INCIDENT_REPORT, SERVICE_COOLDOWNS, ACTIVE_BATCH
LATEST_INCIDENT_REPORT = None
SERVICE_COOLDOWNS = {}
ACTIVE_BATCH = []
return {"status": "reset"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)