Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions crates/core/src/observability/openinference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! - [`OpenInferenceSubscriber`] exposes a NeMo Relay [`EventSubscriberFn`] and
//! convenience `register` / `deregister` / `force_flush` / `shutdown` methods

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -45,6 +45,8 @@ use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
use serde::Serialize;
use uuid::Uuid;

const COMPLETED_SPAN_CONTEXT_LIMIT: usize = 4096;

#[cfg(target_arch = "wasm32")]
use async_trait::async_trait;
#[cfg(target_arch = "wasm32")]
Expand Down Expand Up @@ -500,6 +502,8 @@ struct ActiveSpan {

struct OpenInferenceEventProcessor {
active_spans: HashMap<Uuid, ActiveSpan>,
completed_span_contexts: HashMap<Uuid, SpanContext>,
completed_span_order: VecDeque<Uuid>,
provider: SdkTracerProvider,
tracer: SdkTracer,
}
Expand All @@ -509,6 +513,8 @@ impl OpenInferenceEventProcessor {
let tracer = provider.tracer(instrumentation_scope);
Self {
active_spans: HashMap::new(),
completed_span_contexts: HashMap::new(),
completed_span_order: VecDeque::new(),
provider,
tracer,
}
Expand All @@ -535,6 +541,7 @@ impl OpenInferenceEventProcessor {
}

fn process_start(&mut self, event: &Event) {
self.remove_completed_span_context(event.uuid());
let mut span = self
.tracer
.span_builder(span_name(event))
Expand All @@ -551,6 +558,7 @@ impl OpenInferenceEventProcessor {
let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
return;
};
self.record_completed_span_context(event.uuid(), active_span.span_context.clone());
super::set_span_status_from_event_metadata(&mut active_span.span, event);
active_span.span.set_attributes(end_attributes(event));
active_span
Expand Down Expand Up @@ -587,10 +595,13 @@ impl OpenInferenceEventProcessor {
}

fn parent_context(&self, event: &Event) -> Context {
self.find_parent_span(event)
.map(|active_span| {
Context::new().with_remote_span_context(active_span.span_context.clone())
})
if let Some(active_span) = self.find_parent_span(event) {
return Context::new().with_remote_span_context(active_span.span_context.clone());
}
event
.parent_uuid()
.and_then(|uuid| self.completed_span_contexts.get(&uuid))
.map(|span_context| Context::new().with_remote_span_context(span_context.clone()))
.unwrap_or_default()
}

Expand All @@ -609,6 +620,27 @@ impl OpenInferenceEventProcessor {
self.parent_span_uuid(event)
.and_then(|uuid| self.active_spans.get_mut(&uuid))
}

fn remove_completed_span_context(&mut self, uuid: Uuid) {
self.completed_span_contexts.remove(&uuid);
self.completed_span_order
.retain(|completed_uuid| *completed_uuid != uuid);
}

fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) {
if self
.completed_span_contexts
.insert(uuid, span_context)
.is_none()
{
self.completed_span_order.push_back(uuid);
}
while self.completed_span_order.len() > COMPLETED_SPAN_CONTEXT_LIMIT {
if let Some(expired) = self.completed_span_order.pop_front() {
self.completed_span_contexts.remove(&expired);
}
}
}
}

fn span_kind(event: &Event) -> SpanKind {
Expand Down
42 changes: 37 additions & 5 deletions crates/core/src/observability/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! - [`OpenTelemetrySubscriber`] exposes a NeMo Relay [`EventSubscriberFn`] and
//! convenience `register` / `deregister` / `force_flush` / `shutdown` methods

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand All @@ -39,6 +39,8 @@ use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider, Span};
use serde::Serialize;
use uuid::Uuid;

const COMPLETED_SPAN_CONTEXT_LIMIT: usize = 4096;

#[cfg(target_arch = "wasm32")]
use async_trait::async_trait;
#[cfg(target_arch = "wasm32")]
Expand Down Expand Up @@ -494,6 +496,8 @@ struct ActiveSpan {

struct OtelEventProcessor {
active_spans: HashMap<Uuid, ActiveSpan>,
completed_span_contexts: HashMap<Uuid, SpanContext>,
completed_span_order: VecDeque<Uuid>,
provider: SdkTracerProvider,
tracer: SdkTracer,
}
Expand All @@ -503,6 +507,8 @@ impl OtelEventProcessor {
let tracer = provider.tracer(instrumentation_scope);
Self {
active_spans: HashMap::new(),
completed_span_contexts: HashMap::new(),
completed_span_order: VecDeque::new(),
provider,
tracer,
}
Expand All @@ -529,6 +535,7 @@ impl OtelEventProcessor {
}

fn process_start(&mut self, event: &Event) {
self.remove_completed_span_context(event.uuid());
let mut span = self
.tracer
.span_builder(span_name(event))
Expand All @@ -545,6 +552,7 @@ impl OtelEventProcessor {
let Some(mut active_span) = self.active_spans.remove(&event.uuid()) else {
return;
};
self.record_completed_span_context(event.uuid(), active_span.span_context.clone());

super::set_span_status_from_event_metadata(&mut active_span.span, event);
active_span.span.set_attributes(end_attributes(event));
Expand Down Expand Up @@ -578,10 +586,13 @@ impl OtelEventProcessor {
}

fn parent_context(&self, event: &Event) -> Context {
self.find_parent_span(event)
.map(|active_span| {
Context::new().with_remote_span_context(active_span.span_context.clone())
})
if let Some(active_span) = self.find_parent_span(event) {
return Context::new().with_remote_span_context(active_span.span_context.clone());
}
event
.parent_uuid()
.and_then(|uuid| self.completed_span_contexts.get(&uuid))
Comment thread
bbednarski9 marked this conversation as resolved.
.map(|span_context| Context::new().with_remote_span_context(span_context.clone()))
.unwrap_or_default()
}

Expand All @@ -600,6 +611,27 @@ impl OtelEventProcessor {
self.parent_span_uuid(event)
.and_then(|uuid| self.active_spans.get_mut(&uuid))
}

fn remove_completed_span_context(&mut self, uuid: Uuid) {
self.completed_span_contexts.remove(&uuid);
self.completed_span_order
.retain(|completed_uuid| *completed_uuid != uuid);
}

fn record_completed_span_context(&mut self, uuid: Uuid, span_context: SpanContext) {
if self
.completed_span_contexts
.insert(uuid, span_context)
.is_none()
{
self.completed_span_order.push_back(uuid);
}
while self.completed_span_order.len() > COMPLETED_SPAN_CONTEXT_LIMIT {
if let Some(expired) = self.completed_span_order.pop_front() {
self.completed_span_contexts.remove(&expired);
}
}
}
}

fn span_kind(event: &Event) -> SpanKind {
Expand Down
194 changes: 194 additions & 0 deletions crates/core/tests/unit/observability/openinference_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,200 @@ fn orphan_marks_become_zero_duration_spans() {
);
}

#[test]
fn late_parented_marks_reuse_completed_parent_trace_context() {
let (provider, exporter) = make_provider();
let mut processor =
OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string());
let tool_uuid = Uuid::now_v7();

processor.process(&make_start_event(
tool_uuid,
None,
"terminal",
ScopeType::Tool,
None,
));
processor.process(&make_end_event(
tool_uuid,
None,
"terminal",
ScopeType::Tool,
Some(json!({"status": "done"})),
));
processor.process(&make_mark_event(
Some(tool_uuid),
"visor.tool_output_compressed",
Some(json!({"estimated_tokens_saved": 42})),
));
processor.force_flush().unwrap();

let spans = exporter.get_finished_spans().unwrap();
assert_eq!(spans.len(), 2);
let tool_span = spans
.iter()
.find(|span| span.name.as_ref() == "terminal")
.unwrap();
let mark_span = spans
.iter()
.find(|span| span.name.as_ref() == "mark:visor.tool_output_compressed")
.unwrap();

assert_eq!(
mark_span.span_context.trace_id(),
tool_span.span_context.trace_id()
);
assert_eq!(mark_span.parent_span_id, tool_span.span_context.span_id());
assert!(!mark_span.parent_span_is_remote);

let attributes = attr_map(&mark_span.attributes);
assert_eq!(
attributes.get("nemo_relay.mark.orphan"),
Some(&"true".to_string())
);
assert_eq!(
attributes.get("openinference.span.kind"),
Some(&"CHAIN".to_string())
);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

#[test]
fn completed_span_context_cache_evicts_oldest_parent_contexts() {
let (provider, exporter) = make_provider();
let mut processor =
OpenInferenceEventProcessor::new(provider.clone(), "test-scope".to_string());
let span_count = COMPLETED_SPAN_CONTEXT_LIMIT + 2;
let mut completed_uuids = Vec::with_capacity(span_count);

for index in 0..span_count {
let uuid = Uuid::now_v7();
completed_uuids.push(uuid);
let name = format!("completed-{index}");
processor.process(&make_start_event(uuid, None, &name, ScopeType::Tool, None));
processor.process(&make_end_event(
uuid,
None,
&name,
ScopeType::Tool,
Some(json!({"status": "done"})),
));
}

let oldest_uuid = completed_uuids[0];
let recent_uuid = completed_uuids[span_count - 1];
assert!(!processor.completed_span_contexts.contains_key(&oldest_uuid));
assert!(processor.completed_span_contexts.contains_key(&recent_uuid));

processor.process(&make_mark_event(
Some(oldest_uuid),
"oldest-after-eviction",
Some(json!({"case": "oldest"})),
));
processor.process(&make_mark_event(
Some(recent_uuid),
"recent-after-eviction",
Some(json!({"case": "recent"})),
));
processor.force_flush().unwrap();

let spans = exporter.get_finished_spans().unwrap();
assert_eq!(spans.len(), span_count + 2);

let oldest_parent = spans
.iter()
.find(|span| span.name.as_ref() == "completed-0")
.unwrap();
let recent_parent_name = format!("completed-{}", span_count - 1);
let recent_parent = spans
.iter()
.find(|span| span.name.as_ref() == recent_parent_name.as_str())
.unwrap();
let oldest_mark = spans
.iter()
.find(|span| span.name.as_ref() == "mark:oldest-after-eviction")
.unwrap();
let recent_mark = spans
.iter()
.find(|span| span.name.as_ref() == "mark:recent-after-eviction")
.unwrap();

assert_ne!(
oldest_mark.parent_span_id,
oldest_parent.span_context.span_id()
);
assert_ne!(
oldest_mark.span_context.trace_id(),
oldest_parent.span_context.trace_id()
);
assert_eq!(
recent_mark.span_context.trace_id(),
recent_parent.span_context.trace_id()
);
assert_eq!(
recent_mark.parent_span_id,
recent_parent.span_context.span_id()
);
assert!(!recent_mark.parent_span_is_remote);
}

#[test]
fn process_start_removes_completed_span_order_entry() {
let (provider, _exporter) = make_provider();
let mut processor = OpenInferenceEventProcessor::new(provider, "test-scope".to_string());
let tool_uuid = Uuid::now_v7();

processor.process(&make_start_event(
tool_uuid,
None,
"terminal",
ScopeType::Tool,
None,
));
processor.process(&make_end_event(
tool_uuid,
None,
"terminal",
ScopeType::Tool,
Some(json!({"status": "done"})),
));
assert!(processor.completed_span_contexts.contains_key(&tool_uuid));
assert_eq!(
processor
.completed_span_order
.iter()
.filter(|uuid| **uuid == tool_uuid)
.count(),
1
);

processor.process(&make_start_event(
tool_uuid,
None,
"terminal",
ScopeType::Tool,
None,
));
assert!(!processor.completed_span_contexts.contains_key(&tool_uuid));
assert!(!processor.completed_span_order.contains(&tool_uuid));

processor.process(&make_end_event(
tool_uuid,
None,
"terminal",
ScopeType::Tool,
Some(json!({"status": "done"})),
));
assert!(processor.completed_span_contexts.contains_key(&tool_uuid));
assert_eq!(
processor
.completed_span_order
.iter()
.filter(|uuid| **uuid == tool_uuid)
.count(),
1
);
}

#[test]
fn semantic_scope_type_and_input_value_follow_event_variants() {
let llm_with_content = make_start_event(
Expand Down
Loading
Loading