From 40ba53bf706ce13d1aa9254bbc302b87f0abb71a Mon Sep 17 00:00:00 2001 From: Andy Braren Date: Fri, 23 Jan 2026 09:04:44 -0500 Subject: [PATCH] Enhance event handling by ensuring timestamps are set for AGUI events - Added logic to set a timestamp for AGUI events in `agui_proxy.go` and `agui.go` if not already present, ensuring accurate message tracking. - Updated `compaction.go` to preserve the timestamp from events when creating messages. - Modified frontend components to utilize the timestamp from backend messages, providing a fallback for legacy messages. This change improves the reliability of message timestamp tracking across the system. --- components/backend/websocket/agui.go | 11 ++++++++++- components/backend/websocket/agui_proxy.go | 6 +++++- components/backend/websocket/compaction.go | 17 ++++++++++++++--- .../[name]/sessions/[sessionName]/page.tsx | 3 +++ .../frontend/src/hooks/use-agui-stream.ts | 6 ++++++ 5 files changed, 38 insertions(+), 5 deletions(-) diff --git a/components/backend/websocket/agui.go b/components/backend/websocket/agui.go index 9a6681797..666fe2f78 100644 --- a/components/backend/websocket/agui.go +++ b/components/backend/websocket/agui.go @@ -123,6 +123,11 @@ func RouteAGUIEvent(sessionID string, event map[string]interface{}) { // If no active run found, check if event has a runId we should create if activeRunState == nil { + // Ensure timestamp is set before any early returns + if event["timestamp"] == nil || event["timestamp"] == "" { + event["timestamp"] = time.Now().UTC().Format(time.RFC3339Nano) + } + // Don't create lazy runs for terminal events - they should only apply to existing runs if isTerminalEventType(eventType) { go persistAGUIEventMap(sessionID, "", event) @@ -163,13 +168,17 @@ func RouteAGUIEvent(sessionID string, event map[string]interface{}) { threadID = eventThreadID } - // Fill in missing IDs only if not present + // Fill in missing IDs and timestamp if event["threadId"] == nil || event["threadId"] == "" { event["threadId"] = threadID } if event["runId"] == nil || event["runId"] == "" { event["runId"] = runID } + // Add timestamp if not present - critical for message timestamp tracking + if event["timestamp"] == nil || event["timestamp"] == "" { + event["timestamp"] = time.Now().UTC().Format(time.RFC3339Nano) + } // Broadcast to run-specific SSE subscribers activeRunState.BroadcastFull(event) diff --git a/components/backend/websocket/agui_proxy.go b/components/backend/websocket/agui_proxy.go index 076cfb3d9..106d8320b 100644 --- a/components/backend/websocket/agui_proxy.go +++ b/components/backend/websocket/agui_proxy.go @@ -274,13 +274,17 @@ func handleStreamedEvent(sessionID, runID, threadID, jsonData string, runState * eventType, _ := event["type"].(string) - // Ensure threadId and runId are set + // Ensure threadId, runId, and timestamp are set if _, ok := event["threadId"]; !ok { event["threadId"] = threadID } if _, ok := event["runId"]; !ok { event["runId"] = runID } + // Add timestamp if not present - critical for message timestamp tracking + if _, ok := event["timestamp"]; !ok { + event["timestamp"] = time.Now().UTC().Format(time.RFC3339Nano) + } // Check for terminal events switch eventType { diff --git a/components/backend/websocket/compaction.go b/components/backend/websocket/compaction.go index d03f77f7c..caa45add9 100644 --- a/components/backend/websocket/compaction.go +++ b/components/backend/websocket/compaction.go @@ -118,11 +118,14 @@ func (c *MessageCompactor) handleTextMessageStart(event map[string]interface{}) if role == "" { role = types.RoleAssistant } + // Preserve timestamp from the event + timestamp, _ := event["timestamp"].(string) c.currentMessage = &types.Message{ - ID: messageID, - Role: role, - Content: "", + ID: messageID, + Role: role, + Content: "", + Timestamp: timestamp, } } @@ -228,17 +231,25 @@ func (c *MessageCompactor) handleToolCallEnd(event map[string]interface{}) { tc.Status = "error" } + // Preserve timestamp from the event + timestamp, _ := event["timestamp"].(string) + // Add to message // Check if we need to create a new message or add to current if c.currentMessage != nil && c.currentMessage.Role == types.RoleAssistant { // Add to current message c.currentMessage.ToolCalls = append(c.currentMessage.ToolCalls, tc) + // Update timestamp if not already set + if c.currentMessage.Timestamp == "" && timestamp != "" { + c.currentMessage.Timestamp = timestamp + } } else { // Create new message for this tool call c.messages = append(c.messages, types.Message{ ID: uuid.New().String(), Role: types.RoleAssistant, ToolCalls: []types.ToolCall{tc}, + Timestamp: timestamp, }) } diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx index c16310682..d32373fd1 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx @@ -759,6 +759,8 @@ export default function ProjectSessionDetailPage({ const allToolCalls = new Map(); for (const msg of aguiState.messages) { + // Use msg.timestamp from backend, fallback to current time for legacy messages + // Note: After backend fix, new messages will have proper timestamps const timestamp = msg.timestamp || new Date().toISOString(); if (msg.toolCalls && Array.isArray(msg.toolCalls)) { @@ -852,6 +854,7 @@ export default function ProjectSessionDetailPage({ // Phase C: Process messages and build hierarchical structure for (const msg of aguiState.messages) { + // Use msg.timestamp from backend, fallback to current time for legacy messages const timestamp = msg.timestamp || new Date().toISOString(); // Handle text content by role diff --git a/components/frontend/src/hooks/use-agui-stream.ts b/components/frontend/src/hooks/use-agui-stream.ts index 9b08c45a9..84dfd3e3c 100644 --- a/components/frontend/src/hooks/use-agui-stream.ts +++ b/components/frontend/src/hooks/use-agui-stream.ts @@ -142,6 +142,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur id: newState.currentMessage.id || crypto.randomUUID(), role: newState.currentMessage.role || AGUIRole.ASSISTANT, content: newState.currentMessage.content, + timestamp: event.timestamp, } newState.messages = [...newState.messages, msg] onMessage?.(msg) @@ -214,6 +215,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur id: messageId, role: newState.currentMessage.role || AGUIRole.ASSISTANT, content: newState.currentMessage.content, + timestamp: event.timestamp, } newState.messages = [...newState.messages, msg] onMessage?.(msg) @@ -415,6 +417,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur toolCallId: toolCallId, name: toolCallName, toolCalls: [completedToolCall], + timestamp: event.timestamp, } messages.push(toolMessage) } @@ -546,6 +549,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur thinking: actualRawData.thinking as string, signature: actualRawData.signature as string, }, + timestamp: event.timestamp, } newState.messages = [...newState.messages, msg] onMessage?.(msg) @@ -562,6 +566,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur id: messageId, role: AGUIRole.USER, content: actualRawData.content as string, + timestamp: event.timestamp, } newState.messages = [...newState.messages, msg] onMessage?.(msg) @@ -575,6 +580,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur id: (actualRawData.id as string) || crypto.randomUUID(), role: actualRawData.role as AGUIMessage['role'], content: actualRawData.content as string, + timestamp: event.timestamp, } newState.messages = [...newState.messages, msg] onMessage?.(msg)