diff --git a/crates/core/src/observability/atof.rs b/crates/core/src/observability/atof.rs index 221c10ce..85416276 100644 --- a/crates/core/src/observability/atof.rs +++ b/crates/core/src/observability/atof.rs @@ -115,6 +115,36 @@ pub enum AtofEndpointTransport { Ndjson, } +/// Field name transformation policy used before sending events to an endpoint. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AtofEndpointFieldNamePolicy { + /// Preserve canonical ATOF field names exactly. + #[default] + Preserve, + /// Replace dots in JSON object keys with underscores, recursively. + ReplaceDots, +} + +impl AtofEndpointFieldNamePolicy { + /// Parse a string policy used by configuration and bindings. + pub fn parse(value: &str) -> Option { + match value { + "preserve" => Some(Self::Preserve), + "replace_dots" => Some(Self::ReplaceDots), + _ => None, + } + } + + /// Return the stable string representation used by configuration and bindings. + pub fn as_str(self) -> &'static str { + match self { + Self::Preserve => "preserve", + Self::ReplaceDots => "replace_dots", + } + } +} + impl AtofEndpointTransport { /// Parse a string transport used by configuration and bindings. pub fn parse(value: &str) -> Option { @@ -150,6 +180,9 @@ pub struct AtofEndpointConfig { /// Per-endpoint timeout in milliseconds. #[serde(default = "default_endpoint_timeout_millis")] pub timeout_millis: u64, + /// Field name transformation policy applied before sending events. + #[serde(default)] + pub field_name_policy: AtofEndpointFieldNamePolicy, } impl AtofEndpointConfig { @@ -160,6 +193,7 @@ impl AtofEndpointConfig { transport, headers: HashMap::new(), timeout_millis: default_endpoint_timeout_millis(), + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, } } @@ -174,6 +208,15 @@ impl AtofEndpointConfig { self.timeout_millis = timeout_millis; self } + + /// Override the endpoint field name policy. + pub fn with_field_name_policy( + mut self, + field_name_policy: AtofEndpointFieldNamePolicy, + ) -> Self { + self.field_name_policy = field_name_policy; + self + } } /// Configuration for [`AtofExporter`]. @@ -564,6 +607,7 @@ fn run_endpoint_worker( config: AtofEndpointConfig, rx: tokio::sync::mpsc::UnboundedReceiver, ) { + install_rustls_crypto_provider(); let runtime = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -583,6 +627,11 @@ fn run_endpoint_worker( }); } +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn install_rustls_crypto_provider() { + let _ = rustls::crypto::ring::default_provider().install_default(); +} + #[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] async fn run_http_post_endpoint( index: usize, @@ -611,7 +660,7 @@ async fn run_http_post_endpoint( while let Some(message) = rx.recv().await { match message { EndpointMessage::Event(raw_json) => { - let body = format!("{raw_json}\n"); + let body = format!("{}\n", endpoint_event_json(&config, raw_json)); let result = client .post(&config.url) .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson") @@ -620,10 +669,7 @@ async fn run_http_post_endpoint( .await; match result { Ok(response) if response.status().is_success() => {} - Ok(response) => eprintln!( - "nemo_relay: ATOF endpoint[{index}] HTTP status {}", - response.status() - ), + Ok(response) => log_http_error(index, "HTTP", response).await, Err(error) => { eprintln!("nemo_relay: ATOF endpoint[{index}] send failed: {error}") } @@ -657,7 +703,7 @@ async fn run_websocket_endpoint( while let Some(message) = rx.recv().await { match message { EndpointMessage::Event(raw_json) => { - pending.push_back(raw_json); + pending.push_back(endpoint_event_json(&config, raw_json)); let _ = drain_websocket_pending(index, &config, &mut socket, &mut pending).await; } EndpointMessage::Flush(done) => { @@ -788,9 +834,10 @@ async fn run_ndjson_endpoint( }; let (body_tx, body) = ndjson_body_channel(); + let url = config.url.clone(); let request = tokio::spawn(async move { client - .post(config.url) + .post(url) .header(reqwest::header::CONTENT_TYPE, "application/x-ndjson") .body(body) .send() @@ -800,7 +847,9 @@ async fn run_ndjson_endpoint( while let Some(message) = rx.recv().await { match message { - EndpointMessage::Event(raw_json) => send_ndjson_event(index, &body_tx, raw_json), + EndpointMessage::Event(raw_json) => { + send_ndjson_event(index, &body_tx, endpoint_event_json(&config, raw_json)) + } EndpointMessage::Flush(done) => send_ndjson_flush(index, &body_tx, done), EndpointMessage::Close(done) => { drop(body_tx); @@ -879,10 +928,7 @@ async fn finish_ndjson_upload( ) { match tokio::time::timeout(close_timeout, request).await { Ok(Ok(Ok(response))) if response.status().is_success() => {} - Ok(Ok(Ok(response))) => eprintln!( - "nemo_relay: ATOF endpoint[{index}] NDJSON HTTP status {}", - response.status() - ), + Ok(Ok(Ok(response))) => log_http_error(index, "NDJSON HTTP", response).await, Ok(Ok(Err(error))) => { eprintln!("nemo_relay: ATOF endpoint[{index}] NDJSON upload failed: {error}") } @@ -910,6 +956,95 @@ async fn drain_closed(mut rx: tokio::sync::mpsc::UnboundedReceiver String { + match config.field_name_policy { + AtofEndpointFieldNamePolicy::Preserve => raw_json, + AtofEndpointFieldNamePolicy::ReplaceDots => replace_dotted_field_names(&raw_json), + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn replace_dotted_field_names(raw_json: &str) -> String { + let Ok(mut value) = serde_json::from_str::(raw_json) else { + return raw_json.to_string(); + }; + replace_dotted_value_keys(&mut value); + serde_json::to_string(&value).unwrap_or_else(|_| raw_json.to_string()) +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn replace_dotted_value_keys(value: &mut Json) { + match value { + Json::Object(object) => replace_dotted_object_keys(object), + Json::Array(items) => { + for item in items { + replace_dotted_value_keys(item); + } + } + _ => {} + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn replace_dotted_object_keys(object: &mut serde_json::Map) { + let mut old = std::mem::take(object) + .into_iter() + .map(|(key, mut value)| { + replace_dotted_value_keys(&mut value); + (key, value) + }) + .collect::>(); + old.sort_by_key(|(key, _)| !key.contains('.')); + + for (key, value) in old { + let sanitized_key = key.replace('.', "_"); + let final_key = collision_free_key(object, sanitized_key); + object.insert(final_key, value); + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn collision_free_key(object: &serde_json::Map, key: String) -> String { + if !object.contains_key(&key) { + return key; + } + for suffix in 2.. { + let candidate = format!("{key}_{suffix}"); + if !object.contains_key(&candidate) { + return candidate; + } + } + unreachable!("unbounded suffix search must find a key") +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +async fn log_http_error(index: usize, label: &str, response: reqwest::Response) { + let status = response.status(); + match response.text().await { + Ok(body) if !body.trim().is_empty() => eprintln!( + "nemo_relay: ATOF endpoint[{index}] {label} status {status}: {}", + truncate_log_body(&body) + ), + Ok(_) => eprintln!("nemo_relay: ATOF endpoint[{index}] {label} status {status}"), + Err(error) => eprintln!( + "nemo_relay: ATOF endpoint[{index}] {label} status {status}; failed to read response body: {error}" + ), + } +} + +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn truncate_log_body(body: &str) -> String { + const LIMIT: usize = 1024; + let trimmed = body.trim(); + if trimmed.chars().count() <= LIMIT { + return trimmed.to_string(); + } + let mut truncated = trimmed.chars().take(LIMIT).collect::(); + truncated.push_str("... "); + truncated +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/crates/core/src/observability/plugin_component.rs b/crates/core/src/observability/plugin_component.rs index c4d4fd85..a19016ae 100644 --- a/crates/core/src/observability/plugin_component.rs +++ b/crates/core/src/observability/plugin_component.rs @@ -37,8 +37,9 @@ use crate::api::subscriber::{ use crate::error::FlowError; use crate::observability::atif::{AtifAgentInfo, AtifExporter}; use crate::observability::atof::{ - AtofEndpointConfig as CoreAtofEndpointConfig, AtofEndpointTransport, AtofExporter, - AtofExporterConfig as CoreAtofExporterConfig, AtofExporterMode, + AtofEndpointConfig as CoreAtofEndpointConfig, AtofEndpointFieldNamePolicy, + AtofEndpointTransport, AtofExporter, AtofExporterConfig as CoreAtofExporterConfig, + AtofExporterMode, }; #[cfg(feature = "openinference")] use crate::observability::openinference::{ @@ -199,6 +200,9 @@ pub struct AtofEndpointSectionConfig { /// Per-endpoint timeout in milliseconds. #[serde(default = "default_timeout_millis")] pub timeout_millis: u64, + /// Field name policy applied before sending events: `preserve` or `replace_dots`. + #[serde(default = "default_atof_endpoint_field_name_policy")] + pub field_name_policy: String, } /// Per-trajectory ATIF exporter config. @@ -660,8 +664,15 @@ fn build_atof_endpoint_config( "ATOF endpoints[{index}].transport must be 'http_post', 'websocket', or 'ndjson'" )) })?; + let field_name_policy = AtofEndpointFieldNamePolicy::parse(&endpoint.field_name_policy) + .ok_or_else(|| { + PluginError::InvalidConfig(format!( + "ATOF endpoints[{index}].field_name_policy must be 'preserve' or 'replace_dots'" + )) + })?; let mut config = CoreAtofEndpointConfig::new(endpoint.url, transport) - .with_timeout_millis(endpoint.timeout_millis); + .with_timeout_millis(endpoint.timeout_millis) + .with_field_name_policy(field_name_policy); for (key, value) in endpoint.headers { config = config.with_header(key, value); } @@ -1848,6 +1859,18 @@ fn validate_atof_endpoint_values( format!("ATOF endpoints[{index}].timeout_millis must be greater than 0"), ); } + if AtofEndpointFieldNamePolicy::parse(&endpoint.field_name_policy).is_none() { + push_policy_diag( + diagnostics, + policy.unsupported_value, + "observability.unsupported_value", + Some("atof".to_string()), + Some(format!("endpoints[{index}].field_name_policy")), + format!( + "ATOF endpoints[{index}].field_name_policy must be 'preserve' or 'replace_dots'" + ), + ); + } } #[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] @@ -2176,6 +2199,10 @@ fn default_atof_endpoint_transport() -> String { "http_post".to_string() } +fn default_atof_endpoint_field_name_policy() -> String { + "preserve".to_string() +} + fn default_agent_name() -> String { "NeMo Relay".to_string() } diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 9ec583dc..4d271264 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -370,7 +370,8 @@ fn endpoint_and_exporter_config_builders_preserve_values() { let endpoint = AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost) .with_header("x-test", "enabled") - .with_timeout_millis(42); + .with_timeout_millis(42) + .with_field_name_policy(AtofEndpointFieldNamePolicy::ReplaceDots); let config = AtofExporterConfig::new() .with_output_directory(&dir) .with_mode(AtofExporterMode::Overwrite) @@ -382,10 +383,80 @@ fn endpoint_and_exporter_config_builders_preserve_values() { Some("enabled") ); assert_eq!(endpoint.timeout_millis, 42); + assert_eq!( + endpoint.field_name_policy, + AtofEndpointFieldNamePolicy::ReplaceDots + ); + assert_eq!(AtofEndpointFieldNamePolicy::Preserve.as_str(), "preserve"); + assert_eq!( + AtofEndpointFieldNamePolicy::parse("replace_dots"), + Some(AtofEndpointFieldNamePolicy::ReplaceDots) + ); assert_eq!(config.path(), dir.join("custom.jsonl")); assert_eq!(config.endpoints, vec![endpoint]); } +#[test] +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn endpoint_field_name_policy_replaces_dots_recursively() { + let config = + AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost) + .with_field_name_policy(AtofEndpointFieldNamePolicy::ReplaceDots); + let transformed = endpoint_event_json( + &config, + json!({ + "kind": "scope", + "metadata": { + "otel_status_code": "existing", + "otel.status_code": "OK", + "nested": [{"a.b": true}] + } + }) + .to_string(), + ); + let value: Json = serde_json::from_str(&transformed).unwrap(); + assert_eq!(value["metadata"]["otel_status_code"], json!("OK")); + assert_eq!(value["metadata"]["otel_status_code_2"], json!("existing")); + assert_eq!(value["metadata"]["nested"][0]["a_b"], json!(true)); +} + +#[test] +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn endpoint_field_name_policy_preserves_raw_json_and_falls_back_for_invalid_json() { + let preserve = + AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost); + let raw = "{\"metadata\":{\"otel.status_code\":\"OK\"}}"; + assert_eq!(endpoint_event_json(&preserve, raw.into()), raw); + + let replace = + AtofEndpointConfig::new("http://127.0.0.1:9/events", AtofEndpointTransport::HttpPost) + .with_field_name_policy(AtofEndpointFieldNamePolicy::ReplaceDots); + assert_eq!(endpoint_event_json(&replace, "not-json".into()), "not-json"); +} + +#[test] +#[cfg(all(feature = "atof-streaming", not(target_arch = "wasm32")))] +fn endpoint_http_helper_edges_are_safe() { + install_rustls_crypto_provider(); + assert_eq!( + AtofEndpointFieldNamePolicy::parse("unknown"), + None, + "unknown field name policies should be rejected" + ); + assert_eq!(truncate_log_body(" short body "), "short body"); + + let long_body = "é".repeat(1_025); + let truncated = truncate_log_body(&long_body); + assert!(truncated.ends_with("... ")); + assert_eq!( + truncated + .trim_end_matches("... ") + .chars() + .count(), + 1_024 + ); +} + #[test] fn append_mode_preserves_existing_lines() { let dir = temp_dir("atof-append"); @@ -1301,6 +1372,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::HttpPost, headers: headers.clone(), timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }) .unwrap(); assert_eq!(build_header_map(&headers).unwrap().len(), 1); @@ -1310,6 +1382,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::HttpPost, headers: std::collections::HashMap::new(), timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }; assert!( validate_endpoint_config(&empty_url) @@ -1323,6 +1396,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::HttpPost, headers: std::collections::HashMap::new(), timeout_millis: 0, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }; assert!( validate_endpoint_config(&zero_timeout) @@ -1344,6 +1418,7 @@ fn endpoint_validation_rejects_empty_timeout_and_invalid_headers() { transport: AtofEndpointTransport::Ndjson, headers: bad_header_value, timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }) .unwrap_err() .contains("disabled") @@ -1443,6 +1518,7 @@ fn http_endpoint_worker_disables_invalid_headers_and_drains_control_messages() { transport: AtofEndpointTransport::HttpPost, headers, timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }, rx, ) @@ -1475,6 +1551,7 @@ fn websocket_helpers_cover_invalid_headers_and_timeout_reconnect_path() { transport: AtofEndpointTransport::Websocket, headers, timeout_millis: 1, + field_name_policy: AtofEndpointFieldNamePolicy::Preserve, }; tokio::runtime::Runtime::new().unwrap().block_on(async { assert!(connect_websocket(&config).await.is_err()); diff --git a/crates/core/tests/unit/observability/plugin_component_tests.rs b/crates/core/tests/unit/observability/plugin_component_tests.rs index 8ea2f9c8..ea230224 100644 --- a/crates/core/tests/unit/observability/plugin_component_tests.rs +++ b/crates/core/tests/unit/observability/plugin_component_tests.rs @@ -177,6 +177,13 @@ fn default_config_and_component_conversion_cover_public_shape() { assert!(atof.output_directory.is_none()); assert!(atof.filename.is_none()); + let parsed_atof: AtofSectionConfig = serde_json::from_value(json!({ + "endpoints": [{"url": "http://localhost/events"}] + })) + .unwrap(); + assert_eq!(parsed_atof.endpoints[0].transport, "http_post"); + assert_eq!(parsed_atof.endpoints[0].field_name_policy, "preserve"); + let atif = AtifSectionConfig::default(); assert!(!atif.enabled); assert_eq!(atif.agent_name, "NeMo Relay"); @@ -477,7 +484,8 @@ fn atof_endpoint_validation_rejects_bad_values() { {"url": "", "transport": "http_post"}, {"url": "http://localhost/events", "transport": "bogus"}, {"url": "http://localhost/events", "transport": "ndjson", "timeout_millis": 0}, - {"url": "not a url", "transport": "http_post"} + {"url": "not a url", "transport": "http_post"}, + {"url": "http://localhost/events", "transport": "http_post", "field_name_policy": "bogus"} ] } }))); @@ -508,6 +516,12 @@ fn atof_endpoint_validation_rejects_bad_values() { .iter() .any(|diag| { diag.field.as_deref() == Some("endpoints[3].url") }) ); + assert!( + report + .diagnostics + .iter() + .any(|diag| { diag.field.as_deref() == Some("endpoints[4].field_name_policy") }) + ); } #[test] @@ -521,6 +535,7 @@ fn build_atof_endpoint_config_maps_headers_timeout_and_rejects_transport() { transport: "websocket".into(), headers: headers.clone(), timeout_millis: 123, + field_name_policy: "replace_dots".into(), }, ) .unwrap(); @@ -532,6 +547,10 @@ fn build_atof_endpoint_config_maps_headers_timeout_and_rejects_transport() { ); assert_eq!(config.headers, headers); assert_eq!(config.timeout_millis, 123); + assert_eq!( + config.field_name_policy, + crate::observability::atof::AtofEndpointFieldNamePolicy::ReplaceDots + ); let error = build_atof_endpoint_config( 3, @@ -540,10 +559,24 @@ fn build_atof_endpoint_config_maps_headers_timeout_and_rejects_transport() { transport: "smtp".into(), headers: std::collections::HashMap::new(), timeout_millis: 3_000, + field_name_policy: "preserve".into(), }, ) .unwrap_err(); assert!(error.to_string().contains("endpoints[3].transport")); + + let error = build_atof_endpoint_config( + 4, + AtofEndpointSectionConfig { + url: "http://127.0.0.1:47632/events".into(), + transport: "http_post".into(), + headers: std::collections::HashMap::new(), + timeout_millis: 3_000, + field_name_policy: "bogus".into(), + }, + ) + .unwrap_err(); + assert!(error.to_string().contains("endpoints[4].field_name_policy")); } #[test] diff --git a/crates/node/observability.d.ts b/crates/node/observability.d.ts index f31341d3..f12ede6e 100644 --- a/crates/node/observability.d.ts +++ b/crates/node/observability.d.ts @@ -19,6 +19,7 @@ export interface AtofEndpointConfig { transport?: 'http_post' | 'websocket' | 'ndjson' | string; headers?: Record; timeout_millis?: number; + field_name_policy?: 'preserve' | 'replace_dots' | string; } export interface S3StorageConfig { diff --git a/crates/node/src/api/mod.rs b/crates/node/src/api/mod.rs index 602db870..7f8ccdd1 100644 --- a/crates/node/src/api/mod.rs +++ b/crates/node/src/api/mod.rs @@ -204,6 +204,18 @@ fn build_atof_config( if let Some(timeout_millis) = endpoint.timeout_millis { endpoint_config = endpoint_config.with_timeout_millis(timeout_millis.into()); } + if let Some(field_name_policy) = endpoint.field_name_policy { + let Some(field_name_policy) = + nemo_relay::observability::atof::AtofEndpointFieldNamePolicy::parse( + &field_name_policy, + ) + else { + return Err(napi::Error::from_reason( + "endpoint field_name_policy must be 'preserve' or 'replace_dots'", + )); + }; + endpoint_config = endpoint_config.with_field_name_policy(field_name_policy); + } for (key, value) in parse_string_map(endpoint.headers, "endpoint.headers")? { endpoint_config = endpoint_config.with_header(key, value); } @@ -3072,6 +3084,8 @@ pub struct AtofEndpointConfig { pub headers: Option, /// Per-endpoint timeout in milliseconds. pub timeout_millis: Option, + /// Field name policy applied before sending events. + pub field_name_policy: Option, } /// Filesystem-backed Agent Trajectory Observability Format (ATOF) JSONL event exporter. diff --git a/crates/node/tests/atof_tests.mjs b/crates/node/tests/atof_tests.mjs index bd396603..00bda759 100644 --- a/crates/node/tests/atof_tests.mjs +++ b/crates/node/tests/atof_tests.mjs @@ -38,6 +38,14 @@ describe('AtofExporter', () => { }), /endpoint transport/i, ); + assert.throws( + () => + new AtofExporter({ + outputDirectory: tempDir('node-atof-invalid-field-policy'), + endpoints: [{ url: 'http://localhost:8080/events', fieldNamePolicy: 'bogus' }], + }), + /field_name_policy/i, + ); }); it('writes raw ATOF JSONL events and supports lifecycle methods', () => { diff --git a/crates/node/tests/observability_plugin_tests.mjs b/crates/node/tests/observability_plugin_tests.mjs index e999831a..e0741d07 100644 --- a/crates/node/tests/observability_plugin_tests.mjs +++ b/crates/node/tests/observability_plugin_tests.mjs @@ -64,6 +64,7 @@ describe('observability plugin helpers', () => { transport: 'http_post', headers: { 'X-Test': 'yes' }, timeout_millis: 1000, + field_name_policy: 'replace_dots', }, ], }); @@ -74,6 +75,7 @@ describe('observability plugin helpers', () => { transport: 'http_post', headers: { 'X-Test': 'yes' }, timeout_millis: 1000, + field_name_policy: 'replace_dots', }, ]); }); diff --git a/crates/python/src/py_types/observability.rs b/crates/python/src/py_types/observability.rs index 5bdf6208..1f7e33b0 100644 --- a/crates/python/src/py_types/observability.rs +++ b/crates/python/src/py_types/observability.rs @@ -177,6 +177,8 @@ pub struct PyAtofEndpointConfig { pub(crate) headers: HashMap, #[pyo3(get, set)] pub(crate) timeout_millis: u64, + #[pyo3(get, set)] + pub(crate) field_name_policy: String, } impl PyAtofEndpointConfig { @@ -191,6 +193,16 @@ impl PyAtofEndpointConfig { let mut config = nemo_relay::observability::atof::AtofEndpointConfig::new(self.url.clone(), transport) .with_timeout_millis(self.timeout_millis); + let Some(field_name_policy) = + nemo_relay::observability::atof::AtofEndpointFieldNamePolicy::parse( + &self.field_name_policy, + ) + else { + return Err(pyo3::exceptions::PyValueError::new_err( + "endpoint field_name_policy must be 'preserve' or 'replace_dots'", + )); + }; + config = config.with_field_name_policy(field_name_policy); for (key, value) in &self.headers { config = config.with_header(key.clone(), value.clone()); } @@ -201,12 +213,13 @@ impl PyAtofEndpointConfig { #[pymethods] impl PyAtofEndpointConfig { #[new] - #[pyo3(signature = (url, *, transport="http_post".to_string(), headers=None, timeout_millis=3000))] + #[pyo3(signature = (url, *, transport="http_post".to_string(), headers=None, timeout_millis=3000, field_name_policy="preserve".to_string()))] pub(crate) fn new( url: String, transport: String, headers: Option<&Bound<'_, PyAny>>, timeout_millis: u64, + field_name_policy: String, ) -> PyResult { let headers = match headers { Some(headers) if !headers.is_none() => py_string_map(headers, "headers")?, @@ -217,6 +230,7 @@ impl PyAtofEndpointConfig { transport, headers, timeout_millis, + field_name_policy, }) } diff --git a/go/nemo_relay/atof_test.go b/go/nemo_relay/atof_test.go index 96a3bc1f..2ec7a3da 100644 --- a/go/nemo_relay/atof_test.go +++ b/go/nemo_relay/atof_test.go @@ -30,14 +30,23 @@ func TestNewAtofExporterConfigDefaults(t *testing.T) { t.Fatalf("expected no streaming endpoints by default, got %#v", config.Endpoints) } config.Endpoints = []AtofEndpointConfig{{ - URL: "http://localhost:8080/events", - Transport: AtofEndpointTransportHTTPPost, - Headers: map[string]string{"X-Test": "yes"}, - TimeoutMillis: 1000, + URL: "http://localhost:8080/events", + Transport: AtofEndpointTransportHTTPPost, + Headers: map[string]string{"X-Test": "yes"}, + TimeoutMillis: 1000, + FieldNamePolicy: AtofEndpointFieldNamePolicyReplaceDots, }} - if config.Endpoints[0].Transport != AtofEndpointTransportHTTPPost { + if config.Endpoints[0].Transport != AtofEndpointTransportHTTPPost || + config.Endpoints[0].FieldNamePolicy != AtofEndpointFieldNamePolicyReplaceDots { t.Fatalf("unexpected endpoint config: %#v", config.Endpoints[0]) } + serialized, err := json.Marshal(config) + if err != nil { + t.Fatalf("marshal config failed: %v", err) + } + if !strings.Contains(string(serialized), `"field_name_policy":"replace_dots"`) { + t.Fatalf("expected field_name_policy in serialized config, got %s", serialized) + } } func TestAtofExporterLifecycleWritesRawJSONL(t *testing.T) { diff --git a/go/nemo_relay/nemo_relay.go b/go/nemo_relay/nemo_relay.go index 869535ae..9799a9a7 100644 --- a/go/nemo_relay/nemo_relay.go +++ b/go/nemo_relay/nemo_relay.go @@ -1658,12 +1658,23 @@ const ( AtofEndpointTransportNDJSON AtofEndpointTransport = "ndjson" ) +// AtofEndpointFieldNamePolicy controls endpoint-local field name transformations. +type AtofEndpointFieldNamePolicy string + +const ( + // AtofEndpointFieldNamePolicyPreserve sends canonical ATOF field names unchanged. + AtofEndpointFieldNamePolicyPreserve AtofEndpointFieldNamePolicy = "preserve" + // AtofEndpointFieldNamePolicyReplaceDots replaces dots in JSON object keys with underscores. + AtofEndpointFieldNamePolicyReplaceDots AtofEndpointFieldNamePolicy = "replace_dots" +) + // AtofEndpointConfig configures one streaming destination for raw ATOF events. type AtofEndpointConfig struct { - URL string `json:"url"` - Transport AtofEndpointTransport `json:"transport,omitempty"` - Headers map[string]string `json:"headers,omitempty"` - TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + URL string `json:"url"` + Transport AtofEndpointTransport `json:"transport,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + FieldNamePolicy AtofEndpointFieldNamePolicy `json:"field_name_policy,omitempty"` } // NewAtofExporterConfig returns a config initialized with native defaults. diff --git a/go/nemo_relay/observability_plugin.go b/go/nemo_relay/observability_plugin.go index a40a8ac5..3660dfe4 100644 --- a/go/nemo_relay/observability_plugin.go +++ b/go/nemo_relay/observability_plugin.go @@ -29,10 +29,11 @@ type ObservabilityAtofConfig struct { // ObservabilityAtofEndpoint configures one streaming destination for raw ATOF events. type ObservabilityAtofEndpoint struct { - URL string `json:"url"` - Transport string `json:"transport,omitempty"` - Headers map[string]string `json:"headers,omitempty"` - TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + URL string `json:"url"` + Transport string `json:"transport,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + TimeoutMillis uint64 `json:"timeout_millis,omitempty"` + FieldNamePolicy string `json:"field_name_policy,omitempty"` } // ObservabilityAtifConfig configures per-top-level-agent ATIF file export. diff --git a/go/nemo_relay/observability_plugin_test.go b/go/nemo_relay/observability_plugin_test.go index e525fbd7..0ad8e306 100644 --- a/go/nemo_relay/observability_plugin_test.go +++ b/go/nemo_relay/observability_plugin_test.go @@ -36,10 +36,11 @@ func TestObservabilityConfigHelpers(t *testing.T) { t.Fatalf("unexpected ATOF defaults: %#v", atof) } atof.Endpoints = []ObservabilityAtofEndpoint{{ - URL: "http://localhost:8080/events", - Transport: "http_post", - Headers: map[string]string{"X-Test": "yes"}, - TimeoutMillis: 1000, + URL: "http://localhost:8080/events", + Transport: "http_post", + Headers: map[string]string{"X-Test": "yes"}, + TimeoutMillis: 1000, + FieldNamePolicy: "replace_dots", }} atif := NewObservabilityAtifConfig() if atif.Enabled || atif.AgentName != "NeMo Relay" || atif.ModelName != "unknown" || atif.FilenameTemplate != "nemo-relay-atif-{session_id}.json" { @@ -77,9 +78,21 @@ func TestObservabilityConfigHelpers(t *testing.T) { t.Fatalf("expected serialized ATOF config object, got %#v", wrapped.Config) } atofConfig := wrapped.Config["atof"].(map[string]any) - if _, ok := atofConfig["endpoints"].([]any); !ok { + endpoints, ok := atofConfig["endpoints"].([]any) + if !ok { t.Fatalf("expected serialized ATOF endpoints, got %#v", atofConfig) } + firstEndpoint, ok := endpoints[0].(map[string]any) + if !ok || firstEndpoint["field_name_policy"] != "replace_dots" { + t.Fatalf("expected serialized ATOF endpoint field name policy, got %#v", endpoints) + } + serialized, err := json.Marshal(wrapped) + if err != nil { + t.Fatalf("marshal observability component failed: %v", err) + } + if !strings.Contains(string(serialized), `"field_name_policy":"replace_dots"`) { + t.Fatalf("expected field_name_policy in serialized component, got %s", serialized) + } assertWrappedAtifStorageConfig(t, wrapped.Config["atif"].(map[string]any)) } diff --git a/python/nemo_relay/_native.pyi b/python/nemo_relay/_native.pyi index c30145a8..8aed064a 100644 --- a/python/nemo_relay/_native.pyi +++ b/python/nemo_relay/_native.pyi @@ -768,6 +768,7 @@ class AtofEndpointConfig: transport: str headers: dict[str, str] timeout_millis: int + field_name_policy: str def __init__( self, @@ -776,6 +777,7 @@ class AtofEndpointConfig: transport: str = "http_post", headers: dict[str, str] | None = None, timeout_millis: int = 3000, + field_name_policy: str = "preserve", ) -> None: """Create an ATOF streaming endpoint config. diff --git a/python/nemo_relay/observability.py b/python/nemo_relay/observability.py index a067f4cc..38d90e9c 100644 --- a/python/nemo_relay/observability.py +++ b/python/nemo_relay/observability.py @@ -60,6 +60,7 @@ class AtofEndpointConfig: transport: Literal["http_post", "websocket", "ndjson"] = "http_post" headers: dict[str, str] = field(default_factory=dict) timeout_millis: int = 3000 + field_name_policy: Literal["preserve", "replace_dots"] = "preserve" def to_dict(self) -> JsonObject: """Serialize this ATOF endpoint config to the canonical JSON object shape.""" @@ -69,6 +70,7 @@ def to_dict(self) -> JsonObject: "transport": self.transport, "headers": self.headers, "timeout_millis": self.timeout_millis, + "field_name_policy": self.field_name_policy, } ) diff --git a/python/nemo_relay/observability.pyi b/python/nemo_relay/observability.pyi index fa76d502..11e5b4d6 100644 --- a/python/nemo_relay/observability.pyi +++ b/python/nemo_relay/observability.pyi @@ -23,6 +23,7 @@ class AtofEndpointConfig: transport: Literal["http_post", "websocket", "ndjson"] = ... headers: dict[str, str] = field(default_factory=dict) timeout_millis: int = ... + field_name_policy: Literal["preserve", "replace_dots"] = ... def to_dict(self) -> JsonObject: ... @dataclass(slots=True) diff --git a/python/tests/test_observability_plugin.py b/python/tests/test_observability_plugin.py index ace96b1a..7dd34841 100644 --- a/python/tests/test_observability_plugin.py +++ b/python/tests/test_observability_plugin.py @@ -103,12 +103,14 @@ def test_atof_endpoint_config_serializes_streaming_fields(self): transport="http_post", headers={"X-Test": "yes"}, timeout_millis=1000, + field_name_policy="replace_dots", ) assert endpoint.to_dict() == { "url": "http://localhost:8080/events", "transport": "http_post", "headers": {"X-Test": "yes"}, "timeout_millis": 1000, + "field_name_policy": "replace_dots", } assert AtofConfig(endpoints=[endpoint]).to_dict()["endpoints"] == [endpoint.to_dict()] diff --git a/python/tests/test_types.py b/python/tests/test_types.py index 14f5541d..7cf13d3c 100644 --- a/python/tests/test_types.py +++ b/python/tests/test_types.py @@ -469,6 +469,7 @@ def test_config_defaults_mutation_and_repr(self, tmp_path): transport="http_post", headers={"X-Test": "yes"}, timeout_millis=1000, + field_name_policy="replace_dots", ) config.endpoints = [endpoint] @@ -479,6 +480,20 @@ def test_config_defaults_mutation_and_repr(self, tmp_path): assert config.endpoints[0].transport == "http_post" assert config.endpoints[0].headers == {"X-Test": "yes"} assert config.endpoints[0].timeout_millis == 1000 + assert config.endpoints[0].field_name_policy == "replace_dots" + + def test_endpoint_field_name_policy_is_validated(self, tmp_path): + config = AtofExporterConfig() + config.output_directory = str(tmp_path) + config.endpoints = [ + AtofEndpointConfig( + "http://localhost:8080/events", + field_name_policy="bogus", # type: ignore[arg-type] + ) + ] + + with pytest.raises(ValueError, match="field_name_policy"): + AtofExporter(config) def test_exporter_lifecycle_writes_raw_jsonl_events(self, tmp_path): config = AtofExporterConfig()