diff --git a/crates/adaptive/src/acg/request_surfaces/mod.rs b/crates/adaptive/src/acg/request_surfaces/mod.rs index 94f3bc71..48172e4a 100644 --- a/crates/adaptive/src/acg/request_surfaces/mod.rs +++ b/crates/adaptive/src/acg/request_surfaces/mod.rs @@ -16,6 +16,7 @@ pub(crate) mod openai_responses; use std::collections::HashSet; use nemo_relay::api::llm::LlmRequest; +use nemo_relay::codec::resolve::{ProviderSurface, detect_request_surface}; use serde_json::Value; use crate::acg::prompt_ir::PromptIR; @@ -40,6 +41,14 @@ pub(crate) trait RequestSurfaceApplier: Send + Sync { } impl RequestSurface { + fn from_provider_surface(surface: ProviderSurface) -> Self { + match surface { + ProviderSurface::OpenAIChat => Self::OpenAIChat, + ProviderSurface::OpenAIResponses => Self::OpenAIResponses, + ProviderSurface::AnthropicMessages => Self::AnthropicMessages, + } + } + pub(crate) fn supports_provider(self, provider: &str) -> bool { match provider { "anthropic" => matches!(self, Self::AnthropicMessages), @@ -71,17 +80,13 @@ impl RequestSurface { pub(crate) fn resolve_request_surface_from_request( request: &LlmRequest, ) -> crate::acg::Result { - if request.content.get("input").is_some() || request.content.get("instructions").is_some() { - Ok(RequestSurface::OpenAIResponses) - } else if request.content.get("system").is_some() { - Ok(RequestSurface::AnthropicMessages) - } else if request.content.get("messages").is_some() { - Ok(RequestSurface::OpenAIChat) - } else { - Err(crate::acg::AcgError::Internal( - "unable to resolve request surface from request shape".to_string(), - )) - } + detect_request_surface(&request.content) + .map(RequestSurface::from_provider_surface) + .ok_or_else(|| { + crate::acg::AcgError::Internal( + "unable to resolve request surface from request shape".to_string(), + ) + }) } #[cfg_attr(not(test), allow(dead_code))] diff --git a/crates/core/src/api/event.rs b/crates/core/src/api/event.rs index fb73eb8a..e228f42f 100644 --- a/crates/core/src/api/event.rs +++ b/crates/core/src/api/event.rs @@ -15,6 +15,7 @@ //! Event types for Agent Trajectory Observability Format (ATOF) runtime events. +use std::borrow::Cow; use std::collections::BTreeMap; use std::sync::Arc; @@ -23,10 +24,11 @@ use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use uuid::Uuid; -use crate::api::llm::LlmAttributes; +use crate::api::llm::{LlmAttributes, LlmRequest}; use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType}; use crate::api::tool::ToolAttributes; use crate::codec::request::AnnotatedLlmRequest; +use crate::codec::resolve; use crate::codec::response::AnnotatedLlmResponse; use crate::json::Json; @@ -609,6 +611,32 @@ impl Event { .and_then(|profile| profile.annotated_response.as_ref()) } + /// Normalized LLM request: the codec annotation when present, otherwise a + /// best-effort decode of the start-event input payload. + /// + /// The fallback decode requires the start-event input to be the serialized + /// [`LlmRequest`] wire shape (`{headers, content}`) emitted by the managed + /// LLM pipeline; events whose input is a bare payload or a non-LLM shape + /// yield `None`. + #[must_use] + pub fn normalized_llm_request(&self) -> Option> { + if let Some(annotated) = self.annotated_request() { + return Some(Cow::Borrowed(annotated.as_ref())); + } + let request: LlmRequest = serde_json::from_value(self.input()?.clone()).ok()?; + resolve::normalize_request(&request).map(Cow::Owned) + } + + /// Normalized LLM response: the codec annotation when present, otherwise a + /// best-effort decode of the end-event output payload. + #[must_use] + pub fn normalized_llm_response(&self) -> Option> { + if let Some(annotated) = self.annotated_response() { + return Some(Cow::Borrowed(annotated.as_ref())); + } + resolve::normalize_response(self.output()?).map(Cow::Owned) + } + /// Return true for scope-start events. /// /// # Returns diff --git a/crates/core/src/codec/mod.rs b/crates/core/src/codec/mod.rs index 20653c14..b1c87636 100644 --- a/crates/core/src/codec/mod.rs +++ b/crates/core/src/codec/mod.rs @@ -10,12 +10,16 @@ //! the streaming response codec //! ([`streaming::StreamingCodec`]) used with the managed //! streaming LLM execution pipeline. +//! +//! [`resolve`] is the detect-then-decode entry point for selecting a built-in +//! provider codec from a raw payload when no codec annotation is present. pub mod anthropic; pub mod openai_chat; pub mod openai_responses; pub mod pricing; pub mod request; +pub mod resolve; pub mod response; pub mod streaming; pub mod traits; diff --git a/crates/core/src/codec/openai_responses.rs b/crates/core/src/codec/openai_responses.rs index f2661eef..438aa8ac 100644 --- a/crates/core/src/codec/openai_responses.rs +++ b/crates/core/src/codec/openai_responses.rs @@ -192,6 +192,11 @@ fn collect_output_item( .unwrap_or("") { "message" => collect_message_text_parts(item, text_parts), + "output_text" => { + if let Some(text) = output_text_block(item) { + text_parts.push(text); + } + } "function_call" => tool_calls.push(parse_function_call(item)), _ => {} } @@ -244,6 +249,14 @@ fn message_from_text_parts(text_parts: Vec) -> Option { } } +fn top_level_output_text(response: &Json) -> Option { + response + .get("output_text") + .and_then(|value| value.as_str()) + .filter(|text| !text.is_empty()) + .map(|text| MessageContent::Text(text.to_string())) +} + fn optional_vec(items: Vec) -> Option> { (!items.is_empty()).then_some(items) } @@ -456,7 +469,8 @@ impl LlmResponseCodec for OpenAIResponsesCodec { let all_output_items = raw.output.clone(); let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref()); - let message = message_from_text_parts(text_parts); + let message = + message_from_text_parts(text_parts).or_else(|| top_level_output_text(response)); let tool_calls = optional_vec(tool_calls); // Map finish reason from status + incomplete_details. diff --git a/crates/core/src/codec/resolve.rs b/crates/core/src/codec/resolve.rs new file mode 100644 index 00000000..2eeaec6f --- /dev/null +++ b/crates/core/src/codec/resolve.rs @@ -0,0 +1,94 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Provider-surface detection and best-effort normalization: the preferred path +//! for turning raw provider JSON into normalized types when no codec annotation +//! is present. + +use crate::api::llm::LlmRequest; +use crate::json::Json; + +use super::anthropic::AnthropicMessagesCodec; +use super::openai_chat::OpenAIChatCodec; +use super::openai_responses::OpenAIResponsesCodec; +use super::request::AnnotatedLlmRequest; +use super::response::AnnotatedLlmResponse; +use super::traits::{LlmCodec, LlmResponseCodec}; + +/// A built-in provider request/response surface. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProviderSurface { + /// OpenAI Chat Completions. + OpenAIChat, + /// OpenAI Responses. + OpenAIResponses, + /// Anthropic Messages. + AnthropicMessages, +} + +/// Detect the request surface from a raw request body by top-level key. +/// +/// Priority: OpenAI Responses (`input`/`instructions`) > Anthropic Messages +/// (`system`) > OpenAI Chat (`messages`). `None` when no key matches or `body` +/// is not an object. This is a best-effort heuristic: an Anthropic request that +/// omits the optional top-level `system` is indistinguishable from OpenAI Chat +/// and classifies as `OpenAIChat`. +#[must_use] +pub fn detect_request_surface(body: &Json) -> Option { + let obj = body.as_object()?; + if obj.contains_key("input") || obj.contains_key("instructions") { + Some(ProviderSurface::OpenAIResponses) + } else if obj.contains_key("system") { + Some(ProviderSurface::AnthropicMessages) + } else if obj.contains_key("messages") { + Some(ProviderSurface::OpenAIChat) + } else { + None + } +} + +/// Detect the response surface from a raw provider response, classifying only +/// when exactly one built-in shape matches (the built-in codecs accept minimal +/// objects, so decode success alone is not a reliable classifier). +#[must_use] +pub fn detect_response_surface(raw: &Json) -> Option { + let obj = raw.as_object()?; + let is_chat = obj.get("choices").is_some_and(Json::is_array); + let is_responses = obj.get("output").is_some_and(Json::is_array) + || obj.get("output_text").is_some_and(Json::is_string); + let is_anthropic = obj.get("type").and_then(Json::as_str) == Some("message") + && obj.get("content").is_some_and(Json::is_array); + + match (is_chat, is_responses, is_anthropic) { + (true, false, false) => Some(ProviderSurface::OpenAIChat), + (false, true, false) => Some(ProviderSurface::OpenAIResponses), + (false, false, true) => Some(ProviderSurface::AnthropicMessages), + _ => None, + } +} + +/// Best-effort decode of a raw request into [`AnnotatedLlmRequest`] (fail-open). +#[must_use] +pub fn normalize_request(request: &LlmRequest) -> Option { + match detect_request_surface(&request.content)? { + ProviderSurface::OpenAIChat => OpenAIChatCodec.decode(request), + ProviderSurface::OpenAIResponses => OpenAIResponsesCodec.decode(request), + ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode(request), + } + .ok() +} + +/// Best-effort decode of a raw response into [`AnnotatedLlmResponse`] (fail-open). +#[must_use] +pub fn normalize_response(raw: &Json) -> Option { + match detect_response_surface(raw)? { + ProviderSurface::OpenAIChat => OpenAIChatCodec.decode_response(raw), + ProviderSurface::OpenAIResponses => OpenAIResponsesCodec.decode_response(raw), + ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode_response(raw), + } + .ok() +} + +#[cfg(test)] +#[path = "../../tests/unit/codec/resolve_tests.rs"] +mod tests; diff --git a/crates/core/src/codec/traits.rs b/crates/core/src/codec/traits.rs index 6fe5d189..9da3f10a 100644 --- a/crates/core/src/codec/traits.rs +++ b/crates/core/src/codec/traits.rs @@ -18,8 +18,9 @@ use super::response::AnnotatedLlmResponse; /// structured [`AnnotatedLlmRequest`]. /// /// Codecs are implemented by integration patches (LangChain, LangChain-NVIDIA, -/// LangGraph, etc.) since each SDK has its own request format. They are -/// registered by name in the global codec registry. +/// LangGraph, etc.) since each SDK has its own request format. A codec is +/// supplied per call by the caller; the built-in provider codecs can also be +/// selected from a raw payload via [`crate::codec::resolve`]. /// /// # Design /// diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index 4d4d1f31..311b6e9e 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -38,7 +38,8 @@ use uuid::Uuid; use crate::api::event::Event; use crate::api::runtime::EventSubscriberFn; use crate::api::subscriber::flush_subscribers; -use crate::codec::response::{Usage, estimate_cost_for_provider}; +use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent}; +use crate::codec::response::{AnnotatedLlmResponse, Usage, estimate_cost_for_provider}; use crate::error::Result; use crate::json::Json; @@ -1054,6 +1055,36 @@ fn extract_tool_calls(output: &Json) -> Option> { if calls.is_empty() { None } else { Some(calls) } } +// Annotation adapters: read the normalized message from an annotation, returning +// None for multimodal content so the caller falls back to the raw extractor. + +fn atif_message_from_annotated_request(request: &AnnotatedLlmRequest) -> Option { + let content = request + .messages + .iter() + .rev() + .find_map(|message| match message { + Message::User { content, .. } => Some(content), + _ => None, + })?; + match content { + MessageContent::Text(text) => Some(Json::String(text.clone())), + MessageContent::Parts(_) => None, + } +} + +fn atif_message_from_annotated_response(response: &AnnotatedLlmResponse) -> Option { + match &response.message { + Some(MessageContent::Text(text)) => Some(Json::String(text.clone())), + // Multimodal: defer to the raw extractor (returns None to fall back). + Some(MessageContent::Parts(_)) => None, + // No assistant text (tool-call-only, or reasoning/thinking-only): emit an + // empty message rather than the raw extractor's stringified payload. Tool + // calls and metrics are still recovered from the raw output downstream. + None => Some(empty_message()), + } +} + fn tool_call_array(output: &Json) -> Option<&Vec> { output .as_object() @@ -2212,7 +2243,10 @@ impl StepConversionState { self.steps.push(AtifStep { step_id: 0, source: "user".to_string(), - message: extract_user_messages(&content), + message: event + .annotated_request() + .and_then(|request| atif_message_from_annotated_request(request)) + .unwrap_or_else(|| extract_user_messages(&content)), timestamp: Some(event.timestamp().to_rfc3339()), model_name: None, reasoning_effort: None, @@ -2253,7 +2287,10 @@ impl StepConversionState { self.steps.push(AtifStep { step_id: 0, source: "agent".to_string(), - message: extract_llm_response_message(output), + message: event + .annotated_response() + .and_then(|response| atif_message_from_annotated_response(response)) + .unwrap_or_else(|| extract_llm_response_message(output)), timestamp: Some(event.timestamp().to_rfc3339()), model_name: event.model_name().map(ToOwned::to_owned), reasoning_effort, diff --git a/crates/core/src/observability/manual.rs b/crates/core/src/observability/manual.rs new file mode 100644 index 00000000..0d2977ca --- /dev/null +++ b/crates/core/src/observability/manual.rs @@ -0,0 +1,197 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Shared best-effort extraction for non-provider/manual LLM output, used as the +//! fallback by the OpenTelemetry and OpenInference exporters. + +use serde_json::Map; + +use crate::codec::response::Usage; +use crate::json::Json; + +pub(crate) fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { + output?.as_object()?.get("model").and_then(Json::as_str) +} + +pub(crate) fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + if usage.is_none() && token_usage.is_none() { + return None; + } + + let prompt_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &["prompt_tokens", "input_tokens", "inputTokens", "input"], + ); + let completion_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &[ + "completion_tokens", + "output_tokens", + "completionTokens", + "outputTokens", + "output", + ], + ); + let reported_total_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &["total_tokens", "totalTokens", "total"], + ); + let cache_read_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &[ + "cache_read_tokens", + "cached_tokens", + "cache_read_input_tokens", + "cacheReadTokens", + "cachedTokens", + "cacheReadInputTokens", + "cacheRead", + ], + ) + .or_else(|| { + first_nested_u64_from_manual_usage( + usage, + token_usage, + "input_tokens_details", + "cached_tokens", + ) + }) + .or_else(|| { + first_nested_u64_from_manual_usage( + usage, + token_usage, + "prompt_tokens_details", + "cached_tokens", + ) + }); + let cache_write_tokens = first_u64_from_manual_usage( + usage, + token_usage, + &[ + "cache_write_tokens", + "cache_creation_input_tokens", + "cacheWriteTokens", + "cacheCreationInputTokens", + "cacheWrite", + ], + ); + + if prompt_tokens.is_none() + && completion_tokens.is_none() + && reported_total_tokens.is_none() + && cache_read_tokens.is_none() + && cache_write_tokens.is_none() + { + return None; + } + let total_tokens = + normalize_total_tokens(reported_total_tokens, prompt_tokens, completion_tokens); + + Some(Usage { + prompt_tokens, + completion_tokens, + total_tokens, + cache_read_tokens, + cache_write_tokens, + cost: None, + }) +} + +pub(crate) fn cost_from_manual_llm_output( + output: Option<&Json>, + usd_only: bool, +) -> Option<(f64, String)> { + let object = output?.as_object()?; + let usage = object.get("usage").and_then(Json::as_object); + let token_usage = object.get("token_usage").and_then(Json::as_object); + usage + .and_then(|usage| cost_from_manual_usage(usage, usd_only)) + .or_else(|| token_usage.and_then(|usage| cost_from_manual_usage(usage, usd_only))) +} + +fn cost_from_manual_usage(usage: &Map, usd_only: bool) -> Option<(f64, String)> { + if let Some(total) = usage.get("cost_usd").and_then(Json::as_f64) { + return Some((total, "USD".to_string())); + } + let cost = usage.get("cost")?.as_object()?; + let currency = cost + .get("currency") + .and_then(Json::as_str) + .unwrap_or("USD") + .to_string(); + if usd_only && !currency.eq_ignore_ascii_case("USD") { + return None; + } + let total = cost.get("total").and_then(Json::as_f64).or_else(|| { + let (has_component, component_total) = ["input", "output", "cache_read", "cache_write"] + .iter() + .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) + .fold((false, 0.0), |(_, total), value| (true, total + value)); + has_component.then_some(component_total) + })?; + Some((total, currency)) +} + +// Keep a reported total only when it is internally consistent with the component +// counts. Deriving a total from components is a provider-specific concern owned by +// the provider codecs, not this manual fallback, so an absent total stays absent. +fn normalize_total_tokens( + total_tokens: Option, + prompt_tokens: Option, + completion_tokens: Option, +) -> Option { + let total_tokens = total_tokens?; + let minimum_total = prompt_tokens + .unwrap_or(0) + .saturating_add(completion_tokens.unwrap_or(0)); + if minimum_total == 0 || total_tokens >= minimum_total { + Some(total_tokens) + } else { + None + } +} + +fn first_u64_from_manual_usage( + usage: Option<&Map>, + token_usage: Option<&Map>, + keys: &[&str], +) -> Option { + usage + .and_then(|value| first_u64(value, keys)) + .or_else(|| token_usage.and_then(|value| first_u64(value, keys))) +} + +fn first_nested_u64_from_manual_usage( + usage: Option<&Map>, + token_usage: Option<&Map>, + parent_key: &str, + child_key: &str, +) -> Option { + usage + .and_then(|value| nested_u64(value, parent_key, child_key)) + .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) +} + +fn first_u64(usage: &Map, keys: &[&str]) -> Option { + keys.iter() + .find_map(|key| usage.get(*key).and_then(Json::as_u64)) +} + +fn nested_u64(usage: &Map, parent_key: &str, child_key: &str) -> Option { + usage + .get(parent_key) + .and_then(Json::as_object) + .and_then(|details| details.get(child_key)) + .and_then(Json::as_u64) +} + +#[cfg(test)] +#[path = "../../tests/unit/observability/manual_tests.rs"] +mod tests; diff --git a/crates/core/src/observability/mod.rs b/crates/core/src/observability/mod.rs index 08aa2de3..dd83c5cf 100644 --- a/crates/core/src/observability/mod.rs +++ b/crates/core/src/observability/mod.rs @@ -13,6 +13,8 @@ pub(crate) fn test_mutex() -> &'static Mutex<()> { pub mod atif; pub mod atof; +#[cfg(any(feature = "otel", feature = "openinference"))] +pub(crate) mod manual; #[cfg(feature = "openinference")] pub mod openinference; #[cfg(feature = "otel")] diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 0af88e72..65a86fa7 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -20,6 +20,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use super::manual; use crate::api::event::{Event, ScopeCategory}; use crate::api::runtime::EventSubscriberFn; use crate::api::scope::ScopeType; @@ -754,26 +755,54 @@ fn end_attributes(event: &Event) -> Vec { attributes.push(KeyValue::new(oi::output::MIME_TYPE, mime_type)); } let fallback_usage = if is_llm { - usage_from_manual_llm_output(event.output()) + manual::usage_from_manual_llm_output(event.output()) } else { None }; - let usage = event - .annotated_response() - .and_then(|response| response.usage.as_ref()) - .or(fallback_usage.as_ref()); + // Combine codec-normalized usage (which carries provider-derived fields such + // as Anthropic's computed total) with the manual scraper, preferring codec + // values per field so neither source's coverage is lost. + let normalized = if is_llm { + event.normalized_llm_response() + } else { + None + }; + let usage = merge_usage( + normalized + .as_ref() + .and_then(|response| response.usage.as_ref()), + fallback_usage.as_ref(), + ); if is_llm { - push_llm_usage_attributes(&mut attributes, usage); + push_llm_usage_attributes(&mut attributes, usage.as_ref()); } if is_llm && let Some(cost_total) = cost_total_from_llm_event(event, fallback_usage.as_ref()) { attributes.push(KeyValue::new(oi::llm::cost::TOTAL, cost_total)); } if is_llm { - push_llm_response_attributes(&mut attributes, event); + push_llm_response_attributes(&mut attributes, event, normalized.as_deref()); } attributes } +// Merge two usage sources field by field, preferring `primary` (codec-normalized) +// and filling gaps from `secondary` (manual scraper). This keeps provider-derived +// fields without dropping anything either source alone would have reported. +fn merge_usage(primary: Option<&Usage>, secondary: Option<&Usage>) -> Option { + match (primary, secondary) { + (None, None) => None, + (None, Some(usage)) | (Some(usage), None) => Some(usage.clone()), + (Some(primary), Some(secondary)) => Some(Usage { + prompt_tokens: primary.prompt_tokens.or(secondary.prompt_tokens), + completion_tokens: primary.completion_tokens.or(secondary.completion_tokens), + total_tokens: primary.total_tokens.or(secondary.total_tokens), + cache_read_tokens: primary.cache_read_tokens.or(secondary.cache_read_tokens), + cache_write_tokens: primary.cache_write_tokens.or(secondary.cache_write_tokens), + cost: primary.cost.clone().or_else(|| secondary.cost.clone()), + }), + } +} + fn push_llm_usage_attributes(attributes: &mut Vec, usage: Option<&Usage>) { let Some(usage) = usage else { return; @@ -807,28 +836,44 @@ fn push_llm_request_attributes(attributes: &mut Vec, event: &Event) { return; } - let Some(input) = event.input().and_then(replay_llm_payload) else { + // Match replay before codec detection: replay content can look + // provider-shaped (carry `messages`) and would otherwise be misrouted. + if let Some(input) = event.input().and_then(replay_llm_payload) { + if let Some(provider) = input.get("provider").and_then(Json::as_str) { + attributes.push(KeyValue::new(oi::llm::PROVIDER, provider.to_string())); + } + if let Some(system) = input.get("systemPrompt").and_then(display_text_from_json) { + attributes.push(KeyValue::new(oi::llm::SYSTEM, system)); + } + push_replay_input_messages(attributes, input); return; - }; - if let Some(provider) = input.get("provider").and_then(Json::as_str) { - attributes.push(KeyValue::new(oi::llm::PROVIDER, provider.to_string())); } - if let Some(system) = input.get("systemPrompt").and_then(display_text_from_json) { - attributes.push(KeyValue::new(oi::llm::SYSTEM, system)); + + if let Some(request) = event.normalized_llm_request() { + push_annotated_request_attributes(attributes, &request); } - push_replay_input_messages(attributes, input); } -fn push_llm_response_attributes(attributes: &mut Vec, event: &Event) { +fn push_llm_response_attributes( + attributes: &mut Vec, + event: &Event, + normalized: Option<&AnnotatedLlmResponse>, +) { if let Some(response) = event.annotated_response() { push_annotated_response_attributes(attributes, response); return; } - let Some(output) = event.output().and_then(replay_llm_response) else { + if let Some(output) = event.output().and_then(replay_llm_response) { + push_replay_response_attributes(attributes, output); return; - }; - push_replay_response_attributes(attributes, output); + } + + // Reuse the response decoded once in `end_attributes` (annotation-first; + // falls through to codec detection) instead of decoding the payload again. + if let Some(response) = normalized { + push_annotated_response_attributes(attributes, response); + } } fn push_annotated_request_attributes( @@ -1129,17 +1174,10 @@ fn finish_reason_value(reason: &FinishReason) -> String { } } -fn cost_total_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - usage - .and_then(cost_total_from_usage) - .or_else(|| token_usage.and_then(cost_total_from_usage)) -} - fn cost_total_from_llm_event(event: &Event, fallback_usage: Option<&Usage>) -> Option { - if let Some(cost) = cost_total_from_manual_llm_output(event.output()) { + if let Some(cost) = + manual::cost_from_manual_llm_output(event.output(), true).map(|(total, _)| total) + { return Some(cost); } @@ -1158,178 +1196,11 @@ fn cost_total_from_llm_event(event: &Event, fallback_usage: Option<&Usage>) -> O let usage = fallback_usage?; let model_name = event .model_name() - .or_else(|| model_name_from_manual_llm_output(event.output()))?; + .or_else(|| manual::model_name_from_manual_llm_output(event.output()))?; estimate_cost_for_provider(Some(event.name()), model_name, usage) .and_then(|cost| cost.total_for_currency("USD")) } -fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { - output?.as_object()?.get("model").and_then(Json::as_str) -} - -fn cost_total_from_usage(usage: &serde_json::Map) -> Option { - usage.get("cost_usd").and_then(Json::as_f64).or_else(|| { - let cost = usage.get("cost")?.as_object()?; - let currency = cost.get("currency").and_then(Json::as_str); - let is_usd_cost = currency.is_none_or(|currency| currency.eq_ignore_ascii_case("USD")); - if !is_usd_cost { - return None; - } - cost.get("total").and_then(Json::as_f64).or_else(|| { - let (has_component, component_total) = ["input", "output", "cache_read", "cache_write"] - .iter() - .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) - .fold((false, 0.0), |(_, total), value| (true, total + value)); - has_component.then_some(component_total) - }) - }) -} - -fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - if usage.is_none() && token_usage.is_none() { - return None; - } - - let prompt_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["prompt_tokens", "input_tokens", "inputTokens", "input"], - ); - let completion_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "completion_tokens", - "output_tokens", - "completionTokens", - "outputTokens", - "output", - ], - ); - let reported_total_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["total_tokens", "totalTokens", "total"], - ); - let cache_read_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_read_tokens", - "cached_tokens", - "cache_read_input_tokens", - "cacheReadTokens", - "cachedTokens", - "cacheReadInputTokens", - "cacheRead", - ], - ) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "input_tokens_details", - "cached_tokens", - ) - }) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "prompt_tokens_details", - "cached_tokens", - ) - }); - let cache_write_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_write_tokens", - "cache_creation_input_tokens", - "cacheWriteTokens", - "cacheCreationInputTokens", - "cacheWrite", - ], - ); - - if prompt_tokens.is_none() - && completion_tokens.is_none() - && reported_total_tokens.is_none() - && cache_read_tokens.is_none() - && cache_write_tokens.is_none() - { - return None; - } - let total_tokens = - normalize_total_tokens(reported_total_tokens, prompt_tokens, completion_tokens); - - Some(Usage { - prompt_tokens, - completion_tokens, - total_tokens, - cache_read_tokens, - cache_write_tokens, - cost: None, - }) -} - -fn normalize_total_tokens( - total_tokens: Option, - prompt_tokens: Option, - completion_tokens: Option, -) -> Option { - let total_tokens = total_tokens?; - let minimum_total = prompt_tokens - .unwrap_or(0) - .saturating_add(completion_tokens.unwrap_or(0)); - if minimum_total == 0 || total_tokens >= minimum_total { - Some(total_tokens) - } else { - None - } -} - -fn first_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - keys: &[&str], -) -> Option { - usage - .and_then(|value| first_u64(value, keys)) - .or_else(|| token_usage.and_then(|value| first_u64(value, keys))) -} - -fn first_nested_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - parent_key: &str, - child_key: &str, -) -> Option { - usage - .and_then(|value| nested_u64(value, parent_key, child_key)) - .or_else(|| token_usage.and_then(|value| nested_u64(value, parent_key, child_key))) -} - -fn nested_u64( - usage: &serde_json::Map, - parent_key: &str, - child_key: &str, -) -> Option { - usage - .get(parent_key) - .and_then(Json::as_object) - .and_then(|details| details.get(child_key)) - .and_then(Json::as_u64) -} - -fn first_u64(usage: &serde_json::Map, keys: &[&str]) -> Option { - keys.iter() - .find_map(|key| usage.get(*key).and_then(Json::as_u64)) -} - fn mark_attributes(event: &Event) -> Vec { let handle_attributes = event.attributes(); let mut attributes = vec![ diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 37628ec4..ea4400ec 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -20,14 +20,14 @@ use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use super::manual; use crate::api::event::Event; use crate::api::event::ScopeCategory; use crate::api::runtime::EventSubscriberFn; use crate::api::scope::ScopeType; use crate::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; -use crate::codec::response::{CostEstimate, Usage, estimate_cost_for_provider}; +use crate::codec::response::{CostEstimate, estimate_cost_for_provider}; use crate::error::FlowError; -use crate::json::Json; use chrono::{DateTime, Utc}; use opentelemetry::trace::{ Span as _, SpanContext, SpanKind, TraceContextExt, Tracer, TracerProvider as _, @@ -713,7 +713,7 @@ fn end_attributes(event: &Event) -> Vec { } fn cost_from_llm_event(event: &Event) -> Option<(f64, String)> { - if let Some(cost) = cost_from_manual_llm_output(event.output()) { + if let Some(cost) = manual::cost_from_manual_llm_output(event.output(), false) { return Some(cost); } if let Some(response) = event.annotated_response() @@ -727,10 +727,10 @@ fn cost_from_llm_event(event: &Event) -> Option<(f64, String)> { .and_then(|cost| cost_total_and_currency(&cost)); } } - let usage = usage_from_manual_llm_output(event.output())?; + let usage = manual::usage_from_manual_llm_output(event.output())?; let model_name = event .model_name() - .or_else(|| model_name_from_manual_llm_output(event.output()))?; + .or_else(|| manual::model_name_from_manual_llm_output(event.output()))?; estimate_cost_for_provider(Some(event.name()), model_name, &usage) .and_then(|cost| cost_total_and_currency(&cost)) } @@ -739,184 +739,6 @@ fn cost_total_and_currency(cost: &CostEstimate) -> Option<(f64, String)> { Some((cost.total_or_component_sum()?, cost.currency.clone())) } -fn cost_from_manual_llm_output(output: Option<&Json>) -> Option<(f64, String)> { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - usage - .and_then(cost_from_manual_usage) - .or_else(|| token_usage.and_then(cost_from_manual_usage)) -} - -fn cost_from_manual_usage(usage: &serde_json::Map) -> Option<(f64, String)> { - usage - .get("cost_usd") - .and_then(Json::as_f64) - .map(|total| (total, "USD".to_string())) - .or_else(|| { - let cost = usage.get("cost")?.as_object()?; - let total = cost.get("total").and_then(Json::as_f64).or_else(|| { - let (has_component, component_total) = - ["input", "output", "cache_read", "cache_write"] - .iter() - .filter_map(|field| cost.get(*field).and_then(Json::as_f64)) - .fold((false, 0.0), |(_, total), value| (true, total + value)); - has_component.then_some(component_total) - })?; - Some(( - total, - cost.get("currency") - .and_then(Json::as_str) - .unwrap_or("USD") - .to_string(), - )) - }) -} - -fn usage_from_manual_llm_output(output: Option<&Json>) -> Option { - let object = output?.as_object()?; - let usage = object.get("usage").and_then(Json::as_object); - let token_usage = object.get("token_usage").and_then(Json::as_object); - if usage.is_none() && token_usage.is_none() { - return None; - } - - let prompt_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["prompt_tokens", "input_tokens", "inputTokens", "input"], - ); - let completion_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "completion_tokens", - "output_tokens", - "completionTokens", - "outputTokens", - "output", - ], - ); - let reported_total_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &["total_tokens", "totalTokens", "total"], - ); - let cache_read_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_read_tokens", - "cached_tokens", - "cache_read_input_tokens", - "cacheReadTokens", - "cachedTokens", - "cacheReadInputTokens", - "cacheRead", - ], - ) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "input_tokens_details", - "cached_tokens", - ) - }) - .or_else(|| { - first_nested_u64_from_manual_usage( - usage, - token_usage, - "prompt_tokens_details", - "cached_tokens", - ) - }); - let cache_write_tokens = first_u64_from_manual_usage( - usage, - token_usage, - &[ - "cache_write_tokens", - "cache_creation_input_tokens", - "cacheWriteTokens", - "cacheCreationInputTokens", - "cacheWrite", - ], - ); - - if prompt_tokens.is_none() - && completion_tokens.is_none() - && reported_total_tokens.is_none() - && cache_read_tokens.is_none() - && cache_write_tokens.is_none() - { - return None; - } - - Some(Usage { - prompt_tokens, - completion_tokens, - total_tokens: normalize_total_tokens( - reported_total_tokens, - prompt_tokens, - completion_tokens, - ), - cache_read_tokens, - cache_write_tokens, - cost: None, - }) -} - -fn model_name_from_manual_llm_output(output: Option<&Json>) -> Option<&str> { - output?.as_object()?.get("model").and_then(Json::as_str) -} - -fn first_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - keys: &[&str], -) -> Option { - keys.iter().find_map(|key| { - usage - .and_then(|usage| usage.get(*key).and_then(Json::as_u64)) - .or_else(|| token_usage.and_then(|usage| usage.get(*key).and_then(Json::as_u64))) - }) -} - -fn first_nested_u64_from_manual_usage( - usage: Option<&serde_json::Map>, - token_usage: Option<&serde_json::Map>, - parent: &str, - key: &str, -) -> Option { - usage - .and_then(|usage| usage.get(parent).and_then(Json::as_object)) - .and_then(|details| details.get(key).and_then(Json::as_u64)) - .or_else(|| { - token_usage - .and_then(|usage| usage.get(parent).and_then(Json::as_object)) - .and_then(|details| details.get(key).and_then(Json::as_u64)) - }) -} - -fn normalize_total_tokens( - reported_total_tokens: Option, - prompt_tokens: Option, - completion_tokens: Option, -) -> Option { - let calculated_total = match (prompt_tokens, completion_tokens) { - (Some(prompt), Some(completion)) => Some(prompt + completion), - (Some(prompt), None) => Some(prompt), - (None, Some(completion)) => Some(completion), - (None, None) => None, - }; - match (reported_total_tokens, calculated_total) { - (Some(reported), Some(calculated)) if reported >= calculated => Some(reported), - (Some(_), Some(calculated)) => Some(calculated), - (Some(reported), None) => Some(reported), - (None, calculated) => calculated, - } -} - fn mark_attributes(event: &Event) -> Vec { let handle_attributes = event.attributes(); let mut attributes = vec![ diff --git a/crates/core/tests/unit/atif_tests.rs b/crates/core/tests/unit/atif_tests.rs index 1bfe2cdd..660fe0ba 100644 --- a/crates/core/tests/unit/atif_tests.rs +++ b/crates/core/tests/unit/atif_tests.rs @@ -8,15 +8,22 @@ use crate::api::event::{ BaseEvent, CategoryProfile, Event, EventCategory, MarkEvent, ScopeCategory, ScopeEvent, llm_attributes_to_strings, scope_attributes_to_strings, tool_attributes_to_strings, }; -use crate::api::llm::LlmAttributes; +use crate::api::llm::{LlmAttributes, LlmRequest}; use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType}; use crate::api::tool::ToolAttributes; +use crate::codec::anthropic::AnthropicMessagesCodec; +use crate::codec::openai_chat::OpenAIChatCodec; +use crate::codec::openai_responses::OpenAIResponsesCodec; use crate::codec::pricing::pricing_test_mutex; +use crate::codec::request::AnnotatedLlmRequest; use crate::codec::response::{ - PricingCatalog, PricingResolver, reset_active_pricing_resolver, set_active_pricing_resolver, + AnnotatedLlmResponse, PricingCatalog, PricingResolver, reset_active_pricing_resolver, + set_active_pricing_resolver, }; +use crate::codec::traits::{LlmCodec, LlmResponseCodec}; use serde_json::json; use std::collections::HashSet; +use std::sync::Arc; struct ResetPricingResolverGuard; @@ -73,6 +80,8 @@ struct TestEventBuilder { output: Option, model_name: Option, tool_call_id: Option, + annotated_request: Option>, + annotated_response: Option>, } impl TestEventBuilder { @@ -121,6 +130,16 @@ impl TestEventBuilder { self } + fn annotated_request(mut self, annotated: AnnotatedLlmRequest) -> Self { + self.annotated_request = Some(Arc::new(annotated)); + self + } + + fn annotated_response(mut self, annotated: AnnotatedLlmResponse) -> Self { + self.annotated_response = Some(Arc::new(annotated)); + self + } + fn build(self) -> Event { match (self.event_type, self.scope_type) { (EventType::Mark, _) => Event::Mark(MarkEvent::new( @@ -191,6 +210,7 @@ impl TestEventBuilder { Some( CategoryProfile::builder() .model_name_opt(self.model_name) + .annotated_request_opt(self.annotated_request) .build(), ), )), @@ -211,6 +231,7 @@ impl TestEventBuilder { Some( CategoryProfile::builder() .model_name_opt(self.model_name) + .annotated_response_opt(self.annotated_response) .build(), ), )), @@ -265,6 +286,8 @@ fn event_builder(uuid: Uuid, event_type: EventType) -> TestEventBuilder { output: None, model_name: None, tool_call_id: None, + annotated_request: None, + annotated_response: None, } } @@ -1691,6 +1714,217 @@ fn test_exporter_llm_tool_calls_promoted() { assert_eq!(extra.llm_response.unwrap()["role"], json!("assistant")); } +#[test] +fn test_exporter_uses_annotated_message_but_raw_tool_calls() { + // The annotation supplies the normalized message text (winning over the raw + // content), while tool calls stay on the raw path so provider-specific + // extras are preserved. + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let annotated = OpenAIChatCodec + .decode_response(&json!({ + "choices": [{ + "message": {"role": "assistant", "content": "from annotation"}, + "finish_reason": "stop" + }] + })) + .unwrap(); + + let end = event_builder(Uuid::now_v7(), EventType::End) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .output(json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": "from RAW", + "tool_calls": [{ + "id": "call_1", + "type": "function", + "provider_data": {"trace_id": "t-1"}, + "function": {"name": "search", "arguments": "{\"q\":\"x\"}"} + }] + } + }] + })) + .annotated_response(annotated) + .build(); + exporter.state.lock().unwrap().events.push(end); + + let trajectory = exporter.export().unwrap(); + let step = &trajectory.steps[0]; + // Message comes from the annotation. + assert_eq!(step.message, json!("from annotation")); + // Tool calls come from the raw payload, with provider extras preserved. + let tool_calls = step.tool_calls.as_ref().unwrap(); + assert_eq!(tool_calls[0].function_name, "search"); + assert_eq!(tool_calls[0].arguments, json!({"q": "x"})); + assert_eq!( + tool_calls[0].extra.as_ref().unwrap()["provider_data"]["trace_id"], + json!("t-1") + ); +} + +#[test] +fn test_exporter_annotated_tool_only_response_renders_empty_message() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let annotated = OpenAIChatCodec + .decode_response(&json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": null, + "tool_calls": [{ + "id": "call_2", + "type": "function", + "function": {"name": "lookup", "arguments": "{}"} + }] + }, + "finish_reason": "tool_calls" + }] + })) + .unwrap(); + + let end = event_builder(Uuid::now_v7(), EventType::End) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .output(json!({ + "choices": [{ + "message": { + "role": "assistant", + "content": null, + "tool_calls": [{ + "id": "call_2", + "type": "function", + "function": {"name": "lookup", "arguments": "{}"} + }] + } + }] + })) + .annotated_response(annotated) + .build(); + exporter.state.lock().unwrap().events.push(end); + + let trajectory = exporter.export().unwrap(); + let step = &trajectory.steps[0]; + assert_eq!(step.message, json!("")); + assert_eq!(step.tool_calls.as_ref().unwrap()[0].function_name, "lookup"); +} + +#[test] +fn test_exporter_annotated_reasoning_only_response_renders_empty_message() { + // An OpenAI Responses reasoning-only output decodes to no assistant text. + // The annotated path emits an empty message rather than the raw extractor's + // stringified payload. + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let output = json!({ + "model": "gpt-4o", + "output": [{"type": "reasoning", "summary": []}], + "status": "completed" + }); + let annotated = OpenAIResponsesCodec.decode_response(&output).unwrap(); + assert!(annotated.message.is_none()); + + let end = event_builder(Uuid::now_v7(), EventType::End) + .name("gpt-4o") + .scope_type(ScopeType::Llm) + .output(output) + .annotated_response(annotated) + .build(); + exporter.state.lock().unwrap().events.push(end); + + let trajectory = exporter.export().unwrap(); + assert_eq!(trajectory.steps[0].message, json!("")); +} + +#[test] +fn test_exporter_annotated_thinking_only_response_renders_empty_message() { + // An Anthropic thinking-only response decodes to no assistant text; the + // annotated path emits an empty message rather than dumping the raw blocks. + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let output = json!({ + "type": "message", + "role": "assistant", + "model": "claude-3-5-sonnet", + "content": [{"type": "thinking", "thinking": "hmm", "signature": "sig"}], + "stop_reason": "end_turn" + }); + let annotated = AnthropicMessagesCodec.decode_response(&output).unwrap(); + assert!(annotated.message.is_none()); + + let end = event_builder(Uuid::now_v7(), EventType::End) + .name("claude-3-5-sonnet") + .scope_type(ScopeType::Llm) + .output(output) + .annotated_response(annotated) + .build(); + exporter.state.lock().unwrap().events.push(end); + + let trajectory = exporter.export().unwrap(); + assert_eq!(trajectory.steps[0].message, json!("")); +} + +#[test] +fn test_exporter_prefers_annotated_request_message() { + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let request = LlmRequest { + headers: serde_json::Map::new(), + content: json!({ + "model": "gpt-4", + "messages": [{"role": "user", "content": "hello from annotation"}] + }), + }; + let annotated = OpenAIChatCodec.decode(&request).unwrap(); + + let start = event_builder(Uuid::now_v7(), EventType::Start) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .input(json!({"messages": [{"role": "user", "content": "from RAW"}]})) + .annotated_request(annotated) + .build(); + exporter.state.lock().unwrap().events.push(start); + + let trajectory = exporter.export().unwrap(); + let step = &trajectory.steps[0]; + assert_eq!(step.source, "user"); + assert_eq!(step.message, json!("hello from annotation")); +} + +#[test] +fn test_exporter_annotated_multimodal_request_falls_back_to_raw() { + // A multimodal (content-part) request message must not be flattened to text: + // the annotation adapter returns None and ATIF preserves the raw content. + let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); + let multimodal = json!({ + "model": "gpt-4", + "messages": [{"role": "user", "content": [ + {"type": "text", "text": "describe"}, + {"type": "image_url", "image_url": {"url": "https://example/img.png"}} + ]}] + }); + let annotated = OpenAIChatCodec + .decode(&LlmRequest { + headers: serde_json::Map::new(), + content: multimodal.clone(), + }) + .unwrap(); + + let start = event_builder(Uuid::now_v7(), EventType::Start) + .name("gpt-4") + .scope_type(ScopeType::Llm) + .input(multimodal) + .annotated_request(annotated) + .build(); + exporter.state.lock().unwrap().events.push(start); + + let trajectory = exporter.export().unwrap(); + let message = trajectory.steps[0].message.as_str().unwrap(); + // Raw fallback preserves the image part rather than flattening to "describe". + assert!( + message.contains("image_url"), + "multimodal content preserved: {message}" + ); +} + #[test] fn test_exporter_hermes_wrapper_payload_is_atif_v17_compatible() { let exporter = AtifExporter::new("session-1".to_string(), make_agent_info()); diff --git a/crates/core/tests/unit/codec/openai_responses_tests.rs b/crates/core/tests/unit/codec/openai_responses_tests.rs index 2576c8ba..3feafeda 100644 --- a/crates/core/tests/unit/codec/openai_responses_tests.rs +++ b/crates/core/tests/unit/codec/openai_responses_tests.rs @@ -311,6 +311,58 @@ fn test_decode_response_multiple_output_text_items() { ); } +#[test] +fn test_decode_response_item_level_output_text() { + // A top-level `output_text` output item (sibling of `message`/`function_call`). + let codec = OpenAIResponsesCodec; + let response = json!({ + "output": [ + { "type": "output_text", "text": "Item text." } + ] + }); + let resp = codec.decode_response(&response).unwrap(); + assert_eq!( + resp.message, + Some(MessageContent::Text("Item text.".into())) + ); +} + +#[test] +fn test_decode_response_top_level_output_text_fallback() { + // The flattened top-level `output_text` is used when `output` yields no text. + let codec = OpenAIResponsesCodec; + let response = json!({ + "output": [], + "output_text": "Aggregated text." + }); + let resp = codec.decode_response(&response).unwrap(); + assert_eq!( + resp.message, + Some(MessageContent::Text("Aggregated text.".into())) + ); +} + +#[test] +fn test_decode_response_output_items_take_precedence_over_top_level_output_text() { + // Structured `output` message text wins over the flattened `output_text`. + let codec = OpenAIResponsesCodec; + let response = json!({ + "output": [ + { + "type": "message", + "role": "assistant", + "content": [ { "type": "output_text", "text": "Structured." } ] + } + ], + "output_text": "Aggregate that should be ignored." + }); + let resp = codec.decode_response(&response).unwrap(); + assert_eq!( + resp.message, + Some(MessageContent::Text("Structured.".into())) + ); +} + #[test] fn test_decode_response_only_reasoning_items() { let codec = OpenAIResponsesCodec; diff --git a/crates/core/tests/unit/codec/resolve_tests.rs b/crates/core/tests/unit/codec/resolve_tests.rs new file mode 100644 index 00000000..c307f6bd --- /dev/null +++ b/crates/core/tests/unit/codec/resolve_tests.rs @@ -0,0 +1,220 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for provider-surface detection and best-effort normalization. + +use super::*; +use crate::api::llm::LlmRequest; +use serde_json::json; + +fn req(content: serde_json::Value) -> LlmRequest { + LlmRequest { + headers: serde_json::Map::new(), + content, + } +} + +// --------------------------------------------------------------------------- +// detect_request_surface (priority order, hoisted from adaptive) +// --------------------------------------------------------------------------- + +#[test] +fn detect_request_responses_by_input_or_instructions() { + assert_eq!( + detect_request_surface(&json!({"input": []})), + Some(ProviderSurface::OpenAIResponses) + ); + assert_eq!( + detect_request_surface(&json!({"instructions": "x"})), + Some(ProviderSurface::OpenAIResponses) + ); +} + +#[test] +fn detect_request_anthropic_by_system() { + assert_eq!( + detect_request_surface(&json!({"system": "x", "messages": []})), + Some(ProviderSurface::AnthropicMessages) + ); +} + +#[test] +fn detect_request_chat_by_messages() { + assert_eq!( + detect_request_surface(&json!({"messages": []})), + Some(ProviderSurface::OpenAIChat) + ); +} + +#[test] +fn detect_request_priority_responses_then_anthropic_then_chat() { + // `input` wins even alongside `system` and `messages`. + assert_eq!( + detect_request_surface(&json!({"input": [], "system": "x", "messages": []})), + Some(ProviderSurface::OpenAIResponses) + ); + // `system` wins over `messages` (Anthropic carries both). + assert_eq!( + detect_request_surface(&json!({"system": "x", "messages": []})), + Some(ProviderSurface::AnthropicMessages) + ); +} + +#[test] +fn detect_request_none_for_unknown_or_non_object() { + assert_eq!(detect_request_surface(&json!({})), None); + assert_eq!(detect_request_surface(&json!({"foo": 1})), None); + assert_eq!(detect_request_surface(&json!([1, 2, 3])), None); + assert_eq!(detect_request_surface(&json!("string")), None); +} + +// --------------------------------------------------------------------------- +// detect_response_surface (strict; ambiguity -> None) +// --------------------------------------------------------------------------- + +#[test] +fn detect_response_chat_by_choices() { + assert_eq!( + detect_response_surface(&json!({"choices": []})), + Some(ProviderSurface::OpenAIChat) + ); +} + +#[test] +fn detect_response_responses_by_output_or_output_text() { + assert_eq!( + detect_response_surface(&json!({"output": []})), + Some(ProviderSurface::OpenAIResponses) + ); + assert_eq!( + detect_response_surface(&json!({"output_text": "hi"})), + Some(ProviderSurface::OpenAIResponses) + ); +} + +#[test] +fn detect_response_output_text_must_be_string() { + // A non-string `output_text` (null/object) is not a Responses match. + assert_eq!(detect_response_surface(&json!({"output_text": null})), None); + assert_eq!( + detect_response_surface(&json!({"output_text": {"nested": 1}})), + None + ); +} + +#[test] +fn detect_response_anthropic_by_type_message_and_content() { + assert_eq!( + detect_response_surface(&json!({"type": "message", "content": []})), + Some(ProviderSurface::AnthropicMessages) + ); +} + +#[test] +fn detect_response_none_for_empty_object_the_decode_trap() { + // The built-in codecs decode `{}` successfully, so detection must NOT rely + // on decode success: an empty object classifies to None. + assert_eq!(detect_response_surface(&json!({})), None); +} + +#[test] +fn detect_response_none_for_ambiguous_choices_and_output() { + assert_eq!( + detect_response_surface(&json!({"choices": [], "output": []})), + None + ); +} + +#[test] +fn detect_response_none_for_partial_anthropic() { + // `type == "message"` without a content array does not classify. + assert_eq!(detect_response_surface(&json!({"type": "message"})), None); + // A content array without `type == "message"` does not classify. + assert_eq!(detect_response_surface(&json!({"content": []})), None); +} + +#[test] +fn detect_response_none_for_non_object() { + assert_eq!(detect_response_surface(&json!([1, 2])), None); +} + +// --------------------------------------------------------------------------- +// normalize_response (detect -> decode, fail-open) +// --------------------------------------------------------------------------- + +#[test] +fn normalize_response_decodes_detected_chat() { + let raw = json!({ + "id": "r1", + "model": "gpt-4o", + "choices": [{ + "message": {"role": "assistant", "content": "hello"}, + "finish_reason": "stop" + }] + }); + let decoded = normalize_response(&raw).expect("chat response decodes"); + assert_eq!(decoded.response_text(), Some("hello")); +} + +#[test] +fn normalize_response_decodes_detected_responses_output_text() { + // Top-level `output_text` (the codec extension) detects + decodes as Responses. + let raw = json!({ + "model": "gpt-4o", + "output": [], + "output_text": "hi there" + }); + let decoded = normalize_response(&raw).expect("responses output_text decodes"); + assert_eq!(decoded.response_text(), Some("hi there")); +} + +#[test] +fn normalize_response_decodes_detected_anthropic() { + let raw = json!({ + "type": "message", + "role": "assistant", + "model": "claude-3-5-sonnet", + "content": [{"type": "text", "text": "hi"}], + "stop_reason": "end_turn" + }); + let decoded = normalize_response(&raw).expect("anthropic response decodes"); + assert_eq!(decoded.response_text(), Some("hi")); +} + +#[test] +fn normalize_response_none_for_unrecognized_shape() { + assert!(normalize_response(&json!({"foo": 1})).is_none()); + // Ambiguous/empty objects do not classify, so they do not decode. + assert!(normalize_response(&json!({})).is_none()); +} + +// --------------------------------------------------------------------------- +// normalize_request (detect -> decode, fail-open) +// --------------------------------------------------------------------------- + +#[test] +fn normalize_request_decodes_detected_chat() { + let request = req(json!({ + "model": "gpt-4o", + "messages": [{"role": "user", "content": "hi"}] + })); + let decoded = normalize_request(&request).expect("chat request decodes"); + assert!(!decoded.messages.is_empty()); +} + +#[test] +fn normalize_request_decodes_detected_anthropic() { + // `system` selects the Anthropic surface (priority over `messages`). + let request = req(json!({ + "model": "claude-3-5-sonnet", + "system": "be terse", + "messages": [{"role": "user", "content": "hi"}] + })); + let decoded = normalize_request(&request).expect("anthropic request decodes"); + assert!(!decoded.messages.is_empty()); +} + +#[test] +fn normalize_request_none_for_unknown_shape() { + assert!(normalize_request(&req(json!({"foo": 1}))).is_none()); +} diff --git a/crates/core/tests/unit/observability/manual_tests.rs b/crates/core/tests/unit/observability/manual_tests.rs new file mode 100644 index 00000000..4e11c54c --- /dev/null +++ b/crates/core/tests/unit/observability/manual_tests.rs @@ -0,0 +1,105 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Unit tests for the shared manual/non-provider fallback helpers, focused on +//! the points where the OpenTelemetry and OpenInference copies diverged. + +use super::*; +use serde_json::json; + +#[test] +fn cost_otel_policy_emits_any_currency() { + let output = json!({"usage": {"cost": {"total": 0.5, "currency": "EUR"}}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), false), + Some((0.5, "EUR".to_string())) + ); +} + +#[test] +fn cost_openinference_policy_drops_non_usd() { + let output = json!({"usage": {"cost": {"total": 0.5, "currency": "EUR"}}}); + assert_eq!(cost_from_manual_llm_output(Some(&output), true), None); +} + +#[test] +fn cost_component_sum_emits_currency_for_otel() { + let output = json!({"usage": {"cost": {"input": 0.5, "output": 0.375, "currency": "EUR"}}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), false), + Some((0.875, "EUR".to_string())) + ); +} + +#[test] +fn cost_usd_field_passes_usd_only() { + let output = json!({"usage": {"cost_usd": 1.25}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), true), + Some((1.25, "USD".to_string())) + ); +} + +#[test] +fn cost_absent_currency_treated_as_usd() { + let output = json!({"usage": {"cost": {"total": 0.9}}}); + assert_eq!( + cost_from_manual_llm_output(Some(&output), true), + Some((0.9, "USD".to_string())) + ); +} + +#[test] +fn cost_per_map_fallthrough_under_usd_only() { + // A non-USD `usage` cost is skipped under usd_only; `token_usage` USD wins. + let output = json!({ + "usage": {"cost": {"total": 0.5, "currency": "EUR"}}, + "token_usage": {"cost_usd": 0.2} + }); + assert_eq!( + cost_from_manual_llm_output(Some(&output), true), + Some((0.2, "USD".to_string())) + ); +} + +#[test] +fn first_u64_is_map_major() { + // `usage`'s `total` (5) wins over `token_usage`'s `total_tokens` (10): + // all keys are tried against `usage` before `token_usage`. + let usage = json!({"total": 5}); + let token_usage = json!({"total_tokens": 10}); + let got = first_u64_from_manual_usage( + usage.as_object(), + token_usage.as_object(), + &["total_tokens", "totalTokens", "total"], + ); + assert_eq!(got, Some(5)); +} + +#[test] +fn normalize_total_strict_drops_absent_and_inconsistent() { + assert_eq!(normalize_total_tokens(None, Some(5), Some(5)), None); + assert_eq!(normalize_total_tokens(Some(3), Some(5), Some(5)), None); // 3 < 10 + assert_eq!(normalize_total_tokens(Some(12), Some(5), Some(5)), Some(12)); + assert_eq!(normalize_total_tokens(Some(7), None, None), Some(7)); // minimum 0 +} + +#[test] +fn usage_extracts_aliases_and_returns_none_without_tokens() { + let output = json!({"usage": {"inputTokens": 3, "outputTokens": 4}}); + let usage = usage_from_manual_llm_output(Some(&output)).expect("has tokens"); + assert_eq!(usage.prompt_tokens, Some(3)); + assert_eq!(usage.completion_tokens, Some(4)); + assert!(usage_from_manual_llm_output(Some(&json!({"usage": {"foo": 1}}))).is_none()); + assert!(usage_from_manual_llm_output(Some(&json!({}))).is_none()); +} + +#[test] +fn model_name_extraction() { + assert_eq!( + model_name_from_manual_llm_output(Some(&json!({"model": "m"}))), + Some("m") + ); + assert_eq!(model_name_from_manual_llm_output(Some(&json!({}))), None); + assert_eq!(model_name_from_manual_llm_output(None), None); +} diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index b2cac182..b2acb744 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -954,11 +954,199 @@ fn llm_input_value_omits_request_headers() { assert!(!attributes.contains_key("nemo_relay.start.input_json")); assert!(!attributes["input.value"].contains("authorization")); assert!(!attributes["input.value"].contains("secret-token")); - assert!(!attributes.contains_key("llm.input_messages.0.message.role")); + // The provider-shaped request is decoded through the codec layer, so + // structured messages are emitted — without leaking transport headers. + assert_attr(&attributes, "llm.input_messages.0.message.role", "user"); + assert_attr(&attributes, "llm.input_messages.0.message.content", "hi"); assert_no_attr_contains(&attributes, "headers"); assert_no_attr_contains(&attributes, "secret-token"); } +#[test] +fn un_annotated_provider_response_decoded_through_codec() { + // No annotation and no OpenClaw envelope: the raw provider response is + // detected and decoded through the codec layer (tier 3), so OpenInference + // emits structured output messages instead of nothing. + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "headers": {}, + "content": {"messages": [{"role": "user", "content": "hi"}], "model": "demo-model"} + })), + )); + processor.process(&make_end_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "choices": [{ + "message": {"role": "assistant", "content": "hello there"}, + "finish_reason": "stop" + }] + })), + )); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + let attributes = attr_map(&spans[0].attributes); + assert_attr( + &attributes, + "llm.output_messages.0.message.role", + "assistant", + ); + assert_attr( + &attributes, + "llm.output_messages.0.message.content", + "hello there", + ); +} + +#[test] +fn un_annotated_anthropic_response_emits_codec_computed_total_tokens() { + // Anthropic raw usage carries no total; the codec computes input + output. + // The un-annotated path must surface that codec total rather than dropping + // it the way the manual scraper does. + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + uuid, + None, + "anthropic", + ScopeType::Llm, + Some(json!({ + "headers": {}, + "content": {"model": "claude-3-5-sonnet", "messages": [{"role": "user", "content": "hi"}]} + })), + )); + processor.process(&make_end_event( + uuid, + None, + "anthropic", + ScopeType::Llm, + Some(json!({ + "type": "message", + "role": "assistant", + "model": "claude-3-5-sonnet", + "content": [{"type": "text", "text": "hello"}], + "stop_reason": "end_turn", + "usage": {"input_tokens": 10, "output_tokens": 20} + })), + )); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + let attributes = attr_map(&spans[0].attributes); + assert_attr(&attributes, "llm.token_count.prompt", "10"); + assert_attr(&attributes, "llm.token_count.completion", "20"); + assert_attr(&attributes, "llm.token_count.total", "30"); +} + +#[test] +fn provider_shaped_empty_usage_falls_back_to_manual_token_usage() { + // A provider-shaped response with an empty `usage` object yields an empty + // codec usage; that must not mask the manual scraper's `token_usage`. + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "headers": {}, + "content": {"model": "gpt-4o", "messages": [{"role": "user", "content": "hi"}]} + })), + )); + processor.process(&make_end_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "model": "gpt-4o", + "choices": [{ + "message": {"role": "assistant", "content": "hi"}, + "finish_reason": "stop" + }], + "usage": {}, + "token_usage": {"prompt_tokens": 5, "completion_tokens": 7} + })), + )); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + let attributes = attr_map(&spans[0].attributes); + assert_attr(&attributes, "llm.token_count.prompt", "5"); + assert_attr(&attributes, "llm.token_count.completion", "7"); +} + +#[test] +fn provider_shaped_partial_usage_merges_with_manual_token_usage() { + // Codec usage covers only prompt; `token_usage` covers completion/total. The + // per-field merge must keep all three rather than letting partial codec usage + // mask the scraper's fields. + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "headers": {}, + "content": {"model": "gpt-4o", "messages": [{"role": "user", "content": "hi"}]} + })), + )); + processor.process(&make_end_event( + uuid, + None, + "chat", + ScopeType::Llm, + Some(json!({ + "model": "gpt-4o", + "choices": [{ + "message": {"role": "assistant", "content": "hi"}, + "finish_reason": "stop" + }], + "usage": {"prompt_tokens": 5}, + "token_usage": {"completion_tokens": 7, "total_tokens": 12} + })), + )); + + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 1); + let attributes = attr_map(&spans[0].attributes); + assert_attr(&attributes, "llm.token_count.prompt", "5"); + assert_attr(&attributes, "llm.token_count.completion", "7"); + assert_attr(&attributes, "llm.token_count.total", "12"); +} + #[test] fn openclaw_replay_payloads_emit_flattened_openinference_llm_attributes() { let (provider, exporter) = make_provider(); @@ -2271,9 +2459,7 @@ fn helper_functions_cover_additional_openinference_branches() { ), Some("Requested tools: read".to_string()) ); - assert_eq!(normalize_total_tokens(Some(5), None, None), Some(5)); - - let alias_usage = usage_from_manual_llm_output(Some(&json!({ + let alias_usage = crate::observability::manual::usage_from_manual_llm_output(Some(&json!({ "usage": {"inputTokens": 11, "outputTokens": 7, "totalTokens": 18, "cacheReadInputTokens": 5} }))) .unwrap(); diff --git a/crates/core/tests/unit/types_tests.rs b/crates/core/tests/unit/types_tests.rs index f524c30c..46b6e4b7 100644 --- a/crates/core/tests/unit/types_tests.rs +++ b/crates/core/tests/unit/types_tests.rs @@ -601,6 +601,93 @@ fn event_json_value_uses_canonical_subscriber_shape() { assert_eq!(decoded, value); } +fn llm_end_event(data: serde_json::Value, profile: CategoryProfile) -> Event { + Event::Scope(ScopeEvent::new( + BaseEvent::builder().name("llm").data(data).build(), + ScopeCategory::End, + llm_attributes_to_strings(LlmAttributes::empty()), + EventCategory::llm(), + Some(profile), + )) +} + +fn llm_start_event(data: serde_json::Value, profile: CategoryProfile) -> Event { + Event::Scope(ScopeEvent::new( + BaseEvent::builder().name("llm").data(data).build(), + ScopeCategory::Start, + llm_attributes_to_strings(LlmAttributes::empty()), + EventCategory::llm(), + Some(profile), + )) +} + +#[test] +fn normalized_llm_response_prefers_annotation_over_raw_output() { + // Annotation present: returned (borrowed), ignoring the conflicting raw output. + let response = annotated_response("resp-1", "demo-model", "from-annotation"); + let event = llm_end_event( + json!({"choices": [{"message": {"role": "assistant", "content": "from-raw"}}]}), + CategoryProfile::builder() + .annotated_response(Arc::new(response)) + .build(), + ); + let normalized = event.normalized_llm_response().expect("annotation present"); + assert_eq!(normalized.response_text(), Some("from-annotation")); +} + +#[test] +fn normalized_llm_response_falls_back_to_codec_decode() { + // No annotation: best-effort decode of the raw provider output. + let event = llm_end_event( + json!({ + "model": "gpt-4o", + "choices": [{ + "message": {"role": "assistant", "content": "from-raw"}, + "finish_reason": "stop" + }] + }), + CategoryProfile::default(), + ); + let normalized = event + .normalized_llm_response() + .expect("decodes raw chat output"); + assert_eq!(normalized.response_text(), Some("from-raw")); +} + +#[test] +fn normalized_llm_response_none_for_non_provider_output() { + let event = llm_end_event(json!({"answer": "x"}), CategoryProfile::default()); + assert!(event.normalized_llm_response().is_none()); +} + +#[test] +fn normalized_llm_request_decodes_wrapped_request_when_unannotated() { + // No annotation: decode the wrapped LlmRequest from the start-event input. + let event = llm_start_event( + json!({ + "headers": {}, + "content": {"model": "gpt-4o", "messages": [{"role": "user", "content": "hi"}]} + }), + CategoryProfile::default(), + ); + let normalized = event + .normalized_llm_request() + .expect("decodes wrapped chat request"); + assert!(!normalized.messages.is_empty()); +} + +#[test] +fn normalized_llm_request_prefers_annotation() { + let request = annotated_request("demo-model", "annotated"); + let event = llm_start_event( + json!({"headers": {}, "content": {"messages": []}}), + CategoryProfile::builder() + .annotated_request(Arc::new(request)) + .build(), + ); + assert!(event.normalized_llm_request().is_some()); +} + #[test] fn category_profile_wire_empty_accounts_for_annotations() { assert!(CategoryProfile::default().is_wire_empty());