Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 301 additions & 6 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,63 @@ fn extract_metrics(output: &Json) -> Option<AtifMetrics> {
})
}

fn merge_metrics(
primary: Option<AtifMetrics>,
supplemental: Option<&AtifMetrics>,
) -> Option<AtifMetrics> {
match (primary, supplemental) {
(None, None) => None,
(Some(metrics), None) => Some(metrics),
(None, Some(supplemental)) => Some(supplemental.clone()),
(Some(mut metrics), Some(supplemental)) => {
merge_metrics_fields(&mut metrics, supplemental);
Some(metrics)
}
}
}

fn merge_metrics_fields(target: &mut AtifMetrics, supplemental: &AtifMetrics) {
if target.prompt_tokens.is_none() {
target.prompt_tokens = supplemental.prompt_tokens;
}
if target.completion_tokens.is_none() {
target.completion_tokens = supplemental.completion_tokens;
}
if target.cached_tokens.is_none() {
target.cached_tokens = supplemental.cached_tokens;
}
if target.cost_usd.is_none() {
target.cost_usd = supplemental.cost_usd;
}
if target.prompt_token_ids.is_none() {
target.prompt_token_ids = supplemental.prompt_token_ids.clone();
}
if target.completion_token_ids.is_none() {
target.completion_token_ids = supplemental.completion_token_ids.clone();
}
if target.logprobs.is_none() {
target.logprobs = supplemental.logprobs.clone();
}
merge_metrics_extra(&mut target.extra, &supplemental.extra);
}

fn merge_metrics_extra(target: &mut Option<Json>, supplemental: &Option<Json>) {
let Some(supplemental) = supplemental else {
return;
};
match (target.as_mut(), supplemental) {
(Some(Json::Object(target_object)), Json::Object(supplemental_object)) => {
for (key, value) in supplemental_object {
target_object
.entry(key.clone())
.or_insert_with(|| value.clone());
}
}
(None, _) => *target = Some(supplemental.clone()),
_ => {}
}
}

fn token_usage_object(output: &Json) -> Option<&serde_json::Map<String, Json>> {
let output = output.as_object()?;
output
Expand Down Expand Up @@ -1071,25 +1128,28 @@ struct EventLookupMaps {
name_map: std::collections::HashMap<Uuid, String>,
start_ts_map: std::collections::HashMap<Uuid, DateTime<Utc>>,
tool_call_ids: std::collections::HashMap<Uuid, String>,
suppressed_llm_events: HashSet<Uuid>,
supplemental_llm_metrics: HashMap<Uuid, AtifMetrics>,
}

impl EventLookupMaps {
fn from_events(events: &[&Event]) -> Self {
Self::from_events_with_correlation_events(events, events)
Self::from_events_with_correlation_events(events, events, events)
}

fn from_events_for_agent(events: &[&Event], tree: &AgentScopeTree, agent_uuid: Uuid) -> Self {
let correlation_events = events
let tool_correlation_events = events
.iter()
.copied()
.filter(|event| tree.owner_agent(event) == Some(agent_uuid))
.collect::<Vec<_>>();
Self::from_events_with_correlation_events(events, &correlation_events)
Self::from_events_with_correlation_events(events, events, &tool_correlation_events)
}

fn from_events_with_correlation_events(
events: &[&Event],
correlation_events: &[&Event],
llm_dedupe_events: &[&Event],
tool_correlation_events: &[&Event],
) -> Self {
let mut name_map = std::collections::HashMap::new();
let mut start_ts_map = std::collections::HashMap::new();
Expand All @@ -1099,12 +1159,239 @@ impl EventLookupMaps {
start_ts_map.insert(event.uuid(), *event.timestamp());
}
}
let llm_dedupe = build_llm_dedupe(llm_dedupe_events);
Self {
name_map,
start_ts_map,
tool_call_ids: build_tool_call_correlations(correlation_events),
tool_call_ids: build_tool_call_correlations(tool_correlation_events),
suppressed_llm_events: llm_dedupe.suppressed_events,
supplemental_llm_metrics: llm_dedupe.supplemental_metrics,
}
}

fn should_suppress_llm_event(&self, event: &Event) -> bool {
event.category().map(|category| category.as_str()) == Some("llm")
&& self.suppressed_llm_events.contains(&event.uuid())
}
}

#[derive(Default)]
struct LlmDedupeLookups {
suppressed_events: HashSet<Uuid>,
supplemental_metrics: HashMap<Uuid, AtifMetrics>,
}

#[derive(Default)]
struct LlmSpanParts<'a> {
start: Option<&'a Event>,
end: Option<&'a Event>,
}

#[derive(Debug, Clone)]
struct LlmSpanCandidate {
uuid: Uuid,
parent_uuid: Option<Uuid>,
start_ts: DateTime<Utc>,
end_ts: DateTime<Utc>,
request_signature: String,
response_signature: String,
model_name: Option<String>,
fidelity_score: u8,
end_metrics: Option<AtifMetrics>,
hook_instrumentation: bool,
gateway_instrumentation: bool,
non_exact_provider_payload: bool,
}

fn build_llm_dedupe(events: &[&Event]) -> LlmDedupeLookups {
let candidates = collect_llm_span_candidates(events);
let mut lookups = LlmDedupeLookups::default();

for (left_idx, left) in candidates.iter().enumerate() {
for right in candidates.iter().skip(left_idx + 1) {
if same_physical_llm_request(left, right) {
suppress_lower_fidelity_llm_span(left, right, &mut lookups);
}
}
}

lookups
}

fn collect_llm_span_candidates(events: &[&Event]) -> Vec<LlmSpanCandidate> {
let mut spans: HashMap<Uuid, LlmSpanParts<'_>> = HashMap::new();
for event in events {
if event.category().map(|category| category.as_str()) != Some("llm") {
continue;
}
let parts = spans.entry(event.uuid()).or_default();
match event.scope_category() {
Some(crate::api::event::ScopeCategory::Start) => parts.start = Some(event),
Some(crate::api::event::ScopeCategory::End) => parts.end = Some(event),
None => {}
}
}

spans
.into_iter()
.filter_map(|(uuid, parts)| LlmSpanCandidate::from_events(uuid, parts.start?, parts.end?))
.collect()
}

impl LlmSpanCandidate {
fn from_events(uuid: Uuid, start: &Event, end: &Event) -> Option<Self> {
let request_signature = start.data().map(llm_request_signature)?;
let response_signature = end.data().map(llm_response_signature)?;
Some(Self {
uuid,
parent_uuid: start.parent_uuid().or_else(|| end.parent_uuid()),
start_ts: *start.timestamp(),
end_ts: *end.timestamp(),
request_signature,
response_signature,
model_name: start
.model_name()
.or_else(|| end.model_name())
.map(ToOwned::to_owned),
fidelity_score: llm_event_fidelity_score(start).max(llm_event_fidelity_score(end)),
end_metrics: end.data().and_then(extract_metrics),
hook_instrumentation: is_hook_instrumented_llm_event(start)
|| is_hook_instrumented_llm_event(end),
gateway_instrumentation: is_gateway_instrumented_llm_event(start)
|| is_gateway_instrumented_llm_event(end),
non_exact_provider_payload: has_non_exact_provider_payload(start)
|| has_non_exact_provider_payload(end),
})
}
}

fn llm_request_signature(input: &Json) -> String {
let content = unwrap_llm_request(input);
json_to_string(&extract_user_messages(&content))
}

fn llm_response_signature(output: &Json) -> String {
json_to_string(&extract_llm_response_message(output))
}

fn same_physical_llm_request(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
same_parent(left, right)
&& compatible_model_names(left, right)
&& llm_spans_overlap(left, right)
&& (same_llm_payload_signatures(left, right)
|| complementary_hook_and_gateway_spans(left, right))
}

fn same_llm_payload_signatures(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
left.request_signature == right.request_signature
&& left.response_signature == right.response_signature
}

fn complementary_hook_and_gateway_spans(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
(left.non_exact_provider_payload && left.hook_instrumentation && right.gateway_instrumentation)
|| (right.non_exact_provider_payload
&& right.hook_instrumentation
&& left.gateway_instrumentation)
}
Comment thread
willkill07 marked this conversation as resolved.

fn same_parent(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
left.parent_uuid.is_some() && left.parent_uuid == right.parent_uuid
}

fn compatible_model_names(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
match (&left.model_name, &right.model_name) {
(Some(left_model), Some(right_model)) => left_model == right_model,
_ => true,
}
}

fn llm_spans_overlap(left: &LlmSpanCandidate, right: &LlmSpanCandidate) -> bool {
left.start_ts <= right.end_ts && right.start_ts <= left.end_ts
}

fn suppress_lower_fidelity_llm_span(
left: &LlmSpanCandidate,
right: &LlmSpanCandidate,
lookups: &mut LlmDedupeLookups,
) {
match left.fidelity_score.cmp(&right.fidelity_score) {
std::cmp::Ordering::Greater => suppress_llm_span(right, left, lookups),
std::cmp::Ordering::Less => suppress_llm_span(left, right, lookups),
std::cmp::Ordering::Equal => {}
}
Comment thread
willkill07 marked this conversation as resolved.
}

fn suppress_llm_span(
suppressed: &LlmSpanCandidate,
canonical: &LlmSpanCandidate,
lookups: &mut LlmDedupeLookups,
) {
lookups.suppressed_events.insert(suppressed.uuid);
if let Some(metrics) = &suppressed.end_metrics {
let entry = lookups
.supplemental_metrics
.entry(canonical.uuid)
.or_default();
merge_metrics_fields(entry, metrics);
}
}

fn llm_event_fidelity_score(event: &Event) -> u8 {
let Some(metadata) = event.metadata().and_then(Json::as_object) else {
return 50;
};
if metadata
.get("projection")
.and_then(Json::as_bool)
.unwrap_or(false)
{
return 10;
}
if has_non_exact_provider_payload(event) {
return 30;
}
if metadata
.get("provider_payload_exact")
.and_then(Json::as_bool)
.unwrap_or(false)
{
return 100;
}
if metadata.contains_key("fidelity_source") || metadata.contains_key("api_call_id") {
return 95;
}
if metadata.contains_key("hook_event_name") {
return 90;
}
if is_gateway_instrumented_llm_event(event) {
return 50;
}
50
}

fn is_hook_instrumented_llm_event(event: &Event) -> bool {
event
.metadata()
.and_then(Json::as_object)
.is_some_and(|metadata| metadata.contains_key("hook_event_name"))
}

fn is_gateway_instrumented_llm_event(event: &Event) -> bool {
event
.metadata()
.and_then(Json::as_object)
.is_some_and(|metadata| {
metadata.contains_key("gateway_path") || metadata.contains_key("llm_correlation_source")
})
}

fn has_non_exact_provider_payload(event: &Event) -> bool {
event
.metadata()
.and_then(Json::as_object)
.and_then(|metadata| metadata.get("provider_payload_exact"))
.and_then(Json::as_bool)
== Some(false)
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -1416,6 +1703,9 @@ struct DeferredToolObservation {

impl StepConversionState {
fn handle_event(&mut self, event: &Event, lookups: &EventLookupMaps) {
if lookups.should_suppress_llm_event(event) {
return;
}
match (
event.kind(),
event.scope_category(),
Expand Down Expand Up @@ -1712,6 +2002,11 @@ impl StepConversionState {
"nemo_relay",
);

let metrics = merge_metrics(
extract_metrics(output),
lookups.supplemental_llm_metrics.get(&event.uuid()),
);

self.steps.push(AtifStep {
step_id: 0,
source: "agent".to_string(),
Expand All @@ -1722,7 +2017,7 @@ impl StepConversionState {
reasoning_content,
tool_calls,
observation: None,
metrics: extract_metrics(output),
metrics,
llm_call_count: Some(1),
is_copied_context: None,
extra: None,
Expand Down
Loading
Loading