diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index 5ebdae8e..f5801551 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -654,6 +654,63 @@ fn extract_metrics(output: &Json) -> Option { }) } +fn merge_metrics( + primary: Option, + supplemental: Option<&AtifMetrics>, +) -> Option { + match (primary, supplemental) { + (None, None) => None, + (Some(metrics), None) => Some(metrics), + (None, Some(supplemental)) => Some(supplemental.clone()), + (Some(mut metrics), Some(supplemental)) => { + merge_metrics_fields(&mut metrics, supplemental); + Some(metrics) + } + } +} + +fn merge_metrics_fields(target: &mut AtifMetrics, supplemental: &AtifMetrics) { + if target.prompt_tokens.is_none() { + target.prompt_tokens = supplemental.prompt_tokens; + } + if target.completion_tokens.is_none() { + target.completion_tokens = supplemental.completion_tokens; + } + if target.cached_tokens.is_none() { + target.cached_tokens = supplemental.cached_tokens; + } + if target.cost_usd.is_none() { + target.cost_usd = supplemental.cost_usd; + } + if target.prompt_token_ids.is_none() { + target.prompt_token_ids = supplemental.prompt_token_ids.clone(); + } + if target.completion_token_ids.is_none() { + target.completion_token_ids = supplemental.completion_token_ids.clone(); + } + if target.logprobs.is_none() { + target.logprobs = supplemental.logprobs.clone(); + } + merge_metrics_extra(&mut target.extra, &supplemental.extra); +} + +fn merge_metrics_extra(target: &mut Option, supplemental: &Option) { + let Some(supplemental) = supplemental else { + return; + }; + match (target.as_mut(), supplemental) { + (Some(Json::Object(target_object)), Json::Object(supplemental_object)) => { + for (key, value) in supplemental_object { + target_object + .entry(key.clone()) + .or_insert_with(|| value.clone()); + } + } + (None, _) => *target = Some(supplemental.clone()), + _ => {} + } +} + fn token_usage_object(output: &Json) -> Option<&serde_json::Map> { let output = output.as_object()?; output @@ -1071,25 +1128,28 @@ struct EventLookupMaps { name_map: std::collections::HashMap, start_ts_map: std::collections::HashMap>, tool_call_ids: std::collections::HashMap, + suppressed_llm_events: HashSet, + supplemental_llm_metrics: HashMap, } impl EventLookupMaps { fn from_events(events: &[&Event]) -> Self { - Self::from_events_with_correlation_events(events, events) + Self::from_events_with_correlation_events(events, events, events) } fn from_events_for_agent(events: &[&Event], tree: &AgentScopeTree, agent_uuid: Uuid) -> Self { - let correlation_events = events + let tool_correlation_events = events .iter() .copied() .filter(|event| tree.owner_agent(event) == Some(agent_uuid)) .collect::>(); - Self::from_events_with_correlation_events(events, &correlation_events) + Self::from_events_with_correlation_events(events, events, &tool_correlation_events) } fn from_events_with_correlation_events( events: &[&Event], - correlation_events: &[&Event], + llm_dedupe_events: &[&Event], + tool_correlation_events: &[&Event], ) -> Self { let mut name_map = std::collections::HashMap::new(); let mut start_ts_map = std::collections::HashMap::new(); @@ -1099,12 +1159,239 @@ impl EventLookupMaps { start_ts_map.insert(event.uuid(), *event.timestamp()); } } + let llm_dedupe = build_llm_dedupe(llm_dedupe_events); Self { name_map, start_ts_map, - tool_call_ids: build_tool_call_correlations(correlation_events), + tool_call_ids: build_tool_call_correlations(tool_correlation_events), + suppressed_llm_events: llm_dedupe.suppressed_events, + supplemental_llm_metrics: llm_dedupe.supplemental_metrics, + } + } + + fn should_suppress_llm_event(&self, event: &Event) -> bool { + event.category().map(|category| category.as_str()) == Some("llm") + && self.suppressed_llm_events.contains(&event.uuid()) + } +} + +#[derive(Default)] +struct LlmDedupeLookups { + suppressed_events: HashSet, + supplemental_metrics: HashMap, +} + +#[derive(Default)] +struct LlmSpanParts<'a> { + start: Option<&'a Event>, + end: Option<&'a Event>, +} + +#[derive(Debug, Clone)] +struct LlmSpanCandidate { + uuid: Uuid, + parent_uuid: Option, + start_ts: DateTime, + end_ts: DateTime, + request_signature: String, + response_signature: String, + model_name: Option, + fidelity_score: u8, + end_metrics: Option, + hook_instrumentation: bool, + gateway_instrumentation: bool, + non_exact_provider_payload: bool, +} + +fn build_llm_dedupe(events: &[&Event]) -> LlmDedupeLookups { + let candidates = collect_llm_span_candidates(events); + let mut lookups = LlmDedupeLookups::default(); + + for (left_idx, left) in candidates.iter().enumerate() { + for right in candidates.iter().skip(left_idx + 1) { + if same_physical_llm_request(left, right) { + suppress_lower_fidelity_llm_span(left, right, &mut lookups); + } } } + + lookups +} + +fn collect_llm_span_candidates(events: &[&Event]) -> Vec { + let mut spans: HashMap> = HashMap::new(); + for event in events { + if event.category().map(|category| category.as_str()) != Some("llm") { + continue; + } + let parts = spans.entry(event.uuid()).or_default(); + match event.scope_category() { + Some(crate::api::event::ScopeCategory::Start) => parts.start = Some(event), + Some(crate::api::event::ScopeCategory::End) => parts.end = Some(event), + None => {} + } + } + + spans + .into_iter() + .filter_map(|(uuid, parts)| LlmSpanCandidate::from_events(uuid, parts.start?, parts.end?)) + .collect() +} + +impl LlmSpanCandidate { + fn from_events(uuid: Uuid, start: &Event, end: &Event) -> Option { + let request_signature = start.data().map(llm_request_signature)?; + let response_signature = end.data().map(llm_response_signature)?; + Some(Self { + uuid, + parent_uuid: start.parent_uuid().or_else(|| end.parent_uuid()), + start_ts: *start.timestamp(), + end_ts: *end.timestamp(), + request_signature, + response_signature, + model_name: start + .model_name() + .or_else(|| end.model_name()) + .map(ToOwned::to_owned), + fidelity_score: llm_event_fidelity_score(start).max(llm_event_fidelity_score(end)), + end_metrics: end.data().and_then(extract_metrics), + hook_instrumentation: is_hook_instrumented_llm_event(start) + || is_hook_instrumented_llm_event(end), + gateway_instrumentation: is_gateway_instrumented_llm_event(start) + || is_gateway_instrumented_llm_event(end), + non_exact_provider_payload: has_non_exact_provider_payload(start) + || has_non_exact_provider_payload(end), + }) + } +} + +fn llm_request_signature(input: &Json) -> String { + let content = unwrap_llm_request(input); + json_to_string(&extract_user_messages(&content)) +} + +fn llm_response_signature(output: &Json) -> String { + json_to_string(&extract_llm_response_message(output)) +} + +fn same_physical_llm_request(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { + same_parent(left, right) + && compatible_model_names(left, right) + && llm_spans_overlap(left, right) + && (same_llm_payload_signatures(left, right) + || complementary_hook_and_gateway_spans(left, right)) +} + +fn same_llm_payload_signatures(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { + left.request_signature == right.request_signature + && left.response_signature == right.response_signature +} + +fn complementary_hook_and_gateway_spans(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { + (left.non_exact_provider_payload && left.hook_instrumentation && right.gateway_instrumentation) + || (right.non_exact_provider_payload + && right.hook_instrumentation + && left.gateway_instrumentation) +} + +fn same_parent(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { + left.parent_uuid.is_some() && left.parent_uuid == right.parent_uuid +} + +fn compatible_model_names(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { + match (&left.model_name, &right.model_name) { + (Some(left_model), Some(right_model)) => left_model == right_model, + _ => true, + } +} + +fn llm_spans_overlap(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool { + left.start_ts <= right.end_ts && right.start_ts <= left.end_ts +} + +fn suppress_lower_fidelity_llm_span( + left: &LlmSpanCandidate, + right: &LlmSpanCandidate, + lookups: &mut LlmDedupeLookups, +) { + match left.fidelity_score.cmp(&right.fidelity_score) { + std::cmp::Ordering::Greater => suppress_llm_span(right, left, lookups), + std::cmp::Ordering::Less => suppress_llm_span(left, right, lookups), + std::cmp::Ordering::Equal => {} + } +} + +fn suppress_llm_span( + suppressed: &LlmSpanCandidate, + canonical: &LlmSpanCandidate, + lookups: &mut LlmDedupeLookups, +) { + lookups.suppressed_events.insert(suppressed.uuid); + if let Some(metrics) = &suppressed.end_metrics { + let entry = lookups + .supplemental_metrics + .entry(canonical.uuid) + .or_default(); + merge_metrics_fields(entry, metrics); + } +} + +fn llm_event_fidelity_score(event: &Event) -> u8 { + let Some(metadata) = event.metadata().and_then(Json::as_object) else { + return 50; + }; + if metadata + .get("projection") + .and_then(Json::as_bool) + .unwrap_or(false) + { + return 10; + } + if has_non_exact_provider_payload(event) { + return 30; + } + if metadata + .get("provider_payload_exact") + .and_then(Json::as_bool) + .unwrap_or(false) + { + return 100; + } + if metadata.contains_key("fidelity_source") || metadata.contains_key("api_call_id") { + return 95; + } + if metadata.contains_key("hook_event_name") { + return 90; + } + if is_gateway_instrumented_llm_event(event) { + return 50; + } + 50 +} + +fn is_hook_instrumented_llm_event(event: &Event) -> bool { + event + .metadata() + .and_then(Json::as_object) + .is_some_and(|metadata| metadata.contains_key("hook_event_name")) +} + +fn is_gateway_instrumented_llm_event(event: &Event) -> bool { + event + .metadata() + .and_then(Json::as_object) + .is_some_and(|metadata| { + metadata.contains_key("gateway_path") || metadata.contains_key("llm_correlation_source") + }) +} + +fn has_non_exact_provider_payload(event: &Event) -> bool { + event + .metadata() + .and_then(Json::as_object) + .and_then(|metadata| metadata.get("provider_payload_exact")) + .and_then(Json::as_bool) + == Some(false) } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -1416,6 +1703,9 @@ struct DeferredToolObservation { impl StepConversionState { fn handle_event(&mut self, event: &Event, lookups: &EventLookupMaps) { + if lookups.should_suppress_llm_event(event) { + return; + } match ( event.kind(), event.scope_category(), @@ -1712,6 +2002,11 @@ impl StepConversionState { "nemo_relay", ); + let metrics = merge_metrics( + extract_metrics(output), + lookups.supplemental_llm_metrics.get(&event.uuid()), + ); + self.steps.push(AtifStep { step_id: 0, source: "agent".to_string(), @@ -1722,7 +2017,7 @@ impl StepConversionState { reasoning_content, tool_calls, observation: None, - metrics: extract_metrics(output), + metrics, llm_call_count: Some(1), is_copied_context: None, extra: None, diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index cd36970c..c42d4333 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -1809,6 +1809,271 @@ fn test_exporter_skips_llm_chunk_marks() { assert_eq!(trajectory.steps[0].message, json!("agent.status")); } +#[test] +fn test_exporter_dedupes_overlapping_hook_and_gateway_llm_spans() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let base = base_timestamp(); + let parent_uuid = Uuid::now_v7(); + let hook_uuid = Uuid::now_v7(); + let gateway_uuid = Uuid::now_v7(); + let request = json!({ + "messages": [{"role": "user", "content": "Reply with exactly dedupe_ok"}], + "model": "test-model" + }); + + let mut hook_start = event_builder(hook_uuid, EventType::Start) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "hook_event_name": "pre_api_request", + "api_call_id": "session:task:abcd:api:1", + "provider_payload_exact": true + })) + .input(json!({"content": request.clone(), "headers": {}})) + .build(); + let mut gateway_start = event_builder(gateway_uuid, EventType::Start) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "gateway_path": "/v1/chat/completions", + "llm_correlation_source": "pre_llm_call" + })) + .input(request.clone()) + .build(); + let mut gateway_end = event_builder(gateway_uuid, EventType::End) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "gateway_path": "/v1/chat/completions", + "llm_correlation_source": "pre_llm_call" + })) + .output(json!({ + "choices": [{"message": {"content": "dedupe_ok"}}], + "usage": {"prompt_tokens": 7, "completion_tokens": 3, "total_tokens": 10} + })) + .build(); + let mut hook_end = event_builder(hook_uuid, EventType::End) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "hook_event_name": "post_api_request", + "api_call_id": "session:task:abcd:api:1", + "provider_payload_exact": true + })) + .output(json!({"content": "dedupe_ok"})) + .build(); + + for (idx, event) in [ + &mut hook_start, + &mut gateway_start, + &mut gateway_end, + &mut hook_end, + ] + .into_iter() + .enumerate() + { + set_event_timestamp(event, base + chrono::Duration::milliseconds(idx as i64)); + } + + { + let mut state = exporter.state.lock().unwrap(); + state + .events + .extend([hook_start, gateway_start, gateway_end, hook_end]); + } + + let trajectory = exporter.export().unwrap(); + assert_eq!(trajectory.steps.len(), 2); + assert_eq!( + trajectory.steps[0].message, + json!("Reply with exactly dedupe_ok") + ); + assert_eq!(trajectory.steps[1].message, json!("dedupe_ok")); + assert_eq!( + trajectory.steps[0].extra.as_ref().unwrap()["ancestry"]["function_name"], + "openrouter" + ); + assert_eq!( + trajectory.steps[1].extra.as_ref().unwrap()["ancestry"]["function_name"], + "openrouter" + ); + assert!(!trajectory.steps.iter().any(|step| { + step.extra + .as_ref() + .is_some_and(|extra| extra["ancestry"]["function_name"] == "openai.chat_completions") + })); + let metrics = trajectory.steps[1].metrics.as_ref().unwrap(); + assert_eq!(metrics.prompt_tokens, Some(7)); + assert_eq!(metrics.completion_tokens, Some(3)); + assert_eq!(metrics.extra.as_ref().unwrap()["total_tokens"], json!(10)); +} + +#[test] +fn test_exporter_prefers_gateway_span_over_non_exact_hook_summary() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let base = base_timestamp(); + let parent_uuid = Uuid::now_v7(); + let hook_uuid = Uuid::now_v7(); + let gateway_uuid = Uuid::now_v7(); + let request = json!({ + "messages": [{"role": "user", "content": "Reply with exactly gateway_ok"}], + "model": "test-model" + }); + + let mut hook_start = event_builder(hook_uuid, EventType::Start) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "hook_event_name": "pre_api_request", + "fidelity_source": "agent_api_hooks", + "provider_payload_exact": false + })) + .input(json!({ + "content": {"message_count": 2, "request_char_count": 128}, + "headers": {} + })) + .build(); + let mut gateway_start = event_builder(gateway_uuid, EventType::Start) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({"gateway_path": "/v1/chat/completions"})) + .input(json!({"content": request.clone(), "headers": {}})) + .build(); + let mut gateway_end = event_builder(gateway_uuid, EventType::End) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({"gateway_path": "/v1/chat/completions"})) + .output(json!({"choices": [{"message": {"content": "gateway_ok"}}]})) + .build(); + let mut hook_end = event_builder(hook_uuid, EventType::End) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .model_name("test-model") + .metadata(json!({ + "hook_event_name": "post_api_request", + "fidelity_source": "agent_api_hooks", + "provider_payload_exact": false + })) + .output(json!({"assistant_content_chars": 10, "finish_reason": "stop"})) + .build(); + + for (idx, event) in [ + &mut hook_start, + &mut gateway_start, + &mut gateway_end, + &mut hook_end, + ] + .into_iter() + .enumerate() + { + set_event_timestamp(event, base + chrono::Duration::milliseconds(idx as i64)); + } + + { + let mut state = exporter.state.lock().unwrap(); + state + .events + .extend([hook_start, gateway_start, gateway_end, hook_end]); + } + + let trajectory = exporter.export().unwrap(); + assert_eq!(trajectory.steps.len(), 2); + assert_eq!( + trajectory.steps[0].message, + json!("Reply with exactly gateway_ok") + ); + assert_eq!(trajectory.steps[1].message, json!("gateway_ok")); + assert!(trajectory.steps.iter().all(|step| { + step.extra + .as_ref() + .is_some_and(|extra| extra["ancestry"]["function_name"] == "openai.chat_completions") + })); +} + +#[test] +fn test_exporter_keeps_sequential_same_content_llm_spans() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let base = base_timestamp(); + let parent_uuid = Uuid::now_v7(); + let hook_uuid = Uuid::now_v7(); + let gateway_uuid = Uuid::now_v7(); + let request = json!({ + "messages": [{"role": "user", "content": "Reply with exactly repeat_ok"}], + "model": "test-model" + }); + + let mut hook_start = event_builder(hook_uuid, EventType::Start) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .metadata(json!({"hook_event_name": "pre_api_request"})) + .input(request.clone()) + .build(); + let mut hook_end = event_builder(hook_uuid, EventType::End) + .name("openrouter") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .metadata(json!({"hook_event_name": "post_api_request"})) + .output(json!({"content": "repeat_ok"})) + .build(); + let mut gateway_start = event_builder(gateway_uuid, EventType::Start) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .metadata(json!({"gateway_path": "/v1/chat/completions"})) + .input(request) + .build(); + let mut gateway_end = event_builder(gateway_uuid, EventType::End) + .name("openai.chat_completions") + .parent_uuid(parent_uuid) + .scope_type(ScopeType::Llm) + .metadata(json!({"gateway_path": "/v1/chat/completions"})) + .output(json!({"choices": [{"message": {"content": "repeat_ok"}}]})) + .build(); + + for (idx, event) in [ + &mut hook_start, + &mut hook_end, + &mut gateway_start, + &mut gateway_end, + ] + .into_iter() + .enumerate() + { + set_event_timestamp(event, base + chrono::Duration::milliseconds(idx as i64)); + } + + { + let mut state = exporter.state.lock().unwrap(); + state + .events + .extend([hook_start, hook_end, gateway_start, gateway_end]); + } + + let trajectory = exporter.export().unwrap(); + assert_eq!(trajectory.steps.len(), 4); + assert!(trajectory.steps.iter().any(|step| { + step.extra + .as_ref() + .is_some_and(|extra| extra["ancestry"]["function_name"] == "openai.chat_completions") + })); +} + #[test] fn test_trajectory_serde_roundtrip() { let trajectory = AtifTrajectory {