-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevent_monitor.py
More file actions
executable file
Β·249 lines (200 loc) Β· 8.41 KB
/
event_monitor.py
File metadata and controls
executable file
Β·249 lines (200 loc) Β· 8.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#!/usr/bin/env python3
"""
Agent Event Monitor - View real-time agent events
"""
import requests
import time
import sys
from datetime import datetime
BOT_API = "http://localhost:8765"
def format_timestamp(ts):
"""Format ISO timestamp to readable time"""
try:
dt = datetime.fromisoformat(ts.replace('Z', '+00:00'))
return dt.strftime('%H:%M:%S')
except:
return ts
def format_event(event):
"""Format event for display"""
time_str = format_timestamp(event.get('timestamp', ''))
event_type = event.get('event', '').replace('agent:', '')
data = event.get('data', {})
agent = data.get('agent', '?')
# Format based on event type
if event_type == 'joined':
pos = data.get('position', {})
return f"[{time_str}] β
{agent} joined at ({int(pos.get('x',0))}, {int(pos.get('y',0))}, {int(pos.get('z',0))})"
elif event_type == 'left':
reason = data.get('reason', 'unknown')
return f"[{time_str}] β {agent} left: {reason}"
elif event_type == 'spoke':
msg = data.get('message', '')
return f"[{time_str}] π¬ {agent}: \"{msg}\""
elif event_type == 'moved':
dist = data.get('distance', 0)
return f"[{time_str}] πΆ {agent} moved {dist:.1f} blocks"
elif event_type == 'mode_changed':
old = data.get('oldMode', '')
new = data.get('newMode', '')
return f"[{time_str}] π {agent} mode: {old} β {new}"
elif event_type == 'spawn_set':
pos = data.get('position', {})
method = data.get('method', 'unknown')
return f"[{time_str}] π {agent} spawn set at ({int(pos.get('x',0))}, {int(pos.get('y',0))}, {int(pos.get('z',0))}) via {method}"
elif event_type == 'started_building':
struct = data.get('structure', '')
return f"[{time_str}] ποΈ {agent} started building {struct}"
elif event_type == 'finished_building':
struct = data.get('structure', '')
blocks = data.get('blocksPlaced', 0)
return f"[{time_str}] β
{agent} finished {struct} ({blocks} blocks)"
elif event_type == 'placed_block':
block = data.get('blockType', '')
pos = data.get('position', {})
return f"[{time_str}] π§± {agent} placed {block} at ({pos.get('x')}, {pos.get('y')}, {pos.get('z')})"
elif event_type == 'mined_block':
block = data.get('blockType', '')
pos = data.get('position', {})
return f"[{time_str}] βοΈ {agent} mined {block} at ({int(pos.get('x',0))}, {int(pos.get('y',0))}, {int(pos.get('z',0))})"
elif event_type == 'started_mining':
resource = data.get('resourceType', 'area')
return f"[{time_str}] βοΈ {agent} started mining {resource}"
elif event_type == 'finished_mining':
blocks = data.get('blocksMined', 0)
return f"[{time_str}] β
{agent} finished mining ({blocks} blocks)"
elif event_type == 'error':
err = data.get('error', '')
return f"[{time_str}] β οΈ {agent} error: {err}"
else:
return f"[{time_str}] {event_type}: {agent}"
def show_stats():
"""Show event statistics"""
try:
response = requests.get(f"{BOT_API}/events/stats", timeout=5)
stats = response.json()
print("\n" + "="*60)
print(" π AGENT EVENT STATISTICS")
print("="*60)
print(f"\nTotal Events: {stats['totalEvents']}")
print("\nπ Events by Type:")
for event_type, count in sorted(stats['eventTypes'].items(), key=lambda x: x[1], reverse=True):
print(f" {event_type:25s} {count:5d}")
print("\nπ€ Events by Agent:")
for agent, count in sorted(stats['agentActivity'].items(), key=lambda x: x[1], reverse=True):
print(f" {agent:15s} {count:5d}")
print("\nπ Recent Activity:")
for event in stats['recentActivity'][-5:]:
print(f" {format_event(event)}")
print()
except Exception as e:
print(f"Error: {e}")
def watch_events(auto_rejoin=False):
"""Watch events in real-time with optional auto-rejoin"""
print("="*60)
print(" ποΈ AGENT EVENT MONITOR")
if auto_rejoin:
print(" π AUTO-REJOIN ENABLED")
print("="*60)
print("Watching for agent events... (Ctrl+C to stop)\n")
last_count = 0
disconnected_agents = set()
try:
while True:
response = requests.get(f"{BOT_API}/events/recent?limit=50", timeout=5)
events = response.json()
# Show new events only
if len(events) > last_count:
new_events = events[last_count:]
for event in new_events:
print(format_event(event))
# Auto-rejoin on disconnect
if auto_rejoin and event.get('event') == 'agent:left':
agent = event.get('data', {}).get('agent')
if agent and agent not in disconnected_agents:
disconnected_agents.add(agent)
print(f" π Triggering rejoin for {agent}...")
try:
rejoin_response = requests.post(
f"{BOT_API}/bot/rejoin",
json={"bot_name": agent},
timeout=5
)
print(f" β
Rejoin triggered: {rejoin_response.json().get('message', 'OK')}")
except Exception as e:
print(f" β Rejoin failed: {e}")
# Clear disconnected set on successful join
if event.get('event') == 'agent:joined':
agent = event.get('data', {}).get('agent')
if agent in disconnected_agents:
disconnected_agents.remove(agent)
last_count = len(events)
time.sleep(1) # Poll every second
except KeyboardInterrupt:
print("\n\nβ Stopped monitoring")
except Exception as e:
print(f"\nβ Error: {e}")
def show_agent_events(agent_name, limit=20):
"""Show events for specific agent"""
try:
response = requests.get(f"{BOT_API}/events/agent/{agent_name}?limit={limit}", timeout=5)
events = response.json()
print(f"\nπ Events for {agent_name} (last {limit}):")
print("-"*60)
for event in events:
print(format_event(event))
print()
except Exception as e:
print(f"Error: {e}")
def show_help():
"""Show usage help"""
print("""
π AGENT EVENT MONITOR
=====================
USAGE:
python event_monitor.py [command] [options]
COMMANDS:
watch - Watch events in real-time (default)
stats - Show event statistics
agent NAME - Show events for specific agent
clear - Clear event log
OPTIONS:
--auto-rejoin - Automatically trigger rejoin when agents disconnect
EXAMPLES:
python event_monitor.py
python event_monitor.py watch --auto-rejoin
python event_monitor.py stats
python event_monitor.py agent Agent1
python event_monitor.py clear
AUTO-REJOIN MODE:
When enabled, the monitor will automatically call the /bot/rejoin endpoint
whenever an agent:left event is detected. This provides resilient agent
connections that automatically recover from disconnects, timeouts, and errors.
Example:
python event_monitor.py watch --auto-rejoin
API ENDPOINTS:
GET /events/recent - Get recent events
GET /events/stats - Get statistics
GET /events/agent/:name - Get events by agent
GET /events/type/:type - Get events by type
POST /events/clear - Clear event log
POST /bot/rejoin - Trigger agent rejoin
""")
if __name__ == "__main__":
# Check for auto-rejoin flag
auto_rejoin = '--auto-rejoin' in sys.argv
args = [arg for arg in sys.argv[1:] if not arg.startswith('--')]
if len(args) < 1:
watch_events(auto_rejoin=auto_rejoin)
else:
cmd = args[0].lower()
if cmd == "stats":
show_stats()
elif cmd == "watch":
watch_events(auto_rejoin=auto_rejoin)
elif cmd == "agent" and len(args) >= 2:
show_agent_events(args[1])
elif cmd == "clear":
response = requests.post(f"{BOT_API}/events/clear", timeout=5)
print("β
Event log cleared")
else:
show_help()