From ffe92a2f81e4d6b03a0b1de2345cde64e48a99e9 Mon Sep 17 00:00:00 2001 From: Bryan Bednarski Date: Tue, 23 Jun 2026 15:41:21 -0700 Subject: [PATCH 1/2] Add configurable ATOF endpoint field policy Adds an endpoint-local field_name_policy option for ATOF streaming exporters. The default preserves canonical ATOF JSON, while replace_dots recursively rewrites dotted JSON object keys to underscores for HTTP receivers that reject dotted field names. This also fixes ATOF HTTP streaming robustness by installing the rustls crypto provider before endpoint workers create HTTP clients, preserving the endpoint config in the NDJSON worker after spawning the upload task, and logging bounded non-2xx response bodies so rejected events are diagnosable. Exposes the new option through plugin config and Rust, Python, Node, and Go bindings, with tests for validation, serialization, recursive key rewriting, and collision handling. Signed-off-by: Bryan Bednarski --- crates/core/src/observability/atof.rs | 159 ++++++++++++++++-- .../src/observability/plugin_component.rs | 33 +++- .../tests/unit/observability/atof_tests.rs | 42 ++++- .../observability/plugin_component_tests.rs | 15 +- crates/node/observability.d.ts | 1 + crates/node/src/api/mod.rs | 14 ++ .../node/tests/observability_plugin_tests.mjs | 2 + crates/python/src/py_types/observability.rs | 16 +- go/nemo_relay/atof_test.go | 12 +- go/nemo_relay/nemo_relay.go | 19 ++- go/nemo_relay/observability_plugin.go | 9 +- go/nemo_relay/observability_plugin_test.go | 16 +- python/nemo_relay/_native.pyi | 2 + python/nemo_relay/observability.py | 2 + python/nemo_relay/observability.pyi | 1 + python/tests/test_observability_plugin.py | 2 + python/tests/test_types.py | 2 + 17 files changed, 311 insertions(+), 36 deletions(-) diff --git a/crates/core/src/observability/atof.rs b/crates/core/src/observability/atof.rs index 221c10ce..85416276 100644 --- a/crates/core/src/observability/atof.rs +++ b/crates/core/src/observability/atof.rs @@ -115,6 +115,36 @@ pub enum AtofEndpointTransport { Ndjson, } +/// Field name transformation policy used before sending events to an endpoint. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AtofEndpointFieldNamePolicy { + /// Preserve canonical ATOF field names exactly. + #[default] + Preserve, + /// Replace dots in JSON object keys with underscores, recursively. + ReplaceDots, +} + +impl AtofEndpointFieldNamePolicy { + /// Parse a string policy used by configuration and bindings. + pub fn parse(value: &str) -> Option { + match value { + "preserve" => Some(Self::Preserve), + "replace_dots" => Some(Self::ReplaceDots), + _ => None, + } + } + + /// Return the stable string representation used by configuration and bindings. + pub fn as_str(self) -> &'static str { + match self { + Self::Preserve => "preserve", + Self::ReplaceDots => "replace_dots", + } + } +} + impl AtofEndpointTransport { /// Parse a string transport used by configuration and bindings. pub fn parse(value: &str) -> Option { @@ -150,6 +180,9 @@ pub struct AtofEndpointConfig { /// Per-endpoint timeout in milliseconds. #[serde(default = "default_endpoint_timeout_millis")] pub timeout_millis: u64, + /// Field name transformation policy applied before sending events. + #[serde(default)] + pub field_name_policy: AtofEndpointFieldNamePolicy, } impl AtofEndpointConfig { @@ -160,6 +193,7 @@ impl AtofEndpointConfig { transport, headers: HashMap::new(), timeout_millis: default_endpoint_timeout_millis(), + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, } } @@ -174,6 +208,15 @@ impl AtofEndpointConfig { self.timeout_millis = timeout_millis; self } + + /// Override the endpoint field name policy. + pub fn with_field_name_policy( + mut self, + field_name_policy: AtofEndpointFieldNamePolicy, + ) -> Self { + self.field_name_policy = field_name_policy; + self + } } /// Configuration for [`AtofExporter`]. @@ -564,6 +607,7 @@ fn run_endpoint_worker( config: AtofEndpointConfig, rx: tokio::sync::mpsc::UnboundedReceiver, ) { + install_rustls_crypto_provider(); let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -583,6 +627,11 @@ fn run_endpoint_worker( }); } +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn install_rustls_crypto_provider() { + let _ = rustls::crypto::ring::default_provider().install_default(); +} + #[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] async fn run_http_post_endpoint( index: usize, @@ -611,7 +660,7 @@ async fn run_http_post_endpoint( while let Some(message) = rx.recv().await { match message { EndpointMessage::Event(raw_json) => { - let body = format!("{raw_json}\n"); + let body = format!("{}\n", endpoint_event_json(&config, raw_json)); let result = client .post(&config.url) .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson") @@ -620,10 +669,7 @@ async fn run_http_post_endpoint( .await; match result { Ok(response) if response.status().is_success() => {} - Ok(response) => eprintln!( - "nemo_relay: ATOF endpoint[{index}] HTTP status {}", - response.status() - ), + Ok(response) => log_http_error(index, "HTTP", response).await, Err(error) => { eprintln!("nemo_relay: ATOF endpoint[{index}] send failed: {error}") } @@ -657,7 +703,7 @@ async fn run_websocket_endpoint( while let Some(message) = rx.recv().await { match message { EndpointMessage::Event(raw_json) => { - pending.push_back(raw_json); + pending.push_back(endpoint_event_json(&config, raw_json)); let _ = drain_websocket_pending(index, &config, &mut socket, &mut pending).await; } EndpointMessage::Flush(done) => { @@ -788,9 +834,10 @@ async fn run_ndjson_endpoint( }; let (body_tx, body) = ndjson_body_channel(); + let url = config.url.clone(); let request = tokio::spawn(async move { client - .post(config.url) + .post(url) .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson") .body(body) .send() @@ -800,7 +847,9 @@ async fn run_ndjson_endpoint( while let Some(message) = rx.recv().await { match message { - EndpointMessage::Event(raw_json) => send_ndjson_event(index, &body_tx, raw_json), + EndpointMessage::Event(raw_json) => { + send_ndjson_event(index, &body_tx, endpoint_event_json(&config, raw_json)) + } EndpointMessage::Flush(done) => send_ndjson_flush(index, &body_tx, done), EndpointMessage::Close(done) => { drop(body_tx); @@ -879,10 +928,7 @@ async fn finish_ndjson_upload( ) { match tokio::time::timeout(close_timeout, request).await { Ok(Ok(Ok(response))) if response.status().is_success() => {} - Ok(Ok(Ok(response))) => eprintln!( - "nemo_relay: ATOF endpoint[{index}] NDJSON HTTP status {}", - response.status() - ), + Ok(Ok(Ok(response))) => log_http_error(index, "NDJSON HTTP", response).await, Ok(Ok(Err(error))) => { eprintln!("nemo_relay: ATOF endpoint[{index}] NDJSON upload failed: {error}") } @@ -910,6 +956,95 @@ async fn drain_closed(mut rx: tokio::sync::mpsc::UnboundedReceiver String { + match config.field_name_policy { + AtofEndpointFieldNamePolicy::Preserve => raw_json, + AtofEndpointFieldNamePolicy::ReplaceDots => replace_dotted_field_names(&raw_json), + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn replace_dotted_field_names(raw_json: &str) -> String { + let Ok(mut value) = serde_json::from_str::(raw_json) else { + return raw_json.to_string(); + }; + replace_dotted_value_keys(&mut value); + serde_json::to_string(&value).unwrap_or_else(|_| raw_json.to_string()) +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn replace_dotted_value_keys(value: &mut Json) { + match value { + Json::Object(object) => replace_dotted_object_keys(object), + Json::Array(items) => { + for item in items { + replace_dotted_value_keys(item); + } + } + _ => {} + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn replace_dotted_object_keys(object: &mut serde_json::Map) { + let mut old = std::mem::take(object) + .into_iter() + .map(|(key, mut value)| { + replace_dotted_value_keys(&mut value); + (key, value) + }) + .collect::>(); + old.sort_by_key(|(key, _)| !key.contains('.')); + + for (key, value) in old { + let sanitized_key = key.replace('.', "_"); + let final_key = collision_free_key(object, sanitized_key); + object.insert(final_key, value); + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn collision_free_key(object: &serde_json::Map, key: String) -> String { + if !object.contains_key(&key) { + return key; + } + for suffix in 2.. { + let candidate = format!("{key}_{suffix}"); + if !object.contains_key(&candidate) { + return candidate; + } + } + unreachable!("unbounded suffix search must find a key") +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +async fn log_http_error(index: usize, label: &str, response: reqwest::Response) { + let status = response.status(); + match response.text().await { + Ok(body) if !body.trim().is_empty() => eprintln!( + "nemo_relay: ATOF endpoint[{index}] {label} status {status}: {}", + truncate_log_body(&body) + ), + Ok(_) => eprintln!("nemo_relay: ATOF endpoint[{index}] {label} status {status}"), + Err(error) => eprintln!( + "nemo_relay: ATOF endpoint[{index}] {label} status {status}; failed to read response body: {error}" + ), + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn truncate_log_body(body: &str) -> String { + const LIMIT: usize = 1024; + let trimmed = body.trim(); + if trimmed.chars().count() <= LIMIT { + return trimmed.to_string(); + } + let mut truncated = trimmed.chars().take(LIMIT).collect::(); + truncated.push_str("... "); + truncated +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/crates/core/src/observability/plugin_component.rs b/crates/core/src/observability/plugin_component.rs index c4d4fd85..a19016ae 100644 --- a/crates/core/src/observability/plugin_component.rs +++ b/crates/core/src/observability/plugin_component.rs @@ -37,8 +37,9 @@ use crate::api::subscriber::{ use crate::error::FlowError; use crate::observability::atif::{AtifAgentInfo, AtifExporter}; use crate::observability::atof::{ - AtofEndpointConfig as CoreAtofEndpointConfig, AtofEndpointTransport, AtofExporter, - AtofExporterConfig as CoreAtofExporterConfig, AtofExporterMode, + AtofEndpointConfig as CoreAtofEndpointConfig, AtofEndpointFieldNamePolicy, + AtofEndpointTransport, AtofExporter, AtofExporterConfig as CoreAtofExporterConfig, + AtofExporterMode, }; #[cfg(feature = "openinference")] use crate::observability::openinference::{ @@ -199,6 +200,9 @@ pub struct AtofEndpointSectionConfig { /// Per-endpoint timeout in milliseconds. #[serde(default = "default_timeout_millis")] pub timeout_millis: u64, + /// Field name policy applied before sending events: `preserve` or `replace_dots`. + #[serde(default = "default_atof_endpoint_field_name_policy")] + pub field_name_policy: String, } /// Per-trajectory ATIF exporter config. @@ -660,8 +664,15 @@ fn build_atof_endpoint_config( "ATOF endpoints[{index}].transport must be 'http_post', 'websocket', or 'ndjson'" )) })?; + let field_name_policy = AtofEndpointFieldNamePolicy::parse(&endpoint.field_name_policy) + .ok_or_else(|| { + PluginError::InvalidConfig(format!( + "ATOF endpoints[{index}].field_name_policy must be 'preserve' or 'replace_dots'" + )) + })?; let mut config = CoreAtofEndpointConfig::new(endpoint.url, transport) - .with_timeout_millis(endpoint.timeout_millis); + .with_timeout_millis(endpoint.timeout_millis) + .with_field_name_policy(field_name_policy); for (key, value) in endpoint.headers { config = config.with_header(key, value); } @@ -1848,6 +1859,18 @@ fn validate_atof_endpoint_values( format!("ATOF endpoints[{index}].timeout_millis must be greater than 0"), ); } + if AtofEndpointFieldNamePolicy::parse(&endpoint.field_name_policy).is_none() { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atof".to_string()), + Some(format!("endpoints[{index}].field_name_policy")), + format!( + "ATOF endpoints[{index}].field_name_policy must be 'preserve' or 'replace_dots'" + ), + ); + } } #[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] @@ -2176,6 +2199,10 @@ fn default_atof_endpoint_transport() -> String { "http_post".to_string() } +fn default_atof_endpoint_field_name_policy() -> String { + "preserve".to_string() +} + fn default_agent_name() -> String { "NeMo Relay".to_string() } diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 9ec583dc..cfdd7800 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -370,7 +370,8 @@ fn endpoint_and_exporter_config_builders_preserve_values() { let endpoint = AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost) .with_header("x-test", "enabled") - .with_timeout_millis(42); + .with_timeout_millis(42) + .with_field_name_policy(AtofEndpointFieldNamePolicy::ReplaceDots); let config = AtofExporterConfig::new() .with_output_directory(&dir) .with_mode(AtofExporterMode::Overwrite) @@ -382,10 +383,43 @@ fn endpoint_and_exporter_config_builders_preserve_values() { Some("enabled") ); assert_eq!(endpoint.timeout_millis, 42); + assert_eq!( + endpoint.field_name_policy, + AtofEndpointFieldNamePolicy::ReplaceDots + ); + assert_eq!(AtofEndpointFieldNamePolicy::Preserve.as_str(), "preserve"); + assert_eq!( + AtofEndpointFieldNamePolicy::parse("replace_dots"), + Some(AtofEndpointFieldNamePolicy::ReplaceDots) + ); assert_eq!(config.path(), dir.join("custom.jsonl")); assert_eq!(config.endpoints, vec![endpoint]); } +#[test] +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn endpoint_field_name_policy_replaces_dots_recursively() { + let config = + AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost) + .with_field_name_policy(AtofEndpointFieldNamePolicy::ReplaceDots); + let transformed = endpoint_event_json( + &config, + json!({ + "kind": "scope", + "metadata": { + "otel_status_code": "existing", + "otel.status_code": "OK", + "nested": [{"a.b": true}] + } + }) + .to_string(), + ); + let value: Json = serde_json::from_str(&transformed).unwrap(); + assert_eq!(value["metadata"]["otel_status_code"], json!("OK")); + assert_eq!(value["metadata"]["otel_status_code_2"], json!("existing")); + assert_eq!(value["metadata"]["nested"][0]["a_b"], json!(true)); +} + #[test] fn append_mode_preserves_existing_lines() { let dir = temp_dir("atof-append"); @@ -1301,6 +1335,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::HttpPost, headers: headers.clone(), timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }) .unwrap(); assert_eq!(build_header_map(&headers).unwrap().len(), 1); @@ -1310,6 +1345,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::HttpPost, headers: std::collections::HashMap::new(), timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }; assert!( validate_endpoint_config(&empty_url) @@ -1323,6 +1359,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::HttpPost, headers: std::collections::HashMap::new(), timeout_millis: 0, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }; assert!( validate_endpoint_config(&zero_timeout) @@ -1344,6 +1381,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::Ndjson, headers: bad_header_value, timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }) .unwrap_err() .contains("disabled") @@ -1443,6 +1481,7 @@ fn http_endpoint_worker_disables_invalid_headers_and_drains_control_messages() { transport: AtofEndpointTransport::HttpPost, headers, timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }, rx, ) @@ -1475,6 +1514,7 @@ fn websocket_helpers_cover_invalid_headers_and_timeout_reconnect_path() { transport: AtofEndpointTransport::Websocket, headers, timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }; tokio::runtime::Runtime::new().unwrap().block_on(async { assert!(connect_websocket(&config).await.is_err()); diff --git a/crates/core/tests/unit/observability/plugin_component_tests.rs b/crates/core/tests/unit/observability/plugin_component_tests.rs index 8ea2f9c8..3c20dd1b 100644 --- a/crates/core/tests/unit/observability/plugin_component_tests.rs +++ b/crates/core/tests/unit/observability/plugin_component_tests.rs @@ -477,7 +477,8 @@ fn atof_endpoint_validation_rejects_bad_values() { {"url": "", "transport": "http_post"}, {"url": "http://localhost/events", "transport": "bogus"}, {"url": "http://localhost/events", "transport": "ndjson", "timeout_millis": 0}, - {"url": "not a url", "transport": "http_post"} + {"url": "not a url", "transport": "http_post"}, + {"url": "http://localhost/events", "transport": "http_post", "field_name_policy": "bogus"} ] } }))); @@ -508,6 +509,12 @@ fn atof_endpoint_validation_rejects_bad_values() { .iter() .any(|diag| { diag.field.as_deref() == Some("endpoints[3].url") }) ); + assert!( + report + .diagnostics + .iter() + .any(|diag| { diag.field.as_deref() == Some("endpoints[4].field_name_policy") }) + ); } #[test] @@ -521,6 +528,7 @@ fn build_atof_endpoint_config_maps_headers_timeout_and_rejects_transport() { transport: "websocket".into(), headers: headers.clone(), timeout_millis: 123, + field_name_policy: "replace_dots".into(), }, ) .unwrap(); @@ -532,6 +540,10 @@ fn build_atof_endpoint_config_maps_headers_timeout_and_rejects_transport() { ); assert_eq!(config.headers, headers); assert_eq!(config.timeout_millis, 123); + assert_eq!( + config.field_name_policy, + crate::observability::atof::AtofEndpointFieldNamePolicy::ReplaceDots + ); let error = build_atof_endpoint_config( 3, @@ -540,6 +552,7 @@ fn build_atof_endpoint_config_maps_headers_timeout_and_rejects_transport() { transport: "smtp".into(), headers: std::collections::HashMap::new(), timeout_millis: 3_000, + field_name_policy: "preserve".into(), }, ) .unwrap_err(); diff --git a/crates/node/observability.d.ts b/crates/node/observability.d.ts index f31341d3..f12ede6e 100644 --- a/crates/node/observability.d.ts +++ b/crates/node/observability.d.ts @@ -19,6 +19,7 @@ export interface AtofEndpointConfig { transport?: 'http_post' | 'websocket' | 'ndjson' | string; headers?: Record; timeout_millis?: number; + field_name_policy?: 'preserve' | 'replace_dots' | string; } export interface S3StorageConfig { diff --git a/crates/node/src/api/mod.rs b/crates/node/src/api/mod.rs index 602db870..7f8ccdd1 100644 --- a/crates/node/src/api/mod.rs +++ b/crates/node/src/api/mod.rs @@ -204,6 +204,18 @@ fn build_atof_config( if let Some(timeout_millis) = endpoint.timeout_millis { endpoint_config = endpoint_config.with_timeout_millis(timeout_millis.into()); } + if let Some(field_name_policy) = endpoint.field_name_policy { + let Some(field_name_policy) = + nemo_relay::observability::atof::AtofEndpointFieldNamePolicy::parse( + &field_name_policy, + ) + else { + return Err(napi::Error::from_reason( + "endpoint field_name_policy must be 'preserve' or 'replace_dots'", + )); + }; + endpoint_config = endpoint_config.with_field_name_policy(field_name_policy); + } for (key, value) in parse_string_map(endpoint.headers, "endpoint.headers")? { endpoint_config = endpoint_config.with_header(key, value); } @@ -3072,6 +3084,8 @@ pub struct AtofEndpointConfig { pub headers: Option, /// Per-endpoint timeout in milliseconds. pub timeout_millis: Option, + /// Field name policy applied before sending events. + pub field_name_policy: Option, } /// Filesystem-backed Agent Trajectory Observability Format (ATOF) JSONL event exporter. diff --git a/crates/node/tests/observability_plugin_tests.mjs b/crates/node/tests/observability_plugin_tests.mjs index e999831a..e0741d07 100644 --- a/crates/node/tests/observability_plugin_tests.mjs +++ b/crates/node/tests/observability_plugin_tests.mjs @@ -64,6 +64,7 @@ describe('observability plugin helpers', () => { transport: 'http_post', headers: { 'X-Test': 'yes' }, timeout_millis: 1000, + field_name_policy: 'replace_dots', }, ], }); @@ -74,6 +75,7 @@ describe('observability plugin helpers', () => { transport: 'http_post', headers: { 'X-Test': 'yes' }, timeout_millis: 1000, + field_name_policy: 'replace_dots', }, ]); }); diff --git a/crates/python/src/py_types/observability.rs b/crates/python/src/py_types/observability.rs index 5bdf6208..1f7e33b0 100644 --- a/crates/python/src/py_types/observability.rs +++ b/crates/python/src/py_types/observability.rs @@ -177,6 +177,8 @@ pub struct PyAtofEndpointConfig { pub(crate) headers: HashMap, #[pyo3(get, set)] pub(crate) timeout_millis: u64, + #[pyo3(get, set)] + pub(crate) field_name_policy: String, } impl PyAtofEndpointConfig { @@ -191,6 +193,16 @@ impl PyAtofEndpointConfig { let mut config = nemo_relay::observability::atof::AtofEndpointConfig::new(self.url.clone(), transport) .with_timeout_millis(self.timeout_millis); + let Some(field_name_policy) = + nemo_relay::observability::atof::AtofEndpointFieldNamePolicy::parse( + &self.field_name_policy, + ) + else { + return Err(pyo3::exceptions::PyValueError::new_err( + "endpoint field_name_policy must be 'preserve' or 'replace_dots'", + )); + }; + config = config.with_field_name_policy(field_name_policy); for (key, value) in &self.headers { config = config.with_header(key.clone(), value.clone()); } @@ -201,12 +213,13 @@ impl PyAtofEndpointConfig { #[pymethods] impl PyAtofEndpointConfig { #[new] - #[pyo3(signature = (url, *, transport="http_post".to_string(), headers=None, timeout_millis=3000))] + #[pyo3(signature = (url, *, transport="http_post".to_string(), headers=None, timeout_millis=3000, field_name_policy="preserve".to_string()))] pub(crate) fn new( url: String, transport: String, headers: Option<&Bound<'_, PyAny>>, timeout_millis: u64, + field_name_policy: String, ) -> PyResult { let headers = match headers { Some(headers) if !headers.is_none() => py_string_map(headers, "headers")?, @@ -217,6 +230,7 @@ impl PyAtofEndpointConfig { transport, headers, timeout_millis, + field_name_policy, }) } diff --git a/go/nemo_relay/atof_test.go b/go/nemo_relay/atof_test.go index 96a3bc1f..2eaac784 100644 --- a/go/nemo_relay/atof_test.go +++ b/go/nemo_relay/atof_test.go @@ -30,12 +30,14 @@ func TestNewAtofExporterConfigDefaults(t *testing.T) { t.Fatalf("expected no streaming endpoints by default, got %#v", config.Endpoints) } config.Endpoints = []AtofEndpointConfig{{ - URL: "http://localhost:8080/events", - Transport: AtofEndpointTransportHTTPPost, - Headers: map[string]string{"X-Test": "yes"}, - TimeoutMillis: 1000, + URL: "http://localhost:8080/events", + Transport: AtofEndpointTransportHTTPPost, + Headers: map[string]string{"X-Test": "yes"}, + TimeoutMillis: 1000, + FieldNamePolicy: AtofEndpointFieldNamePolicyReplaceDots, }} - if config.Endpoints[0].Transport != AtofEndpointTransportHTTPPost { + if config.Endpoints[0].Transport != AtofEndpointTransportHTTPPost || + config.Endpoints[0].FieldNamePolicy != AtofEndpointFieldNamePolicyReplaceDots { t.Fatalf("unexpected endpoint config: %#v", config.Endpoints[0]) } } diff --git a/go/nemo_relay/nemo_relay.go b/go/nemo_relay/nemo_relay.go index 869535ae..9799a9a7 100644 --- a/go/nemo_relay/nemo_relay.go +++ b/go/nemo_relay/nemo_relay.go @@ -1658,12 +1658,23 @@ const ( AtofEndpointTransportNDJSON AtofEndpointTransport = "ndjson" ) +// AtofEndpointFieldNamePolicy controls endpoint-local field name transformations. +type AtofEndpointFieldNamePolicy string + +const ( + // AtofEndpointFieldNamePolicyPreserve sends canonical ATOF field names unchanged. + AtofEndpointFieldNamePolicyPreserve AtofEndpointFieldNamePolicy = "preserve" + // AtofEndpointFieldNamePolicyReplaceDots replaces dots in JSON object keys with underscores. + AtofEndpointFieldNamePolicyReplaceDots AtofEndpointFieldNamePolicy = "replace_dots" +) + // AtofEndpointConfig configures one streaming destination for raw ATOF events. type AtofEndpointConfig struct { - URL string `json:"url"` - Transport AtofEndpointTransport `json:"transport,omitempty"` - Headers map[string]string `json:"headers,omitempty"` - TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + URL string `json:"url"` + Transport AtofEndpointTransport `json:"transport,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + FieldNamePolicy AtofEndpointFieldNamePolicy `json:"field_name_policy,omitempty"` } // NewAtofExporterConfig returns a config initialized with native defaults. diff --git a/go/nemo_relay/observability_plugin.go b/go/nemo_relay/observability_plugin.go index a40a8ac5..3660dfe4 100644 --- a/go/nemo_relay/observability_plugin.go +++ b/go/nemo_relay/observability_plugin.go @@ -29,10 +29,11 @@ type ObservabilityAtofConfig struct { // ObservabilityAtofEndpoint configures one streaming destination for raw ATOF events. type ObservabilityAtofEndpoint struct { - URL string `json:"url"` - Transport string `json:"transport,omitempty"` - Headers map[string]string `json:"headers,omitempty"` - TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + URL string `json:"url"` + Transport string `json:"transport,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + FieldNamePolicy string `json:"field_name_policy,omitempty"` } // ObservabilityAtifConfig configures per-top-level-agent ATIF file export. diff --git a/go/nemo_relay/observability_plugin_test.go b/go/nemo_relay/observability_plugin_test.go index e525fbd7..c40a6bbe 100644 --- a/go/nemo_relay/observability_plugin_test.go +++ b/go/nemo_relay/observability_plugin_test.go @@ -36,10 +36,11 @@ func TestObservabilityConfigHelpers(t *testing.T) { t.Fatalf("unexpected ATOF defaults: %#v", atof) } atof.Endpoints = []ObservabilityAtofEndpoint{{ - URL: "http://localhost:8080/events", - Transport: "http_post", - Headers: map[string]string{"X-Test": "yes"}, - TimeoutMillis: 1000, + URL: "http://localhost:8080/events", + Transport: "http_post", + Headers: map[string]string{"X-Test": "yes"}, + TimeoutMillis: 1000, + FieldNamePolicy: "replace_dots", }} atif := NewObservabilityAtifConfig() if atif.Enabled || atif.AgentName != "NeMo Relay" || atif.ModelName != "unknown" || atif.FilenameTemplate != "nemo-relay-atif-{session_id}.json" { @@ -77,9 +78,14 @@ func TestObservabilityConfigHelpers(t *testing.T) { t.Fatalf("expected serialized ATOF config object, got %#v", wrapped.Config) } atofConfig := wrapped.Config["atof"].(map[string]any) - if _, ok := atofConfig["endpoints"].([]any); !ok { + endpoints, ok := atofConfig["endpoints"].([]any) + if !ok { t.Fatalf("expected serialized ATOF endpoints, got %#v", atofConfig) } + firstEndpoint, ok := endpoints[0].(map[string]any) + if !ok || firstEndpoint["field_name_policy"] != "replace_dots" { + t.Fatalf("expected serialized ATOF endpoint field name policy, got %#v", endpoints) + } assertWrappedAtifStorageConfig(t, wrapped.Config["atif"].(map[string]any)) } diff --git a/python/nemo_relay/_native.pyi b/python/nemo_relay/_native.pyi index c30145a8..8aed064a 100644 --- a/python/nemo_relay/_native.pyi +++ b/python/nemo_relay/_native.pyi @@ -768,6 +768,7 @@ class AtofEndpointConfig: transport: str headers: dict[str, str] timeout_millis: int + field_name_policy: str def __init__( self, @@ -776,6 +777,7 @@ class AtofEndpointConfig: transport: str = "http_post", headers: dict[str, str] | None = None, timeout_millis: int = 3000, + field_name_policy: str = "preserve", ) -> None: """Create an ATOF streaming endpoint config. diff --git a/python/nemo_relay/observability.py b/python/nemo_relay/observability.py index a067f4cc..38d90e9c 100644 --- a/python/nemo_relay/observability.py +++ b/python/nemo_relay/observability.py @@ -60,6 +60,7 @@ class AtofEndpointConfig: transport: Literal["http_post", "websocket", "ndjson"] = "http_post" headers: dict[str, str] = field(default_factory=dict) timeout_millis: int = 3000 + field_name_policy: Literal["preserve", "replace_dots"] = "preserve" def to_dict(self) -> JsonObject: """Serialize this ATOF endpoint config to the canonical JSON object shape.""" @@ -69,6 +70,7 @@ def to_dict(self) -> JsonObject: "transport": self.transport, "headers": self.headers, "timeout_millis": self.timeout_millis, + "field_name_policy": self.field_name_policy, } ) diff --git a/python/nemo_relay/observability.pyi b/python/nemo_relay/observability.pyi index fa76d502..11e5b4d6 100644 --- a/python/nemo_relay/observability.pyi +++ b/python/nemo_relay/observability.pyi @@ -23,6 +23,7 @@ class AtofEndpointConfig: transport: Literal["http_post", "websocket", "ndjson"] = ... headers: dict[str, str] = field(default_factory=dict) timeout_millis: int = ... + field_name_policy: Literal["preserve", "replace_dots"] = ... def to_dict(self) -> JsonObject: ... @dataclass(slots=True) diff --git a/python/tests/test_observability_plugin.py b/python/tests/test_observability_plugin.py index ace96b1a..7dd34841 100644 --- a/python/tests/test_observability_plugin.py +++ b/python/tests/test_observability_plugin.py @@ -103,12 +103,14 @@ def test_atof_endpoint_config_serializes_streaming_fields(self): transport="http_post", headers={"X-Test": "yes"}, timeout_millis=1000, + field_name_policy="replace_dots", ) assert endpoint.to_dict() == { "url": "http://localhost:8080/events", "transport": "http_post", "headers": {"X-Test": "yes"}, "timeout_millis": 1000, + "field_name_policy": "replace_dots", } assert AtofConfig(endpoints=[endpoint]).to_dict()["endpoints"] == [endpoint.to_dict()] diff --git a/python/tests/test_types.py b/python/tests/test_types.py index 14f5541d..21965ed7 100644 --- a/python/tests/test_types.py +++ b/python/tests/test_types.py @@ -469,6 +469,7 @@ def test_config_defaults_mutation_and_repr(self, tmp_path): transport="http_post", headers={"X-Test": "yes"}, timeout_millis=1000, + field_name_policy="replace_dots", ) config.endpoints = [endpoint] @@ -479,6 +480,7 @@ def test_config_defaults_mutation_and_repr(self, tmp_path): assert config.endpoints[0].transport == "http_post" assert config.endpoints[0].headers == {"X-Test": "yes"} assert config.endpoints[0].timeout_millis == 1000 + assert config.endpoints[0].field_name_policy == "replace_dots" def test_exporter_lifecycle_writes_raw_jsonl_events(self, tmp_path): config = AtofExporterConfig() From 41d2980ab9d3d55a15f0473c050858d5cd931626 Mon Sep 17 00:00:00 2001 From: Bryan Bednarski Date: Tue, 23 Jun 2026 16:26:12 -0700 Subject: [PATCH 2/2] test: cover ATOF endpoint field policy branches Signed-off-by: Bryan Bednarski --- .../tests/unit/observability/atof_tests.rs | 37 +++++++++++++++++++ .../observability/plugin_component_tests.rs | 20 ++++++++++ crates/node/tests/atof_tests.mjs | 8 ++++ go/nemo_relay/atof_test.go | 7 ++++ go/nemo_relay/observability_plugin_test.go | 7 ++++ python/tests/test_types.py | 13 +++++++ 6 files changed, 92 insertions(+) diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index cfdd7800..4d271264 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -420,6 +420,43 @@ fn endpoint_field_name_policy_replaces_dots_recursively() { assert_eq!(value["metadata"]["nested"][0]["a_b"], json!(true)); } +#[test] +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn endpoint_field_name_policy_preserves_raw_json_and_falls_back_for_invalid_json() { + let preserve = + AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost); + let raw = "{\"metadata\":{\"otel.status_code\":\"OK\"}}"; + assert_eq!(endpoint_event_json(&preserve, raw.into()), raw); + + let replace = + AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost) + .with_field_name_policy(AtofEndpointFieldNamePolicy::ReplaceDots); + assert_eq!(endpoint_event_json(&replace, "not-json".into()), "not-json"); +} + +#[test] +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn endpoint_http_helper_edges_are_safe() { + install_rustls_crypto_provider(); + assert_eq!( + AtofEndpointFieldNamePolicy::parse("unknown"), + None, + "unknown field name policies should be rejected" + ); + assert_eq!(truncate_log_body(" short body "), "short body"); + + let long_body = "é".repeat(1_025); + let truncated = truncate_log_body(&long_body); + assert!(truncated.ends_with("... ")); + assert_eq!( + truncated + .trim_end_matches("... ") + .chars() + .count(), + 1_024 + ); +} + #[test] fn append_mode_preserves_existing_lines() { let dir = temp_dir("atof-append"); diff --git a/crates/core/tests/unit/observability/plugin_component_tests.rs b/crates/core/tests/unit/observability/plugin_component_tests.rs index 3c20dd1b..ea230224 100644 --- a/crates/core/tests/unit/observability/plugin_component_tests.rs +++ b/crates/core/tests/unit/observability/plugin_component_tests.rs @@ -177,6 +177,13 @@ fn default_config_and_component_conversion_cover_public_shape() { assert!(atof.output_directory.is_none()); assert!(atof.filename.is_none()); + let parsed_atof: AtofSectionConfig = serde_json::from_value(json!({ + "endpoints": [{"url": "http://localhost/events"}] + })) + .unwrap(); + assert_eq!(parsed_atof.endpoints[0].transport, "http_post"); + assert_eq!(parsed_atof.endpoints[0].field_name_policy, "preserve"); + let atif = AtifSectionConfig::default(); assert!(!atif.enabled); assert_eq!(atif.agent_name, "NeMo Relay"); @@ -557,6 +564,19 @@ fn build_atof_endpoint_config_maps_headers_timeout_and_rejects_transport() { ) .unwrap_err(); assert!(error.to_string().contains("endpoints[3].transport")); + + let error = build_atof_endpoint_config( + 4, + AtofEndpointSectionConfig { + url: "http://127.0.0.1:47632/events".into(), + transport: "http_post".into(), + headers: std::collections::HashMap::new(), + timeout_millis: 3_000, + field_name_policy: "bogus".into(), + }, + ) + .unwrap_err(); + assert!(error.to_string().contains("endpoints[4].field_name_policy")); } #[test] diff --git a/crates/node/tests/atof_tests.mjs b/crates/node/tests/atof_tests.mjs index bd396603..00bda759 100644 --- a/crates/node/tests/atof_tests.mjs +++ b/crates/node/tests/atof_tests.mjs @@ -38,6 +38,14 @@ describe('AtofExporter', () => { }), /endpoint transport/i, ); + assert.throws( + () => + new AtofExporter({ + outputDirectory: tempDir('node-atof-invalid-field-policy'), + endpoints: [{ url: 'http://localhost:8080/events', fieldNamePolicy: 'bogus' }], + }), + /field_name_policy/i, + ); }); it('writes raw ATOF JSONL events and supports lifecycle methods', () => { diff --git a/go/nemo_relay/atof_test.go b/go/nemo_relay/atof_test.go index 2eaac784..2ec7a3da 100644 --- a/go/nemo_relay/atof_test.go +++ b/go/nemo_relay/atof_test.go @@ -40,6 +40,13 @@ func TestNewAtofExporterConfigDefaults(t *testing.T) { config.Endpoints[0].FieldNamePolicy != AtofEndpointFieldNamePolicyReplaceDots { t.Fatalf("unexpected endpoint config: %#v", config.Endpoints[0]) } + serialized, err := json.Marshal(config) + if err != nil { + t.Fatalf("marshal config failed: %v", err) + } + if !strings.Contains(string(serialized), `"field_name_policy":"replace_dots"`) { + t.Fatalf("expected field_name_policy in serialized config, got %s", serialized) + } } func TestAtofExporterLifecycleWritesRawJSONL(t *testing.T) { diff --git a/go/nemo_relay/observability_plugin_test.go b/go/nemo_relay/observability_plugin_test.go index c40a6bbe..0ad8e306 100644 --- a/go/nemo_relay/observability_plugin_test.go +++ b/go/nemo_relay/observability_plugin_test.go @@ -86,6 +86,13 @@ func TestObservabilityConfigHelpers(t *testing.T) { if !ok || firstEndpoint["field_name_policy"] != "replace_dots" { t.Fatalf("expected serialized ATOF endpoint field name policy, got %#v", endpoints) } + serialized, err := json.Marshal(wrapped) + if err != nil { + t.Fatalf("marshal observability component failed: %v", err) + } + if !strings.Contains(string(serialized), `"field_name_policy":"replace_dots"`) { + t.Fatalf("expected field_name_policy in serialized component, got %s", serialized) + } assertWrappedAtifStorageConfig(t, wrapped.Config["atif"].(map[string]any)) } diff --git a/python/tests/test_types.py b/python/tests/test_types.py index 21965ed7..7cf13d3c 100644 --- a/python/tests/test_types.py +++ b/python/tests/test_types.py @@ -482,6 +482,19 @@ def test_config_defaults_mutation_and_repr(self, tmp_path): assert config.endpoints[0].timeout_millis == 1000 assert config.endpoints[0].field_name_policy == "replace_dots" + def test_endpoint_field_name_policy_is_validated(self, tmp_path): + config = AtofExporterConfig() + config.output_directory = str(tmp_path) + config.endpoints = [ + AtofEndpointConfig( + "http://localhost:8080/events", + field_name_policy="bogus", # type: ignore[arg-type] + ) + ] + + with pytest.raises(ValueError, match="field_name_policy"): + AtofExporter(config) + def test_exporter_lifecycle_writes_raw_jsonl_events(self, tmp_path): config = AtofExporterConfig() config.output_directory = str(tmp_path)