diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 54be164e..0af88e72 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -16,7 +16,7 @@ //! - [`OpenInferenceSubscriber`] exposes a NeMo Relay [`EventSubscriberFn`] and //! convenience `register` / `deregister` / `force_flush` / `shutdown` methods -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -45,6 +45,8 @@ use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span}; use serde::Serialize; use uuid::Uuid; +const COMPLETED_SPAN_CONTEXT_LIMIT: usize = 4096; + #[cfg(target_arch = "wasm32")] use async_trait::async_trait; #[cfg(target_arch = "wasm32")] @@ -500,6 +502,8 @@ struct ActiveSpan { struct OpenInferenceEventProcessor { active_spans: HashMap, + completed_span_contexts: HashMap, + completed_span_order: VecDeque, provider: SdkTracerProvider, tracer: SdkTracer, } @@ -509,6 +513,8 @@ impl OpenInferenceEventProcessor { let tracer = provider.tracer(instrumentation_scope); Self { active_spans: HashMap::new(), + completed_span_contexts: HashMap::new(), + completed_span_order: VecDeque::new(), provider, tracer, } @@ -535,6 +541,7 @@ impl OpenInferenceEventProcessor { } fn process_start(&mut self, event: &Event) { + self.remove_completed_span_context(event.uuid()); let mut span = self .tracer .span_builder(span_name(event)) @@ -551,6 +558,7 @@ impl OpenInferenceEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + self.record_completed_span_context(event.uuid(), active_span.span_context.clone()); super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); active_span @@ -587,10 +595,13 @@ impl OpenInferenceEventProcessor { } fn parent_context(&self, event: &Event) -> Context { - self.find_parent_span(event) - .map(|active_span| { - Context::new().with_remote_span_context(active_span.span_context.clone()) - }) + if let Some(active_span) = self.find_parent_span(event) { + return Context::new().with_remote_span_context(active_span.span_context.clone()); + } + event + .parent_uuid() + .and_then(|uuid| self.completed_span_contexts.get(&uuid)) + .map(|span_context| Context::new().with_remote_span_context(span_context.clone())) .unwrap_or_default() } @@ -609,6 +620,27 @@ impl OpenInferenceEventProcessor { self.parent_span_uuid(event) .and_then(|uuid| self.active_spans.get_mut(&uuid)) } + + fn remove_completed_span_context(&mut self, uuid: Uuid) { + self.completed_span_contexts.remove(&uuid); + self.completed_span_order + .retain(|completed_uuid| *completed_uuid != uuid); + } + + fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) { + if self + .completed_span_contexts + .insert(uuid, span_context) + .is_none() + { + self.completed_span_order.push_back(uuid); + } + while self.completed_span_order.len() > COMPLETED_SPAN_CONTEXT_LIMIT { + if let Some(expired) = self.completed_span_order.pop_front() { + self.completed_span_contexts.remove(&expired); + } + } + } } fn span_kind(event: &Event) -> SpanKind { diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 74fefe59..37628ec4 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -16,7 +16,7 @@ //! - [`OpenTelemetrySubscriber`] exposes a NeMo Relay [`EventSubscriberFn`] and //! convenience `register` / `deregister` / `force_flush` / `shutdown` methods -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -39,6 +39,8 @@ use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span}; use serde::Serialize; use uuid::Uuid; +const COMPLETED_SPAN_CONTEXT_LIMIT: usize = 4096; + #[cfg(target_arch = "wasm32")] use async_trait::async_trait; #[cfg(target_arch = "wasm32")] @@ -494,6 +496,8 @@ struct ActiveSpan { struct OtelEventProcessor { active_spans: HashMap, + completed_span_contexts: HashMap, + completed_span_order: VecDeque, provider: SdkTracerProvider, tracer: SdkTracer, } @@ -503,6 +507,8 @@ impl OtelEventProcessor { let tracer = provider.tracer(instrumentation_scope); Self { active_spans: HashMap::new(), + completed_span_contexts: HashMap::new(), + completed_span_order: VecDeque::new(), provider, tracer, } @@ -529,6 +535,7 @@ impl OtelEventProcessor { } fn process_start(&mut self, event: &Event) { + self.remove_completed_span_context(event.uuid()); let mut span = self .tracer .span_builder(span_name(event)) @@ -545,6 +552,7 @@ impl OtelEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + self.record_completed_span_context(event.uuid(), active_span.span_context.clone()); super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); @@ -578,10 +586,13 @@ impl OtelEventProcessor { } fn parent_context(&self, event: &Event) -> Context { - self.find_parent_span(event) - .map(|active_span| { - Context::new().with_remote_span_context(active_span.span_context.clone()) - }) + if let Some(active_span) = self.find_parent_span(event) { + return Context::new().with_remote_span_context(active_span.span_context.clone()); + } + event + .parent_uuid() + .and_then(|uuid| self.completed_span_contexts.get(&uuid)) + .map(|span_context| Context::new().with_remote_span_context(span_context.clone())) .unwrap_or_default() } @@ -600,6 +611,27 @@ impl OtelEventProcessor { self.parent_span_uuid(event) .and_then(|uuid| self.active_spans.get_mut(&uuid)) } + + fn remove_completed_span_context(&mut self, uuid: Uuid) { + self.completed_span_contexts.remove(&uuid); + self.completed_span_order + .retain(|completed_uuid| *completed_uuid != uuid); + } + + fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) { + if self + .completed_span_contexts + .insert(uuid, span_context) + .is_none() + { + self.completed_span_order.push_back(uuid); + } + while self.completed_span_order.len() > COMPLETED_SPAN_CONTEXT_LIMIT { + if let Some(expired) = self.completed_span_order.pop_front() { + self.completed_span_contexts.remove(&expired); + } + } + } } fn span_kind(event: &Event) -> SpanKind { diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 22d0698f..b2cac182 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -1751,6 +1751,200 @@ fn orphan_marks_become_zero_duration_spans() { ); } +#[test] +fn late_parented_marks_reuse_completed_parent_trace_context() { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + processor.process(&make_mark_event( + Some(tool_uuid), + "visor.tool_output_compressed", + Some(json!({"estimated_tokens_saved": 42})), + )); + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 2); + let tool_span = spans + .iter() + .find(|span| span.name.as_ref() == "terminal") + .unwrap(); + let mark_span = spans + .iter() + .find(|span| span.name.as_ref() == "mark:visor.tool_output_compressed") + .unwrap(); + + assert_eq!( + mark_span.span_context.trace_id(), + tool_span.span_context.trace_id() + ); + assert_eq!(mark_span.parent_span_id, tool_span.span_context.span_id()); + assert!(!mark_span.parent_span_is_remote); + + let attributes = attr_map(&mark_span.attributes); + assert_eq!( + attributes.get("nemo_relay.mark.orphan"), + Some(&"true".to_string()) + ); + assert_eq!( + attributes.get("openinference.span.kind"), + Some(&"CHAIN".to_string()) + ); +} + +#[test] +fn completed_span_context_cache_evicts_oldest_parent_contexts() { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let span_count = COMPLETED_SPAN_CONTEXT_LIMIT + 2; + let mut completed_uuids = Vec::with_capacity(span_count); + + for index in 0..span_count { + let uuid = Uuid::now_v7(); + completed_uuids.push(uuid); + let name = format!("completed-{index}"); + processor.process(&make_start_event(uuid, None, &name, ScopeType::Tool, None)); + processor.process(&make_end_event( + uuid, + None, + &name, + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + } + + let oldest_uuid = completed_uuids[0]; + let recent_uuid = completed_uuids[span_count - 1]; + assert!(!processor.completed_span_contexts.contains_key(&oldest_uuid)); + assert!(processor.completed_span_contexts.contains_key(&recent_uuid)); + + processor.process(&make_mark_event( + Some(oldest_uuid), + "oldest-after-eviction", + Some(json!({"case": "oldest"})), + )); + processor.process(&make_mark_event( + Some(recent_uuid), + "recent-after-eviction", + Some(json!({"case": "recent"})), + )); + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), span_count + 2); + + let oldest_parent = spans + .iter() + .find(|span| span.name.as_ref() == "completed-0") + .unwrap(); + let recent_parent_name = format!("completed-{}", span_count - 1); + let recent_parent = spans + .iter() + .find(|span| span.name.as_ref() == recent_parent_name.as_str()) + .unwrap(); + let oldest_mark = spans + .iter() + .find(|span| span.name.as_ref() == "mark:oldest-after-eviction") + .unwrap(); + let recent_mark = spans + .iter() + .find(|span| span.name.as_ref() == "mark:recent-after-eviction") + .unwrap(); + + assert_ne!( + oldest_mark.parent_span_id, + oldest_parent.span_context.span_id() + ); + assert_ne!( + oldest_mark.span_context.trace_id(), + oldest_parent.span_context.trace_id() + ); + assert_eq!( + recent_mark.span_context.trace_id(), + recent_parent.span_context.trace_id() + ); + assert_eq!( + recent_mark.parent_span_id, + recent_parent.span_context.span_id() + ); + assert!(!recent_mark.parent_span_is_remote); +} + +#[test] +fn process_start_removes_completed_span_order_entry() { + let (provider, _exporter) = make_provider(); + let mut processor = OpenInferenceEventProcessor::new(provider, "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + assert!(!processor.completed_span_contexts.contains_key(&tool_uuid)); + assert!(!processor.completed_span_order.contains(&tool_uuid)); + + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); +} + #[test] fn semantic_scope_type_and_input_value_follow_event_variants() { let llm_with_content = make_start_event( diff --git a/crates/core/tests/unit/observability/otel_tests.rs b/crates/core/tests/unit/observability/otel_tests.rs index 41a5e757..ee5017ae 100644 --- a/crates/core/tests/unit/observability/otel_tests.rs +++ b/crates/core/tests/unit/observability/otel_tests.rs @@ -752,6 +752,116 @@ fn orphan_marks_become_zero_duration_spans() { ); } +#[test] +fn late_parented_marks_reuse_completed_parent_trace_context() { + let (provider, exporter) = make_provider(); + let mut processor = OtelEventProcessor::new(provider.clone(), "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + processor.process(&make_mark_event( + Some(tool_uuid), + "visor.tool_output_compressed", + Some(json!({"estimated_tokens_saved": 42})), + )); + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 2); + let tool_span = spans + .iter() + .find(|span| span.name.as_ref() == "terminal") + .unwrap(); + let mark_span = spans + .iter() + .find(|span| span.name.as_ref() == "mark:visor.tool_output_compressed") + .unwrap(); + + assert_eq!( + mark_span.span_context.trace_id(), + tool_span.span_context.trace_id() + ); + assert_eq!(mark_span.parent_span_id, tool_span.span_context.span_id()); + assert!(!mark_span.parent_span_is_remote); + + let attributes = attr_map(&mark_span.attributes); + assert_eq!( + attributes.get("nemo_relay.mark.orphan"), + Some(&"true".to_string()) + ); +} + +#[test] +fn process_start_removes_completed_span_order_entry() { + let (provider, _exporter) = make_provider(); + let mut processor = OtelEventProcessor::new(provider, "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + assert!(!processor.completed_span_contexts.contains_key(&tool_uuid)); + assert!(!processor.completed_span_order.contains(&tool_uuid)); + + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); +} + #[test] fn semantic_scope_type_and_span_kind_follow_event_variants() { let scope_event = make_start_event(