diff --git a/.github/workflows/processor-release.yml b/.github/workflows/processor-release.yml index 54108396..04f847f5 100644 --- a/.github/workflows/processor-release.yml +++ b/.github/workflows/processor-release.yml @@ -12,7 +12,6 @@ jobs: fail-fast: false matrix: include: - - target: x86_64-unknown-linux-gnu runner: ubuntu-22.04 os: ubuntu @@ -29,6 +28,10 @@ jobs: runner: ubuntu-22.04 os: ubuntu python-version: 3.13 + - target: x86_64-unknown-linux-gnu + runner: ubuntu-22.04 + os: ubuntu + python-version: 3.14 - target: aarch64-unknown-linux-gnu runner: ubuntu-22.04-arm os: ubuntu @@ -45,6 +48,10 @@ jobs: runner: ubuntu-22.04-arm os: ubuntu python-version: 3.13 + - target: aarch64-unknown-linux-gnu + runner: ubuntu-22.04-arm + os: ubuntu + python-version: 3.14 - target: aarch64-apple-darwin runner: macos-14 os: macos @@ -61,6 +68,10 @@ jobs: runner: macos-14 os: macos python-version: 3.13 + - target: aarch64-apple-darwin + runner: macos-14 + os: macos + python-version: 3.14 env: TARGET: ${{ matrix.target }} steps: @@ -74,26 +85,10 @@ jobs: - name: install protoc macos if: matrix.os == 'macos' run: brew install protobuf - - name: update python 3.10 - if: matrix.python-version == '3.10' - uses: actions/setup-python@v5 - with: - python-version: "3.10" - - name: update python 3.11 - if: matrix.python-version == '3.11' - uses: actions/setup-python@v5 - with: - python-version: 3.11 - - name: update python 3.12 - if: matrix.python-version == '3.12' - uses: actions/setup-python@v5 - with: - python-version: 3.12 - - name: update python 3.13 - if: matrix.python-version == '3.13' + - name: update python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: 3.13 + python-version: ${{ matrix.python-version }} - name: Set build env run: echo "BUILD_SHORT_SHA=$(echo -n $GITHUB_SHA | cut -c 1-7)" >> $GITHUB_ENV - uses: actions-rust-lang/setup-rust-toolchain@v1 @@ -112,7 +107,7 @@ jobs: command: build args: --release --features pyo3 --locked --target ${{ matrix.target }} - name: upload build - if: matrix.python-version == '3.13' + if: matrix.python-version == '3.14' uses: actions/upload-artifact@v4 with: name: rotel-processor-build-${{ matrix.target }} diff --git a/Cargo.lock b/Cargo.lock index 8aeb3d7f..5935a94a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3147,9 +3147,9 @@ dependencies = [ [[package]] name = "numpy" -version = "0.24.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cfbf3f0feededcaa4d289fe3079b03659e85c5b5a177f4ba6fb01ab4fb3e39" +checksum = "7aac2e6a6e4468ffa092ad43c39b81c79196c2bb773b8db4085f695efe3bba17" dependencies = [ "libc", "ndarray", @@ -3734,11 +3734,10 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5203598f366b11a02b13aa20cab591229ff0a89fd121a308a5df751d5fc9219" +checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d" dependencies = [ - "cfg-if", "indoc", "libc", "memoffset", @@ -3752,19 +3751,18 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99636d423fa2ca130fa5acde3059308006d46f98caac629418e53f7ebb1e9999" +checksum = "b455933107de8642b4487ed26d912c2d899dec6114884214a0b3bb3be9261ea6" dependencies = [ - "once_cell", "target-lexicon", ] [[package]] name = "pyo3-ffi" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78f9cf92ba9c409279bc3305b5409d90db2d2c22392d443a87df3a1adad59e33" +checksum = "1c85c9cbfaddf651b1221594209aed57e9e5cff63c4d11d1feead529b872a089" dependencies = [ "libc", "pyo3-build-config", @@ -3772,9 +3770,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b999cb1a6ce21f9a6b147dcf1be9ffedf02e0043aec74dc390f3007047cecd9" +checksum = "0a5b10c9bf9888125d917fb4d2ca2d25c8df94c7ab5a52e13313a07e050a3b02" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -3784,9 +3782,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "822ece1c7e1012745607d5cf0bcb2874769f0f7cb34c4cde03b9358eb9ef911a" +checksum = "03b51720d314836e53327f5871d4c0cfb4fb37cc2c4a11cc71907a86342c40f9" dependencies = [ "heck", "proc-macro2", @@ -4088,7 +4086,7 @@ dependencies = [ [[package]] name = "rotel" -version = "0.1.6" +version = "0.1.7" dependencies = [ "arrow", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index baad903b..e1a7091d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rotel" -version = "0.1.6" +version = "0.1.7" edition = "2024" homepage = "https://github.com/streamfold/rotel" readme = "README.md" @@ -64,7 +64,7 @@ libc = "0.2.170" pin-project = "1.1.10" futures-util = "0.3.31" rotel_python_processor_sdk = { path = "rotel_python_processor_sdk", optional = true } -pyo3 = { version = "0.24.1", optional = true } +pyo3 = { version = "0.27.2", optional = true } chrono = { version = "0.4.40", features = ["serde"] } serde = { version = "1.0.217", features = ["derive"] } thiserror = "2.0.12" diff --git a/README.md b/README.md index 9fec46aa..93a239ca 100644 --- a/README.md +++ b/README.md @@ -232,6 +232,7 @@ and traces. | --clickhouse-exporter-async-insert | true | true, false | | --clickhouse-exporter-request-timeout | 5s | | | --clickhouse-exporter-enable-json | | | +| --clickhouse-exporter-nested-kv-max-depth | | | | --clickhouse-exporter-json-underscore | | | | --clickhouse-exporter-user | | | | --clickhouse-exporter-password | | | @@ -263,6 +264,18 @@ a nested JSON object. You can replace periods in JSON keys with underscores by p `--clickhouse-exporter-json-underscore` which will keep the JSON keys flat. For example, the resource attribute `service.name` will be inserted as `service_name`. +When exporting OpenTelemetry attributes that contain nested `KeyValueList` structures (such as GenAI message +attributes like `gen_ai.input.messages`), use `--clickhouse-exporter-nested-kv-max-depth` to convert them to +proper JSON objects. Without this option, nested structures are serialized as JSON strings (the protobuf +representation) for backwards compatibility. Set to a value like `10` to enable nested conversion: + +```shell +rotel start --exporter clickhouse \ + --clickhouse-exporter-endpoint "http://localhost:8123" \ + --clickhouse-exporter-enable-json \ + --clickhouse-exporter-nested-kv-max-depth 10 +``` + _The ClickHouse exporter is built using code from the official Rust [clickhouse-rs](https://crates.io/crates/clickhouse) crate._ diff --git a/rotel_python_processor_sdk/Cargo.lock b/rotel_python_processor_sdk/Cargo.lock index 5b765a10..87ade168 100644 --- a/rotel_python_processor_sdk/Cargo.lock +++ b/rotel_python_processor_sdk/Cargo.lock @@ -555,9 +555,9 @@ dependencies = [ [[package]] name = "numpy" -version = "0.24.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cfbf3f0feededcaa4d289fe3079b03659e85c5b5a177f4ba6fb01ab4fb3e39" +checksum = "7aac2e6a6e4468ffa092ad43c39b81c79196c2bb773b8db4085f695efe3bba17" dependencies = [ "libc", "ndarray", @@ -725,11 +725,10 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17da310086b068fbdcefbba30aeb3721d5bb9af8db4987d6735b2183ca567229" +checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d" dependencies = [ - "cfg-if", "indoc", "libc", "memoffset", @@ -743,19 +742,18 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e27165889bd793000a098bb966adc4300c312497ea25cf7a690a9f0ac5aa5fc1" +checksum = "b455933107de8642b4487ed26d912c2d899dec6114884214a0b3bb3be9261ea6" dependencies = [ - "once_cell", "target-lexicon", ] [[package]] name = "pyo3-ffi" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05280526e1dbf6b420062f3ef228b78c0c54ba94e157f5cb724a609d0f2faabc" +checksum = "1c85c9cbfaddf651b1221594209aed57e9e5cff63c4d11d1feead529b872a089" dependencies = [ "libc", "pyo3-build-config", @@ -763,9 +761,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3ce5686aa4d3f63359a5100c62a127c9f15e8398e5fdeb5deef1fed5cd5f44" +checksum = "0a5b10c9bf9888125d917fb4d2ca2d25c8df94c7ab5a52e13313a07e050a3b02" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -775,9 +773,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4cf6faa0cbfb0ed08e89beb8103ae9724eb4750e3a78084ba4017cbe94f3855" +checksum = "03b51720d314836e53327f5871d4c0cfb4fb37cc2c4a11cc71907a86342c40f9" dependencies = [ "heck", "proc-macro2", diff --git a/rotel_python_processor_sdk/Cargo.toml b/rotel_python_processor_sdk/Cargo.toml index 856ba38d..09f94fe9 100644 --- a/rotel_python_processor_sdk/Cargo.toml +++ b/rotel_python_processor_sdk/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -pyo3 = { version = "0.24.1" } +pyo3 = { version = "0.27.2" } opentelemetry-proto = { version = "0.30.0" } tower = "0.5.2" utilities = { path = "../utilities" } diff --git a/rotel_python_processor_sdk/src/model/common.rs b/rotel_python_processor_sdk/src/model/common.rs index 1e632cc9..7d397653 100644 --- a/rotel_python_processor_sdk/src/model/common.rs +++ b/rotel_python_processor_sdk/src/model/common.rs @@ -1,7 +1,6 @@ use crate::py::common::*; use opentelemetry_proto::tonic::common::v1::KeyValue; -#[allow(deprecated)] -use pyo3::{IntoPy, PyObject, PyResult, Python}; +use pyo3::{IntoPyObjectExt, Py, PyAny, PyResult, Python}; use std::sync::{Arc, Mutex}; #[derive(Debug, Clone)] @@ -26,10 +25,9 @@ pub struct RArrayValue { pub values: Arc>>>>>, } -#[allow(deprecated)] impl RArrayValue { - pub(crate) fn convert_to_py(&self, py: Python) -> PyResult { - Ok(ArrayValue(self.values.clone()).into_py(py)) + pub(crate) fn convert_to_py(&self, py: Python) -> PyResult> { + Ok(ArrayValue(self.values.clone()).into_py_any(py)?) } } @@ -38,10 +36,9 @@ pub struct RKeyValueList { pub values: Arc>>, } -#[allow(deprecated)] impl RKeyValueList { - pub(crate) fn convert_to_py(&self, py: Python) -> PyResult { - Ok(KeyValueList(self.values.clone()).into_py(py)) + pub(crate) fn convert_to_py(&self, py: Python) -> PyResult> { + Ok(KeyValueList(self.values.clone()).into_py_any(py)?) } } diff --git a/rotel_python_processor_sdk/src/model/mod.rs b/rotel_python_processor_sdk/src/model/mod.rs index d6096da2..486156e6 100644 --- a/rotel_python_processor_sdk/src/model/mod.rs +++ b/rotel_python_processor_sdk/src/model/mod.rs @@ -22,10 +22,10 @@ static PROCESSOR_INIT: Once = Once::new(); pub fn register_processor(code: String, script: String, module: String) -> Result<(), BoxError> { PROCESSOR_INIT.call_once(|| { pyo3::append_to_inittab!(rotel_sdk); - pyo3::prepare_freethreaded_python(); + Python::initialize(); }); - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { PyModule::from_code( py, CString::new(code)?.as_c_str(), @@ -48,7 +48,7 @@ impl PythonProcessable for opentelemetry_proto::tonic::trace::v1::ResourceSpans fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_spans(self); // Build the PyObject - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { let spans = ResourceSpans { resource: inner.resource.clone(), scope_spans: inner.scope_spans.clone(), @@ -101,7 +101,7 @@ impl PythonProcessable for opentelemetry_proto::tonic::metrics::v1::ResourceMetr fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_metrics(self); // Build the PyObject - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { let spans = ResourceMetrics { resource: inner.resource.clone(), scope_metrics: inner.scope_metrics.clone(), @@ -142,7 +142,7 @@ impl PythonProcessable for opentelemetry_proto::tonic::logs::v1::ResourceLogs { fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_logs(self); // Build the PyObject - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { let spans = ResourceLogs { resource: inner.resource.clone(), scope_logs: inner.scope_logs.clone(), diff --git a/rotel_python_processor_sdk/src/py/common.rs b/rotel_python_processor_sdk/src/py/common.rs index 4cb32340..00031aba 100644 --- a/rotel_python_processor_sdk/src/py/common.rs +++ b/rotel_python_processor_sdk/src/py/common.rs @@ -8,8 +8,9 @@ use crate::model::otel_transform::convert_attributes; use crate::py::{handle_poison_error, AttributesList}; use pyo3::exceptions::PyRuntimeError; use pyo3::types::{PyAnyMethods, PyBool, PyBytes, PyFloat, PyInt, PyString}; -#[allow(deprecated)] -use pyo3::{pyclass, pymethods, IntoPy, Py, PyErr, PyObject, PyRef, PyRefMut, PyResult, Python}; +use pyo3::{ + pyclass, pymethods, IntoPyObjectExt, Py, PyAny, PyErr, PyRef, PyRefMut, PyResult, Python, +}; use std::sync::{Arc, Mutex}; // Wrapper for AnyValue that can be exposed to Python @@ -22,10 +23,10 @@ pub struct AnyValue { impl AnyValue { #[new] #[pyo3(signature = (optional_value=None))] - fn new(py: Python, optional_value: Option) -> PyResult { + fn new(py: Python, optional_value: Option>) -> PyResult { if let Some(v) = optional_value { let bound_obj = v.bind(py); - if let Ok(s) = bound_obj.downcast::() { + if let Ok(s) = bound_obj.cast::() { let value: String = s.extract()?; // Extract the i64 value Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { @@ -34,35 +35,35 @@ impl AnyValue { }) // VERY IMPORTANT! The ordering of checking downcast, with PyBool first and PyInt // second is critical. This ensures we don't downcast a python value of True to an i64 - } else if let Ok(b) = bound_obj.downcast_exact::() { + } else if let Ok(b) = bound_obj.cast_exact::() { let value: bool = b.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(BoolValue(value)))), }))), }) - } else if let Ok(i) = bound_obj.downcast::() { + } else if let Ok(i) = bound_obj.cast::() { let value: i64 = i.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(IntValue(value)))), }))), }) - } else if let Ok(f) = bound_obj.downcast::() { + } else if let Ok(f) = bound_obj.cast::() { let value: f64 = f.extract()?; // Extract the i64 value Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(DoubleValue(value)))), }))), }) - } else if let Ok(b) = bound_obj.downcast::() { + } else if let Ok(b) = bound_obj.cast::() { let value: Vec = b.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(BytesValue(value)))), }))), }) - } else if let Ok(v) = bound_obj.downcast::() { + } else if let Ok(v) = bound_obj.cast::() { let value: KeyValueList = v.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { @@ -71,7 +72,7 @@ impl AnyValue { })))), }))), }) - } else if let Ok(v) = bound_obj.downcast::() { + } else if let Ok(v) = bound_obj.cast::() { let value: ArrayValue = v.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { @@ -92,17 +93,16 @@ impl AnyValue { } } #[getter] - #[allow(deprecated)] - fn value<'py>(&self, py: Python<'py>) -> PyResult { + fn value<'py>(&self, py: Python<'py>) -> PyResult> { let v = self.inner.lock().map_err(handle_poison_error)?; let binding = v.clone().unwrap().value.clone(); let bind_lock = binding.lock(); let x = match bind_lock.unwrap().clone() { - Some(StringValue(s)) => Ok(s.into_py(py)), - Some(BoolValue(b)) => Ok(b.into_py(py)), - Some(IntValue(i)) => Ok(i.into_py(py)), - Some(DoubleValue(d)) => Ok(d.into_py(py)), - Some(BytesValue(b)) => Ok(b.into_py(py)), + Some(StringValue(s)) => Ok(s.into_py_any(py)?), + Some(BoolValue(b)) => Ok(b.into_py_any(py)?), + Some(IntValue(i)) => Ok(i.into_py_any(py)?), + Some(DoubleValue(d)) => Ok(d.into_py_any(py)?), + Some(BytesValue(b)) => Ok(b.into_py_any(py)?), Some(RVArrayValue(a)) => Ok(a.convert_to_py(py)?), Some(KvListValue(k)) => Ok(k.convert_to_py(py)?), None => Ok(py.None()), @@ -410,7 +410,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_bool_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_bool_value(key: &str, py: Python, value: Py) -> PyResult { let b = value.extract::(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -423,7 +423,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_int_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_int_value(key: &str, py: Python, value: Py) -> PyResult { let i = value.extract::(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -436,7 +436,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_double_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_double_value(key: &str, py: Python, value: Py) -> PyResult { let f = value.extract::(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -449,7 +449,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_bytes_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_bytes_value(key: &str, py: Python, value: Py) -> PyResult { let f = value.extract::>(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -489,12 +489,11 @@ impl KeyValue { }) } #[getter] - #[allow(deprecated)] - fn key(&self, py: Python) -> PyResult { + fn key(&self, py: Python) -> PyResult> { let v = self.inner.lock().map_err(handle_poison_error)?; let binding = v.key.clone(); let bind_lock = binding.lock(); - let x = Ok(bind_lock.unwrap().clone().into_py(py)); + let x = Ok(bind_lock.unwrap().clone().into_py_any(py)?); x } #[setter] diff --git a/rotel_python_processor_sdk/src/py/mod.rs b/rotel_python_processor_sdk/src/py/mod.rs index 37bbfe79..ff21c823 100644 --- a/rotel_python_processor_sdk/src/py/mod.rs +++ b/rotel_python_processor_sdk/src/py/mod.rs @@ -183,6 +183,7 @@ mod tests { use opentelemetry_proto::tonic::metrics::v1::metric::Data; use opentelemetry_proto::tonic::trace::v1; use pyo3::ffi::c_str; + use pyo3::IntoPyObjectExt; use std::collections::HashMap; use std::ffi::CString; use std::sync::{Arc, Mutex, Once}; @@ -208,7 +209,7 @@ mod tests { process_fn: Option, ) -> PyResult<()> { let sys = py.import("sys")?; - sys.setattr("stdout", LoggingStdout.into_py(py))?; + sys.setattr("stdout", LoggingStdout.into_py_any(py)?)?; let code = std::fs::read_to_string(format!("./python_tests/{}", script))?; let py_mod = PyModule::from_code( py, diff --git a/src/exporters/clickhouse/mod.rs b/src/exporters/clickhouse/mod.rs index e1da36a4..aece9ee6 100644 --- a/src/exporters/clickhouse/mod.rs +++ b/src/exporters/clickhouse/mod.rs @@ -70,6 +70,7 @@ pub struct ClickhouseExporterConfigBuilder { auth_password: Option, async_insert: bool, use_json: bool, + nested_kv_max_depth: Option, request_timeout: Duration, } @@ -112,6 +113,7 @@ impl ClickhouseExporterConfigBuilder { request_timeout: Duration::from_secs(5), compression: Default::default(), use_json: false, + nested_kv_max_depth: None, async_insert: false, } } @@ -126,6 +128,11 @@ impl ClickhouseExporterConfigBuilder { self } + pub fn with_nested_kv_max_depth(mut self, max_depth: Option) -> Self { + self.nested_kv_max_depth = max_depth; + self + } + pub fn with_async_insert(mut self, async_insert: bool) -> Self { self.async_insert = async_insert; self @@ -173,6 +180,7 @@ impl ClickhouseExporterConfigBuilder { request_mapper: mapper, retry_config: self.retry_config, request_timeout: self.request_timeout, + nested_kv_max_depth: self.nested_kv_max_depth, }) } } @@ -182,6 +190,7 @@ pub struct ClickhouseExporterBuilder { retry_config: RetryConfig, request_mapper: Arc, request_timeout: Duration, + nested_kv_max_depth: Option, } impl ClickhouseExporterBuilder { @@ -221,7 +230,8 @@ impl ClickhouseExporterBuilder { { let client = Client::build(tls::Config::default(), Protocol::Http, Default::default())?; - let transformer = Transformer::new(self.config.compression.clone(), self.config.use_json); + let transformer = Transformer::new(self.config.compression.clone(), self.config.use_json) + .with_nested_kv_max_depth(self.nested_kv_max_depth); let req_builder = RequestBuilder::new(transformer, self.request_mapper.clone())?; diff --git a/src/exporters/clickhouse/rowbinary/json.rs b/src/exporters/clickhouse/rowbinary/json.rs index a967052f..5ae9e89a 100644 --- a/src/exporters/clickhouse/rowbinary/json.rs +++ b/src/exporters/clickhouse/rowbinary/json.rs @@ -1,25 +1,43 @@ use serde::{Serialize, Serializer}; +use std::borrow::Cow; use crate::otlp::cvattr::ConvertedAttrValue; use opentelemetry_proto::tonic::common::v1::AnyValue; use opentelemetry_proto::tonic::common::v1::any_value::Value; use serde_json::json; +/// JSON type representation for ClickHouse rowbinary format. +/// Uses `Cow` for strings and object keys to efficiently handle both +/// borrowed and owned data with a single type. #[derive(Debug)] pub enum JsonType<'a> { Int(i64), - Str(&'a str), - StrOwned(String), + Str(Cow<'a, str>), Double(f64), Bool(bool), Array(Vec>), + Object(Vec<(Cow<'a, str>, JsonType<'a>)>), +} + +impl<'a> JsonType<'a> { + /// Create a borrowed string variant + #[inline] + pub fn str_borrowed(s: &'a str) -> Self { + JsonType::Str(Cow::Borrowed(s)) + } + + /// Create an owned string variant + #[inline] + pub fn str_owned(s: String) -> Self { + JsonType::Str(Cow::Owned(s)) + } } impl<'a> From<&'a ConvertedAttrValue> for JsonType<'a> { fn from(value: &'a ConvertedAttrValue) -> Self { match value { ConvertedAttrValue::Int(i) => JsonType::Int(*i), - ConvertedAttrValue::String(s) => JsonType::Str(s.as_str()), + ConvertedAttrValue::String(s) => JsonType::str_borrowed(s.as_str()), ConvertedAttrValue::Double(d) => JsonType::Double(*d), } } @@ -29,81 +47,153 @@ impl From for JsonType<'static> { fn from(value: ConvertedAttrValue) -> Self { match value { ConvertedAttrValue::Int(i) => JsonType::Int(i), - ConvertedAttrValue::String(s) => JsonType::StrOwned(s), + ConvertedAttrValue::String(s) => JsonType::str_owned(s), ConvertedAttrValue::Double(d) => JsonType::Double(d), } } } -impl<'a> From<&'a AnyValue> for JsonType<'a> { - fn from(value: &'a AnyValue) -> Self { - match &value.value { +/// Convert AnyValue to JsonType with configurable nesting behavior. +/// - `nested_kv_max_depth = None` or `Some(0)`: flat mode (backwards compatible, fastest) +/// - `nested_kv_max_depth = Some(n)` where n > 0: recursive conversion up to depth n +/// +/// Flat mode is the hot path - it handles nested complex types by serializing to JSON strings. +/// Nested mode recursively converts KvlistValue to Object and preserves ArrayValue structure. +#[inline] +pub fn anyvalue_to_jsontype<'a>( + value: &'a AnyValue, + nested_kv_max_depth: Option, +) -> JsonType<'a> { + anyvalue_to_jsontype_nested(value, 0, nested_kv_max_depth) +} + +/// Nested mode: recursively convert with depth tracking. +fn anyvalue_to_jsontype_nested<'a>( + value: &'a AnyValue, + depth: usize, + max_depth: Option, +) -> JsonType<'a> { + let effective_max = max_depth.unwrap_or(0); + if depth > effective_max { + return match &value.value { Some(Value::IntValue(i)) => JsonType::Int(*i), Some(Value::DoubleValue(d)) => JsonType::Double(*d), - Some(Value::StringValue(s)) => JsonType::Str(s.as_str()), + Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), Some(Value::BoolValue(b)) => JsonType::Bool(*b), - Some(Value::ArrayValue(a)) => { - // We support arrays with all simple values, no recursive nesting - let values = a - .values - .iter() - .map(|v| match &v.value { - Some(Value::IntValue(i)) => JsonType::Int(*i), - Some(Value::DoubleValue(d)) => JsonType::Double(*d), - Some(Value::StringValue(s)) => JsonType::Str(s.as_str()), - Some(Value::BoolValue(b)) => JsonType::Bool(*b), - Some(Value::ArrayValue(a)) => JsonType::StrOwned(json!(a).to_string()), - Some(Value::KvlistValue(kv)) => JsonType::StrOwned(json!(kv).to_string()), - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(b)), - None => JsonType::Str(""), + // Complex nested types become JSON strings + Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), + Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), + None => JsonType::str_borrowed(""), + }; + } + + match &value.value { + Some(Value::IntValue(i)) => JsonType::Int(*i), + Some(Value::DoubleValue(d)) => JsonType::Double(*d), + Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), + Some(Value::BoolValue(b)) => JsonType::Bool(*b), + Some(Value::ArrayValue(a)) => { + let values = a + .values + .iter() + .map(|v| anyvalue_to_jsontype_nested(v, depth + 1, max_depth)) + .collect(); + JsonType::Array(values) + } + Some(Value::KvlistValue(kv)) => { + let entries = kv + .values + .iter() + .filter_map(|entry| { + entry.value.as_ref().map(|v| { + ( + Cow::Borrowed(entry.key.as_str()), + anyvalue_to_jsontype_nested(v, depth + 1, max_depth), + ) }) - .collect(); - JsonType::Array(values) - } - Some(Value::KvlistValue(_kv)) => { - // KvlistValue should be flattened before reaching this point - // This case should not occur in normal operation - JsonType::Str("") - } - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(b)), - None => JsonType::Str(""), + }) + .collect(); + JsonType::Object(entries) } + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), + None => JsonType::str_borrowed(""), } } -impl From for JsonType<'static> { - fn from(value: AnyValue) -> Self { - match value.value { +/// Convert owned AnyValue to JsonType with configurable nesting behavior. +#[inline] +pub fn anyvalue_to_jsontype_owned( + value: AnyValue, + nested_kv_max_depth: Option, +) -> JsonType<'static> { + anyvalue_to_jsontype_nested_owned(value, 0, nested_kv_max_depth) +} + +/// Nested mode (owned): recursively convert with depth tracking. +fn anyvalue_to_jsontype_nested_owned( + value: AnyValue, + depth: usize, + max_depth: Option, +) -> JsonType<'static> { + let effective_max = max_depth.unwrap_or(0); + if depth > effective_max { + return match value.value { Some(Value::IntValue(i)) => JsonType::Int(i), Some(Value::DoubleValue(d)) => JsonType::Double(d), - Some(Value::StringValue(s)) => JsonType::StrOwned(s), + Some(Value::StringValue(s)) => JsonType::str_owned(s), Some(Value::BoolValue(b)) => JsonType::Bool(b), - Some(Value::ArrayValue(a)) => { - // We support arrays with all simple values, no recursive nesting - let values = a - .values - .into_iter() - .map(|v| match v.value { - Some(Value::IntValue(i)) => JsonType::Int(i), - Some(Value::DoubleValue(d)) => JsonType::Double(d), - Some(Value::StringValue(s)) => JsonType::StrOwned(s), - Some(Value::BoolValue(b)) => JsonType::Bool(b), - Some(Value::ArrayValue(a)) => JsonType::StrOwned(json!(a).to_string()), - Some(Value::KvlistValue(kv)) => JsonType::StrOwned(json!(kv).to_string()), - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(b)), - None => JsonType::Str(""), + Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), + Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), + None => JsonType::str_owned(String::new()), + }; + } + + match value.value { + Some(Value::IntValue(i)) => JsonType::Int(i), + Some(Value::DoubleValue(d)) => JsonType::Double(d), + Some(Value::StringValue(s)) => JsonType::str_owned(s), + Some(Value::BoolValue(b)) => JsonType::Bool(b), + Some(Value::ArrayValue(a)) => { + let values = a + .values + .into_iter() + .map(|v| anyvalue_to_jsontype_nested_owned(v, depth + 1, max_depth)) + .collect(); + JsonType::Array(values) + } + Some(Value::KvlistValue(kv)) => { + let entries = kv + .values + .into_iter() + .filter_map(|entry| { + entry.value.map(|v| { + ( + Cow::Owned(entry.key), + anyvalue_to_jsontype_nested_owned(v, depth + 1, max_depth), + ) }) - .collect(); - JsonType::Array(values) - } - Some(Value::KvlistValue(_kv)) => { - // KvlistValue should be flattened before reaching this point - // This case should not occur in normal operation - JsonType::StrOwned(String::new()) - } - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(&b)), - None => JsonType::StrOwned(String::new()), + }) + .collect(); + JsonType::Object(entries) } + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), + None => JsonType::str_owned(String::new()), + } +} + +impl<'a> From<&'a AnyValue> for JsonType<'a> { + /// Default conversion uses flat mode for backwards compatibility. + fn from(value: &'a AnyValue) -> Self { + anyvalue_to_jsontype(value, None) + } +} + +impl From for JsonType<'static> { + /// Default conversion uses flat mode for backwards compatibility. + fn from(value: AnyValue) -> Self { + anyvalue_to_jsontype_owned(value, None) } } @@ -125,14 +215,7 @@ impl<'a> Serialize for JsonType<'a> { JsonType::Str(s) => { let jsonstr = JsonStr { code: 0x15, - value: s, - }; - jsonstr.serialize(serializer) - } - JsonType::StrOwned(s) => { - let jsonstr = JsonStr { - code: 0x15, - value: &s, + value: s.as_ref(), }; jsonstr.serialize(serializer) } @@ -151,7 +234,7 @@ impl<'a> Serialize for JsonType<'a> { jsonbool.serialize(serializer) } JsonType::Array(a) => { - // We always use the Dyanmic type here because it is simpler to serialize. Clickhouse + // We always use the Dynamic type here because it is simpler to serialize. Clickhouse // will use Nullable(T) in responses if all types are the same, but there doesn't seem // to be a cost savings in wire size. let jsonarray = JsonArrayDynamic { @@ -162,6 +245,14 @@ impl<'a> Serialize for JsonType<'a> { }; jsonarray.serialize(serializer) } + JsonType::Object(entries) => { + // Object type code 0x1f for named tuples/objects + let jsonobject = JsonObject { + object_code: 0x1f, + entries, + }; + jsonobject.serialize(serializer) + } } } } @@ -198,6 +289,37 @@ struct JsonArrayDynamic<'a> { values: &'a Vec>, } +/// Serializer for Object type (named tuple in ClickHouse JSON format) +/// Format: object_code (0x1f) | num_entries | [key_len | key_bytes | value]... +struct JsonObject<'a> { + object_code: u8, + entries: &'a Vec<(Cow<'a, str>, JsonType<'a>)>, +} + +impl<'a> Serialize for JsonObject<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeTuple; + // Object format: code | num_keys | [key_len | key_bytes | value_type | value]... + let num_entries = self.entries.len(); + let mut seq = serializer.serialize_tuple(2 + num_entries * 2)?; + seq.serialize_element(&self.object_code)?; + seq.serialize_element(&(num_entries as u8))?; + for (key, value) in self.entries.iter() { + // Serialize key as length-prefixed string (Cow dereferences to &str) + seq.serialize_element(&JsonStr { + code: 0x00, // No type code for object keys, just length + bytes + value: key.as_ref(), + })?; + // Serialize the value with its type + seq.serialize_element(value)?; + } + seq.end() + } +} + #[cfg(test)] mod tests { use bytes::BytesMut; @@ -244,12 +366,12 @@ mod tests { ); } - /// Test that verifies the byte patterns for JsonType::Str serialization. + /// Test that verifies the byte patterns for JsonType::Str serialization (borrowed). /// This test prevents regressions in the wire format by ensuring the serialized /// bytes match the expected pattern: [type_code(0x15), string_length, string_bytes] #[test] fn test_jsontype_str_serialization() { - let json_str = JsonType::Str("hello"); + let json_str = JsonType::str_borrowed("hello"); let serialized = serialize_to_bytes(json_str); // Expected bytes: type code 0x15 + string length (5 as u8) + "hello" bytes @@ -261,14 +383,14 @@ mod tests { assert_eq!( serialized, expected, - "JsonType::Str(\"hello\") serialization bytes mismatch" + "JsonType::str_borrowed(\"hello\") serialization bytes mismatch" ); } /// Test that verifies the byte patterns for JsonType::Str with empty string. #[test] fn test_jsontype_str_empty_serialization() { - let json_str = JsonType::Str(""); + let json_str = JsonType::str_borrowed(""); let serialized = serialize_to_bytes(json_str); // Expected bytes: type code 0x15 + string length (0 as u8) @@ -279,15 +401,15 @@ mod tests { assert_eq!( serialized, expected, - "JsonType::Str(\"\") serialization bytes mismatch" + "JsonType::str_borrowed(\"\") serialization bytes mismatch" ); } - /// Test that verifies the byte patterns for JsonType::StrOwned serialization. - /// This should produce the same byte pattern as JsonType::Str. + /// Test that verifies the byte patterns for JsonType::Str (owned) serialization. + /// This should produce the same byte pattern as borrowed Str. #[test] fn test_jsontype_str_owned_serialization() { - let json_str_owned = JsonType::StrOwned("world".to_string()); + let json_str_owned = JsonType::str_owned("world".to_string()); let serialized = serialize_to_bytes(json_str_owned); // Expected bytes: type code 0x15 + string length (5 as u8) + "world" bytes @@ -299,7 +421,7 @@ mod tests { assert_eq!( serialized, expected, - "JsonType::StrOwned(\"world\") serialization bytes mismatch" + "JsonType::str_owned(\"world\") serialization bytes mismatch" ); } @@ -370,7 +492,7 @@ mod tests { assert_eq!(serialized_array, expected); // Different types - let json_array = JsonType::Array(vec![JsonType::Int(3), JsonType::Str("hello")]); + let json_array = JsonType::Array(vec![JsonType::Int(3), JsonType::str_borrowed("hello")]); let serialized_array = serialize_to_bytes(json_array); let expected: Vec = vec![ @@ -413,11 +535,93 @@ mod tests { ); } + /// Test flat mode (backwards compatible) - KvlistValue and nested ArrayValue become JSON strings + #[test] + fn test_anyvalue_arrayvalue_flat_mode() { + use opentelemetry_proto::tonic::common::v1::{ArrayValue, KeyValue, KeyValueList}; + + let mut kv_list = KeyValueList::default(); + kv_list.values.push(KeyValue { + key: "nested_key".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("nested_value".to_string())), + }), + }); + + let array_value = ArrayValue { + values: vec![ + AnyValue { + value: Some(Value::IntValue(42)), + }, + AnyValue { + value: Some(Value::StringValue("test_string".to_string())), + }, + AnyValue { + value: Some(Value::KvlistValue(kv_list)), + }, + AnyValue { + value: Some(Value::ArrayValue(ArrayValue { + values: vec![AnyValue { + value: Some(Value::IntValue(100)), + }], + })), + }, + ], + }; + + let any_value = AnyValue { + value: Some(Value::ArrayValue(array_value)), + }; + + // Default From uses flat mode (None) + let json_type: JsonType = (&any_value).into(); + + match json_type { + JsonType::Array(values) => { + assert_eq!(values.len(), 4); + + // Simple types are converted directly + match &values[0] { + JsonType::Int(i) => assert_eq!(*i, 42), + _ => panic!("Expected Int(42) at index 0"), + } + match &values[1] { + JsonType::Str(s) => assert_eq!(s.as_ref(), "test_string"), + _ => panic!("Expected Str at index 1"), + } + + // KvlistValue becomes a JSON string in flat mode + match &values[2] { + JsonType::Str(s) => { + assert!(s.as_ref().contains("nested_key")); + assert!(s.as_ref().contains("nested_value")); + } + _ => panic!( + "Expected Str (JSON string) at index 2 in flat mode, got {:?}", + &values[2] + ), + } + + // Nested ArrayValue becomes a JSON string in flat mode + match &values[3] { + JsonType::Str(s) => { + assert!(s.as_ref().contains("100")); + } + _ => panic!( + "Expected Str (JSON string) at index 3 in flat mode, got {:?}", + &values[3] + ), + } + } + _ => panic!("Expected JsonType::Array"), + } + } + + /// Test nested mode - KvlistValue becomes Object, nested ArrayValue stays Array #[test] - fn test_anyvalue_arrayvalue_conversion() { + fn test_anyvalue_arrayvalue_nested_mode() { use opentelemetry_proto::tonic::common::v1::{ArrayValue, KeyValue, KeyValueList}; - // Create an ArrayValue with mixed types including KvlistValue let mut kv_list = KeyValueList::default(); kv_list.values.push(KeyValue { key: "nested_key".to_string(), @@ -444,7 +648,7 @@ mod tests { value: Some(Value::KvlistValue(kv_list)), }, AnyValue { - value: Some(Value::BytesValue(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f])), // "Hello" + value: Some(Value::BytesValue(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f])), }, AnyValue { value: Some(Value::ArrayValue(ArrayValue { @@ -460,59 +664,64 @@ mod tests { value: Some(Value::ArrayValue(array_value)), }; - let json_type: JsonType = (&any_value).into(); + // Use nested mode with max depth 10 + let json_type = anyvalue_to_jsontype(&any_value, Some(10)); match json_type { JsonType::Array(values) => { assert_eq!(values.len(), 7); - // Check Int value match &values[0] { JsonType::Int(i) => assert_eq!(*i, 42), _ => panic!("Expected Int(42) at index 0"), } - - // Check String value match &values[1] { - JsonType::Str(s) => assert_eq!(*s, "test_string"), - _ => panic!("Expected Str(\"test_string\") at index 1"), + JsonType::Str(s) => assert_eq!(s.as_ref(), "test_string"), + _ => panic!("Expected Str at index 1"), } - - // Check Double value match &values[2] { JsonType::Double(d) => assert_eq!(*d, 3.14), - _ => panic!("Expected Double(3.14) at index 2"), + _ => panic!("Expected Double at index 2"), } - - // Check Bool value match &values[3] { JsonType::Bool(b) => assert!(b), - _ => panic!("Expected Bool(true) at index 3"), + _ => panic!("Expected Bool at index 3"), } - // Check KvlistValue (converted to JSON string) + // KvlistValue becomes Object in nested mode match &values[4] { - JsonType::StrOwned(s) => { - // The exact JSON string format may vary, but it should contain the key-value pair - assert!(s.contains("nested_key")); - assert!(s.contains("nested_value")); + JsonType::Object(entries) => { + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].0.as_ref(), "nested_key"); + match &entries[0].1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "nested_value"), + _ => panic!("Expected nested_key to have Str value"), + } } - _ => panic!("Expected StrOwned with JSON string at index 4"), + _ => panic!( + "Expected Object at index 4 in nested mode, got {:?}", + &values[4] + ), } - // Check BytesValue (hex encoded) match &values[5] { - JsonType::StrOwned(s) => assert_eq!(s, "48656c6c6f"), // "Hello" in hex - _ => panic!("Expected StrOwned with hex string at index 5"), + JsonType::Str(s) => assert_eq!(s.as_ref(), "48656c6c6f"), + _ => panic!("Expected Str (hex) at index 5"), } - // Check nested ArrayValue (converted to JSON string) + // Nested ArrayValue stays as Array in nested mode match &values[6] { - JsonType::StrOwned(s) => { - // Should be a JSON representation of the nested array - assert!(s.contains("100")); + JsonType::Array(nested) => { + assert_eq!(nested.len(), 1); + match &nested[0] { + JsonType::Int(i) => assert_eq!(*i, 100), + _ => panic!("Expected Int(100) in nested array"), + } } - _ => panic!("Expected StrOwned with JSON array string at index 6"), + _ => panic!( + "Expected Array at index 6 in nested mode, got {:?}", + &values[6] + ), } } _ => panic!("Expected JsonType::Array"), @@ -532,11 +741,8 @@ mod tests { fn debug_jsontype_serialization_bytes() { let test_cases = vec![ ("Int(42)", JsonType::Int(42)), - ("Str(\"hello\")", JsonType::Str("hello")), - ( - "StrOwned(\"world\")", - JsonType::StrOwned("world".to_string()), - ), + ("Str(borrowed)", JsonType::str_borrowed("hello")), + ("Str(owned)", JsonType::str_owned("world".to_string())), ("Double(3.14)", JsonType::Double(std::f64::consts::PI)), ]; diff --git a/src/exporters/clickhouse/transformer.rs b/src/exporters/clickhouse/transformer.rs index 6e46bcfe..f85b9e05 100644 --- a/src/exporters/clickhouse/transformer.rs +++ b/src/exporters/clickhouse/transformer.rs @@ -10,6 +10,10 @@ use std::collections::HashMap; pub struct Transformer { pub(crate) compression: Compression, use_json: bool, + /// Maximum depth for nested KV list conversion. + /// - `None` or `Some(0)`: flat mode (backwards compatible, nested KV serialized as JSON strings) + /// - `Some(n)` where n > 0: recursive conversion up to depth n + nested_kv_max_depth: Option, } impl Transformer { @@ -17,8 +21,14 @@ impl Transformer { Self { compression, use_json, + nested_kv_max_depth: None, // Default: backwards compatible flat mode } } + + pub fn with_nested_kv_max_depth(mut self, max_depth: Option) -> Self { + self.nested_kv_max_depth = max_depth; + self + } } impl Transformer { @@ -80,6 +90,7 @@ impl Transformer { prefix: String, result: &mut HashMap, JsonType<'a>>, ) { + use crate::exporters::clickhouse::rowbinary::json::anyvalue_to_jsontype; use opentelemetry_proto::tonic::common::v1::any_value::Value; for kv in attrs { @@ -100,7 +111,10 @@ impl Transformer { } else { Cow::Owned(format!("{}.{}", prefix, kv.key)) }; - result.insert(key, any_value.into()); + result.insert( + key, + anyvalue_to_jsontype(any_value, self.nested_kv_max_depth), + ); } None => {} } @@ -127,6 +141,7 @@ impl Transformer { prefix: String, result: &mut HashMap>, ) { + use crate::exporters::clickhouse::rowbinary::json::anyvalue_to_jsontype_owned; use opentelemetry_proto::tonic::common::v1::any_value::Value; for kv in attrs { @@ -143,7 +158,10 @@ impl Transformer { self.flatten_keyvalues_owned(&kvlist.values, full_key, result); } Some(_) => { - result.insert(full_key, any_value.clone().into()); + result.insert( + full_key, + anyvalue_to_jsontype_owned(any_value.clone(), self.nested_kv_max_depth), + ); } None => {} } @@ -203,11 +221,11 @@ pub(crate) fn encode_id<'a>(id: &[u8], out: &'a mut [u8]) -> &'a str { #[cfg(test)] mod tests { use super::*; + use opentelemetry_proto::tonic::common::v1::AnyValue; + use opentelemetry_proto::tonic::common::v1::any_value::Value; #[test] fn test_transform_attrs_lifetime_correctness() { - use opentelemetry_proto::tonic::common::v1::AnyValue; - use opentelemetry_proto::tonic::common::v1::any_value::Value; let transformer = Transformer::new(Compression::None, true); @@ -324,8 +342,8 @@ mod tests { // Verify the values are correct and owned match &map["service.name"] { - JsonType::StrOwned(s) => assert_eq!(s, "test-service"), - _ => panic!("Expected StrOwned variant"), + JsonType::Str(s) => assert_eq!(s.as_ref(), "test-service"), + _ => panic!("Expected Str variant"), } match &map["http.status_code"] { JsonType::Int(i) => assert_eq!(*i, 200), @@ -724,4 +742,649 @@ mod tests { _ => panic!("Expected Map variant"), } } + + // ========================================================================= + // GenAI-style nested attribute tests + // ========================================================================= + // + // These tests exercise the nesting pattern used by OpenTelemetry GenAI + // semantic conventions, where attributes like gen_ai.input.messages use: + // ArrayValue [ KvlistValue { role, parts: ArrayValue [ KvlistValue { type, content } ] } ] + + /// Helper: build a GenAI message KvlistValue with role and text parts. + fn genai_message(role: &str, parts: Vec<(&str, &str)>) -> AnyValue { + use opentelemetry_proto::tonic::common::v1::KeyValueList; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + + let part_values: Vec = parts + .into_iter() + .map(|(part_type, content)| { + AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "type".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue(part_type.to_string())), + }), + }, + KeyValue { + key: "content".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue(content.to_string())), + }), + }, + ], + })), + } + }) + .collect(); + + AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "role".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue(role.to_string())), + }), + }, + KeyValue { + key: "parts".to_string(), + value: Some(AnyValue { + value: Some(Value::ArrayValue( + opentelemetry_proto::tonic::common::v1::ArrayValue { + values: part_values, + }, + )), + }), + }, + ], + })), + } + } + + /// Helper: build a gen_ai.input.messages attribute as ArrayValue of message KvlistValues. + fn genai_messages_attr(key: &str, messages: Vec) -> KeyValue { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + + KeyValue { + key: key.to_string(), + value: Some(AnyValue { + value: Some(Value::ArrayValue( + opentelemetry_proto::tonic::common::v1::ArrayValue { + values: messages, + }, + )), + }), + } + } + + #[test] + fn test_genai_input_messages_flat_mode() { + // In flat mode (default), the top-level ArrayValue stays as an Array, + // but nested KvlistValues inside the array become JSON strings. + let transformer = Transformer::new(Compression::None, true); + + let attrs = vec![ + genai_messages_attr( + "gen_ai.input.messages", + vec![ + genai_message("user", vec![("text", "What is the weather?")]), + genai_message("assistant", vec![("text", "It's sunny today.")]), + ], + ), + KeyValue { + key: "gen_ai.system".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("openai".to_string())), + }), + }, + ]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + // gen_ai.system is a simple string + assert!(map.contains_key("gen_ai.system")); + + // gen_ai.input.messages should be present as an array + assert!( + map.contains_key("gen_ai.input.messages"), + "Expected gen_ai.input.messages key" + ); + + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(values) => { + assert_eq!(values.len(), 2, "Expected 2 messages"); + + // In flat mode, KvlistValue elements become JSON strings + for val in values { + match val { + JsonType::Str(s) => { + // Should contain role and parts info serialized as JSON + let json_str = s.as_ref(); + assert!( + json_str.contains("role") || json_str.contains("type"), + "Expected JSON with message fields, got: {}", + json_str + ); + } + _ => panic!( + "Expected Str (JSON string) in flat mode for KvlistValue, got {:?}", + val + ), + } + } + } + other => panic!( + "Expected Array for gen_ai.input.messages, got {:?}", + other + ), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_input_messages_nested_mode() { + // In nested mode, ArrayValue stays Array and KvlistValues become Objects. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let attrs = vec![genai_messages_attr( + "gen_ai.input.messages", + vec![ + genai_message("user", vec![("text", "What is the weather?")]), + genai_message("assistant", vec![("text", "It's sunny today.")]), + ], + )]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + assert!(map.contains_key("gen_ai.input.messages")); + + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 2, "Expected 2 messages"); + + // First message: user + match &messages[0] { + JsonType::Object(entries) => { + assert_eq!(entries.len(), 2, "Expected role + parts"); + + // Check role + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role") + .expect("Expected 'role' key"); + match &role_entry.1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "user"), + _ => panic!("Expected role to be Str"), + } + + // Check parts is an array + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts") + .expect("Expected 'parts' key"); + match &parts_entry.1 { + JsonType::Array(parts) => { + assert_eq!(parts.len(), 1, "Expected 1 part"); + match &parts[0] { + JsonType::Object(part_entries) => { + let type_entry = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "type") + .expect("Expected 'type' key"); + match &type_entry.1 { + JsonType::Str(s) => { + assert_eq!(s.as_ref(), "text") + } + _ => panic!("Expected type to be Str"), + } + + let content_entry = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "content") + .expect("Expected 'content' key"); + match &content_entry.1 { + JsonType::Str(s) => assert_eq!( + s.as_ref(), + "What is the weather?" + ), + _ => panic!("Expected content to be Str"), + } + } + _ => panic!("Expected Object for part"), + } + } + _ => panic!("Expected Array for parts"), + } + } + _ => panic!("Expected Object for message, got {:?}", &messages[0]), + } + + // Second message: assistant + match &messages[1] { + JsonType::Object(entries) => { + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role") + .expect("Expected 'role' key"); + match &role_entry.1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "assistant"), + _ => panic!("Expected role to be Str"), + } + } + _ => panic!("Expected Object for message"), + } + } + other => panic!( + "Expected Array for gen_ai.input.messages, got {:?}", + other + ), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_input_messages_nested_mode_owned() { + // Same as nested mode test but using the owned path. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let attrs = vec![genai_messages_attr( + "gen_ai.input.messages", + vec![genai_message("user", vec![("text", "Hello!")])], + )]; + + let result = transformer.transform_attrs_kv_owned(&attrs); + + match result { + MapOrJson::JsonOwned(map) => { + assert!(map.contains_key("gen_ai.input.messages")); + + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + + match &messages[0] { + JsonType::Object(entries) => { + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role") + .expect("Expected 'role' key"); + match &role_entry.1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "user"), + _ => panic!("Expected role to be Str"), + } + + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts") + .expect("Expected 'parts' key"); + match &parts_entry.1 { + JsonType::Array(parts) => { + assert_eq!(parts.len(), 1); + match &parts[0] { + JsonType::Object(part_entries) => { + let content_entry = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "content") + .expect("Expected 'content' key"); + match &content_entry.1 { + JsonType::Str(s) => { + assert_eq!(s.as_ref(), "Hello!") + } + _ => panic!("Expected content to be Str"), + } + } + _ => panic!("Expected Object for part"), + } + } + _ => panic!("Expected Array for parts"), + } + } + _ => panic!("Expected Object for message"), + } + } + other => panic!("Expected Array, got {:?}", other), + } + } + _ => panic!("Expected JsonOwned variant"), + } + } + + #[test] + fn test_genai_tool_call_parts_nested_mode() { + // Test GenAI tool_call message part structure: + // KvlistValue { type: "tool_call", id: "call_123", name: "get_weather", arguments: "{}" } + use opentelemetry_proto::tonic::common::v1::KeyValueList; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let tool_call_part = AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "type".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("tool_call".to_string())), + }), + }, + KeyValue { + key: "id".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("call_abc123".to_string())), + }), + }, + KeyValue { + key: "name".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("get_weather".to_string())), + }), + }, + KeyValue { + key: "arguments".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue( + r#"{"location":"San Francisco"}"#.to_string(), + )), + }), + }, + ], + })), + }; + + // Build: assistant message with tool_call part + let assistant_msg = AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "role".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("assistant".to_string())), + }), + }, + KeyValue { + key: "parts".to_string(), + value: Some(AnyValue { + value: Some(Value::ArrayValue( + opentelemetry_proto::tonic::common::v1::ArrayValue { + values: vec![tool_call_part], + }, + )), + }), + }, + ], + })), + }; + + let attrs = vec![genai_messages_attr( + "gen_ai.output.messages", + vec![assistant_msg], + )]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + assert!(map.contains_key("gen_ai.output.messages")); + + match map.get("gen_ai.output.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + + match &messages[0] { + JsonType::Object(entries) => { + // Check parts contains tool_call + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts") + .expect("Expected 'parts' key"); + match &parts_entry.1 { + JsonType::Array(parts) => { + assert_eq!(parts.len(), 1); + match &parts[0] { + JsonType::Object(part_entries) => { + // Verify all tool_call fields + let fields: Vec<&str> = part_entries + .iter() + .map(|(k, _)| k.as_ref()) + .collect(); + assert!(fields.contains(&"type")); + assert!(fields.contains(&"id")); + assert!(fields.contains(&"name")); + assert!(fields.contains(&"arguments")); + + let name = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "name") + .unwrap(); + match &name.1 { + JsonType::Str(s) => { + assert_eq!(s.as_ref(), "get_weather") + } + _ => panic!("Expected Str for name"), + } + } + _ => panic!("Expected Object for tool_call part"), + } + } + _ => panic!("Expected Array for parts"), + } + } + _ => panic!("Expected Object for message"), + } + } + other => panic!("Expected Array, got {:?}", other), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_depth_limit_truncation() { + // Test that depth limit correctly truncates deeply nested structures. + // With max_depth=1, only 1 level of nesting is preserved. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(1)); + + let attrs = vec![genai_messages_attr( + "gen_ai.input.messages", + vec![genai_message("user", vec![("text", "Hello")])], + )]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + assert!(map.contains_key("gen_ai.input.messages")); + + // At depth 0: ArrayValue → Array (each element at depth 1) + // At depth 1: KvlistValue → Object (each value at depth 2) + // At depth 2 > max_depth(1): values become JSON strings + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + + match &messages[0] { + JsonType::Object(entries) => { + // role at depth 2 → JSON string fallback + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role"); + assert!(role_entry.is_some(), "Expected 'role' key"); + + // parts at depth 2 → should be a JSON string (depth exceeded) + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts"); + assert!(parts_entry.is_some(), "Expected 'parts' key"); + + // The parts value at depth 2 should be a JSON string fallback + match &parts_entry.unwrap().1 { + JsonType::Str(s) => { + // Depth exceeded, so the ArrayValue is serialized as JSON + assert!( + s.as_ref().contains("text") + || s.as_ref().contains("Hello"), + "Expected JSON fallback containing message data, got: {}", + s.as_ref() + ); + } + _ => { + // At depth 2 with max_depth 1, it falls back to JSON string + // but role (StringValue) would still resolve as Str + } + } + } + JsonType::Str(s) => { + // If depth 1 is exceeded for the message itself + assert!( + s.as_ref().contains("role") || s.as_ref().contains("user"), + "Expected JSON string for truncated message" + ); + } + _ => panic!( + "Expected Object or Str for depth-limited message, got {:?}", + &messages[0] + ), + } + } + other => panic!("Expected Array, got {:?}", other), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_mixed_attributes() { + // Test a realistic span with both simple and GenAI nested attributes. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let attrs = vec![ + KeyValue { + key: "gen_ai.system".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("openai".to_string())), + }), + }, + KeyValue { + key: "gen_ai.request.model".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("gpt-4".to_string())), + }), + }, + KeyValue { + key: "gen_ai.request.max_tokens".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(4096)), + }), + }, + KeyValue { + key: "gen_ai.request.temperature".to_string(), + value: Some(AnyValue { + value: Some(Value::DoubleValue(0.7)), + }), + }, + genai_messages_attr( + "gen_ai.input.messages", + vec![genai_message("user", vec![("text", "Tell me a joke")])], + ), + genai_messages_attr( + "gen_ai.output.messages", + vec![genai_message( + "assistant", + vec![("text", "Why did the chicken cross the road?")], + )], + ), + KeyValue { + key: "gen_ai.usage.input_tokens".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(10)), + }), + }, + KeyValue { + key: "gen_ai.usage.output_tokens".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(15)), + }), + }, + ]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + // All 8 attributes should be present + assert_eq!(map.len(), 8, "Expected 8 attributes, got: {:?}", map.keys().collect::>()); + + // Simple string attributes + match map.get("gen_ai.system").unwrap() { + JsonType::Str(s) => assert_eq!(s.as_ref(), "openai"), + _ => panic!("Expected Str"), + } + match map.get("gen_ai.request.model").unwrap() { + JsonType::Str(s) => assert_eq!(s.as_ref(), "gpt-4"), + _ => panic!("Expected Str"), + } + + // Integer attributes + match map.get("gen_ai.request.max_tokens").unwrap() { + JsonType::Int(i) => assert_eq!(*i, 4096), + _ => panic!("Expected Int"), + } + match map.get("gen_ai.usage.input_tokens").unwrap() { + JsonType::Int(i) => assert_eq!(*i, 10), + _ => panic!("Expected Int"), + } + match map.get("gen_ai.usage.output_tokens").unwrap() { + JsonType::Int(i) => assert_eq!(*i, 15), + _ => panic!("Expected Int"), + } + + // Double attribute + match map.get("gen_ai.request.temperature").unwrap() { + JsonType::Double(d) => assert_eq!(*d, 0.7), + _ => panic!("Expected Double"), + } + + // Nested array attributes + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + match &messages[0] { + JsonType::Object(_) => {} // Nested Object is correct + _ => panic!("Expected Object for message"), + } + } + _ => panic!("Expected Array for input messages"), + } + match map.get("gen_ai.output.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + match &messages[0] { + JsonType::Object(_) => {} // Nested Object is correct + _ => panic!("Expected Object for message"), + } + } + _ => panic!("Expected Array for output messages"), + } + } + _ => panic!("Expected JSON variant"), + } + } } diff --git a/src/init/clickhouse_exporter.rs b/src/init/clickhouse_exporter.rs index afdfcfe4..5bfc94ca 100644 --- a/src/init/clickhouse_exporter.rs +++ b/src/init/clickhouse_exporter.rs @@ -78,6 +78,16 @@ pub struct ClickhouseExporterArgs { )] pub enable_json: bool, + /// Clickhouse Exporter nested KV max depth for JSON columns. + /// When set to a value > 0, nested KeyValueList structures are converted to JSON objects + /// up to the specified depth. When unset or 0, nested KV is flattened (backwards compatible). + /// Recommended value: 10 for GenAI attributes. + #[arg( + long("clickhouse-exporter-nested-kv-max-depth"), + env = "ROTEL_CLICKHOUSE_EXPORTER_NESTED_KV_MAX_DEPTH" + )] + pub nested_kv_max_depth: Option, + /// Clickhouse Exporter request timeout #[arg( id("CLICKHOUSE_EXPORTER_REQUEST_TIMEOUT"), @@ -106,6 +116,7 @@ impl Default for ClickhouseExporterArgs { password: None, async_insert: "true".to_string(), enable_json: false, + nested_kv_max_depth: None, request_timeout: std::time::Duration::from_secs(5), retry: Default::default(), } diff --git a/src/init/config.rs b/src/init/config.rs index cae06b28..664fd49a 100644 --- a/src/init/config.rs +++ b/src/init/config.rs @@ -331,6 +331,7 @@ impl TryIntoConfig for ExporterArgs { .with_compression(ch.compression) .with_async_insert(async_insert) .with_json(ch.enable_json) + .with_nested_kv_max_depth(ch.nested_kv_max_depth) .with_request_timeout(ch.request_timeout); if let Some(user) = &ch.user { diff --git a/src/topology/payload.rs b/src/topology/payload.rs index a9b4a9e4..390e5042 100644 --- a/src/topology/payload.rs +++ b/src/topology/payload.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::bounded_channel::{BoundedSender, SendError}; +#[cfg(feature = "file_receiver")] use crate::receivers::file::offset_tracker::LineOffset; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; @@ -57,6 +58,7 @@ pub enum RequestContext { pub enum MessageMetadataInner { Kafka(KafkaMetadata), Forwarder(ForwarderMetadata), + #[cfg(feature = "file_receiver")] File(FileMetadata), } @@ -96,6 +98,7 @@ impl MessageMetadata { } /// Create new MessageMetadata with File variant, starting with ref_count = 1 + #[cfg(feature = "file_receiver")] pub fn file(metadata: FileMetadata) -> Self { Self { data: MessageMetadataInner::File(metadata), @@ -309,6 +312,7 @@ impl Ack for MessageMetadata { .await?; } } + #[cfg(feature = "file_receiver")] MessageMetadataInner::File(fm) => { if let Some(ack_chan) = &fm.ack_chan { ack_chan @@ -353,6 +357,7 @@ impl Ack for MessageMetadata { .await?; } } + #[cfg(feature = "file_receiver")] MessageMetadataInner::File(fm) => { if let Some(ack_chan) = &fm.ack_chan { ack_chan @@ -389,6 +394,7 @@ pub struct KafkaNack { } /// Metadata for file receiver messages +#[cfg(feature = "file_receiver")] #[derive(Clone)] pub struct FileMetadata { /// The unique file identity @@ -399,6 +405,7 @@ pub struct FileMetadata { pub ack_chan: Option>, } +#[cfg(feature = "file_receiver")] impl FileMetadata { /// Create new FileMetadata pub fn new( @@ -415,6 +422,7 @@ impl FileMetadata { } // Manual Debug for FileMetadata since BoundedSender doesn't implement Debug +#[cfg(feature = "file_receiver")] impl std::fmt::Debug for FileMetadata { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FileMetadata") @@ -426,6 +434,7 @@ impl std::fmt::Debug for FileMetadata { } // Manual PartialEq for FileMetadata since BoundedSender can't be compared +#[cfg(feature = "file_receiver")] impl PartialEq for FileMetadata { fn eq(&self, other: &Self) -> bool { self.file_id == other.file_id && self.offsets == other.offsets @@ -434,18 +443,21 @@ impl PartialEq for FileMetadata { } /// Acknowledgement message from exporter back to file receiver +#[cfg(feature = "file_receiver")] pub enum FileAcknowledgement { Ack(FileAck), Nack(FileNack), } /// Successful acknowledgement of a file batch +#[cfg(feature = "file_receiver")] pub struct FileAck { pub file_id: crate::receivers::file::input::FileId, pub offsets: Vec, } /// Failed acknowledgement of a file batch +#[cfg(feature = "file_receiver")] pub struct FileNack { pub file_id: crate::receivers::file::input::FileId, pub offsets: Vec,