From 06714c1874601516eb54b155cbd88034c350573f Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Mon, 22 Jun 2026 15:17:21 +0300 Subject: [PATCH 01/12] add claude amp laminar usage traces --- crates/harness-server/src/amp.rs | 7 +- crates/harness-server/src/anthropic.rs | 122 ++++++++- crates/harness-server/src/claude.rs | 5 + crates/harness-server/src/lib.rs | 2 +- crates/harness-server/src/otel.rs | 365 ++++++++++++++++++++++++- crates/harness-server/src/server.rs | 64 ++++- crates/harness-server/src/traits.rs | 32 +++ crates/harness-server/src/turn.rs | 1 + 8 files changed, 585 insertions(+), 13 deletions(-) diff --git a/crates/harness-server/src/amp.rs b/crates/harness-server/src/amp.rs index 65562a4d3..63ecd93cc 100644 --- a/crates/harness-server/src/amp.rs +++ b/crates/harness-server/src/amp.rs @@ -24,8 +24,9 @@ pub struct AmpEventNormalizer { impl AmpEventNormalizer { fn normalize(&mut self, event: AnthropicStreamEvent) -> Vec { + let token_usage = event.token_usage(); let normalized = self.anthropic.normalize(event); - match normalized { + let mut out = match normalized { NormalizedEvent::AgentTextDelta { ref item_id, delta } => { if !delta.is_empty() { self.pre_final_text_items.insert(item_id.clone()); @@ -49,7 +50,11 @@ impl AmpEventNormalizer { content, } => self.chunk_final_assistant_message(stop_reason, content), event => vec![event], + }; + if let Some(usage) = token_usage { + out.insert(0, NormalizedEvent::TokenUsage { usage }); } + out } fn chunk_final_assistant_message( diff --git a/crates/harness-server/src/anthropic.rs b/crates/harness-server/src/anthropic.rs index 99177bb37..ea2dd476e 100644 --- a/crates/harness-server/src/anthropic.rs +++ b/crates/harness-server/src/anthropic.rs @@ -3,7 +3,10 @@ use std::collections::HashMap; use serde::Deserialize; use serde_json::Value; -use crate::{NormalizedContent, NormalizedEvent, NormalizedToolResult, Result, stable_id}; +use crate::{ + NormalizedContent, NormalizedEvent, NormalizedTokenUsage, NormalizedToolResult, Result, + stable_id, +}; #[derive(Debug, Clone, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] @@ -31,6 +34,7 @@ pub enum AnthropicStreamEvent { is_error: bool, error: Option, message: Option, + usage: Option, }, Error { error: Option, @@ -61,6 +65,16 @@ impl AnthropicStreamEvent { _ => None, } } + + pub fn token_usage(&self) -> Option { + match self { + Self::Assistant { message, .. } => { + token_usage_from_value(message.usage.as_ref(), message.model.clone()) + } + Self::Result { usage, .. } => token_usage_from_value(usage.as_ref(), None), + _ => None, + } + } } #[derive(Debug, Clone, Deserialize)] @@ -227,6 +241,7 @@ impl AnthropicEventNormalizer { is_error, error, message, + usage: _, } => NormalizedEvent::Result { error: result_error_text(subtype, is_error, error, message, result), }, @@ -254,7 +269,9 @@ impl From for NormalizedEvent { #[derive(Debug, Clone, Deserialize)] pub struct AnthropicMessage { pub id: Option, + pub model: Option, pub stop_reason: Option, + pub usage: Option, #[serde(default)] pub content: Vec, } @@ -439,6 +456,75 @@ fn runtime_exit_code(value: Option<&Value>) -> Option { .and_then(|code| i32::try_from(code).ok()) } +fn token_usage_from_value( + value: Option<&Value>, + model: Option, +) -> Option { + let value = value?; + if value.is_null() { + return None; + } + let usage = NormalizedTokenUsage { + model: model.or_else(|| token_string(value, &["model"])), + input_tokens: token_count(value, &["input_tokens", "inputTokens", "inputTokenCount"]), + output_tokens: token_count( + value, + &["output_tokens", "outputTokens", "outputTokenCount"], + ), + cache_creation_input_tokens: token_count( + value, + &[ + "cache_creation_input_tokens", + "cacheCreationInputTokens", + "cacheCreationTokens", + ], + ), + cache_read_input_tokens: token_count( + value, + &[ + "cache_read_input_tokens", + "cached_input_tokens", + "cacheReadInputTokens", + "cachedInputTokens", + ], + ), + reasoning_output_tokens: token_count( + value, + &[ + "reasoning_output_tokens", + "reasoning_tokens", + "reasoningOutputTokens", + "reasoningTokens", + ], + ), + total_tokens: token_count(value, &["total_tokens", "totalTokens", "totalTokenCount"]), + }; + usage.has_counts().then_some(usage) +} + +fn token_count(value: &Value, keys: &[&str]) -> Option { + keys.iter() + .filter_map(|key| value.get(*key)) + .find_map(value_as_i64) +} + +fn token_string(value: &Value, keys: &[&str]) -> Option { + keys.iter() + .filter_map(|key| value.get(*key)) + .filter_map(Value::as_str) + .map(str::trim) + .find(|value| !value.is_empty()) + .map(str::to_owned) +} + +fn value_as_i64(value: &Value) -> Option { + value + .as_i64() + .or_else(|| value.as_u64().and_then(|value| i64::try_from(value).ok())) + .or_else(|| value.as_str()?.trim().parse().ok()) + .filter(|value| *value >= 0) +} + fn parse_json_tool_result(content: &str) -> Option<(String, Option)> { let value: Value = serde_json::from_str(content).ok()?; let object = value.as_object()?; @@ -459,3 +545,37 @@ fn exit_code_from_prefix(content: &str) -> Option { let code_text = rest.split_once('\n').map_or(rest, |(code, _)| code); code_text.trim().parse().ok() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assistant_event_exposes_token_usage() { + let event: AnthropicStreamEvent = serde_json::from_str( + r#"{"type":"assistant","message":{"model":"claude-fable-5","id":"msg_1","content":[{"type":"text","text":"hi"}],"usage":{"input_tokens":2,"cache_creation_input_tokens":3,"cache_read_input_tokens":5,"output_tokens":7}}}"#, + ) + .expect("assistant event"); + let usage = event.token_usage().expect("usage"); + + assert_eq!(usage.model.as_deref(), Some("claude-fable-5")); + assert_eq!(usage.input_tokens, Some(2)); + assert_eq!(usage.cache_creation_input_tokens, Some(3)); + assert_eq!(usage.cache_read_input_tokens, Some(5)); + assert_eq!(usage.output_tokens, Some(7)); + } + + #[test] + fn result_event_exposes_camel_case_token_usage() { + let event: AnthropicStreamEvent = serde_json::from_str( + r#"{"type":"result","subtype":"success","usage":{"inputTokens":11,"cachedInputTokens":13,"outputTokens":17,"totalTokens":41}}"#, + ) + .expect("result event"); + let usage = event.token_usage().expect("usage"); + + assert_eq!(usage.input_tokens, Some(11)); + assert_eq!(usage.cache_read_input_tokens, Some(13)); + assert_eq!(usage.output_tokens, Some(17)); + assert_eq!(usage.total_tokens, Some(41)); + } +} diff --git a/crates/harness-server/src/claude.rs b/crates/harness-server/src/claude.rs index 5e9e36e34..e4a897cc0 100644 --- a/crates/harness-server/src/claude.rs +++ b/crates/harness-server/src/claude.rs @@ -44,9 +44,13 @@ impl PendingAgentMessage { impl ClaudeEventNormalizer { pub fn normalize(&mut self, event: AnthropicStreamEvent) -> Vec { + let token_usage = event.token_usage(); let message_stop_reason = event.message_stop_reason().map(str::to_string); let normalized = self.inner.normalize(event); let mut out = Vec::new(); + if let Some(usage) = token_usage { + out.push(NormalizedEvent::TokenUsage { usage }); + } if let Some(stop_reason) = message_stop_reason { self.flush_pending(Some(stop_reason), &mut out); } @@ -73,6 +77,7 @@ impl ClaudeEventNormalizer { self.flush_pending(None, &mut out); out.push(event); } + NormalizedEvent::TokenUsage { .. } => {} NormalizedEvent::Ignored => {} event => out.push(event), } diff --git a/crates/harness-server/src/lib.rs b/crates/harness-server/src/lib.rs index ac223e4d4..cb177725b 100644 --- a/crates/harness-server/src/lib.rs +++ b/crates/harness-server/src/lib.rs @@ -15,7 +15,7 @@ pub use error::{HarnessServerError, Result}; pub use server::{run_blocks_server, run_harness_server, run_validate_jsonrpc, server_for}; pub use traits::{ AppServerNormalizer, AppServerRuntime, HarnessKind, HarnessServer, NormalizedContent, - NormalizedEvent, NormalizedToolResult, ThreadState, + NormalizedEvent, NormalizedTokenUsage, NormalizedToolResult, ThreadState, }; pub use turn::{BridgeConfig, CodexTurnNormalizer}; pub use validation::run_validate_agent_deltas; diff --git a/crates/harness-server/src/otel.rs b/crates/harness-server/src/otel.rs index 730d16390..b2cba5e4f 100644 --- a/crates/harness-server/src/otel.rs +++ b/crates/harness-server/src/otel.rs @@ -6,17 +6,18 @@ use std::net::{TcpListener, TcpStream}; use std::path::PathBuf; use std::sync::OnceLock; use std::thread; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue, any_value}; -use opentelemetry_proto::tonic::trace::v1::Span; +use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value}; +use opentelemetry_proto::tonic::resource::v1::Resource; +use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, span}; use prost::Message as _; use serde_json::Value; use url::Url; use uuid::Uuid; -use crate::{HarnessServerError, Result}; +use crate::{HarnessKind, HarnessServerError, NormalizedTokenUsage, Result}; const CODEX_SPAN_PREFIX: &str = "codex."; const LAMINAR_METADATA_PREFIX: &str = "lmnr.association.properties.metadata."; @@ -56,7 +57,7 @@ pub(crate) fn configure_codex_otel_for_startup(trace: &TraceContext) -> Result<( let Some(trace_id) = trace.effective_trace_id() else { return Ok(()); }; - let Some(endpoint) = codex_otel_endpoint() else { + let Some(endpoint) = otlp_traces_endpoint() else { return Ok(()); }; if !trace.metadata.is_empty() { @@ -91,6 +92,46 @@ pub(crate) fn configure_codex_otel_for_startup(trace: &TraceContext) -> Result<( Ok(()) } +pub(crate) fn unix_time_nanos() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + .min(u128::from(u64::MAX)) as u64 +} + +#[derive(Clone, Debug)] +pub(crate) struct HarnessUsageSpan<'a> { + pub(crate) harness: HarnessKind, + pub(crate) model: &'a str, + pub(crate) model_provider: &'a str, + pub(crate) turn_id: &'a str, + pub(crate) start_unix_nano: u64, + pub(crate) end_unix_nano: u64, +} + +pub(crate) fn export_harness_usage_span( + trace: &TraceContext, + span: HarnessUsageSpan<'_>, + usage: &NormalizedTokenUsage, +) -> Result<()> { + if !usage.has_counts() { + return Ok(()); + } + let Some(endpoint) = otlp_traces_endpoint() else { + return Ok(()); + }; + let request = harness_usage_trace_request(trace, span, usage)?; + let mut headers = otel_forward_headers(); + if let Some(trace_id) = trace.effective_trace_id() { + headers.insert("x-trace-id".to_string(), trace_id); + } + if let Some(thread_key) = clean_optional(trace.thread_key.as_deref()) { + headers.insert("x-centaur-thread-key".to_string(), thread_key); + } + post_otlp_trace_payload(&endpoint, &headers, &request.encode_to_vec()) +} + fn codex_config_path() -> Option { env::var_os("CODEX_HOME") .map(PathBuf::from) @@ -98,7 +139,7 @@ fn codex_config_path() -> Option { .map(|home| home.join("config.toml")) } -fn codex_otel_endpoint() -> Option { +fn otlp_traces_endpoint() -> Option { let traces_endpoint = clean_optional( env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") .ok() @@ -214,6 +255,13 @@ fn otel_authorization_token() -> Option { .or(Some(authorization)) } +fn otel_forward_headers() -> BTreeMap { + otel_headers() + .into_iter() + .filter(|(name, _)| name == "authorization") + .collect() +} + fn otel_environment() -> String { if let Ok(raw) = env::var("OTEL_RESOURCE_ATTRIBUTES") { for item in raw.split(',') { @@ -319,6 +367,56 @@ impl OtlpTarget { } } +fn post_otlp_trace_payload( + endpoint: &str, + headers: &BTreeMap, + body: &[u8], +) -> Result<()> { + let target = OtlpTarget::parse(endpoint)?; + let mut upstream = TcpStream::connect((target.host.as_str(), target.port))?; + upstream.set_read_timeout(Some(Duration::from_secs(10)))?; + upstream.set_write_timeout(Some(Duration::from_secs(10)))?; + write!( + upstream, + "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/x-protobuf\r\nContent-Length: {}\r\nConnection: close\r\n", + target.path, + target.host_header, + body.len() + )?; + for (name, value) in headers { + if matches!( + name.as_str(), + "authorization" | "x-trace-id" | "x-centaur-thread-key" + ) { + write!(upstream, "{name}: {value}\r\n")?; + } + } + upstream.write_all(b"\r\n")?; + upstream.write_all(body)?; + upstream.flush()?; + + let mut response = Vec::new(); + upstream.read_to_end(&mut response)?; + let status = http_status_code(&response).unwrap_or(0); + if (200..300).contains(&status) { + Ok(()) + } else { + Err(HarnessServerError::Protocol(format!( + "OTLP trace export failed with HTTP status {status}" + ))) + } +} + +fn http_status_code(response: &[u8]) -> Option { + let line = String::from_utf8_lossy(response) + .lines() + .next()? + .to_string(); + let mut parts = line.split_whitespace(); + let _version = parts.next()?; + parts.next()?.parse().ok() +} + fn run_otlp_proxy(listener: TcpListener, target: OtlpTarget) { for stream in listener.incoming() { let Ok(stream) = stream else { @@ -470,6 +568,188 @@ fn write_http_response( stream.write_all(body) } +fn harness_usage_trace_request( + trace: &TraceContext, + span_context: HarnessUsageSpan<'_>, + usage: &NormalizedTokenUsage, +) -> Result { + let traceparent = trace.effective_traceparent().ok_or_else(|| { + HarnessServerError::Protocol("missing trace id for harness usage span".to_string()) + })?; + let (trace_id, parent_span_id) = trace_ids_from_traceparent(&traceparent).ok_or_else(|| { + HarnessServerError::Protocol("invalid traceparent for harness usage span".to_string()) + })?; + let mut attributes = Vec::new(); + let harness_name = harness_name(span_context.harness); + let system = gen_ai_system(span_context.harness, span_context.model_provider); + let model = usage + .model + .as_deref() + .filter(|value| !value.trim().is_empty()) + .unwrap_or(span_context.model); + let total_tokens = usage.total_tokens.or_else(|| { + [ + usage.input_tokens, + usage.output_tokens, + usage.cache_creation_input_tokens, + usage.cache_read_input_tokens, + usage.reasoning_output_tokens, + ] + .into_iter() + .flatten() + .try_fold(0_i64, |acc, value| acc.checked_add(value)) + }); + + set_attribute_string(&mut attributes, "gen_ai.operation.name", "chat"); + set_attribute_string(&mut attributes, "gen_ai.system", system); + set_attribute_string(&mut attributes, "gen_ai.request.model", model); + set_attribute_string(&mut attributes, "gen_ai.response.model", model); + set_attribute_int( + &mut attributes, + "gen_ai.usage.input_tokens", + usage.input_tokens, + ); + set_attribute_int( + &mut attributes, + "gen_ai.usage.output_tokens", + usage.output_tokens, + ); + set_attribute_int( + &mut attributes, + "gen_ai.usage.cache_creation_input_tokens", + usage.cache_creation_input_tokens, + ); + set_attribute_int( + &mut attributes, + "gen_ai.usage.cache_read_input_tokens", + usage.cache_read_input_tokens, + ); + set_attribute_int( + &mut attributes, + "gen_ai.usage.reasoning_tokens", + usage.reasoning_output_tokens, + ); + set_attribute_int(&mut attributes, "gen_ai.usage.total_tokens", total_tokens); + set_attribute_string(&mut attributes, "centaur.harness", harness_name); + set_attribute_string( + &mut attributes, + "centaur.model_provider", + span_context.model_provider, + ); + set_attribute_string(&mut attributes, "centaur.turn_id", span_context.turn_id); + if let Some(thread_key) = trace.thread_key.as_deref() { + set_attribute_string(&mut attributes, "centaur.thread_key", thread_key); + } + apply_laminar_trace_metadata_to_attributes(&mut attributes, &trace.metadata); + + let start = span_context.start_unix_nano.min(span_context.end_unix_nano); + let end = span_context.end_unix_nano.max(start); + Ok(ExportTraceServiceRequest { + resource_spans: vec![ResourceSpans { + resource: Some(Resource { + attributes: vec![ + kv_string("service.name", "harness-server"), + kv_string("deployment.environment", &otel_environment()), + ], + ..Default::default() + }), + scope_spans: vec![ScopeSpans { + scope: Some(InstrumentationScope { + name: "centaur.harness-server".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + ..Default::default() + }), + spans: vec![Span { + trace_id, + span_id: random_span_id(), + parent_span_id, + name: format!("{harness_name}.session_task.turn"), + kind: span::SpanKind::Internal as i32, + start_time_unix_nano: start, + end_time_unix_nano: end, + attributes, + flags: 1, + ..Default::default() + }], + ..Default::default() + }], + ..Default::default() + }], + }) +} + +fn apply_laminar_trace_metadata_to_attributes( + attributes: &mut Vec, + metadata: &BTreeMap, +) { + for (key, value) in metadata { + let key = key.trim(); + if !key.is_empty() { + set_attribute_json( + attributes, + &format!("{LAMINAR_METADATA_PREFIX}{key}"), + value, + ); + } + } +} + +fn trace_ids_from_traceparent(traceparent: &str) -> Option<(Vec, Vec)> { + let parts = validate_traceparent(traceparent)? + .split('-') + .collect::>(); + Some((hex_bytes(parts[1])?, hex_bytes(parts[2])?)) +} + +fn hex_bytes(value: &str) -> Option> { + if value.len() % 2 != 0 { + return None; + } + let bytes = value.as_bytes(); + let mut out = Vec::with_capacity(value.len() / 2); + for pair in bytes.chunks_exact(2) { + let hi = hex_value(pair[0])?; + let lo = hex_value(pair[1])?; + out.push((hi << 4) | lo); + } + Some(out) +} + +fn random_span_id() -> Vec { + Uuid::new_v4().as_bytes()[..8].to_vec() +} + +fn harness_name(kind: HarnessKind) -> &'static str { + match kind { + HarnessKind::Codex => "codex", + HarnessKind::ClaudeCode => "claude", + HarnessKind::Amp => "amp", + } +} + +fn gen_ai_system(kind: HarnessKind, model_provider: &str) -> &'static str { + let provider = model_provider.trim().to_ascii_lowercase(); + if provider.contains("anthropic") || matches!(kind, HarnessKind::ClaudeCode) { + "anthropic" + } else if provider.contains("openai") || matches!(kind, HarnessKind::Codex) { + "openai" + } else if provider.contains("amp") || matches!(kind, HarnessKind::Amp) { + "amp" + } else { + "unknown" + } +} + +fn kv_string(key: &str, value: &str) -> KeyValue { + KeyValue { + key: key.to_string(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(value.to_string())), + }), + ..Default::default() + } +} + pub(crate) fn rewrite_otlp_trace_payload(payload: &[u8]) -> std::result::Result, String> { let mut request = ExportTraceServiceRequest::decode(payload) .map_err(|error| format!("invalid OTLP trace payload: {error}"))?; @@ -761,6 +1041,79 @@ trust_level = "trusted" ); } + #[test] + fn harness_usage_trace_request_builds_laminar_priced_span() { + let trace = TraceContext { + thread_key: Some("slack:C123:123.456".to_string()), + trace_id: None, + traceparent: Some( + "00-0123456789abcdef0123456789abcdef-1111111111111111-01".to_string(), + ), + metadata: BTreeMap::from([( + "execution_id".to_string(), + Value::String("exe_123".to_string()), + )]), + }; + let usage = NormalizedTokenUsage { + model: Some("claude-fable-5".to_string()), + input_tokens: Some(2), + output_tokens: Some(7), + cache_creation_input_tokens: Some(3), + cache_read_input_tokens: Some(5), + reasoning_output_tokens: None, + total_tokens: None, + }; + let request = harness_usage_trace_request( + &trace, + HarnessUsageSpan { + harness: HarnessKind::ClaudeCode, + model: "fallback-model", + model_provider: "anthropic", + turn_id: "turn-1", + start_unix_nano: 100, + end_unix_nano: 200, + }, + &usage, + ) + .expect("usage trace request"); + let span = &request.resource_spans[0].scope_spans[0].spans[0]; + + assert_eq!(span.name, "claude.session_task.turn"); + assert_eq!(span.trace_id.len(), 16); + assert_eq!(span.parent_span_id.len(), 8); + assert_eq!( + attribute_string(&span.attributes, "gen_ai.system"), + "anthropic" + ); + assert_eq!( + attribute_string(&span.attributes, "gen_ai.response.model"), + "claude-fable-5" + ); + assert_eq!( + attribute_int(&span.attributes, "gen_ai.usage.input_tokens"), + Some(2) + ); + assert_eq!( + attribute_int(&span.attributes, "gen_ai.usage.cache_creation_input_tokens"), + Some(3) + ); + assert_eq!( + attribute_int(&span.attributes, "gen_ai.usage.cache_read_input_tokens"), + Some(5) + ); + assert_eq!( + attribute_int(&span.attributes, "gen_ai.usage.total_tokens"), + Some(17) + ); + assert_eq!( + attribute_string( + &span.attributes, + "lmnr.association.properties.metadata.execution_id" + ), + "exe_123" + ); + } + fn kv_string(key: &str, value: &str) -> KeyValue { KeyValue { key: key.to_string(), diff --git a/crates/harness-server/src/server.rs b/crates/harness-server/src/server.rs index 3c0f2419e..299385e8d 100644 --- a/crates/harness-server/src/server.rs +++ b/crates/harness-server/src/server.rs @@ -23,10 +23,10 @@ use uuid::Uuid; use crate::amp::AmpHarness; use crate::claude::ClaudeCodeHarness; use crate::codex::CodexHarnessServer; -use crate::otel::TraceContext; +use crate::otel::{self, HarnessUsageSpan, TraceContext}; use crate::traits::{ AppServerNormalizer, AppServerRuntime, HarnessChild, HarnessKind, HarnessServer, - NormalizedEvent, ThreadState, + NormalizedEvent, NormalizedTokenUsage, ThreadState, }; use crate::turn::{BridgeConfig, CodexTurnNormalizer}; use crate::util::{absolute_path, default_codex_home, write_value}; @@ -98,7 +98,7 @@ pub(crate) fn run_blocks_app_server(harness: &H) -> Result<()> // knob (its provider is fixed at thread start from session params). provider: _, reasoning: _, - trace_context: _, + trace_context, }) => { if let Some(model) = model { state.model = model; @@ -108,6 +108,7 @@ pub(crate) fn run_blocks_app_server(harness: &H) -> Result<()> &mut state, input, client_user_message_id, + &trace_context, &mut stdout, &request_rx, ) { @@ -181,6 +182,7 @@ fn run_blocks_turn( state: &mut ThreadState, input: Vec, client_user_message_id: Option, + trace_context: &TraceContext, stdout: &mut W, request_rx: &Receiver, ) -> Result<()> { @@ -191,6 +193,7 @@ fn run_blocks_turn( state, &input, client_user_message_id, + Some(trace_context), &mut normalizer, stdout, request_rx, @@ -789,6 +792,7 @@ fn handle_request( state, ¶ms.input, params.client_user_message_id.clone(), + None, &mut normalizer, stdout, request_rx, @@ -964,6 +968,7 @@ fn run_normalized_turn( state: &mut ThreadState, input: &[UserInput], client_user_message_id: Option, + trace_context: Option<&TraceContext>, normalizer: &mut CodexTurnNormalizer, stdout: &mut W, request_rx: &Receiver, @@ -978,7 +983,15 @@ fn run_normalized_turn( write_value(stdout, ¬ification_to_wire_value(¬ification)?)?; } - match run_harness_turn(harness, state, input, normalizer, stdout, request_rx) { + match run_harness_turn( + harness, + state, + input, + trace_context, + normalizer, + stdout, + request_rx, + ) { Ok(Some(turn)) => state.completed_turns.push(turn), Ok(None) => {} Err(error) => finish_turn_with_error(state, normalizer, stdout, error)?, @@ -1012,10 +1025,15 @@ fn run_harness_turn( harness: &H, state: &mut ThreadState, input: &[UserInput], + trace_context: Option<&TraceContext>, normalizer: &mut CodexTurnNormalizer, stdout: &mut W, request_rx: &Receiver, ) -> Result> { + let usage_span_start = otel::unix_time_nanos(); + let usage_span_model = state.model.clone(); + let usage_span_model_provider = state.model_provider.clone(); + let usage_span_turn_id = normalizer.turn_id().to_string(); ensure_harness_process(harness, state)?; let process = state .process @@ -1027,6 +1045,7 @@ fn run_harness_turn( let mut last_session_id = state.harness_session_id.clone(); let mut event_normalizer = H::EventNormalizer::default(); let mut completed_turn = None; + let mut latest_usage = None; loop { while let Ok(request) = request_rx.try_recv() { handle_active_turn_request(harness, process, normalizer, request, stdout)?; @@ -1052,6 +1071,9 @@ fn run_harness_turn( let normalized_events = harness.normalize_events(&mut event_normalizer, event)?; let mut terminal = false; for normalized in normalized_events { + if let Some(usage) = normalized.token_usage() { + latest_usage = Some(usage.clone()); + } if let Some(session_id) = normalized.session_id() { last_session_id = Some(session_id.to_string()); state.harness_session_id = Some(session_id.to_string()); @@ -1064,6 +1086,15 @@ fn run_harness_turn( && normalized.is_assistant_end_turn()); } if terminal { + export_harness_usage_if_available( + trace_context, + harness.kind(), + &usage_span_model, + &usage_span_model_provider, + &usage_span_turn_id, + usage_span_start, + latest_usage.as_ref(), + ); if let Some(notification) = normalizer.finish_turn(None)? { if let ServerNotification::TurnCompleted(completed) = ¬ification { completed_turn = Some(completed.turn.clone()); @@ -1086,6 +1117,31 @@ fn run_harness_turn( Ok(completed_turn) } +fn export_harness_usage_if_available( + trace_context: Option<&TraceContext>, + harness: HarnessKind, + model: &str, + model_provider: &str, + turn_id: &str, + start_unix_nano: u64, + usage: Option<&NormalizedTokenUsage>, +) { + let (Some(trace_context), Some(usage)) = (trace_context, usage) else { + return; + }; + let span = HarnessUsageSpan { + harness, + model, + model_provider, + turn_id, + start_unix_nano, + end_unix_nano: otel::unix_time_nanos(), + }; + if let Err(error) = otel::export_harness_usage_span(trace_context, span, usage) { + eprintln!("harness usage OTLP export failed: {error:#}"); + } +} + fn ensure_harness_process(harness: &H, state: &mut ThreadState) -> Result<()> { if state.process.is_some() { return Ok(()); diff --git a/crates/harness-server/src/traits.rs b/crates/harness-server/src/traits.rs index a39868b94..f7c736de9 100644 --- a/crates/harness-server/src/traits.rs +++ b/crates/harness-server/src/traits.rs @@ -113,6 +113,9 @@ pub enum NormalizedEvent { delta: String, }, ToolResults(Vec), + TokenUsage { + usage: NormalizedTokenUsage, + }, Result { error: Option, }, @@ -136,6 +139,13 @@ impl NormalizedEvent { matches!(self, Self::Result { .. } | Self::Error { .. }) } + pub(crate) fn token_usage(&self) -> Option<&NormalizedTokenUsage> { + match self { + Self::TokenUsage { usage } => Some(usage), + _ => None, + } + } + pub(crate) fn is_assistant_end_turn(&self) -> bool { matches!( self, @@ -173,6 +183,28 @@ pub struct NormalizedToolResult { pub exit_code: Option, } +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct NormalizedTokenUsage { + pub model: Option, + pub input_tokens: Option, + pub output_tokens: Option, + pub cache_creation_input_tokens: Option, + pub cache_read_input_tokens: Option, + pub reasoning_output_tokens: Option, + pub total_tokens: Option, +} + +impl NormalizedTokenUsage { + pub(crate) fn has_counts(&self) -> bool { + self.input_tokens.is_some() + || self.output_tokens.is_some() + || self.cache_creation_input_tokens.is_some() + || self.cache_read_input_tokens.is_some() + || self.reasoning_output_tokens.is_some() + || self.total_tokens.is_some() + } +} + #[derive(Debug)] pub struct AppServerNormalizer { harness: D, diff --git a/crates/harness-server/src/turn.rs b/crates/harness-server/src/turn.rs index 7413970cb..d5463934b 100644 --- a/crates/harness-server/src/turn.rs +++ b/crates/harness-server/src/turn.rs @@ -195,6 +195,7 @@ impl CodexTurnNormalizer { self.emit_tool_result(result, &mut out)?; } } + NormalizedEvent::TokenUsage { .. } => {} NormalizedEvent::Result { error } => { if let Some(error) = error { self.last_error = Some(error.clone()); From e04ce2850845aa236f595f1cf6204e3a073d7a83 Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 13:39:05 +0300 Subject: [PATCH 02/12] Estimate Claude and Amp trace costs --- crates/harness-server/src/otel.rs | 332 ++++++++++++++++++++++++++++++ 1 file changed, 332 insertions(+) diff --git a/crates/harness-server/src/otel.rs b/crates/harness-server/src/otel.rs index b2cba5e4f..d2a5a7e42 100644 --- a/crates/harness-server/src/otel.rs +++ b/crates/harness-server/src/otel.rs @@ -630,6 +630,37 @@ fn harness_usage_trace_request( usage.reasoning_output_tokens, ); set_attribute_int(&mut attributes, "gen_ai.usage.total_tokens", total_tokens); + if let Some(cost) = estimate_usage_cost(span_context.harness, system, model, usage) { + set_attribute_double( + &mut attributes, + "gen_ai.usage.input_cost", + cost.input_cost, + ); + set_attribute_double( + &mut attributes, + "gen_ai.usage.output_cost", + cost.output_cost, + ); + set_attribute_double(&mut attributes, "gen_ai.usage.cost", cost.total_cost()); + set_attribute_string(&mut attributes, "gen_ai.usage.cost_currency", "USD"); + set_attribute_double( + &mut attributes, + "centaur.usage.input_cost_usd", + cost.input_cost, + ); + set_attribute_double( + &mut attributes, + "centaur.usage.output_cost_usd", + cost.output_cost, + ); + set_attribute_double( + &mut attributes, + "centaur.usage.estimated_cost_usd", + cost.total_cost(), + ); + set_attribute_string(&mut attributes, "centaur.usage.cost_source", cost.source); + set_attribute_bool(&mut attributes, "centaur.usage.cost_estimated", true); + } set_attribute_string(&mut attributes, "centaur.harness", harness_name); set_attribute_string( &mut attributes, @@ -740,6 +771,186 @@ fn gen_ai_system(kind: HarnessKind, model_provider: &str) -> &'static str { } } +#[derive(Clone, Copy, Debug)] +struct TokenPricing { + input_per_mtok: f64, + cache_creation_per_mtok: f64, + cache_read_per_mtok: f64, + output_per_mtok: f64, + source: &'static str, +} + +#[derive(Clone, Copy, Debug)] +struct UsageCost { + input_cost: f64, + output_cost: f64, + source: &'static str, +} + +impl UsageCost { + fn total_cost(self) -> f64 { + self.input_cost + self.output_cost + } +} + +fn estimate_usage_cost( + harness: HarnessKind, + system: &str, + model: &str, + usage: &NormalizedTokenUsage, +) -> Option { + let pricing = pricing_for_usage(harness, system, model)?; + let input_tokens = positive_tokens(usage.input_tokens); + let cache_creation_tokens = positive_tokens(usage.cache_creation_input_tokens); + let cache_read_tokens = positive_tokens(usage.cache_read_input_tokens); + let output_tokens = positive_tokens(usage.output_tokens); + let cache_tokens = cache_creation_tokens + cache_read_tokens; + let non_cached_input_tokens = if input_tokens >= cache_tokens { + input_tokens - cache_tokens + } else { + input_tokens + }; + + let input_cost = mtok_cost(non_cached_input_tokens, pricing.input_per_mtok) + + mtok_cost( + cache_creation_tokens, + pricing.cache_creation_per_mtok, + ) + + mtok_cost(cache_read_tokens, pricing.cache_read_per_mtok); + let output_cost = mtok_cost(output_tokens, pricing.output_per_mtok); + + Some(UsageCost { + input_cost, + output_cost, + source: pricing.source, + }) +} + +fn pricing_for_usage(harness: HarnessKind, system: &str, model: &str) -> Option { + let normalized = normalize_model_name(model); + match system { + "anthropic" => anthropic_pricing(&normalized), + "openai" => openai_pricing(&normalized), + "amp" => amp_pricing(harness, &normalized), + _ => None, + } +} + +fn normalize_model_name(model: &str) -> String { + model + .trim() + .to_ascii_lowercase() + .replace(['_', '.'], "-") +} + +fn anthropic_pricing(model: &str) -> Option { + if model.contains("fable-5") || model.contains("mythos-5") { + return Some(TokenPricing { + input_per_mtok: 10.0, + cache_creation_per_mtok: 12.5, + cache_read_per_mtok: 1.0, + output_per_mtok: 50.0, + source: "centaur_estimate:anthropic:fable-mythos-5:5m-cache-write", + }); + } + if model.contains("opus-4-8") + || model.contains("opus-4-7") + || model.contains("opus-4-6") + || model.contains("opus-4-5") + { + return Some(TokenPricing { + input_per_mtok: 5.0, + cache_creation_per_mtok: 6.25, + cache_read_per_mtok: 0.5, + output_per_mtok: 25.0, + source: "centaur_estimate:anthropic:opus-4.5-plus:5m-cache-write", + }); + } + if model.contains("opus-4-1") || model.contains("opus-4") { + return Some(TokenPricing { + input_per_mtok: 15.0, + cache_creation_per_mtok: 18.75, + cache_read_per_mtok: 1.5, + output_per_mtok: 75.0, + source: "centaur_estimate:anthropic:opus-4-deprecated:5m-cache-write", + }); + } + if model.contains("sonnet-4-6") + || model.contains("sonnet-4-5") + || model.contains("sonnet-4") + { + return Some(TokenPricing { + input_per_mtok: 3.0, + cache_creation_per_mtok: 3.75, + cache_read_per_mtok: 0.3, + output_per_mtok: 15.0, + source: "centaur_estimate:anthropic:sonnet-4:5m-cache-write", + }); + } + if model.contains("haiku-4-5") { + return Some(TokenPricing { + input_per_mtok: 1.0, + cache_creation_per_mtok: 1.25, + cache_read_per_mtok: 0.1, + output_per_mtok: 5.0, + source: "centaur_estimate:anthropic:haiku-4.5:5m-cache-write", + }); + } + None +} + +fn openai_pricing(model: &str) -> Option { + if model.contains("gpt-5-5") { + return Some(TokenPricing { + input_per_mtok: 5.0, + cache_creation_per_mtok: 5.0, + cache_read_per_mtok: 0.5, + output_per_mtok: 30.0, + source: "centaur_estimate:openai:gpt-5.5", + }); + } + if model.contains("gpt-5-4") { + return Some(TokenPricing { + input_per_mtok: 2.5, + cache_creation_per_mtok: 2.5, + cache_read_per_mtok: 0.25, + output_per_mtok: 15.0, + source: "centaur_estimate:openai:gpt-5.4", + }); + } + None +} + +fn amp_pricing(_harness: HarnessKind, model: &str) -> Option { + if model == "deep" || model.starts_with("deep-") || model == "rush" || model.starts_with("rush-") { + return Some(TokenPricing { + input_per_mtok: 5.0, + cache_creation_per_mtok: 5.0, + cache_read_per_mtok: 0.5, + output_per_mtok: 30.0, + source: "centaur_estimate:amp:gpt-5.5", + }); + } + if model == "smart" || model.starts_with("smart-") { + return Some(TokenPricing { + input_per_mtok: 5.0, + cache_creation_per_mtok: 6.25, + cache_read_per_mtok: 0.5, + output_per_mtok: 25.0, + source: "centaur_estimate:amp:claude-opus-4.8:5m-cache-write", + }); + } + openai_pricing(model).or_else(|| anthropic_pricing(model)) +} + +fn positive_tokens(value: Option) -> f64 { + value.unwrap_or_default().max(0) as f64 +} + +fn mtok_cost(tokens: f64, price_per_mtok: f64) -> f64 { + tokens * price_per_mtok / 1_000_000.0 +} + fn kv_string(key: &str, value: &str) -> KeyValue { KeyValue { key: key.to_string(), @@ -884,6 +1095,29 @@ fn set_attribute_int(attributes: &mut Vec, key: &str, value: Option, key: &str, value: f64) { + if !value.is_finite() { + return; + } + set_attribute_value( + attributes, + key, + AnyValue { + value: Some(any_value::Value::DoubleValue(value)), + }, + ); +} + +fn set_attribute_bool(attributes: &mut Vec, key: &str, value: bool) { + set_attribute_value( + attributes, + key, + AnyValue { + value: Some(any_value::Value::BoolValue(value)), + }, + ); +} + fn set_attribute_json(attributes: &mut Vec, key: &str, value: &Value) { let any_value = match value { Value::Bool(value) => AnyValue { @@ -1105,6 +1339,30 @@ trust_level = "trusted" attribute_int(&span.attributes, "gen_ai.usage.total_tokens"), Some(17) ); + assert_eq!( + attribute_double(&span.attributes, "gen_ai.usage.input_cost"), + Some(0.0000625) + ); + assert_eq!( + attribute_double(&span.attributes, "gen_ai.usage.output_cost"), + Some(0.00035) + ); + assert_eq!( + attribute_double(&span.attributes, "gen_ai.usage.cost"), + Some(0.0004125) + ); + assert_eq!( + attribute_double(&span.attributes, "centaur.usage.estimated_cost_usd"), + Some(0.0004125) + ); + assert_eq!( + attribute_string(&span.attributes, "gen_ai.usage.cost_currency"), + "USD" + ); + assert_eq!( + attribute_string(&span.attributes, "centaur.usage.cost_source"), + "centaur_estimate:anthropic:fable-mythos-5:5m-cache-write" + ); assert_eq!( attribute_string( &span.attributes, @@ -1114,6 +1372,67 @@ trust_level = "trusted" ); } + #[test] + fn harness_usage_trace_request_estimates_amp_deep_cost() { + let trace = TraceContext { + thread_key: Some("slack:C123:123.456".to_string()), + trace_id: None, + traceparent: Some( + "00-0123456789abcdef0123456789abcdef-1111111111111111-01".to_string(), + ), + metadata: BTreeMap::new(), + }; + let usage = NormalizedTokenUsage { + model: Some("deep".to_string()), + input_tokens: Some(150_681), + output_tokens: Some(1_456), + cache_creation_input_tokens: Some(27_289), + cache_read_input_tokens: Some(123_392), + reasoning_output_tokens: None, + total_tokens: Some(152_137), + }; + let request = harness_usage_trace_request( + &trace, + HarnessUsageSpan { + harness: HarnessKind::Amp, + model: "deep", + model_provider: "amp", + turn_id: "turn-1", + start_unix_nano: 100, + end_unix_nano: 200, + }, + &usage, + ) + .expect("usage trace request"); + let span = &request.resource_spans[0].scope_spans[0].spans[0]; + + assert_eq!(span.name, "amp.session_task.turn"); + assert_eq!( + attribute_string(&span.attributes, "gen_ai.system"), + "amp" + ); + assert_eq!( + attribute_double(&span.attributes, "gen_ai.usage.input_cost"), + Some(0.198141) + ); + assert_eq!( + attribute_double(&span.attributes, "gen_ai.usage.output_cost"), + Some(0.04368) + ); + assert_eq!( + attribute_double(&span.attributes, "gen_ai.usage.cost"), + Some(0.241821) + ); + assert_eq!( + attribute_double(&span.attributes, "centaur.usage.estimated_cost_usd"), + Some(0.241821) + ); + assert_eq!( + attribute_string(&span.attributes, "centaur.usage.cost_source"), + "centaur_estimate:amp:gpt-5.5" + ); + } + fn kv_string(key: &str, value: &str) -> KeyValue { KeyValue { key: key.to_string(), @@ -1133,4 +1452,17 @@ trust_level = "trusted" ..Default::default() } } + + fn attribute_double(attributes: &[KeyValue], key: &str) -> Option { + attributes + .iter() + .find(|attribute| attribute.key == key) + .and_then(|attribute| attribute.value.as_ref()) + .and_then(|value| match value.value.as_ref()? { + any_value::Value::DoubleValue(value) => Some(*value), + any_value::Value::IntValue(value) => Some(*value as f64), + any_value::Value::StringValue(value) => value.parse().ok(), + _ => None, + }) + } } From fdbba80fdc101dd349182fba6b06838458de011c Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 14:53:34 +0300 Subject: [PATCH 03/12] Add harness trace input output payloads --- crates/harness-server/src/otel.rs | 118 +++++++++++++++++++----- crates/harness-server/src/server.rs | 133 +++++++++++++++++++++++++++- 2 files changed, 227 insertions(+), 24 deletions(-) diff --git a/crates/harness-server/src/otel.rs b/crates/harness-server/src/otel.rs index d2a5a7e42..bc011e918 100644 --- a/crates/harness-server/src/otel.rs +++ b/crates/harness-server/src/otel.rs @@ -13,7 +13,7 @@ use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, Key use opentelemetry_proto::tonic::resource::v1::Resource; use opentelemetry_proto::tonic::trace::v1::{ResourceSpans, ScopeSpans, Span, span}; use prost::Message as _; -use serde_json::Value; +use serde_json::{Value, json}; use url::Url; use uuid::Uuid; @@ -106,6 +106,8 @@ pub(crate) struct HarnessUsageSpan<'a> { pub(crate) model: &'a str, pub(crate) model_provider: &'a str, pub(crate) turn_id: &'a str, + pub(crate) input: Option<&'a str>, + pub(crate) output: Option<&'a str>, pub(crate) start_unix_nano: u64, pub(crate) end_unix_nano: u64, } @@ -601,9 +603,11 @@ fn harness_usage_trace_request( }); set_attribute_string(&mut attributes, "gen_ai.operation.name", "chat"); + set_attribute_string(&mut attributes, "lmnr.span.type", "LLM"); set_attribute_string(&mut attributes, "gen_ai.system", system); set_attribute_string(&mut attributes, "gen_ai.request.model", model); set_attribute_string(&mut attributes, "gen_ai.response.model", model); + set_harness_span_io_attributes(&mut attributes, span_context.input, span_context.output); set_attribute_int( &mut attributes, "gen_ai.usage.input_tokens", @@ -631,11 +635,7 @@ fn harness_usage_trace_request( ); set_attribute_int(&mut attributes, "gen_ai.usage.total_tokens", total_tokens); if let Some(cost) = estimate_usage_cost(span_context.harness, system, model, usage) { - set_attribute_double( - &mut attributes, - "gen_ai.usage.input_cost", - cost.input_cost, - ); + set_attribute_double(&mut attributes, "gen_ai.usage.input_cost", cost.input_cost); set_attribute_double( &mut attributes, "gen_ai.usage.output_cost", @@ -709,6 +709,54 @@ fn harness_usage_trace_request( }) } +fn set_harness_span_io_attributes( + attributes: &mut Vec, + input: Option<&str>, + output: Option<&str>, +) { + if let Some(input) = clean_optional(input) { + set_attribute_string(attributes, "input.value", &input); + set_attribute_string( + attributes, + "lmnr.span.input", + &legacy_chat_message_json("user", &input), + ); + set_attribute_string( + attributes, + "gen_ai.input.messages", + &gen_ai_message_json("user", &input), + ); + } + if let Some(output) = clean_optional(output) { + set_attribute_string(attributes, "output.value", &output); + set_attribute_string( + attributes, + "lmnr.span.output", + &legacy_chat_message_json("assistant", &output), + ); + set_attribute_string( + attributes, + "gen_ai.output.messages", + &gen_ai_message_json("assistant", &output), + ); + } +} + +fn legacy_chat_message_json(role: &str, content: &str) -> String { + serde_json::to_string(&json!([{ "role": role, "content": content }])) + .unwrap_or_else(|_| "[]".to_string()) +} + +fn gen_ai_message_json(role: &str, content: &str) -> String { + serde_json::to_string(&json!([ + { + "role": role, + "parts": [{ "type": "text", "content": content }] + } + ])) + .unwrap_or_else(|_| "[]".to_string()) +} + fn apply_laminar_trace_metadata_to_attributes( attributes: &mut Vec, metadata: &BTreeMap, @@ -812,10 +860,7 @@ fn estimate_usage_cost( }; let input_cost = mtok_cost(non_cached_input_tokens, pricing.input_per_mtok) - + mtok_cost( - cache_creation_tokens, - pricing.cache_creation_per_mtok, - ) + + mtok_cost(cache_creation_tokens, pricing.cache_creation_per_mtok) + mtok_cost(cache_read_tokens, pricing.cache_read_per_mtok); let output_cost = mtok_cost(output_tokens, pricing.output_per_mtok); @@ -837,10 +882,7 @@ fn pricing_for_usage(harness: HarnessKind, system: &str, model: &str) -> Option< } fn normalize_model_name(model: &str) -> String { - model - .trim() - .to_ascii_lowercase() - .replace(['_', '.'], "-") + model.trim().to_ascii_lowercase().replace(['_', '.'], "-") } fn anthropic_pricing(model: &str) -> Option { @@ -875,10 +917,7 @@ fn anthropic_pricing(model: &str) -> Option { source: "centaur_estimate:anthropic:opus-4-deprecated:5m-cache-write", }); } - if model.contains("sonnet-4-6") - || model.contains("sonnet-4-5") - || model.contains("sonnet-4") - { + if model.contains("sonnet-4-6") || model.contains("sonnet-4-5") || model.contains("sonnet-4") { return Some(TokenPricing { input_per_mtok: 3.0, cache_creation_per_mtok: 3.75, @@ -922,7 +961,11 @@ fn openai_pricing(model: &str) -> Option { } fn amp_pricing(_harness: HarnessKind, model: &str) -> Option { - if model == "deep" || model.starts_with("deep-") || model == "rush" || model.starts_with("rush-") { + if model == "deep" + || model.starts_with("deep-") + || model == "rush" + || model.starts_with("rush-") + { return Some(TokenPricing { input_per_mtok: 5.0, cache_creation_per_mtok: 5.0, @@ -1304,6 +1347,8 @@ trust_level = "trusted" model: "fallback-model", model_provider: "anthropic", turn_id: "turn-1", + input: Some("say hi"), + output: Some("hi there"), start_unix_nano: 100, end_unix_nano: 200, }, @@ -1323,6 +1368,34 @@ trust_level = "trusted" attribute_string(&span.attributes, "gen_ai.response.model"), "claude-fable-5" ); + assert_eq!(attribute_string(&span.attributes, "lmnr.span.type"), "LLM"); + assert_eq!(attribute_string(&span.attributes, "input.value"), "say hi"); + assert_eq!( + serde_json::from_str::(&attribute_string( + &span.attributes, + "gen_ai.input.messages" + )) + .expect("input messages JSON"), + json!([{ + "role": "user", + "parts": [{ "type": "text", "content": "say hi" }] + }]) + ); + assert_eq!( + attribute_string(&span.attributes, "output.value"), + "hi there" + ); + assert_eq!( + serde_json::from_str::(&attribute_string( + &span.attributes, + "gen_ai.output.messages" + )) + .expect("output messages JSON"), + json!([{ + "role": "assistant", + "parts": [{ "type": "text", "content": "hi there" }] + }]) + ); assert_eq!( attribute_int(&span.attributes, "gen_ai.usage.input_tokens"), Some(2) @@ -1398,6 +1471,8 @@ trust_level = "trusted" model: "deep", model_provider: "amp", turn_id: "turn-1", + input: None, + output: None, start_unix_nano: 100, end_unix_nano: 200, }, @@ -1407,10 +1482,7 @@ trust_level = "trusted" let span = &request.resource_spans[0].scope_spans[0].spans[0]; assert_eq!(span.name, "amp.session_task.turn"); - assert_eq!( - attribute_string(&span.attributes, "gen_ai.system"), - "amp" - ); + assert_eq!(attribute_string(&span.attributes, "gen_ai.system"), "amp"); assert_eq!( attribute_double(&span.attributes, "gen_ai.usage.input_cost"), Some(0.198141) diff --git a/crates/harness-server/src/server.rs b/crates/harness-server/src/server.rs index 299385e8d..d615625fd 100644 --- a/crates/harness-server/src/server.rs +++ b/crates/harness-server/src/server.rs @@ -26,7 +26,7 @@ use crate::codex::CodexHarnessServer; use crate::otel::{self, HarnessUsageSpan, TraceContext}; use crate::traits::{ AppServerNormalizer, AppServerRuntime, HarnessChild, HarnessKind, HarnessServer, - NormalizedEvent, NormalizedTokenUsage, ThreadState, + NormalizedContent, NormalizedEvent, NormalizedTokenUsage, ThreadState, }; use crate::turn::{BridgeConfig, CodexTurnNormalizer}; use crate::util::{absolute_path, default_codex_home, write_value}; @@ -1034,6 +1034,8 @@ fn run_harness_turn( let usage_span_model = state.model.clone(); let usage_span_model_provider = state.model_provider.clone(); let usage_span_turn_id = normalizer.turn_id().to_string(); + let usage_span_input = usage_span_input_value(input); + let mut usage_span_output = UsageSpanOutput::default(); ensure_harness_process(harness, state)?; let process = state .process @@ -1074,6 +1076,7 @@ fn run_harness_turn( if let Some(usage) = normalized.token_usage() { latest_usage = Some(usage.clone()); } + append_usage_span_output(&normalized, &mut usage_span_output); if let Some(session_id) = normalized.session_id() { last_session_id = Some(session_id.to_string()); state.harness_session_id = Some(session_id.to_string()); @@ -1092,6 +1095,8 @@ fn run_harness_turn( &usage_span_model, &usage_span_model_provider, &usage_span_turn_id, + usage_span_input.as_deref(), + usage_span_output.value().as_deref(), usage_span_start, latest_usage.as_ref(), ); @@ -1123,6 +1128,8 @@ fn export_harness_usage_if_available( model: &str, model_provider: &str, turn_id: &str, + input: Option<&str>, + output: Option<&str>, start_unix_nano: u64, usage: Option<&NormalizedTokenUsage>, ) { @@ -1134,6 +1141,8 @@ fn export_harness_usage_if_available( model, model_provider, turn_id, + input, + output, start_unix_nano, end_unix_nano: otel::unix_time_nanos(), }; @@ -1142,6 +1151,96 @@ fn export_harness_usage_if_available( } } +fn usage_span_input_value(input: &[UserInput]) -> Option { + let mut parts = Vec::new(); + for item in input { + match item { + UserInput::Text { text, .. } => parts.push(text.clone()), + UserInput::Image { url, .. } => parts.push(format!("[image: {url}]")), + UserInput::LocalImage { path, .. } => { + parts.push(format!("[local image: {}]", path.display())); + } + UserInput::Skill { name, path } => { + parts.push(format!("[skill: {name} at {}]", path.display())); + } + UserInput::Mention { name, path } => { + parts.push(format!("[mention: {name} at {path}]")); + } + } + } + let joined = parts.join("\n"); + non_empty(Some(&joined)).map(str::to_owned) +} + +#[derive(Debug, Default)] +struct UsageSpanOutput { + item_order: Vec, + text_by_item_id: HashMap, + fallback: Option, +} + +impl UsageSpanOutput { + fn append_delta(&mut self, item_id: &str, delta: &str) { + if delta.is_empty() { + return; + } + self.remember_item(item_id); + self.text_by_item_id + .entry(item_id.to_string()) + .or_default() + .push_str(delta); + } + + fn set_item_text(&mut self, item_id: &str, text: &str) { + if text.is_empty() { + return; + } + self.remember_item(item_id); + self.text_by_item_id + .insert(item_id.to_string(), text.to_string()); + } + + fn set_fallback_if_empty(&mut self, value: &str) { + if self.value().is_none() { + self.fallback = clean_string(Some(value)); + } + } + + fn value(&self) -> Option { + let mut text = String::new(); + for item_id in &self.item_order { + if let Some(item_text) = self.text_by_item_id.get(item_id) { + text.push_str(item_text); + } + } + clean_string(Some(&text)).or_else(|| self.fallback.clone()) + } + + fn remember_item(&mut self, item_id: &str) { + if self.text_by_item_id.contains_key(item_id) { + return; + } + self.item_order.push(item_id.to_string()); + self.text_by_item_id + .insert(item_id.to_string(), String::new()); + } +} + +fn append_usage_span_output(event: &NormalizedEvent, output: &mut UsageSpanOutput) { + match event { + NormalizedEvent::AssistantMessage { content, .. } => { + for item in content { + if let NormalizedContent::AgentText { item_id, text } = item { + output.set_item_text(item_id, text); + } + } + } + NormalizedEvent::AgentTextDelta { item_id, delta } => output.append_delta(item_id, delta), + NormalizedEvent::Error { message } => output.set_fallback_if_empty(message), + _ => {} + } +} + fn ensure_harness_process(harness: &H, state: &mut ThreadState) -> Result<()> { if state.process.is_some() { return Ok(()); @@ -1349,6 +1448,38 @@ mod tests { assert_eq!(reasoning, None); } + #[test] + fn usage_span_output_replaces_delta_reconstruction_with_canonical_text() { + let mut output = UsageSpanOutput::default(); + append_usage_span_output( + &NormalizedEvent::AgentTextDelta { + item_id: "msg-1".to_string(), + delta: "hel".to_string(), + }, + &mut output, + ); + append_usage_span_output( + &NormalizedEvent::AgentTextDelta { + item_id: "msg-1".to_string(), + delta: "lo".to_string(), + }, + &mut output, + ); + append_usage_span_output( + &NormalizedEvent::AssistantMessage { + partial: false, + stop_reason: Some("end_turn".to_string()), + content: vec![NormalizedContent::AgentText { + item_id: "msg-1".to_string(), + text: "hello".to_string(), + }], + }, + &mut output, + ); + + assert_eq!(output.value().as_deref(), Some("hello")); + } + #[test] fn parses_attachment_chunk_without_starting_turn() { let _upload_dir = temp_upload_dir(); From b84ab40c85683e7e6eb8b7ee7526d5c4e27f95de Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 15:59:42 +0300 Subject: [PATCH 04/12] Fix Laminar thread trace grouping --- crates/harness-server/src/otel.rs | 38 ++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/crates/harness-server/src/otel.rs b/crates/harness-server/src/otel.rs index bc011e918..3e6785bcc 100644 --- a/crates/harness-server/src/otel.rs +++ b/crates/harness-server/src/otel.rs @@ -35,10 +35,13 @@ pub(crate) struct TraceContext { impl TraceContext { pub(crate) fn effective_trace_id(&self) -> Option { - self.traceparent - .as_deref() - .and_then(trace_id_from_traceparent) - .or_else(|| self.trace_id.clone()) + self.trace_id + .clone() + .or_else(|| { + self.traceparent + .as_deref() + .and_then(trace_id_from_traceparent) + }) .or_else(|| clean_optional(env::var("CENTAUR_TRACE_ID").ok().as_deref())) } @@ -47,8 +50,13 @@ impl TraceContext { .as_deref() .and_then(|value| validate_traceparent(value).map(str::to_owned)) .or_else(|| { - self.effective_trace_id() - .and_then(|trace_id| traceparent_from_trace_id(&trace_id)) + self.trace_id + .as_deref() + .and_then(traceparent_from_trace_id) + .or_else(|| { + clean_optional(env::var("CENTAUR_TRACE_ID").ok().as_deref()) + .and_then(|trace_id| traceparent_from_trace_id(&trace_id)) + }) }) } } @@ -1272,6 +1280,24 @@ trust_level = "trusted" assert!(!config.contains("environment = \"old\"")); } + #[test] + fn explicit_trace_id_wins_for_otlp_grouping_but_traceparent_stays_current() { + let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; + let execution_traceparent = "00-fedcba9876543210fedcba9876543210-0123456789abcdef-01"; + let trace = TraceContext { + thread_key: Some("slack:T:C:1.0".to_string()), + trace_id: Some(thread_trace_id.to_string()), + traceparent: Some(execution_traceparent.to_string()), + metadata: BTreeMap::new(), + }; + + assert_eq!(trace.effective_trace_id().as_deref(), Some(thread_trace_id)); + assert_eq!( + trace.effective_traceparent().as_deref(), + Some(execution_traceparent) + ); + } + #[test] fn rewrite_otlp_trace_payload_prefixes_and_normalizes_codex_turn_span() { let request = ExportTraceServiceRequest { From f56e380395d455592de203664c883885b17abad9 Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 16:16:44 +0300 Subject: [PATCH 05/12] Group Claude and Amp usage spans by thread trace --- crates/harness-server/src/otel.rs | 83 ++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 6 deletions(-) diff --git a/crates/harness-server/src/otel.rs b/crates/harness-server/src/otel.rs index 3e6785bcc..fd713d4a8 100644 --- a/crates/harness-server/src/otel.rs +++ b/crates/harness-server/src/otel.rs @@ -583,12 +583,7 @@ fn harness_usage_trace_request( span_context: HarnessUsageSpan<'_>, usage: &NormalizedTokenUsage, ) -> Result { - let traceparent = trace.effective_traceparent().ok_or_else(|| { - HarnessServerError::Protocol("missing trace id for harness usage span".to_string()) - })?; - let (trace_id, parent_span_id) = trace_ids_from_traceparent(&traceparent).ok_or_else(|| { - HarnessServerError::Protocol("invalid traceparent for harness usage span".to_string()) - })?; + let (trace_id, parent_span_id) = harness_usage_span_trace_ids(trace)?; let mut attributes = Vec::new(); let harness_name = harness_name(span_context.harness); let system = gen_ai_system(span_context.harness, span_context.model_provider); @@ -717,6 +712,28 @@ fn harness_usage_trace_request( }) } +fn harness_usage_span_trace_ids(trace: &TraceContext) -> Result<(Vec, Vec)> { + let trace_id = trace.effective_trace_id().ok_or_else(|| { + HarnessServerError::Protocol("missing trace id for harness usage span".to_string()) + })?; + let trace_id = trace_id_to_bytes(&trace_id).ok_or_else(|| { + HarnessServerError::Protocol("invalid trace id for harness usage span".to_string()) + })?; + let parent_span_id = trace + .traceparent + .as_deref() + .and_then(trace_ids_from_traceparent) + .and_then(|(parent_trace_id, parent_span_id)| { + if parent_trace_id == trace_id { + Some(parent_span_id) + } else { + None + } + }) + .unwrap_or_default(); + Ok((trace_id, parent_span_id)) +} + fn set_harness_span_io_attributes( attributes: &mut Vec, input: Option<&str>, @@ -788,6 +805,10 @@ fn trace_ids_from_traceparent(traceparent: &str) -> Option<(Vec, Vec)> { Some((hex_bytes(parts[1])?, hex_bytes(parts[2])?)) } +fn trace_id_to_bytes(trace_id: &str) -> Option> { + Some(Uuid::parse_str(trace_id).ok()?.as_bytes().to_vec()) +} + fn hex_bytes(value: &str) -> Option> { if value.len() % 2 != 0 { return None; @@ -1471,6 +1492,56 @@ trust_level = "trusted" ); } + #[test] + fn harness_usage_trace_request_uses_explicit_thread_trace_id() { + let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; + let trace = TraceContext { + thread_key: Some("slack:C123:123.456".to_string()), + trace_id: Some(thread_trace_id.to_string()), + traceparent: Some( + "00-fedcba9876543210fedcba9876543210-1111111111111111-01".to_string(), + ), + metadata: BTreeMap::new(), + }; + let usage = NormalizedTokenUsage { + model: Some("claude-opus-4-8".to_string()), + input_tokens: Some(10), + output_tokens: Some(2), + cache_creation_input_tokens: None, + cache_read_input_tokens: None, + reasoning_output_tokens: None, + total_tokens: None, + }; + let request = harness_usage_trace_request( + &trace, + HarnessUsageSpan { + harness: HarnessKind::ClaudeCode, + model: "fallback-model", + model_provider: "anthropic", + turn_id: "turn-1", + input: None, + output: None, + start_unix_nano: 100, + end_unix_nano: 200, + }, + &usage, + ) + .expect("usage trace request"); + let span = &request.resource_spans[0].scope_spans[0].spans[0]; + + assert_eq!( + span.trace_id, + Uuid::parse_str(thread_trace_id) + .expect("thread trace uuid") + .as_bytes() + .to_vec() + ); + assert!( + span.parent_span_id.is_empty(), + "parent span cannot cross trace ids" + ); + } + #[test] fn harness_usage_trace_request_estimates_amp_deep_cost() { let trace = TraceContext { From 260b0e198d1495bc20b5cc593556dbe69e278ec9 Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 20:10:42 +0300 Subject: [PATCH 06/12] Preserve API trace parentage for harness usage spans --- crates/harness-server/src/otel.rs | 73 +++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 14 deletions(-) diff --git a/crates/harness-server/src/otel.rs b/crates/harness-server/src/otel.rs index fd713d4a8..8f3591e75 100644 --- a/crates/harness-server/src/otel.rs +++ b/crates/harness-server/src/otel.rs @@ -35,13 +35,10 @@ pub(crate) struct TraceContext { impl TraceContext { pub(crate) fn effective_trace_id(&self) -> Option { - self.trace_id - .clone() - .or_else(|| { - self.traceparent - .as_deref() - .and_then(trace_id_from_traceparent) - }) + self.traceparent + .as_deref() + .and_then(trace_id_from_traceparent) + .or_else(|| self.trace_id.clone()) .or_else(|| clean_optional(env::var("CENTAUR_TRACE_ID").ok().as_deref())) } @@ -1302,7 +1299,7 @@ trust_level = "trusted" } #[test] - fn explicit_trace_id_wins_for_otlp_grouping_but_traceparent_stays_current() { + fn traceparent_wins_for_otlp_parentage_when_available() { let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; let execution_traceparent = "00-fedcba9876543210fedcba9876543210-0123456789abcdef-01"; let trace = TraceContext { @@ -1312,7 +1309,10 @@ trust_level = "trusted" metadata: BTreeMap::new(), }; - assert_eq!(trace.effective_trace_id().as_deref(), Some(thread_trace_id)); + assert_eq!( + trace.effective_trace_id().as_deref(), + Some("fedcba98-7654-3210-fedc-ba9876543210") + ); assert_eq!( trace.effective_traceparent().as_deref(), Some(execution_traceparent) @@ -1493,7 +1493,7 @@ trust_level = "trusted" } #[test] - fn harness_usage_trace_request_uses_explicit_thread_trace_id() { + fn harness_usage_trace_request_uses_traceparent_for_api_parentage() { let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; let trace = TraceContext { thread_key: Some("slack:C123:123.456".to_string()), @@ -1529,6 +1529,54 @@ trust_level = "trusted" .expect("usage trace request"); let span = &request.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!( + span.trace_id, + Uuid::parse_str("fedcba98-7654-3210-fedc-ba9876543210") + .expect("execution trace uuid") + .as_bytes() + .to_vec() + ); + assert_eq!( + span.parent_span_id, + vec![0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11] + ); + } + + #[test] + fn harness_usage_trace_request_falls_back_to_explicit_thread_trace_id() { + let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; + let trace = TraceContext { + thread_key: Some("slack:C123:123.456".to_string()), + trace_id: Some(thread_trace_id.to_string()), + traceparent: None, + metadata: BTreeMap::new(), + }; + let usage = NormalizedTokenUsage { + model: Some("claude-opus-4-8".to_string()), + input_tokens: Some(10), + output_tokens: Some(2), + cache_creation_input_tokens: None, + cache_read_input_tokens: None, + reasoning_output_tokens: None, + total_tokens: None, + }; + let request = harness_usage_trace_request( + &trace, + HarnessUsageSpan { + harness: HarnessKind::ClaudeCode, + model: "fallback-model", + model_provider: "anthropic", + turn_id: "turn-1", + input: None, + output: None, + start_unix_nano: 100, + end_unix_nano: 200, + }, + &usage, + ) + .expect("usage trace request"); + let span = &request.resource_spans[0].scope_spans[0].spans[0]; + assert_eq!( span.trace_id, Uuid::parse_str(thread_trace_id) @@ -1536,10 +1584,7 @@ trust_level = "trusted" .as_bytes() .to_vec() ); - assert!( - span.parent_span_id.is_empty(), - "parent span cannot cross trace ids" - ); + assert!(span.parent_span_id.is_empty()); } #[test] From 6a7f8d8982bd2deac591257ade522135e2691016 Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 20:56:30 +0300 Subject: [PATCH 07/12] Use thread trace identity for session API spans --- .../crates/centaur-api-server/src/routes.rs | 46 +++++++++++++++++-- .../crates/centaur-session-runtime/src/lib.rs | 33 ++++++++++++- .../crates/centaur-telemetry/src/lib.rs | 46 ++++++++++++++++++- 3 files changed, 120 insertions(+), 5 deletions(-) diff --git a/services/api-rs/crates/centaur-api-server/src/routes.rs b/services/api-rs/crates/centaur-api-server/src/routes.rs index 90e8673b8..94a98822b 100644 --- a/services/api-rs/crates/centaur-api-server/src/routes.rs +++ b/services/api-rs/crates/centaur-api-server/src/routes.rs @@ -30,11 +30,12 @@ use base64::{Engine as _, engine::general_purpose}; use centaur_session_core::ThreadKey; use centaur_session_runtime::{ ExecuteSessionInput, HarnessConflictPolicy, PersonaSummary, SandboxRuntime, SessionRuntime, + thread_trace_id, thread_trace_parent_span_id, }; use centaur_session_sqlx::PgSessionStore; use centaur_telemetry::{ PrometheusHandle, http_status_class, prometheus_handle, record_http_request_finished, - record_http_request_started, + record_http_request_started, set_span_parent_trace, }; use centaur_workflows::{ CreateWorkflowRunRequest, WorkflowRuntime, WorkflowWebhookAuth, WorkflowWebhookSpec, @@ -234,14 +235,26 @@ pub fn build_router_with_app_state(state: AppState) -> Router { TraceLayer::new_for_http() .make_span_with(|request: &Request| { let route = matched_route(request); - tracing::info_span!( + let span = tracing::info_span!( "centaur.api_rs.http_request", "otel.kind" = "server", "otel.status_code" = tracing::field::Empty, "http.request.method" = request.method().as_str(), "http.route" = route.as_str(), "http.response.status_code" = tracing::field::Empty, - ) + "centaur.thread_key" = tracing::field::Empty, + thread_key = tracing::field::Empty, + ); + if let Some(thread_key) = session_thread_key_from_request(request) { + span.record("centaur.thread_key", thread_key.as_str()); + span.record("thread_key", thread_key.as_str()); + set_span_parent_trace( + &span, + &thread_trace_id(&thread_key), + &thread_trace_parent_span_id(&thread_key), + ); + } + span }) .on_request(()) .on_response(|response: &Response, latency: Duration, span: &Span| { @@ -319,6 +332,20 @@ fn matched_route(request: &Request) -> String { .unwrap_or_else(|| "__unmatched__".to_owned()) } +fn session_thread_key_from_request(request: &Request) -> Option { + session_thread_key_from_path(request.uri().path()) +} + +fn session_thread_key_from_path(path: &str) -> Option { + let rest = path.strip_prefix("/api/session/")?; + let raw_thread_key = rest.split('/').next()?; + if raw_thread_key.is_empty() { + return None; + } + let decoded = urlencoding::decode(raw_thread_key).ok()?; + ThreadKey::try_from(decoded.into_owned()).ok() +} + async fn create_or_get_session( State(state): State, Path(raw_thread_key): Path, @@ -1329,6 +1356,19 @@ fn header_value(headers: &HeaderMap, name: &str) -> Option { mod webhook_tests { use super::*; + #[test] + fn session_thread_key_from_path_decodes_session_routes() { + assert_eq!( + session_thread_key_from_path( + "/api/session/slack%3AT092R71U6QY%3AC0B1NNXKE4F%3A1782217699.671539/execute" + ) + .map(|thread_key| thread_key.to_string()), + Some("slack:T092R71U6QY:C0B1NNXKE4F:1782217699.671539".to_owned()) + ); + assert!(session_thread_key_from_path("/api/workflows/runs").is_none()); + assert!(session_thread_key_from_path("/api/session//execute").is_none()); + } + #[test] fn parses_form_payload_json() { let mut headers = HeaderMap::new(); diff --git a/services/api-rs/crates/centaur-session-runtime/src/lib.rs b/services/api-rs/crates/centaur-session-runtime/src/lib.rs index 4c324f40b..0f24bf447 100644 --- a/services/api-rs/crates/centaur-session-runtime/src/lib.rs +++ b/services/api-rs/crates/centaur-session-runtime/src/lib.rs @@ -23,6 +23,7 @@ use centaur_session_sqlx::{ use centaur_telemetry::{ record_sandbox_warm_pool_claim, record_session_execution_finished, record_session_execution_started, record_session_failure, record_session_first_token_latency, + set_span_parent_trace, }; use dashmap::DashMap; use futures_util::{SinkExt, Stream, StreamExt, stream}; @@ -757,6 +758,11 @@ impl SessionRuntime { input_line_count, idempotency_key_present, ); + set_span_parent_trace( + &span, + &thread_trace_id(thread_key), + &thread_trace_parent_span_id(thread_key), + ); let result = async { info!( component = COMPONENT_SESSION_RUNTIME, @@ -828,6 +834,11 @@ impl SessionRuntime { execution_id = %execution.execution_id, sandbox_id = tracing::field::Empty, ); + set_span_parent_trace( + &execution_trace_span, + &thread_trace_id(thread_key), + &thread_trace_parent_span_id(thread_key), + ); self.execution_spans .lock() .await @@ -3821,7 +3832,7 @@ impl SessionTraceContext { /// Deterministic per-thread trace id: one trace identity per thread without a /// `thread_traces` table (derive, don't store). -fn thread_trace_id(thread_key: &ThreadKey) -> String { +pub fn thread_trace_id(thread_key: &ThreadKey) -> String { uuid::Uuid::new_v5( &uuid::Uuid::NAMESPACE_URL, format!("centaur:thread:{}", thread_key.as_str()).as_bytes(), @@ -3829,6 +3840,16 @@ fn thread_trace_id(thread_key: &ThreadKey) -> String { .to_string() } +pub fn thread_trace_parent_span_id(thread_key: &ThreadKey) -> String { + let digest = Sha256::digest(format!("centaur:thread-parent:{}", thread_key.as_str())); + let mut bytes = [0_u8; 8]; + bytes.copy_from_slice(&digest[..8]); + if bytes.iter().all(|byte| *byte == 0) { + bytes[7] = 1; + } + bytes.iter().map(|byte| format!("{byte:02x}")).collect() +} + fn input_lines_with_session_context( thread_key: &ThreadKey, trace: &SessionTraceContext, @@ -5089,6 +5110,16 @@ mod tests { assert_ne!(thread_trace_id(&thread_key), thread_trace_id(&other)); // The wrapper parses this with uuid.UUID(...): must stay a canonical UUID. assert!(uuid::Uuid::parse_str(&thread_trace_id(&thread_key)).is_ok()); + assert_eq!( + thread_trace_parent_span_id(&thread_key), + thread_trace_parent_span_id(&thread_key) + ); + assert_ne!( + thread_trace_parent_span_id(&thread_key), + thread_trace_parent_span_id(&other) + ); + assert_eq!(thread_trace_parent_span_id(&thread_key).len(), 16); + assert_ne!(thread_trace_parent_span_id(&thread_key), "0000000000000000"); } fn session_with_sandbox(sandbox_id: &str) -> Session { diff --git a/services/api-rs/crates/centaur-telemetry/src/lib.rs b/services/api-rs/crates/centaur-telemetry/src/lib.rs index 7cc08391f..8c71d93d9 100644 --- a/services/api-rs/crates/centaur-telemetry/src/lib.rs +++ b/services/api-rs/crates/centaur-telemetry/src/lib.rs @@ -8,7 +8,13 @@ use std::{ pub use metrics_exporter_prometheus::PrometheusHandle; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; -use opentelemetry::trace::{TraceContextExt as _, TracerProvider as _}; +use opentelemetry::{ + Context, + trace::{ + SpanContext, SpanId, TraceContextExt as _, TraceFlags, TraceId, TraceState, + TracerProvider as _, + }, +}; use opentelemetry_sdk::{Resource, trace::SdkTracerProvider}; use serde_json::Value; use thiserror::Error; @@ -416,6 +422,44 @@ pub fn traceparent_for_span(span: &tracing::Span) -> Option { )) } +/// Assign a remote parent trace to a not-yet-entered tracing span. +/// +/// `trace_id` may be a UUID string or 32-character W3C trace id. The parent +/// span id is synthetic and intentionally not exported; it just gives all +/// per-turn request spans the same thread trace identity. +pub fn set_span_parent_trace(span: &tracing::Span, trace_id: &str, parent_span_id: &str) -> bool { + let Some(parent_context) = remote_parent_context(trace_id, parent_span_id) else { + return false; + }; + span.set_parent(parent_context).is_ok() +} + +fn remote_parent_context(trace_id: &str, parent_span_id: &str) -> Option { + let trace_id = normalize_trace_id_hex(trace_id)?; + let trace_id = TraceId::from_hex(&trace_id).ok()?; + let parent_span_id = SpanId::from_hex(parent_span_id).ok()?; + if trace_id == TraceId::INVALID || parent_span_id == SpanId::INVALID { + return None; + } + let span_context = SpanContext::new( + trace_id, + parent_span_id, + TraceFlags::SAMPLED, + true, + TraceState::default(), + ); + Some(Context::new().with_remote_span_context(span_context)) +} + +fn normalize_trace_id_hex(trace_id: &str) -> Option { + let hex: String = trace_id.chars().filter(|ch| *ch != '-').collect(); + if hex.len() == 32 && hex.chars().all(|ch| ch.is_ascii_hexdigit()) { + Some(hex) + } else { + None + } +} + pub fn init_telemetry(config: TelemetryConfig) -> Result { let _metrics = prometheus_handle()?; let filter = EnvFilter::try_new(&config.rust_log).unwrap_or_else(|_| EnvFilter::new("info")); From 6f4d982b506fcf5db8782b8883be470e62c0d39e Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 21:38:44 +0300 Subject: [PATCH 08/12] Keep session API subspans under execution trace --- services/api-rs/crates/centaur-session-runtime/src/lib.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/api-rs/crates/centaur-session-runtime/src/lib.rs b/services/api-rs/crates/centaur-session-runtime/src/lib.rs index 0f24bf447..32bd47aba 100644 --- a/services/api-rs/crates/centaur-session-runtime/src/lib.rs +++ b/services/api-rs/crates/centaur-session-runtime/src/lib.rs @@ -868,6 +868,7 @@ impl SessionRuntime { session.iron_control_principal.as_deref(), &execution.execution_id, ) + .instrument(execution_trace_span.clone()) .await { Ok(sandbox_id) => sandbox_id, @@ -882,7 +883,11 @@ impl SessionRuntime { execution_trace_span.record("centaur.sandbox_id", sandbox_id.as_str()); execution_trace_span.record("sandbox_id", sandbox_id.as_str()); - let pipe = match self.ensure_session_pipe(thread_key, &sandbox_id).await { + let pipe = match self + .ensure_session_pipe(thread_key, &sandbox_id) + .instrument(execution_trace_span.clone()) + .await + { Ok(pipe) => pipe, Err(error) => { self.record_execution_failure(thread_key, &execution.execution_id, &error) @@ -900,6 +905,7 @@ impl SessionRuntime { &execution.execution_id, Some(&sandbox_id), ) + .instrument(execution_trace_span.clone()) .await { self.record_execution_failure(thread_key, &execution.execution_id, &error) From 9eb6f2ff3c9ddb52dbc90fa43dd8ba433437fce7 Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 21:56:53 +0300 Subject: [PATCH 09/12] Keep stdout pump spans in thread trace --- services/api-rs/crates/centaur-session-runtime/src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/api-rs/crates/centaur-session-runtime/src/lib.rs b/services/api-rs/crates/centaur-session-runtime/src/lib.rs index 32bd47aba..b49c122e0 100644 --- a/services/api-rs/crates/centaur-session-runtime/src/lib.rs +++ b/services/api-rs/crates/centaur-session-runtime/src/lib.rs @@ -2254,6 +2254,11 @@ async fn run_stdout_pump( thread_key = %thread_key, sandbox_id, ); + set_span_parent_trace( + &span, + &thread_trace_id(&thread_key), + &thread_trace_parent_span_id(&thread_key), + ); async { let mut stdout = FramedRead::new(stdout, LinesCodec::new()); info!( From a07a14586dfb9a9c6aa474ef87f358727cf4c16f Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:55:04 +0300 Subject: [PATCH 10/12] Fix thread trace roots for harness spans --- crates/harness-server/src/otel.rs | 92 ++++++--- .../crates/centaur-session-runtime/src/lib.rs | 31 ++- .../crates/centaur-telemetry/src/lib.rs | 179 ++++++++++++++++-- 3 files changed, 256 insertions(+), 46 deletions(-) diff --git a/crates/harness-server/src/otel.rs b/crates/harness-server/src/otel.rs index 8f3591e75..a2fed5ccd 100644 --- a/crates/harness-server/src/otel.rs +++ b/crates/harness-server/src/otel.rs @@ -35,26 +35,23 @@ pub(crate) struct TraceContext { impl TraceContext { pub(crate) fn effective_trace_id(&self) -> Option { - self.traceparent - .as_deref() - .and_then(trace_id_from_traceparent) - .or_else(|| self.trace_id.clone()) + self.trace_id + .clone() + .or_else(|| self.traceparent.as_deref().and_then(trace_id_from_traceparent)) .or_else(|| clean_optional(env::var("CENTAUR_TRACE_ID").ok().as_deref())) } pub(crate) fn effective_traceparent(&self) -> Option { - self.traceparent + let trace_id = self.effective_trace_id()?; + if let Some(traceparent) = self + .traceparent .as_deref() - .and_then(|value| validate_traceparent(value).map(str::to_owned)) - .or_else(|| { - self.trace_id - .as_deref() - .and_then(traceparent_from_trace_id) - .or_else(|| { - clean_optional(env::var("CENTAUR_TRACE_ID").ok().as_deref()) - .and_then(|trace_id| traceparent_from_trace_id(&trace_id)) - }) - }) + .and_then(validate_traceparent) + && trace_id_from_traceparent(traceparent).as_deref() == Some(trace_id.as_str()) + { + return Some(traceparent.to_owned()); + } + traceparent_from_trace_id(&trace_id) } } @@ -1299,7 +1296,7 @@ trust_level = "trusted" } #[test] - fn traceparent_wins_for_otlp_parentage_when_available() { + fn explicit_thread_trace_id_wins_over_foreign_traceparent() { let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; let execution_traceparent = "00-fedcba9876543210fedcba9876543210-0123456789abcdef-01"; let trace = TraceContext { @@ -1311,11 +1308,13 @@ trust_level = "trusted" assert_eq!( trace.effective_trace_id().as_deref(), - Some("fedcba98-7654-3210-fedc-ba9876543210") + Some(thread_trace_id) ); + let effective_traceparent = trace.effective_traceparent().expect("traceparent"); + assert_ne!(effective_traceparent, execution_traceparent); assert_eq!( - trace.effective_traceparent().as_deref(), - Some(execution_traceparent) + trace_id_from_traceparent(&effective_traceparent).as_deref(), + Some(thread_trace_id) ); } @@ -1493,13 +1492,13 @@ trust_level = "trusted" } #[test] - fn harness_usage_trace_request_uses_traceparent_for_api_parentage() { + fn harness_usage_trace_request_uses_matching_traceparent_for_api_parentage() { let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; let trace = TraceContext { thread_key: Some("slack:C123:123.456".to_string()), trace_id: Some(thread_trace_id.to_string()), traceparent: Some( - "00-fedcba9876543210fedcba9876543210-1111111111111111-01".to_string(), + "00-0123456789abcdef0123456789abcdef-1111111111111111-01".to_string(), ), metadata: BTreeMap::new(), }; @@ -1531,8 +1530,8 @@ trust_level = "trusted" assert_eq!( span.trace_id, - Uuid::parse_str("fedcba98-7654-3210-fedc-ba9876543210") - .expect("execution trace uuid") + Uuid::parse_str(thread_trace_id) + .expect("thread trace uuid") .as_bytes() .to_vec() ); @@ -1542,6 +1541,53 @@ trust_level = "trusted" ); } + #[test] + fn harness_usage_trace_request_ignores_foreign_traceparent_parentage() { + let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; + let trace = TraceContext { + thread_key: Some("slack:C123:123.456".to_string()), + trace_id: Some(thread_trace_id.to_string()), + traceparent: Some( + "00-fedcba9876543210fedcba9876543210-1111111111111111-01".to_string(), + ), + metadata: BTreeMap::new(), + }; + let usage = NormalizedTokenUsage { + model: Some("claude-opus-4-8".to_string()), + input_tokens: Some(10), + output_tokens: Some(2), + cache_creation_input_tokens: None, + cache_read_input_tokens: None, + reasoning_output_tokens: None, + total_tokens: None, + }; + let request = harness_usage_trace_request( + &trace, + HarnessUsageSpan { + harness: HarnessKind::ClaudeCode, + model: "fallback-model", + model_provider: "anthropic", + turn_id: "turn-1", + input: None, + output: None, + start_unix_nano: 100, + end_unix_nano: 200, + }, + &usage, + ) + .expect("usage trace request"); + let span = &request.resource_spans[0].scope_spans[0].spans[0]; + + assert_eq!( + span.trace_id, + Uuid::parse_str(thread_trace_id) + .expect("thread trace uuid") + .as_bytes() + .to_vec() + ); + assert!(span.parent_span_id.is_empty()); + } + #[test] fn harness_usage_trace_request_falls_back_to_explicit_thread_trace_id() { let thread_trace_id = "01234567-89ab-cdef-0123-456789abcdef"; diff --git a/services/api-rs/crates/centaur-session-runtime/src/lib.rs b/services/api-rs/crates/centaur-session-runtime/src/lib.rs index b49c122e0..418cf3ffd 100644 --- a/services/api-rs/crates/centaur-session-runtime/src/lib.rs +++ b/services/api-rs/crates/centaur-session-runtime/src/lib.rs @@ -21,9 +21,9 @@ use centaur_session_sqlx::{ PgSessionStore, SessionEventListener, SessionStoreError, default_metadata, }; use centaur_telemetry::{ - record_sandbox_warm_pool_claim, record_session_execution_finished, - record_session_execution_started, record_session_failure, record_session_first_token_latency, - set_span_parent_trace, + export_thread_trace_root_span, record_sandbox_warm_pool_claim, + record_session_execution_finished, record_session_execution_started, record_session_failure, + record_session_first_token_latency, set_span_parent_trace, }; use dashmap::DashMap; use futures_util::{SinkExt, Stream, StreamExt, stream}; @@ -427,7 +427,13 @@ impl SessionRuntime { harness_type = %harness_type, iron_control_enabled = self.iron_control.is_some(), ); + set_span_parent_trace( + &span, + &thread_trace_id(thread_key), + &thread_trace_parent_span_id(thread_key), + ); let result = async { + ensure_thread_trace_root_span(thread_key); info!( component = COMPONENT_SESSION_RUNTIME, event = "session_create_or_get_started", @@ -643,7 +649,13 @@ impl SessionRuntime { thread_key = %thread_key, message_count = messages.len(), ); + set_span_parent_trace( + &span, + &thread_trace_id(thread_key), + &thread_trace_parent_span_id(thread_key), + ); let result = async { + ensure_thread_trace_root_span(thread_key); if messages.is_empty() { return Err(SessionRuntimeError::BadRequest( "messages must not be empty".to_owned(), @@ -764,6 +776,7 @@ impl SessionRuntime { &thread_trace_parent_span_id(thread_key), ); let result = async { + ensure_thread_trace_root_span(thread_key); info!( component = COMPONENT_SESSION_RUNTIME, event = "session_execute_started", @@ -2260,6 +2273,7 @@ async fn run_stdout_pump( &thread_trace_parent_span_id(&thread_key), ); async { + ensure_thread_trace_root_span(&thread_key); let mut stdout = FramedRead::new(stdout, LinesCodec::new()); info!( component = COMPONENT_SESSION_RUNTIME, @@ -3851,6 +3865,17 @@ pub fn thread_trace_id(thread_key: &ThreadKey) -> String { .to_string() } +fn ensure_thread_trace_root_span(thread_key: &ThreadKey) { + let trace_id = thread_trace_id(thread_key); + let root_span_id = thread_trace_parent_span_id(thread_key); + let thread_key = thread_key.as_str().to_owned(); + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + let _ = export_thread_trace_root_span(&trace_id, &root_span_id, &thread_key).await; + }); + } +} + pub fn thread_trace_parent_span_id(thread_key: &ThreadKey) -> String { let digest = Sha256::digest(format!("centaur:thread-parent:{}", thread_key.as_str())); let mut bytes = [0_u8; 8]; diff --git a/services/api-rs/crates/centaur-telemetry/src/lib.rs b/services/api-rs/crates/centaur-telemetry/src/lib.rs index 8c71d93d9..7c27d4b3f 100644 --- a/services/api-rs/crates/centaur-telemetry/src/lib.rs +++ b/services/api-rs/crates/centaur-telemetry/src/lib.rs @@ -1,21 +1,26 @@ //! Shared telemetry setup for the Rust Centaur control plane. use std::{ + borrow::Cow, + collections::HashSet, env, fmt as std_fmt, sync::{LazyLock, Mutex}, - time::Duration, + time::{Duration, SystemTime}, }; pub use metrics_exporter_prometheus::PrometheusHandle; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; use opentelemetry::{ - Context, + Context, InstrumentationScope, KeyValue, trace::{ - SpanContext, SpanId, TraceContextExt as _, TraceFlags, TraceId, TraceState, - TracerProvider as _, + SpanContext, SpanId, SpanKind, Status, TraceContextExt as _, TraceFlags, TraceId, + TraceState, TracerProvider as _, }, }; -use opentelemetry_sdk::{Resource, trace::SdkTracerProvider}; +use opentelemetry_sdk::{ + Resource, + trace::{SdkTracerProvider, SpanData, SpanEvents, SpanExporter as _, SpanLinks}, +}; use serde_json::Value; use thiserror::Error; use tracing::{Event, Subscriber}; @@ -88,6 +93,8 @@ const COMPANY_CONTEXT_DOCUMENT_SIZE_BUCKETS: &[f64] = &[ static PROMETHEUS_HANDLE: LazyLock>> = LazyLock::new(|| Mutex::new(None)); +static EXPORTED_THREAD_ROOT_SPANS: LazyLock>> = + LazyLock::new(|| Mutex::new(HashSet::new())); #[derive(Clone, Debug, Eq, PartialEq)] pub struct TelemetryConfig { @@ -425,8 +432,8 @@ pub fn traceparent_for_span(span: &tracing::Span) -> Option { /// Assign a remote parent trace to a not-yet-entered tracing span. /// /// `trace_id` may be a UUID string or 32-character W3C trace id. The parent -/// span id is synthetic and intentionally not exported; it just gives all -/// per-turn request spans the same thread trace identity. +/// span id is a deterministic thread-root span id; callers should export that +/// root span so trace viewers have a parentless node to render. pub fn set_span_parent_trace(span: &tracing::Span, trace_id: &str, parent_span_id: &str) -> bool { let Some(parent_context) = remote_parent_context(trace_id, parent_span_id) else { return false; @@ -434,6 +441,113 @@ pub fn set_span_parent_trace(span: &tracing::Span, trace_id: &str, parent_span_i span.set_parent(parent_context).is_ok() } +pub async fn export_thread_trace_root_span( + trace_id: &str, + root_span_id: &str, + thread_key: &str, +) -> bool { + if TraceExporter::from_env() != TraceExporter::Otlp { + return false; + } + + let export_key = format!("{trace_id}:{root_span_id}"); + { + let mut exported = EXPORTED_THREAD_ROOT_SPANS + .lock() + .expect("thread root span export lock poisoned"); + if !exported.insert(export_key.clone()) { + return true; + } + } + + if let Err(error) = + export_thread_trace_root_span_inner(trace_id, root_span_id, thread_key).await + { + EXPORTED_THREAD_ROOT_SPANS + .lock() + .expect("thread root span export lock poisoned") + .remove(&export_key); + tracing::warn!( + %error, + trace_id, + root_span_id, + thread_key, + "failed to export thread trace root span" + ); + return false; + } + + true +} + +async fn export_thread_trace_root_span_inner( + trace_id: &str, + root_span_id: &str, + thread_key: &str, +) -> Result<(), String> { + let config = TelemetryConfig::from_env(); + let resource = otlp_resource(&config); + let span = thread_trace_root_span_data(trace_id, root_span_id, thread_key, SystemTime::now())?; + let mut exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .build() + .map_err(|error| error.to_string())?; + exporter.set_resource(&resource); + exporter + .export(vec![span]) + .await + .map_err(|error| error.to_string()) +} + +fn thread_trace_root_span_data( + trace_id: &str, + root_span_id: &str, + thread_key: &str, + start_time: SystemTime, +) -> Result { + let trace_hex = + normalize_trace_id_hex(trace_id).ok_or_else(|| "invalid thread trace id".to_owned())?; + let trace_id = TraceId::from_hex(&trace_hex) + .map_err(|error| format!("invalid thread trace id: {error}"))?; + let span_id = SpanId::from_hex(root_span_id) + .map_err(|error| format!("invalid thread root span id: {error}"))?; + if trace_id == TraceId::INVALID || span_id == SpanId::INVALID { + return Err("invalid zero thread trace id or root span id".to_owned()); + } + let end_time = start_time + .checked_add(Duration::from_nanos(1)) + .unwrap_or(start_time); + + Ok(SpanData { + span_context: SpanContext::new( + trace_id, + span_id, + TraceFlags::SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + parent_span_is_remote: false, + span_kind: SpanKind::Internal, + name: Cow::Borrowed("centaur.api_rs.thread"), + start_time, + end_time, + attributes: vec![ + KeyValue::new(FIELD_COMPONENT, "session_runtime"), + KeyValue::new(FIELD_EVENT, "thread_trace_root"), + KeyValue::new("centaur.thread_key", thread_key.to_owned()), + KeyValue::new(FIELD_THREAD_KEY, thread_key.to_owned()), + ], + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: InstrumentationScope::builder("centaur.api-rs") + .with_version(env!("CARGO_PKG_VERSION")) + .build(), + }) +} + fn remote_parent_context(trace_id: &str, parent_span_id: &str) -> Option { let trace_id = normalize_trace_id_hex(trace_id)?; let trace_id = TraceId::from_hex(&trace_id).ok()?; @@ -623,28 +737,27 @@ fn workflow_metric_labels(labels: &[(String, String)]) -> Vec { fn build_otlp_tracer_provider( config: &TelemetryConfig, ) -> Result { - let resource = Resource::builder() - .with_service_name(config.service_name.clone()) - .with_attribute(opentelemetry::KeyValue::new( - OTEL_SERVICE_NAMESPACE, - SERVICE_NAMESPACE, - )) - .with_attribute(opentelemetry::KeyValue::new( - OTEL_DEPLOYMENT_ENVIRONMENT_NAME, - config.environment.clone(), - )) - .build(); - let exporter = opentelemetry_otlp::SpanExporter::builder() .with_http() .build()?; Ok(SdkTracerProvider::builder() - .with_resource(resource) + .with_resource(otlp_resource(config)) .with_batch_exporter(exporter) .build()) } +fn otlp_resource(config: &TelemetryConfig) -> Resource { + Resource::builder() + .with_service_name(config.service_name.clone()) + .with_attribute(KeyValue::new(OTEL_SERVICE_NAMESPACE, SERVICE_NAMESPACE)) + .with_attribute(KeyValue::new( + OTEL_DEPLOYMENT_ENVIRONMENT_NAME, + config.environment.clone(), + )) + .build() +} + #[derive(Debug, Clone)] struct TraceContextJsonFormatter { inner: fmt_format::Format, @@ -773,6 +886,32 @@ mod tests { ); } + #[test] + fn thread_trace_root_span_data_uses_parentless_thread_root() { + let span = thread_trace_root_span_data( + "01234567-89ab-cdef-0123-456789abcdef", + "1111111111111111", + "slack:T:C:1782217699.671539", + SystemTime::UNIX_EPOCH, + ) + .expect("thread root span"); + + assert_eq!(span.name.as_ref(), "centaur.api_rs.thread"); + assert_eq!( + span.span_context.trace_id().to_string(), + "0123456789abcdef0123456789abcdef" + ); + assert_eq!(span.span_context.span_id().to_string(), "1111111111111111"); + assert_eq!(span.parent_span_id, SpanId::INVALID); + assert_eq!(span.start_time, SystemTime::UNIX_EPOCH); + assert!(span.end_time > span.start_time); + assert!( + span.attributes + .iter() + .any(|attribute| attribute.key.as_str() == "centaur.thread_key") + ); + } + #[test] fn prometheus_metrics_render_route_template_labels() { prometheus_handle().unwrap(); From 49dfda8f7a31be368f82851042fef3151cb8759b Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:07:30 +0300 Subject: [PATCH 11/12] Export thread trace root via OTLP protobuf --- services/api-rs/Cargo.lock | 3 + services/api-rs/Cargo.toml | 2 + .../crates/centaur-telemetry/Cargo.toml | 3 + .../crates/centaur-telemetry/src/lib.rs | 382 ++++++++++++++---- 4 files changed, 322 insertions(+), 68 deletions(-) diff --git a/services/api-rs/Cargo.lock b/services/api-rs/Cargo.lock index 05ddb2ae3..cd9aa8888 100644 --- a/services/api-rs/Cargo.lock +++ b/services/api-rs/Cargo.lock @@ -1012,9 +1012,12 @@ dependencies = [ "metrics-exporter-prometheus", "opentelemetry", "opentelemetry-otlp", + "opentelemetry-proto", "opentelemetry_sdk", + "prost", "serde_json", "thiserror", + "tokio", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/services/api-rs/Cargo.toml b/services/api-rs/Cargo.toml index e5a932f26..ff383d4a2 100644 --- a/services/api-rs/Cargo.toml +++ b/services/api-rs/Cargo.toml @@ -78,7 +78,9 @@ ratatui = "0.29" rustls = "0.23" opentelemetry = "0.32.0" opentelemetry-otlp = { version = "0.32.0", default-features = false, features = ["http-proto", "reqwest-blocking-client", "trace"] } +opentelemetry-proto = { version = "0.32.0", default-features = false, features = ["trace", "gen-tonic-messages"] } opentelemetry_sdk = { version = "0.32.1", features = ["trace"] } +prost = "0.14" metrics = "0.24.6" metrics-exporter-prometheus = { version = "0.18.3", default-features = false } sqlx = { version = "0.8.6", default-features = false, features = ["derive", "json", "macros", "migrate", "postgres", "runtime-tokio-rustls", "time"] } diff --git a/services/api-rs/crates/centaur-telemetry/Cargo.toml b/services/api-rs/crates/centaur-telemetry/Cargo.toml index 729580b65..44f20670d 100644 --- a/services/api-rs/crates/centaur-telemetry/Cargo.toml +++ b/services/api-rs/crates/centaur-telemetry/Cargo.toml @@ -10,9 +10,12 @@ metrics.workspace = true metrics-exporter-prometheus.workspace = true opentelemetry.workspace = true opentelemetry-otlp.workspace = true +opentelemetry-proto.workspace = true opentelemetry_sdk.workspace = true +prost.workspace = true serde_json.workspace = true thiserror.workspace = true +tokio.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "json", "registry"] } diff --git a/services/api-rs/crates/centaur-telemetry/src/lib.rs b/services/api-rs/crates/centaur-telemetry/src/lib.rs index 7c27d4b3f..702d54f7d 100644 --- a/services/api-rs/crates/centaur-telemetry/src/lib.rs +++ b/services/api-rs/crates/centaur-telemetry/src/lib.rs @@ -1,26 +1,31 @@ //! Shared telemetry setup for the Rust Centaur control plane. use std::{ - borrow::Cow, collections::HashSet, env, fmt as std_fmt, + io::{Read, Write}, + net::TcpStream, sync::{LazyLock, Mutex}, - time::{Duration, SystemTime}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; pub use metrics_exporter_prometheus::PrometheusHandle; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; use opentelemetry::{ - Context, InstrumentationScope, KeyValue, + Context, KeyValue, trace::{ - SpanContext, SpanId, SpanKind, Status, TraceContextExt as _, TraceFlags, TraceId, - TraceState, TracerProvider as _, + SpanContext, SpanId, TraceContextExt as _, TraceFlags, TraceId, TraceState, + TracerProvider as _, }, }; -use opentelemetry_sdk::{ - Resource, - trace::{SdkTracerProvider, SpanData, SpanEvents, SpanExporter as _, SpanLinks}, +use opentelemetry_proto::tonic::{ + collector::trace::v1::ExportTraceServiceRequest, + common::v1::{AnyValue, InstrumentationScope, KeyValue as ProtoKeyValue, any_value}, + resource::v1::Resource as ProtoResource, + trace::v1::{ResourceSpans, ScopeSpans, Span as ProtoSpan, span}, }; +use opentelemetry_sdk::{Resource, trace::SdkTracerProvider}; +use prost::Message as _; use serde_json::Value; use thiserror::Error; use tracing::{Event, Subscriber}; @@ -485,69 +490,303 @@ async fn export_thread_trace_root_span_inner( root_span_id: &str, thread_key: &str, ) -> Result<(), String> { - let config = TelemetryConfig::from_env(); - let resource = otlp_resource(&config); - let span = thread_trace_root_span_data(trace_id, root_span_id, thread_key, SystemTime::now())?; - let mut exporter = opentelemetry_otlp::SpanExporter::builder() - .with_http() - .build() - .map_err(|error| error.to_string())?; - exporter.set_resource(&resource); - exporter - .export(vec![span]) - .await - .map_err(|error| error.to_string()) + let trace_id = trace_id.to_owned(); + let root_span_id = root_span_id.to_owned(); + let thread_key = thread_key.to_owned(); + tokio::task::spawn_blocking(move || { + export_thread_trace_root_span_blocking(&trace_id, &root_span_id, &thread_key) + }) + .await + .map_err(|error| format!("thread root span export task failed: {error}"))? +} + +fn export_thread_trace_root_span_blocking( + trace_id: &str, + root_span_id: &str, + thread_key: &str, +) -> Result<(), String> { + let endpoint = otlp_traces_endpoint() + .ok_or_else(|| "OTLP traces endpoint is not configured".to_owned())?; + let request = + thread_trace_root_export_request(trace_id, root_span_id, thread_key, SystemTime::now())?; + let mut headers = otlp_export_headers(); + headers.push(("x-trace-id".to_owned(), trace_id.to_owned())); + headers.push(("x-centaur-thread-key".to_owned(), thread_key.to_owned())); + post_otlp_trace_payload(&endpoint, &headers, &request.encode_to_vec()) } -fn thread_trace_root_span_data( +fn thread_trace_root_export_request( trace_id: &str, root_span_id: &str, thread_key: &str, start_time: SystemTime, -) -> Result { - let trace_hex = - normalize_trace_id_hex(trace_id).ok_or_else(|| "invalid thread trace id".to_owned())?; - let trace_id = TraceId::from_hex(&trace_hex) - .map_err(|error| format!("invalid thread trace id: {error}"))?; - let span_id = SpanId::from_hex(root_span_id) - .map_err(|error| format!("invalid thread root span id: {error}"))?; - if trace_id == TraceId::INVALID || span_id == SpanId::INVALID { - return Err("invalid zero thread trace id or root span id".to_owned()); - } +) -> Result { + let config = TelemetryConfig::from_env(); + let trace_id = trace_id_bytes(trace_id)?; + let span_id = span_id_bytes(root_span_id)?; let end_time = start_time .checked_add(Duration::from_nanos(1)) .unwrap_or(start_time); + let start_time_unix_nano = unix_time_nanos(start_time); + let end_time_unix_nano = unix_time_nanos(end_time).max(start_time_unix_nano + 1); + + Ok(ExportTraceServiceRequest { + resource_spans: vec![ResourceSpans { + resource: Some(ProtoResource { + attributes: vec![ + proto_kv_string("service.name", &config.service_name), + proto_kv_string(OTEL_SERVICE_NAMESPACE, SERVICE_NAMESPACE), + proto_kv_string(OTEL_DEPLOYMENT_ENVIRONMENT_NAME, &config.environment), + proto_kv_string("deployment.environment", &config.environment), + ], + ..Default::default() + }), + scope_spans: vec![ScopeSpans { + scope: Some(InstrumentationScope { + name: "centaur.api-rs".to_owned(), + version: env!("CARGO_PKG_VERSION").to_owned(), + ..Default::default() + }), + spans: vec![ProtoSpan { + trace_id, + span_id, + parent_span_id: Vec::new(), + name: "centaur.api_rs.thread".to_owned(), + kind: span::SpanKind::Internal as i32, + start_time_unix_nano, + end_time_unix_nano, + attributes: vec![ + proto_kv_string(FIELD_COMPONENT, "session_runtime"), + proto_kv_string(FIELD_EVENT, "thread_trace_root"), + proto_kv_string("centaur.thread_key", thread_key), + proto_kv_string(FIELD_THREAD_KEY, thread_key), + ], + flags: 1, + ..Default::default() + }], + ..Default::default() + }], + ..Default::default() + }], + }) +} - Ok(SpanData { - span_context: SpanContext::new( - trace_id, - span_id, - TraceFlags::SAMPLED, - false, - TraceState::default(), - ), - parent_span_id: SpanId::INVALID, - parent_span_is_remote: false, - span_kind: SpanKind::Internal, - name: Cow::Borrowed("centaur.api_rs.thread"), - start_time, - end_time, - attributes: vec![ - KeyValue::new(FIELD_COMPONENT, "session_runtime"), - KeyValue::new(FIELD_EVENT, "thread_trace_root"), - KeyValue::new("centaur.thread_key", thread_key.to_owned()), - KeyValue::new(FIELD_THREAD_KEY, thread_key.to_owned()), - ], - dropped_attributes_count: 0, - events: SpanEvents::default(), - links: SpanLinks::default(), - status: Status::Unset, - instrumentation_scope: InstrumentationScope::builder("centaur.api-rs") - .with_version(env!("CARGO_PKG_VERSION")) - .build(), +fn post_otlp_trace_payload( + endpoint: &str, + headers: &[(String, String)], + body: &[u8], +) -> Result<(), String> { + let target = OtlpHttpTarget::parse(endpoint)?; + let mut upstream = TcpStream::connect((target.host.as_str(), target.port)) + .map_err(|error| format!("failed to connect to OTLP endpoint: {error}"))?; + upstream + .set_read_timeout(Some(Duration::from_secs(10))) + .map_err(|error| format!("failed to set OTLP read timeout: {error}"))?; + upstream + .set_write_timeout(Some(Duration::from_secs(10))) + .map_err(|error| format!("failed to set OTLP write timeout: {error}"))?; + write!( + upstream, + "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/x-protobuf\r\nContent-Length: {}\r\nConnection: close\r\n", + target.path, + target.host_header, + body.len() + ) + .map_err(|error| format!("failed to write OTLP request headers: {error}"))?; + for (name, value) in headers { + if matches!( + name.as_str(), + "authorization" | "x-trace-id" | "x-centaur-thread-key" + ) { + write!(upstream, "{name}: {value}\r\n") + .map_err(|error| format!("failed to write OTLP header {name}: {error}"))?; + } + } + upstream + .write_all(b"\r\n") + .and_then(|()| upstream.write_all(body)) + .and_then(|()| upstream.flush()) + .map_err(|error| format!("failed to write OTLP request body: {error}"))?; + + let mut response = Vec::new(); + upstream + .read_to_end(&mut response) + .map_err(|error| format!("failed to read OTLP response: {error}"))?; + let status = http_status_code(&response).unwrap_or(0); + if (200..300).contains(&status) { + Ok(()) + } else { + Err(format!( + "OTLP trace export failed with HTTP status {status}" + )) + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +struct OtlpHttpTarget { + host: String, + port: u16, + host_header: String, + path: String, +} + +impl OtlpHttpTarget { + fn parse(endpoint: &str) -> Result { + let endpoint = endpoint.trim(); + let rest = endpoint.strip_prefix("http://").ok_or_else(|| { + "only http OTLP endpoints are supported for root span export".to_owned() + })?; + let (host_port, path) = match rest.split_once('/') { + Some((host_port, path)) => (host_port, format!("/{path}")), + None => (rest, "/v1/traces".to_owned()), + }; + if host_port.is_empty() { + return Err("OTLP endpoint host is empty".to_owned()); + } + let (host, port) = match host_port.rsplit_once(':') { + Some((host, port)) => { + let port = port + .parse::() + .map_err(|error| format!("invalid OTLP endpoint port: {error}"))?; + (host.to_owned(), port) + } + None => (host_port.to_owned(), 80), + }; + if host.is_empty() { + return Err("OTLP endpoint host is empty".to_owned()); + } + Ok(Self { + host, + port, + host_header: host_port.to_owned(), + path, + }) + } +} + +fn http_status_code(response: &[u8]) -> Option { + let line = String::from_utf8_lossy(response).lines().next()?.to_owned(); + let mut parts = line.split_whitespace(); + let _version = parts.next()?; + parts.next()?.parse().ok() +} + +fn otlp_traces_endpoint() -> Option { + first_nonempty_env(&["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"]).or_else(|| { + first_nonempty_env(&["OTEL_EXPORTER_OTLP_ENDPOINT"]).map(|endpoint| { + if endpoint.ends_with("/v1/traces") { + endpoint + } else { + format!("{}/v1/traces", endpoint.trim_end_matches('/')) + } + }) }) } +fn otlp_export_headers() -> Vec<(String, String)> { + first_nonempty_env(&[ + "OTEL_EXPORTER_OTLP_TRACES_HEADERS", + "OTEL_EXPORTER_OTLP_HEADERS", + ]) + .map(|raw| parse_otlp_headers(&raw)) + .unwrap_or_default() +} + +fn parse_otlp_headers(raw: &str) -> Vec<(String, String)> { + raw.split(',') + .filter_map(|part| { + let (name, value) = part.split_once('=')?; + let name = name.trim().to_ascii_lowercase(); + if name.is_empty() { + return None; + } + Some((name, percent_decode(value.trim()))) + }) + .collect() +} + +fn percent_decode(value: &str) -> String { + let bytes = value.as_bytes(); + let mut out = Vec::with_capacity(bytes.len()); + let mut index = 0; + while index < bytes.len() { + if bytes[index] == b'%' + && index + 2 < bytes.len() + && let (Some(high), Some(low)) = + (hex_value(bytes[index + 1]), hex_value(bytes[index + 2])) + { + out.push(high << 4 | low); + index += 3; + continue; + } + out.push(bytes[index]); + index += 1; + } + String::from_utf8_lossy(&out).into_owned() +} + +fn hex_value(value: u8) -> Option { + match value { + b'0'..=b'9' => Some(value - b'0'), + b'a'..=b'f' => Some(value - b'a' + 10), + b'A'..=b'F' => Some(value - b'A' + 10), + _ => None, + } +} + +fn proto_kv_string(key: &str, value: &str) -> ProtoKeyValue { + ProtoKeyValue { + key: key.to_owned(), + value: Some(AnyValue { + value: Some(any_value::Value::StringValue(value.to_owned())), + }), + ..Default::default() + } +} + +fn trace_id_bytes(trace_id: &str) -> Result, String> { + let trace_hex = + normalize_trace_id_hex(trace_id).ok_or_else(|| "invalid thread trace id".to_owned())?; + let bytes = hex_to_bytes(&trace_hex)?; + if bytes.len() != 16 || bytes.iter().all(|byte| *byte == 0) { + return Err("invalid zero thread trace id".to_owned()); + } + Ok(bytes) +} + +fn span_id_bytes(span_id: &str) -> Result, String> { + let span_id = span_id.trim(); + if span_id.len() != 16 || !span_id.chars().all(|ch| ch.is_ascii_hexdigit()) { + return Err("invalid thread root span id".to_owned()); + } + let bytes = hex_to_bytes(span_id)?; + if bytes.len() != 8 || bytes.iter().all(|byte| *byte == 0) { + return Err("invalid zero thread root span id".to_owned()); + } + Ok(bytes) +} + +fn hex_to_bytes(hex: &str) -> Result, String> { + if hex.len() % 2 != 0 { + return Err("hex value must have even length".to_owned()); + } + let bytes = hex.as_bytes(); + let mut out = Vec::with_capacity(hex.len() / 2); + for index in (0..bytes.len()).step_by(2) { + let high = hex_value(bytes[index]).ok_or_else(|| "invalid hex digit".to_owned())?; + let low = hex_value(bytes[index + 1]).ok_or_else(|| "invalid hex digit".to_owned())?; + out.push(high << 4 | low); + } + Ok(out) +} + +fn unix_time_nanos(time: SystemTime) -> u64 { + time.duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + .min(u128::from(u64::MAX)) as u64 +} + fn remote_parent_context(trace_id: &str, parent_span_id: &str) -> Option { let trace_id = normalize_trace_id_hex(trace_id)?; let trace_id = TraceId::from_hex(&trace_id).ok()?; @@ -887,28 +1126,35 @@ mod tests { } #[test] - fn thread_trace_root_span_data_uses_parentless_thread_root() { - let span = thread_trace_root_span_data( + fn thread_trace_root_export_request_uses_parentless_thread_root() { + let request = thread_trace_root_export_request( "01234567-89ab-cdef-0123-456789abcdef", "1111111111111111", "slack:T:C:1782217699.671539", SystemTime::UNIX_EPOCH, ) .expect("thread root span"); + let span = &request.resource_spans[0].scope_spans[0].spans[0]; - assert_eq!(span.name.as_ref(), "centaur.api_rs.thread"); assert_eq!( - span.span_context.trace_id().to_string(), - "0123456789abcdef0123456789abcdef" + span.trace_id, + vec![ + 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, + 0xcd, 0xef, + ] + ); + assert_eq!( + span.span_id, + vec![0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11] ); - assert_eq!(span.span_context.span_id().to_string(), "1111111111111111"); - assert_eq!(span.parent_span_id, SpanId::INVALID); - assert_eq!(span.start_time, SystemTime::UNIX_EPOCH); - assert!(span.end_time > span.start_time); + assert!(span.parent_span_id.is_empty()); + assert_eq!(span.name, "centaur.api_rs.thread"); + assert_eq!(span.start_time_unix_nano, 0); + assert!(span.end_time_unix_nano > span.start_time_unix_nano); assert!( span.attributes .iter() - .any(|attribute| attribute.key.as_str() == "centaur.thread_key") + .any(|attribute| attribute.key == "centaur.thread_key") ); } From 384a8ed80ab9a021bbb7012fb6dce01836bb96c4 Mon Sep 17 00:00:00 2001 From: Zygimantas <5236121+Zygimantass@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:14:18 +0300 Subject: [PATCH 12/12] Fix telemetry clippy lint --- services/api-rs/crates/centaur-telemetry/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/api-rs/crates/centaur-telemetry/src/lib.rs b/services/api-rs/crates/centaur-telemetry/src/lib.rs index 702d54f7d..1bd5fe75d 100644 --- a/services/api-rs/crates/centaur-telemetry/src/lib.rs +++ b/services/api-rs/crates/centaur-telemetry/src/lib.rs @@ -767,7 +767,7 @@ fn span_id_bytes(span_id: &str) -> Result, String> { } fn hex_to_bytes(hex: &str) -> Result, String> { - if hex.len() % 2 != 0 { + if !hex.len().is_multiple_of(2) { return Err("hex value must have even length".to_owned()); } let bytes = hex.as_bytes();