Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions crates/adaptive/src/acg/request_surfaces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod openai_responses;
use std::collections::HashSet;

use nemo_relay::api::llm::LlmRequest;
use nemo_relay::codec::resolve::{ProviderSurface, detect_request_surface};
use serde_json::Value;

use crate::acg::prompt_ir::PromptIR;
Expand All @@ -40,6 +41,14 @@ pub(crate) trait RequestSurfaceApplier: Send + Sync {
}

impl RequestSurface {
fn from_provider_surface(surface: ProviderSurface) -> Self {
match surface {
ProviderSurface::OpenAiChat => Self::OpenAIChat,
ProviderSurface::OpenAiResponses => Self::OpenAIResponses,
ProviderSurface::AnthropicMessages => Self::AnthropicMessages,
}
}

pub(crate) fn supports_provider(self, provider: &str) -> bool {
match provider {
"anthropic" => matches!(self, Self::AnthropicMessages),
Expand Down Expand Up @@ -71,17 +80,13 @@ impl RequestSurface {
pub(crate) fn resolve_request_surface_from_request(
request: &LlmRequest,
) -> crate::acg::Result<RequestSurface> {
if request.content.get("input").is_some() || request.content.get("instructions").is_some() {
Ok(RequestSurface::OpenAIResponses)
} else if request.content.get("system").is_some() {
Ok(RequestSurface::AnthropicMessages)
} else if request.content.get("messages").is_some() {
Ok(RequestSurface::OpenAIChat)
} else {
Err(crate::acg::AcgError::Internal(
"unable to resolve request surface from request shape".to_string(),
))
}
detect_request_surface(&request.content)
.map(RequestSurface::from_provider_surface)
.ok_or_else(|| {
crate::acg::AcgError::Internal(
"unable to resolve request surface from request shape".to_string(),
)
})
}

#[cfg_attr(not(test), allow(dead_code))]
Expand Down
25 changes: 24 additions & 1 deletion crates/core/src/api/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

//! Event types for Agent Trajectory Observability Format (ATOF) runtime events.

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::sync::Arc;

Expand All @@ -23,10 +24,11 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::api::llm::LlmAttributes;
use crate::api::llm::{LlmAttributes, LlmRequest};
use crate::api::scope::{HandleAttributes, ScopeAttributes, ScopeType};
use crate::api::tool::ToolAttributes;
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::resolve;
use crate::codec::response::AnnotatedLlmResponse;
use crate::json::Json;

Expand Down Expand Up @@ -609,6 +611,27 @@ impl Event {
.and_then(|profile| profile.annotated_response.as_ref())
}

/// Normalized LLM request: the codec annotation when present, otherwise a
/// best-effort decode of the start-event input payload.
#[must_use]
pub fn normalized_llm_request(&self) -> Option<Cow<'_, AnnotatedLlmRequest>> {
if let Some(annotated) = self.annotated_request() {
return Some(Cow::Borrowed(annotated.as_ref()));
}
let request: LlmRequest = serde_json::from_value(self.input()?.clone()).ok()?;
resolve::normalize_request(&request, None).map(Cow::Owned)
}

/// Normalized LLM response: the codec annotation when present, otherwise a
/// best-effort decode of the end-event output payload.
#[must_use]
pub fn normalized_llm_response(&self) -> Option<Cow<'_, AnnotatedLlmResponse>> {
if let Some(annotated) = self.annotated_response() {
return Some(Cow::Borrowed(annotated.as_ref()));
}
resolve::normalize_response(self.output()?, None).map(Cow::Owned)
}

/// Return true for scope-start events.
///
/// # Returns
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod openai_chat;
pub mod openai_responses;
pub mod pricing;
pub mod request;
pub mod resolve;
pub mod response;
pub mod streaming;
pub mod traits;
16 changes: 15 additions & 1 deletion crates/core/src/codec/openai_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ fn collect_output_item(
.unwrap_or("")
{
"message" => collect_message_text_parts(item, text_parts),
"output_text" => {
if let Some(text) = output_text_block(item) {
text_parts.push(text);
}
}
"function_call" => tool_calls.push(parse_function_call(item)),
_ => {}
}
Expand Down Expand Up @@ -244,6 +249,14 @@ fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
}
}

fn top_level_output_text(response: &Json) -> Option<MessageContent> {
response
.get("output_text")
.and_then(|value| value.as_str())
.filter(|text| !text.is_empty())
.map(|text| MessageContent::Text(text.to_string()))
}

fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
(!items.is_empty()).then_some(items)
}
Expand Down Expand Up @@ -456,7 +469,8 @@ impl LlmResponseCodec for OpenAIResponsesCodec {

let all_output_items = raw.output.clone();
let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
let message = message_from_text_parts(text_parts);
let message =
message_from_text_parts(text_parts).or_else(|| top_level_output_text(response));
let tool_calls = optional_vec(tool_calls);

// Map finish reason from status + incomplete_details.
Expand Down
120 changes: 120 additions & 0 deletions crates/core/src/codec/resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Provider-surface detection and best-effort normalization: the preferred path
//! for turning raw provider JSON into normalized types when no codec annotation
//! is present.

use crate::api::llm::LlmRequest;
use crate::json::Json;

use super::anthropic::AnthropicMessagesCodec;
use super::openai_chat::OpenAIChatCodec;
use super::openai_responses::OpenAIResponsesCodec;
use super::request::AnnotatedLlmRequest;
use super::response::AnnotatedLlmResponse;
use super::traits::{LlmCodec, LlmResponseCodec};

/// A built-in provider request/response surface.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProviderSurface {
/// OpenAI Chat Completions.
OpenAiChat,
/// OpenAI Responses.
OpenAiResponses,
/// Anthropic Messages.
AnthropicMessages,
}

impl ProviderSurface {
/// Map a canonical codec name (the `ApiSpecificResponse` serde tags) to a surface.
#[must_use]
pub fn from_codec_name(name: &str) -> Option<Self> {
match name {
"openai_chat" => Some(Self::OpenAiChat),
"openai_responses" => Some(Self::OpenAiResponses),
"anthropic_messages" => Some(Self::AnthropicMessages),
_ => None,
}
}

/// The canonical codec name for this surface.
#[must_use]
pub fn codec_name(self) -> &'static str {
match self {
Self::OpenAiChat => "openai_chat",
Self::OpenAiResponses => "openai_responses",
Self::AnthropicMessages => "anthropic_messages",
}
}
}

/// Detect the request surface from a raw request body, using the request codecs'
/// priority order; `None` when no key matches or `body` is not an object.
#[must_use]
pub fn detect_request_surface(body: &Json) -> Option<ProviderSurface> {
let obj = body.as_object()?;
if obj.contains_key("input") || obj.contains_key("instructions") {
Some(ProviderSurface::OpenAiResponses)
} else if obj.contains_key("system") {
Some(ProviderSurface::AnthropicMessages)
} else if obj.contains_key("messages") {
Some(ProviderSurface::OpenAiChat)
} else {
None
}
}

/// Detect the response surface from a raw provider response, classifying only
/// when exactly one built-in shape matches (the built-in codecs accept minimal
/// objects, so decode success alone is not a reliable classifier).
#[must_use]
pub fn detect_response_surface(raw: &Json) -> Option<ProviderSurface> {
let obj = raw.as_object()?;
let is_chat = obj.get("choices").is_some_and(Json::is_array);
let is_responses = obj.get("output").is_some_and(Json::is_array)
|| obj.get("output_text").is_some_and(Json::is_string);
let is_anthropic = obj.get("type").and_then(Json::as_str) == Some("message")
&& obj.get("content").is_some_and(Json::is_array);

match (is_chat, is_responses, is_anthropic) {
(true, false, false) => Some(ProviderSurface::OpenAiChat),
(false, true, false) => Some(ProviderSurface::OpenAiResponses),
(false, false, true) => Some(ProviderSurface::AnthropicMessages),
_ => None,
}
}

/// Best-effort decode of a raw request into [`AnnotatedLlmRequest`] (fail-open).
#[must_use]
pub fn normalize_request(
request: &LlmRequest,
hint: Option<ProviderSurface>,
) -> Option<AnnotatedLlmRequest> {
let surface = hint.or_else(|| detect_request_surface(&request.content))?;
match surface {
ProviderSurface::OpenAiChat => OpenAIChatCodec.decode(request),
ProviderSurface::OpenAiResponses => OpenAIResponsesCodec.decode(request),
ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode(request),
}
.ok()
}

/// Best-effort decode of a raw response into [`AnnotatedLlmResponse`] (fail-open).
#[must_use]
pub fn normalize_response(
raw: &Json,
hint: Option<ProviderSurface>,
) -> Option<AnnotatedLlmResponse> {
let surface = hint.or_else(|| detect_response_surface(raw))?;
match surface {
ProviderSurface::OpenAiChat => OpenAIChatCodec.decode_response(raw),
ProviderSurface::OpenAiResponses => OpenAIResponsesCodec.decode_response(raw),
ProviderSurface::AnthropicMessages => AnthropicMessagesCodec.decode_response(raw),
}
.ok()
}

#[cfg(test)]
#[path = "../../tests/unit/codec/resolve_tests.rs"]
mod tests;
5 changes: 3 additions & 2 deletions crates/core/src/codec/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use super::response::AnnotatedLlmResponse;
/// structured [`AnnotatedLlmRequest`].
///
/// Codecs are implemented by integration patches (LangChain, LangChain-NVIDIA,
/// LangGraph, etc.) since each SDK has its own request format. They are
/// registered by name in the global codec registry.
/// LangGraph, etc.) since each SDK has its own request format. A codec is
/// supplied per call by the caller; the built-in provider codecs can also be
/// selected from a raw payload via [`crate::codec::resolve`].
///
/// # Design
///
Expand Down
39 changes: 36 additions & 3 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use uuid::Uuid;
use crate::api::event::Event;
use crate::api::runtime::EventSubscriberFn;
use crate::api::subscriber::flush_subscribers;
use crate::codec::response::{Usage, estimate_cost_for_provider};
use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent};
use crate::codec::response::{AnnotatedLlmResponse, Usage, estimate_cost_for_provider};
use crate::error::Result;
use crate::json::Json;

Expand Down Expand Up @@ -1054,6 +1055,32 @@ fn extract_tool_calls(output: &Json) -> Option<Vec<AtifToolCall>> {
if calls.is_empty() { None } else { Some(calls) }
}

// Annotation adapters: read the normalized message from an annotation, returning
// None for multimodal content so the caller falls back to the raw extractor.

fn atif_message_from_annotated_request(request: &AnnotatedLlmRequest) -> Option<Json> {
let content = request
.messages
.iter()
.rev()
.find_map(|message| match message {
Message::User { content, .. } => Some(content),
_ => None,
})?;
match content {
MessageContent::Text(text) => Some(Json::String(text.clone())),
MessageContent::Parts(_) => None,
}
}

fn atif_message_from_annotated_response(response: &AnnotatedLlmResponse) -> Option<Json> {
match &response.message {
Some(MessageContent::Text(text)) => Some(Json::String(text.clone())),
Some(MessageContent::Parts(_)) => None,
None => Some(empty_message()),
}
}

fn tool_call_array(output: &Json) -> Option<&Vec<Json>> {
output
.as_object()
Expand Down Expand Up @@ -2212,7 +2239,10 @@ impl StepConversionState {
self.steps.push(AtifStep {
step_id: 0,
source: "user".to_string(),
message: extract_user_messages(&content),
message: event
.annotated_request()
.and_then(|request| atif_message_from_annotated_request(request))
.unwrap_or_else(|| extract_user_messages(&content)),
timestamp: Some(event.timestamp().to_rfc3339()),
model_name: None,
reasoning_effort: None,
Expand Down Expand Up @@ -2253,7 +2283,10 @@ impl StepConversionState {
self.steps.push(AtifStep {
step_id: 0,
source: "agent".to_string(),
message: extract_llm_response_message(output),
message: event
.annotated_response()
.and_then(|response| atif_message_from_annotated_response(response))
.unwrap_or_else(|| extract_llm_response_message(output)),
timestamp: Some(event.timestamp().to_rfc3339()),
model_name: event.model_name().map(ToOwned::to_owned),
reasoning_effort,
Expand Down
Loading
Loading