From 2188e3d38ceba11765bd5ba4092b3362427720d0 Mon Sep 17 00:00:00 2001 From: Bryan Bednarski Date: Tue, 23 Jun 2026 18:44:03 -0700 Subject: [PATCH 1/2] fix: keep late mark spans in parent traces Cache completed span contexts in the OpenTelemetry and OpenInference event processors so mark events that arrive after their parent scope has ended still inherit the original trace context. Keep true orphan marks on the existing fallback path, bound the completed-context cache, and add regression coverage for late parented marks after tool scope completion. Signed-off-by: Bryan Bednarski --- .../core/src/observability/openinference.rs | 36 ++++++++++-- crates/core/src/observability/otel.rs | 36 ++++++++++-- .../unit/observability/openinference_tests.rs | 57 +++++++++++++++++++ .../tests/unit/observability/otel_tests.rs | 52 +++++++++++++++++ 4 files changed, 171 insertions(+), 10 deletions(-) diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index 54be164e..a724a020 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -16,7 +16,7 @@ //! - [`OpenInferenceSubscriber`] exposes a NeMo Relay [`EventSubscriberFn`] and //! convenience `register` / `deregister` / `force_flush` / `shutdown` methods -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -45,6 +45,8 @@ use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span}; use serde::Serialize; use uuid::Uuid; +const COMPLETED_SPAN_CONTEXT_LIMIT: usize = 4096; + #[cfg(target_arch = "wasm32")] use async_trait::async_trait; #[cfg(target_arch = "wasm32")] @@ -500,6 +502,8 @@ struct ActiveSpan { struct OpenInferenceEventProcessor { active_spans: HashMap, + completed_span_contexts: HashMap, + completed_span_order: VecDeque, provider: SdkTracerProvider, tracer: SdkTracer, } @@ -509,6 +513,8 @@ impl OpenInferenceEventProcessor { let tracer = provider.tracer(instrumentation_scope); Self { active_spans: HashMap::new(), + completed_span_contexts: HashMap::new(), + completed_span_order: VecDeque::new(), provider, tracer, } @@ -535,6 +541,7 @@ impl OpenInferenceEventProcessor { } fn process_start(&mut self, event: &Event) { + self.completed_span_contexts.remove(&event.uuid()); let mut span = self .tracer .span_builder(span_name(event)) @@ -551,6 +558,7 @@ impl OpenInferenceEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + self.record_completed_span_context(event.uuid(), active_span.span_context.clone()); super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); active_span @@ -587,10 +595,13 @@ impl OpenInferenceEventProcessor { } fn parent_context(&self, event: &Event) -> Context { - self.find_parent_span(event) - .map(|active_span| { - Context::new().with_remote_span_context(active_span.span_context.clone()) - }) + if let Some(active_span) = self.find_parent_span(event) { + return Context::new().with_remote_span_context(active_span.span_context.clone()); + } + event + .parent_uuid() + .and_then(|uuid| self.completed_span_contexts.get(&uuid)) + .map(|span_context| Context::new().with_remote_span_context(span_context.clone())) .unwrap_or_default() } @@ -609,6 +620,21 @@ impl OpenInferenceEventProcessor { self.parent_span_uuid(event) .and_then(|uuid| self.active_spans.get_mut(&uuid)) } + + fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) { + if self + .completed_span_contexts + .insert(uuid, span_context) + .is_none() + { + self.completed_span_order.push_back(uuid); + } + while self.completed_span_order.len() > COMPLETED_SPAN_CONTEXT_LIMIT { + if let Some(expired) = self.completed_span_order.pop_front() { + self.completed_span_contexts.remove(&expired); + } + } + } } fn span_kind(event: &Event) -> SpanKind { diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index 74fefe59..b3f4b892 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -16,7 +16,7 @@ //! - [`OpenTelemetrySubscriber`] exposes a NeMo Relay [`EventSubscriberFn`] and //! convenience `register` / `deregister` / `force_flush` / `shutdown` methods -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -39,6 +39,8 @@ use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span}; use serde::Serialize; use uuid::Uuid; +const COMPLETED_SPAN_CONTEXT_LIMIT: usize = 4096; + #[cfg(target_arch = "wasm32")] use async_trait::async_trait; #[cfg(target_arch = "wasm32")] @@ -494,6 +496,8 @@ struct ActiveSpan { struct OtelEventProcessor { active_spans: HashMap, + completed_span_contexts: HashMap, + completed_span_order: VecDeque, provider: SdkTracerProvider, tracer: SdkTracer, } @@ -503,6 +507,8 @@ impl OtelEventProcessor { let tracer = provider.tracer(instrumentation_scope); Self { active_spans: HashMap::new(), + completed_span_contexts: HashMap::new(), + completed_span_order: VecDeque::new(), provider, tracer, } @@ -529,6 +535,7 @@ impl OtelEventProcessor { } fn process_start(&mut self, event: &Event) { + self.completed_span_contexts.remove(&event.uuid()); let mut span = self .tracer .span_builder(span_name(event)) @@ -545,6 +552,7 @@ impl OtelEventProcessor { let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else { return; }; + self.record_completed_span_context(event.uuid(), active_span.span_context.clone()); super::set_span_status_from_event_metadata(&mut active_span.span, event); active_span.span.set_attributes(end_attributes(event)); @@ -578,10 +586,13 @@ impl OtelEventProcessor { } fn parent_context(&self, event: &Event) -> Context { - self.find_parent_span(event) - .map(|active_span| { - Context::new().with_remote_span_context(active_span.span_context.clone()) - }) + if let Some(active_span) = self.find_parent_span(event) { + return Context::new().with_remote_span_context(active_span.span_context.clone()); + } + event + .parent_uuid() + .and_then(|uuid| self.completed_span_contexts.get(&uuid)) + .map(|span_context| Context::new().with_remote_span_context(span_context.clone())) .unwrap_or_default() } @@ -600,6 +611,21 @@ impl OtelEventProcessor { self.parent_span_uuid(event) .and_then(|uuid| self.active_spans.get_mut(&uuid)) } + + fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) { + if self + .completed_span_contexts + .insert(uuid, span_context) + .is_none() + { + self.completed_span_order.push_back(uuid); + } + while self.completed_span_order.len() > COMPLETED_SPAN_CONTEXT_LIMIT { + if let Some(expired) = self.completed_span_order.pop_front() { + self.completed_span_contexts.remove(&expired); + } + } + } } fn span_kind(event: &Event) -> SpanKind { diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 22d0698f..38ca6dd8 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -1751,6 +1751,63 @@ fn orphan_marks_become_zero_duration_spans() { ); } +#[test] +fn late_parented_marks_reuse_completed_parent_trace_context() { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + processor.process(&make_mark_event( + Some(tool_uuid), + "visor.tool_output_compressed", + Some(json!({"estimated_tokens_saved": 42})), + )); + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 2); + let tool_span = spans + .iter() + .find(|span| span.name.as_ref() == "terminal") + .unwrap(); + let mark_span = spans + .iter() + .find(|span| span.name.as_ref() == "mark:visor.tool_output_compressed") + .unwrap(); + + assert_eq!( + mark_span.span_context.trace_id(), + tool_span.span_context.trace_id() + ); + assert_eq!(mark_span.parent_span_id, tool_span.span_context.span_id()); + assert!(!mark_span.parent_span_is_remote); + + let attributes = attr_map(&mark_span.attributes); + assert_eq!( + attributes.get("nemo_relay.mark.orphan"), + Some(&"true".to_string()) + ); + assert_eq!( + attributes.get("openinference.span.kind"), + Some(&"CHAIN".to_string()) + ); +} + #[test] fn semantic_scope_type_and_input_value_follow_event_variants() { let llm_with_content = make_start_event( diff --git a/crates/core/tests/unit/observability/otel_tests.rs b/crates/core/tests/unit/observability/otel_tests.rs index 41a5e757..6585d758 100644 --- a/crates/core/tests/unit/observability/otel_tests.rs +++ b/crates/core/tests/unit/observability/otel_tests.rs @@ -752,6 +752,58 @@ fn orphan_marks_become_zero_duration_spans() { ); } +#[test] +fn late_parented_marks_reuse_completed_parent_trace_context() { + let (provider, exporter) = make_provider(); + let mut processor = OtelEventProcessor::new(provider.clone(), "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + processor.process(&make_mark_event( + Some(tool_uuid), + "visor.tool_output_compressed", + Some(json!({"estimated_tokens_saved": 42})), + )); + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), 2); + let tool_span = spans + .iter() + .find(|span| span.name.as_ref() == "terminal") + .unwrap(); + let mark_span = spans + .iter() + .find(|span| span.name.as_ref() == "mark:visor.tool_output_compressed") + .unwrap(); + + assert_eq!( + mark_span.span_context.trace_id(), + tool_span.span_context.trace_id() + ); + assert_eq!(mark_span.parent_span_id, tool_span.span_context.span_id()); + assert!(!mark_span.parent_span_is_remote); + + let attributes = attr_map(&mark_span.attributes); + assert_eq!( + attributes.get("nemo_relay.mark.orphan"), + Some(&"true".to_string()) + ); +} + #[test] fn semantic_scope_type_and_span_kind_follow_event_variants() { let scope_event = make_start_event( From 080c5203880b593ce95b69d52beb6cd34ef44cf2 Mon Sep 17 00:00:00 2001 From: Bryan Bednarski Date: Tue, 23 Jun 2026 21:25:48 -0700 Subject: [PATCH 2/2] fix: keep completed span context cache synchronized Remove completed span UUIDs from both the context map and FIFO order queue when a span restarts, preventing stale queue entries from later evicting fresh parent contexts. Add regression coverage for restart synchronization and OpenInference completed-context cache eviction behavior. Signed-off-by: Bryan Bednarski --- .../core/src/observability/openinference.rs | 8 +- crates/core/src/observability/otel.rs | 8 +- .../unit/observability/openinference_tests.rs | 137 ++++++++++++++++++ .../tests/unit/observability/otel_tests.rs | 58 ++++++++ 4 files changed, 209 insertions(+), 2 deletions(-) diff --git a/crates/core/src/observability/openinference.rs b/crates/core/src/observability/openinference.rs index a724a020..0af88e72 100644 --- a/crates/core/src/observability/openinference.rs +++ b/crates/core/src/observability/openinference.rs @@ -541,7 +541,7 @@ impl OpenInferenceEventProcessor { } fn process_start(&mut self, event: &Event) { - self.completed_span_contexts.remove(&event.uuid()); + self.remove_completed_span_context(event.uuid()); let mut span = self .tracer .span_builder(span_name(event)) @@ -621,6 +621,12 @@ impl OpenInferenceEventProcessor { .and_then(|uuid| self.active_spans.get_mut(&uuid)) } + fn remove_completed_span_context(&mut self, uuid: Uuid) { + self.completed_span_contexts.remove(&uuid); + self.completed_span_order + .retain(|completed_uuid| *completed_uuid != uuid); + } + fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) { if self .completed_span_contexts diff --git a/crates/core/src/observability/otel.rs b/crates/core/src/observability/otel.rs index b3f4b892..37628ec4 100644 --- a/crates/core/src/observability/otel.rs +++ b/crates/core/src/observability/otel.rs @@ -535,7 +535,7 @@ impl OtelEventProcessor { } fn process_start(&mut self, event: &Event) { - self.completed_span_contexts.remove(&event.uuid()); + self.remove_completed_span_context(event.uuid()); let mut span = self .tracer .span_builder(span_name(event)) @@ -612,6 +612,12 @@ impl OtelEventProcessor { .and_then(|uuid| self.active_spans.get_mut(&uuid)) } + fn remove_completed_span_context(&mut self, uuid: Uuid) { + self.completed_span_contexts.remove(&uuid); + self.completed_span_order + .retain(|completed_uuid| *completed_uuid != uuid); + } + fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) { if self .completed_span_contexts diff --git a/crates/core/tests/unit/observability/openinference_tests.rs b/crates/core/tests/unit/observability/openinference_tests.rs index 38ca6dd8..b2cac182 100644 --- a/crates/core/tests/unit/observability/openinference_tests.rs +++ b/crates/core/tests/unit/observability/openinference_tests.rs @@ -1808,6 +1808,143 @@ fn late_parented_marks_reuse_completed_parent_trace_context() { ); } +#[test] +fn completed_span_context_cache_evicts_oldest_parent_contexts() { + let (provider, exporter) = make_provider(); + let mut processor = + OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string()); + let span_count = COMPLETED_SPAN_CONTEXT_LIMIT + 2; + let mut completed_uuids = Vec::with_capacity(span_count); + + for index in 0..span_count { + let uuid = Uuid::now_v7(); + completed_uuids.push(uuid); + let name = format!("completed-{index}"); + processor.process(&make_start_event(uuid, None, &name, ScopeType::Tool, None)); + processor.process(&make_end_event( + uuid, + None, + &name, + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + } + + let oldest_uuid = completed_uuids[0]; + let recent_uuid = completed_uuids[span_count - 1]; + assert!(!processor.completed_span_contexts.contains_key(&oldest_uuid)); + assert!(processor.completed_span_contexts.contains_key(&recent_uuid)); + + processor.process(&make_mark_event( + Some(oldest_uuid), + "oldest-after-eviction", + Some(json!({"case": "oldest"})), + )); + processor.process(&make_mark_event( + Some(recent_uuid), + "recent-after-eviction", + Some(json!({"case": "recent"})), + )); + processor.force_flush().unwrap(); + + let spans = exporter.get_finished_spans().unwrap(); + assert_eq!(spans.len(), span_count + 2); + + let oldest_parent = spans + .iter() + .find(|span| span.name.as_ref() == "completed-0") + .unwrap(); + let recent_parent_name = format!("completed-{}", span_count - 1); + let recent_parent = spans + .iter() + .find(|span| span.name.as_ref() == recent_parent_name.as_str()) + .unwrap(); + let oldest_mark = spans + .iter() + .find(|span| span.name.as_ref() == "mark:oldest-after-eviction") + .unwrap(); + let recent_mark = spans + .iter() + .find(|span| span.name.as_ref() == "mark:recent-after-eviction") + .unwrap(); + + assert_ne!( + oldest_mark.parent_span_id, + oldest_parent.span_context.span_id() + ); + assert_ne!( + oldest_mark.span_context.trace_id(), + oldest_parent.span_context.trace_id() + ); + assert_eq!( + recent_mark.span_context.trace_id(), + recent_parent.span_context.trace_id() + ); + assert_eq!( + recent_mark.parent_span_id, + recent_parent.span_context.span_id() + ); + assert!(!recent_mark.parent_span_is_remote); +} + +#[test] +fn process_start_removes_completed_span_order_entry() { + let (provider, _exporter) = make_provider(); + let mut processor = OpenInferenceEventProcessor::new(provider, "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + assert!(!processor.completed_span_contexts.contains_key(&tool_uuid)); + assert!(!processor.completed_span_order.contains(&tool_uuid)); + + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); +} + #[test] fn semantic_scope_type_and_input_value_follow_event_variants() { let llm_with_content = make_start_event( diff --git a/crates/core/tests/unit/observability/otel_tests.rs b/crates/core/tests/unit/observability/otel_tests.rs index 6585d758..ee5017ae 100644 --- a/crates/core/tests/unit/observability/otel_tests.rs +++ b/crates/core/tests/unit/observability/otel_tests.rs @@ -804,6 +804,64 @@ fn late_parented_marks_reuse_completed_parent_trace_context() { ); } +#[test] +fn process_start_removes_completed_span_order_entry() { + let (provider, _exporter) = make_provider(); + let mut processor = OtelEventProcessor::new(provider, "test-scope".to_string()); + let tool_uuid = Uuid::now_v7(); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); + + processor.process(&make_start_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + None, + )); + assert!(!processor.completed_span_contexts.contains_key(&tool_uuid)); + assert!(!processor.completed_span_order.contains(&tool_uuid)); + + processor.process(&make_end_event( + tool_uuid, + None, + "terminal", + ScopeType::Tool, + Some(json!({"status": "done"})), + )); + assert!(processor.completed_span_contexts.contains_key(&tool_uuid)); + assert_eq!( + processor + .completed_span_order + .iter() + .filter(|uuid| **uuid == tool_uuid) + .count(), + 1 + ); +} + #[test] fn semantic_scope_type_and_span_kind_follow_event_variants() { let scope_event = make_start_event(