From 2c08920ea7c7ae6c820ba69b0441df4d987cb9a6 Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Tue, 27 Jan 2026 10:07:55 -0500 Subject: [PATCH 01/14] Bump to latest pyo3 Signed-off-by: Mike Heffner --- Cargo.lock | 26 +++++++++---------- Cargo.toml | 2 +- rotel_python_processor_sdk/Cargo.lock | 26 +++++++++---------- rotel_python_processor_sdk/Cargo.toml | 2 +- .../src/model/common.rs | 9 +++---- rotel_python_processor_sdk/src/py/common.rs | 19 +++++++------- rotel_python_processor_sdk/src/py/mod.rs | 3 ++- 7 files changed, 40 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3bfddf6b..915aa48a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3147,9 +3147,9 @@ dependencies = [ [[package]] name = "numpy" -version = "0.24.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cfbf3f0feededcaa4d289fe3079b03659e85c5b5a177f4ba6fb01ab4fb3e39" +checksum = "7aac2e6a6e4468ffa092ad43c39b81c79196c2bb773b8db4085f695efe3bba17" dependencies = [ "libc", "ndarray", @@ -3734,11 +3734,10 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5203598f366b11a02b13aa20cab591229ff0a89fd121a308a5df751d5fc9219" +checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d" dependencies = [ - "cfg-if", "indoc", "libc", "memoffset", @@ -3752,19 +3751,18 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99636d423fa2ca130fa5acde3059308006d46f98caac629418e53f7ebb1e9999" +checksum = "b455933107de8642b4487ed26d912c2d899dec6114884214a0b3bb3be9261ea6" dependencies = [ - "once_cell", "target-lexicon", ] [[package]] name = "pyo3-ffi" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78f9cf92ba9c409279bc3305b5409d90db2d2c22392d443a87df3a1adad59e33" +checksum = "1c85c9cbfaddf651b1221594209aed57e9e5cff63c4d11d1feead529b872a089" dependencies = [ "libc", "pyo3-build-config", @@ -3772,9 +3770,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b999cb1a6ce21f9a6b147dcf1be9ffedf02e0043aec74dc390f3007047cecd9" +checksum = "0a5b10c9bf9888125d917fb4d2ca2d25c8df94c7ab5a52e13313a07e050a3b02" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -3784,9 +3782,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.24.2" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "822ece1c7e1012745607d5cf0bcb2874769f0f7cb34c4cde03b9358eb9ef911a" +checksum = "03b51720d314836e53327f5871d4c0cfb4fb37cc2c4a11cc71907a86342c40f9" dependencies = [ "heck", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 8d08ea6c..c62eb3e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ libc = "0.2.170" pin-project = "1.1.10" futures-util = "0.3.31" rotel_python_processor_sdk = { path = "rotel_python_processor_sdk", optional = true } -pyo3 = { version = "0.24.1", optional = true } +pyo3 = { version = "0.27.2", optional = true } chrono = { version = "0.4.40", features = ["serde"] } serde = { version = "1.0.217", features = ["derive"] } thiserror = "2.0.12" diff --git a/rotel_python_processor_sdk/Cargo.lock b/rotel_python_processor_sdk/Cargo.lock index 5b765a10..87ade168 100644 --- a/rotel_python_processor_sdk/Cargo.lock +++ b/rotel_python_processor_sdk/Cargo.lock @@ -555,9 +555,9 @@ dependencies = [ [[package]] name = "numpy" -version = "0.24.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cfbf3f0feededcaa4d289fe3079b03659e85c5b5a177f4ba6fb01ab4fb3e39" +checksum = "7aac2e6a6e4468ffa092ad43c39b81c79196c2bb773b8db4085f695efe3bba17" dependencies = [ "libc", "ndarray", @@ -725,11 +725,10 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17da310086b068fbdcefbba30aeb3721d5bb9af8db4987d6735b2183ca567229" +checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d" dependencies = [ - "cfg-if", "indoc", "libc", "memoffset", @@ -743,19 +742,18 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e27165889bd793000a098bb966adc4300c312497ea25cf7a690a9f0ac5aa5fc1" +checksum = "b455933107de8642b4487ed26d912c2d899dec6114884214a0b3bb3be9261ea6" dependencies = [ - "once_cell", "target-lexicon", ] [[package]] name = "pyo3-ffi" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05280526e1dbf6b420062f3ef228b78c0c54ba94e157f5cb724a609d0f2faabc" +checksum = "1c85c9cbfaddf651b1221594209aed57e9e5cff63c4d11d1feead529b872a089" dependencies = [ "libc", "pyo3-build-config", @@ -763,9 +761,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3ce5686aa4d3f63359a5100c62a127c9f15e8398e5fdeb5deef1fed5cd5f44" +checksum = "0a5b10c9bf9888125d917fb4d2ca2d25c8df94c7ab5a52e13313a07e050a3b02" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -775,9 +773,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.24.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4cf6faa0cbfb0ed08e89beb8103ae9724eb4750e3a78084ba4017cbe94f3855" +checksum = "03b51720d314836e53327f5871d4c0cfb4fb37cc2c4a11cc71907a86342c40f9" dependencies = [ "heck", "proc-macro2", diff --git a/rotel_python_processor_sdk/Cargo.toml b/rotel_python_processor_sdk/Cargo.toml index 856ba38d..09f94fe9 100644 --- a/rotel_python_processor_sdk/Cargo.toml +++ b/rotel_python_processor_sdk/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -pyo3 = { version = "0.24.1" } +pyo3 = { version = "0.27.2" } opentelemetry-proto = { version = "0.30.0" } tower = "0.5.2" utilities = { path = "../utilities" } diff --git a/rotel_python_processor_sdk/src/model/common.rs b/rotel_python_processor_sdk/src/model/common.rs index 1e632cc9..bd75b16b 100644 --- a/rotel_python_processor_sdk/src/model/common.rs +++ b/rotel_python_processor_sdk/src/model/common.rs @@ -1,7 +1,6 @@ use crate::py::common::*; use opentelemetry_proto::tonic::common::v1::KeyValue; -#[allow(deprecated)] -use pyo3::{IntoPy, PyObject, PyResult, Python}; +use pyo3::{IntoPyObjectExt, PyObject, PyResult, Python}; use std::sync::{Arc, Mutex}; #[derive(Debug, Clone)] @@ -26,10 +25,9 @@ pub struct RArrayValue { pub values: Arc>>>>>, } -#[allow(deprecated)] impl RArrayValue { pub(crate) fn convert_to_py(&self, py: Python) -> PyResult { - Ok(ArrayValue(self.values.clone()).into_py(py)) + Ok(ArrayValue(self.values.clone()).into_py_any(py)?) } } @@ -38,10 +36,9 @@ pub struct RKeyValueList { pub values: Arc>>, } -#[allow(deprecated)] impl RKeyValueList { pub(crate) fn convert_to_py(&self, py: Python) -> PyResult { - Ok(KeyValueList(self.values.clone()).into_py(py)) + Ok(KeyValueList(self.values.clone()).into_py_any(py)?) } } diff --git a/rotel_python_processor_sdk/src/py/common.rs b/rotel_python_processor_sdk/src/py/common.rs index 4cb32340..287e9145 100644 --- a/rotel_python_processor_sdk/src/py/common.rs +++ b/rotel_python_processor_sdk/src/py/common.rs @@ -8,8 +8,9 @@ use crate::model::otel_transform::convert_attributes; use crate::py::{handle_poison_error, AttributesList}; use pyo3::exceptions::PyRuntimeError; use pyo3::types::{PyAnyMethods, PyBool, PyBytes, PyFloat, PyInt, PyString}; -#[allow(deprecated)] -use pyo3::{pyclass, pymethods, IntoPy, Py, PyErr, PyObject, PyRef, PyRefMut, PyResult, Python}; +use pyo3::{ + pyclass, pymethods, IntoPyObjectExt, Py, PyErr, PyObject, PyRef, PyRefMut, PyResult, Python, +}; use std::sync::{Arc, Mutex}; // Wrapper for AnyValue that can be exposed to Python @@ -92,17 +93,16 @@ impl AnyValue { } } #[getter] - #[allow(deprecated)] fn value<'py>(&self, py: Python<'py>) -> PyResult { let v = self.inner.lock().map_err(handle_poison_error)?; let binding = v.clone().unwrap().value.clone(); let bind_lock = binding.lock(); let x = match bind_lock.unwrap().clone() { - Some(StringValue(s)) => Ok(s.into_py(py)), - Some(BoolValue(b)) => Ok(b.into_py(py)), - Some(IntValue(i)) => Ok(i.into_py(py)), - Some(DoubleValue(d)) => Ok(d.into_py(py)), - Some(BytesValue(b)) => Ok(b.into_py(py)), + Some(StringValue(s)) => Ok(s.into_py_any(py)?), + Some(BoolValue(b)) => Ok(b.into_py_any(py)?), + Some(IntValue(i)) => Ok(i.into_py_any(py)?), + Some(DoubleValue(d)) => Ok(d.into_py_any(py)?), + Some(BytesValue(b)) => Ok(b.into_py_any(py)?), Some(RVArrayValue(a)) => Ok(a.convert_to_py(py)?), Some(KvListValue(k)) => Ok(k.convert_to_py(py)?), None => Ok(py.None()), @@ -489,12 +489,11 @@ impl KeyValue { }) } #[getter] - #[allow(deprecated)] fn key(&self, py: Python) -> PyResult { let v = self.inner.lock().map_err(handle_poison_error)?; let binding = v.key.clone(); let bind_lock = binding.lock(); - let x = Ok(bind_lock.unwrap().clone().into_py(py)); + let x = Ok(bind_lock.unwrap().clone().into_py_any(py)?); x } #[setter] diff --git a/rotel_python_processor_sdk/src/py/mod.rs b/rotel_python_processor_sdk/src/py/mod.rs index 37bbfe79..ff21c823 100644 --- a/rotel_python_processor_sdk/src/py/mod.rs +++ b/rotel_python_processor_sdk/src/py/mod.rs @@ -183,6 +183,7 @@ mod tests { use opentelemetry_proto::tonic::metrics::v1::metric::Data; use opentelemetry_proto::tonic::trace::v1; use pyo3::ffi::c_str; + use pyo3::IntoPyObjectExt; use std::collections::HashMap; use std::ffi::CString; use std::sync::{Arc, Mutex, Once}; @@ -208,7 +209,7 @@ mod tests { process_fn: Option, ) -> PyResult<()> { let sys = py.import("sys")?; - sys.setattr("stdout", LoggingStdout.into_py(py))?; + sys.setattr("stdout", LoggingStdout.into_py_any(py)?)?; let code = std::fs::read_to_string(format!("./python_tests/{}", script))?; let py_mod = PyModule::from_code( py, From 20592aa737580b200eb49cb8fed3794f1efdb346 Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Tue, 27 Jan 2026 10:22:40 -0500 Subject: [PATCH 02/14] Fix deprecation warnings Signed-off-by: Mike Heffner --- .../src/model/common.rs | 6 ++-- rotel_python_processor_sdk/src/model/mod.rs | 10 +++---- rotel_python_processor_sdk/src/py/common.rs | 30 +++++++++---------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/rotel_python_processor_sdk/src/model/common.rs b/rotel_python_processor_sdk/src/model/common.rs index bd75b16b..7d397653 100644 --- a/rotel_python_processor_sdk/src/model/common.rs +++ b/rotel_python_processor_sdk/src/model/common.rs @@ -1,6 +1,6 @@ use crate::py::common::*; use opentelemetry_proto::tonic::common::v1::KeyValue; -use pyo3::{IntoPyObjectExt, PyObject, PyResult, Python}; +use pyo3::{IntoPyObjectExt, Py, PyAny, PyResult, Python}; use std::sync::{Arc, Mutex}; #[derive(Debug, Clone)] @@ -26,7 +26,7 @@ pub struct RArrayValue { } impl RArrayValue { - pub(crate) fn convert_to_py(&self, py: Python) -> PyResult { + pub(crate) fn convert_to_py(&self, py: Python) -> PyResult> { Ok(ArrayValue(self.values.clone()).into_py_any(py)?) } } @@ -37,7 +37,7 @@ pub struct RKeyValueList { } impl RKeyValueList { - pub(crate) fn convert_to_py(&self, py: Python) -> PyResult { + pub(crate) fn convert_to_py(&self, py: Python) -> PyResult> { Ok(KeyValueList(self.values.clone()).into_py_any(py)?) } } diff --git a/rotel_python_processor_sdk/src/model/mod.rs b/rotel_python_processor_sdk/src/model/mod.rs index d6096da2..486156e6 100644 --- a/rotel_python_processor_sdk/src/model/mod.rs +++ b/rotel_python_processor_sdk/src/model/mod.rs @@ -22,10 +22,10 @@ static PROCESSOR_INIT: Once = Once::new(); pub fn register_processor(code: String, script: String, module: String) -> Result<(), BoxError> { PROCESSOR_INIT.call_once(|| { pyo3::append_to_inittab!(rotel_sdk); - pyo3::prepare_freethreaded_python(); + Python::initialize(); }); - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { PyModule::from_code( py, CString::new(code)?.as_c_str(), @@ -48,7 +48,7 @@ impl PythonProcessable for opentelemetry_proto::tonic::trace::v1::ResourceSpans fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_spans(self); // Build the PyObject - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { let spans = ResourceSpans { resource: inner.resource.clone(), scope_spans: inner.scope_spans.clone(), @@ -101,7 +101,7 @@ impl PythonProcessable for opentelemetry_proto::tonic::metrics::v1::ResourceMetr fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_metrics(self); // Build the PyObject - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { let spans = ResourceMetrics { resource: inner.resource.clone(), scope_metrics: inner.scope_metrics.clone(), @@ -142,7 +142,7 @@ impl PythonProcessable for opentelemetry_proto::tonic::logs::v1::ResourceLogs { fn process(self, processor: &str, request_context: Option) -> Self { let inner = otel_transform::transform_resource_logs(self); // Build the PyObject - let res = Python::with_gil(|py| -> PyResult<()> { + let res = Python::attach(|py| -> PyResult<()> { let spans = ResourceLogs { resource: inner.resource.clone(), scope_logs: inner.scope_logs.clone(), diff --git a/rotel_python_processor_sdk/src/py/common.rs b/rotel_python_processor_sdk/src/py/common.rs index 287e9145..00031aba 100644 --- a/rotel_python_processor_sdk/src/py/common.rs +++ b/rotel_python_processor_sdk/src/py/common.rs @@ -9,7 +9,7 @@ use crate::py::{handle_poison_error, AttributesList}; use pyo3::exceptions::PyRuntimeError; use pyo3::types::{PyAnyMethods, PyBool, PyBytes, PyFloat, PyInt, PyString}; use pyo3::{ - pyclass, pymethods, IntoPyObjectExt, Py, PyErr, PyObject, PyRef, PyRefMut, PyResult, Python, + pyclass, pymethods, IntoPyObjectExt, Py, PyAny, PyErr, PyRef, PyRefMut, PyResult, Python, }; use std::sync::{Arc, Mutex}; @@ -23,10 +23,10 @@ pub struct AnyValue { impl AnyValue { #[new] #[pyo3(signature = (optional_value=None))] - fn new(py: Python, optional_value: Option) -> PyResult { + fn new(py: Python, optional_value: Option>) -> PyResult { if let Some(v) = optional_value { let bound_obj = v.bind(py); - if let Ok(s) = bound_obj.downcast::() { + if let Ok(s) = bound_obj.cast::() { let value: String = s.extract()?; // Extract the i64 value Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { @@ -35,35 +35,35 @@ impl AnyValue { }) // VERY IMPORTANT! The ordering of checking downcast, with PyBool first and PyInt // second is critical. This ensures we don't downcast a python value of True to an i64 - } else if let Ok(b) = bound_obj.downcast_exact::() { + } else if let Ok(b) = bound_obj.cast_exact::() { let value: bool = b.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(BoolValue(value)))), }))), }) - } else if let Ok(i) = bound_obj.downcast::() { + } else if let Ok(i) = bound_obj.cast::() { let value: i64 = i.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(IntValue(value)))), }))), }) - } else if let Ok(f) = bound_obj.downcast::() { + } else if let Ok(f) = bound_obj.cast::() { let value: f64 = f.extract()?; // Extract the i64 value Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(DoubleValue(value)))), }))), }) - } else if let Ok(b) = bound_obj.downcast::() { + } else if let Ok(b) = bound_obj.cast::() { let value: Vec = b.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { value: Arc::new(Mutex::new(Some(BytesValue(value)))), }))), }) - } else if let Ok(v) = bound_obj.downcast::() { + } else if let Ok(v) = bound_obj.cast::() { let value: KeyValueList = v.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { @@ -72,7 +72,7 @@ impl AnyValue { })))), }))), }) - } else if let Ok(v) = bound_obj.downcast::() { + } else if let Ok(v) = bound_obj.cast::() { let value: ArrayValue = v.extract()?; Ok(AnyValue { inner: Arc::new(Mutex::new(Some(RAnyValue { @@ -93,7 +93,7 @@ impl AnyValue { } } #[getter] - fn value<'py>(&self, py: Python<'py>) -> PyResult { + fn value<'py>(&self, py: Python<'py>) -> PyResult> { let v = self.inner.lock().map_err(handle_poison_error)?; let binding = v.clone().unwrap().value.clone(); let bind_lock = binding.lock(); @@ -410,7 +410,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_bool_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_bool_value(key: &str, py: Python, value: Py) -> PyResult { let b = value.extract::(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -423,7 +423,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_int_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_int_value(key: &str, py: Python, value: Py) -> PyResult { let i = value.extract::(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -436,7 +436,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_double_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_double_value(key: &str, py: Python, value: Py) -> PyResult { let f = value.extract::(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -449,7 +449,7 @@ impl KeyValue { } // Helper methods for class #[staticmethod] - fn new_bytes_value(key: &str, py: Python, value: PyObject) -> PyResult { + fn new_bytes_value(key: &str, py: Python, value: Py) -> PyResult { let f = value.extract::>(py)?; let key = Arc::new(Mutex::new(key.to_string())); let value = RAnyValue { @@ -489,7 +489,7 @@ impl KeyValue { }) } #[getter] - fn key(&self, py: Python) -> PyResult { + fn key(&self, py: Python) -> PyResult> { let v = self.inner.lock().map_err(handle_poison_error)?; let binding = v.key.clone(); let bind_lock = binding.lock(); From 7d849db2f4ddf2473e126054c5725836c537661c Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Tue, 27 Jan 2026 10:52:27 -0500 Subject: [PATCH 03/14] Add 3.14 to py releases --- .github/workflows/processor-release.yml | 35 +++++++++++-------------- Dockerfile.python-processor | 8 +++--- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/.github/workflows/processor-release.yml b/.github/workflows/processor-release.yml index 54108396..04f847f5 100644 --- a/.github/workflows/processor-release.yml +++ b/.github/workflows/processor-release.yml @@ -12,7 +12,6 @@ jobs: fail-fast: false matrix: include: - - target: x86_64-unknown-linux-gnu runner: ubuntu-22.04 os: ubuntu @@ -29,6 +28,10 @@ jobs: runner: ubuntu-22.04 os: ubuntu python-version: 3.13 + - target: x86_64-unknown-linux-gnu + runner: ubuntu-22.04 + os: ubuntu + python-version: 3.14 - target: aarch64-unknown-linux-gnu runner: ubuntu-22.04-arm os: ubuntu @@ -45,6 +48,10 @@ jobs: runner: ubuntu-22.04-arm os: ubuntu python-version: 3.13 + - target: aarch64-unknown-linux-gnu + runner: ubuntu-22.04-arm + os: ubuntu + python-version: 3.14 - target: aarch64-apple-darwin runner: macos-14 os: macos @@ -61,6 +68,10 @@ jobs: runner: macos-14 os: macos python-version: 3.13 + - target: aarch64-apple-darwin + runner: macos-14 + os: macos + python-version: 3.14 env: TARGET: ${{ matrix.target }} steps: @@ -74,26 +85,10 @@ jobs: - name: install protoc macos if: matrix.os == 'macos' run: brew install protobuf - - name: update python 3.10 - if: matrix.python-version == '3.10' - uses: actions/setup-python@v5 - with: - python-version: "3.10" - - name: update python 3.11 - if: matrix.python-version == '3.11' - uses: actions/setup-python@v5 - with: - python-version: 3.11 - - name: update python 3.12 - if: matrix.python-version == '3.12' - uses: actions/setup-python@v5 - with: - python-version: 3.12 - - name: update python 3.13 - if: matrix.python-version == '3.13' + - name: update python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: - python-version: 3.13 + python-version: ${{ matrix.python-version }} - name: Set build env run: echo "BUILD_SHORT_SHA=$(echo -n $GITHUB_SHA | cut -c 1-7)" >> $GITHUB_ENV - uses: actions-rust-lang/setup-rust-toolchain@v1 @@ -112,7 +107,7 @@ jobs: command: build args: --release --features pyo3 --locked --target ${{ matrix.target }} - name: upload build - if: matrix.python-version == '3.13' + if: matrix.python-version == '3.14' uses: actions/upload-artifact@v4 with: name: rotel-processor-build-${{ matrix.target }} diff --git a/Dockerfile.python-processor b/Dockerfile.python-processor index fc18125d..ced631f0 100644 --- a/Dockerfile.python-processor +++ b/Dockerfile.python-processor @@ -15,13 +15,13 @@ ENV PYTHONUNBUFFERED 1 RUN add-apt-repository ppa:deadsnakes/ppa && \ apt update -# Install Python 3.13 and pip -RUN apt install -y python3.13 python3.13-venv python3.13-dev && \ +# Install Python 3.14 and pip +RUN apt install -y python3.14 python3.14-venv python3.14-dev && \ apt clean && \ rm -rf /var/lib/apt/lists/* -# Set Python 3.13 as the default python3 -RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.13 1 +# Set Python 3.14 as the default python3 +RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.14 1 COPY target/${TARGETARCH}/rotel /rotel RUN chmod 0755 /rotel From 465b80a3abe8c2ccea8f875d3d5af176cf9d79f5 Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Wed, 28 Jan 2026 09:48:41 -0800 Subject: [PATCH 04/14] Update to support nested key value export --- src/exporters/clickhouse/rowbinary/json.rs | 478 ++++++++++++++------- src/exporters/clickhouse/transformer.rs | 20 +- 2 files changed, 349 insertions(+), 149 deletions(-) diff --git a/src/exporters/clickhouse/rowbinary/json.rs b/src/exporters/clickhouse/rowbinary/json.rs index a967052f..94189b8f 100644 --- a/src/exporters/clickhouse/rowbinary/json.rs +++ b/src/exporters/clickhouse/rowbinary/json.rs @@ -1,25 +1,46 @@ use serde::{Serialize, Serializer}; +use std::borrow::Cow; use crate::otlp::cvattr::ConvertedAttrValue; use opentelemetry_proto::tonic::common::v1::AnyValue; use opentelemetry_proto::tonic::common::v1::any_value::Value; use serde_json::json; +/// Default maximum recursion depth for nested structures to prevent stack overflow +pub const DEFAULT_NESTED_KV_MAX_DEPTH: usize = 10; + +/// JSON type representation for ClickHouse rowbinary format. +/// Uses `Cow` for strings and object keys to efficiently handle both +/// borrowed and owned data with a single type. #[derive(Debug)] pub enum JsonType<'a> { Int(i64), - Str(&'a str), - StrOwned(String), + Str(Cow<'a, str>), Double(f64), Bool(bool), Array(Vec>), + Object(Vec<(Cow<'a, str>, JsonType<'a>)>), +} + +impl<'a> JsonType<'a> { + /// Create a borrowed string variant + #[inline] + pub fn str_borrowed(s: &'a str) -> Self { + JsonType::Str(Cow::Borrowed(s)) + } + + /// Create an owned string variant + #[inline] + pub fn str_owned(s: String) -> Self { + JsonType::Str(Cow::Owned(s)) + } } impl<'a> From<&'a ConvertedAttrValue> for JsonType<'a> { fn from(value: &'a ConvertedAttrValue) -> Self { match value { ConvertedAttrValue::Int(i) => JsonType::Int(*i), - ConvertedAttrValue::String(s) => JsonType::Str(s.as_str()), + ConvertedAttrValue::String(s) => JsonType::str_borrowed(s.as_str()), ConvertedAttrValue::Double(d) => JsonType::Double(*d), } } @@ -29,81 +50,174 @@ impl From for JsonType<'static> { fn from(value: ConvertedAttrValue) -> Self { match value { ConvertedAttrValue::Int(i) => JsonType::Int(i), - ConvertedAttrValue::String(s) => JsonType::StrOwned(s), + ConvertedAttrValue::String(s) => JsonType::str_owned(s), ConvertedAttrValue::Double(d) => JsonType::Double(d), } } } +/// Convert AnyValue to JsonType with configurable nesting behavior. +/// - `nested_kv_max_depth = None` or `Some(0)`: flat mode (backwards compatible, fastest) +/// - `nested_kv_max_depth = Some(n)` where n > 0: recursive conversion up to depth n +/// +/// Flat mode is the hot path - it handles nested complex types by serializing to JSON strings. +/// Nested mode recursively converts KvlistValue to Object and preserves ArrayValue structure. +#[inline] +pub fn anyvalue_to_jsontype<'a>(value: &'a AnyValue, nested_kv_max_depth: Option) -> JsonType<'a> { + match nested_kv_max_depth { + // Fast path: flat mode (backwards compatible) + None | Some(0) => anyvalue_to_jsontype_flat(value), + // Nested mode: recursive conversion + Some(max_depth) => anyvalue_to_jsontype_nested(value, 0, max_depth), + } +} + +/// Fast path: convert without recursing into nested structures. +/// Arrays containing KvlistValue or nested ArrayValue are serialized as JSON strings. +#[inline] +fn anyvalue_to_jsontype_flat<'a>(value: &'a AnyValue) -> JsonType<'a> { + match &value.value { + Some(Value::IntValue(i)) => JsonType::Int(*i), + Some(Value::DoubleValue(d)) => JsonType::Double(*d), + Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), + Some(Value::BoolValue(b)) => JsonType::Bool(*b), + Some(Value::ArrayValue(a)) => { + let values = a.values.iter() + .map(|v| match &v.value { + Some(Value::IntValue(i)) => JsonType::Int(*i), + Some(Value::DoubleValue(d)) => JsonType::Double(*d), + Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), + Some(Value::BoolValue(b)) => JsonType::Bool(*b), + // Complex nested types become JSON strings + Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), + Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), + None => JsonType::str_borrowed(""), + }) + .collect(); + JsonType::Array(values) + } + Some(Value::KvlistValue(_)) => JsonType::str_borrowed(""), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), + None => JsonType::str_borrowed(""), + } +} + +/// Nested mode: recursively convert with depth tracking. +fn anyvalue_to_jsontype_nested<'a>(value: &'a AnyValue, depth: usize, max_depth: usize) -> JsonType<'a> { + if depth > max_depth { + return JsonType::str_owned(json!(value).to_string()); + } + + match &value.value { + Some(Value::IntValue(i)) => JsonType::Int(*i), + Some(Value::DoubleValue(d)) => JsonType::Double(*d), + Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), + Some(Value::BoolValue(b)) => JsonType::Bool(*b), + Some(Value::ArrayValue(a)) => { + let values = a.values.iter() + .map(|v| anyvalue_to_jsontype_nested(v, depth + 1, max_depth)) + .collect(); + JsonType::Array(values) + } + Some(Value::KvlistValue(kv)) => { + let entries = kv.values.iter() + .filter_map(|entry| { + entry.value.as_ref().map(|v| ( + Cow::Borrowed(entry.key.as_str()), + anyvalue_to_jsontype_nested(v, depth + 1, max_depth), + )) + }) + .collect(); + JsonType::Object(entries) + } + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), + None => JsonType::str_borrowed(""), + } +} + +/// Convert owned AnyValue to JsonType with configurable nesting behavior. +#[inline] +pub fn anyvalue_to_jsontype_owned(value: AnyValue, nested_kv_max_depth: Option) -> JsonType<'static> { + match nested_kv_max_depth { + None | Some(0) => anyvalue_to_jsontype_flat_owned(value), + Some(max_depth) => anyvalue_to_jsontype_nested_owned(value, 0, max_depth), + } +} + +/// Fast path (owned): convert without recursing into nested structures. +#[inline] +fn anyvalue_to_jsontype_flat_owned(value: AnyValue) -> JsonType<'static> { + match value.value { + Some(Value::IntValue(i)) => JsonType::Int(i), + Some(Value::DoubleValue(d)) => JsonType::Double(d), + Some(Value::StringValue(s)) => JsonType::str_owned(s), + Some(Value::BoolValue(b)) => JsonType::Bool(b), + Some(Value::ArrayValue(a)) => { + let values = a.values.into_iter() + .map(|v| match v.value { + Some(Value::IntValue(i)) => JsonType::Int(i), + Some(Value::DoubleValue(d)) => JsonType::Double(d), + Some(Value::StringValue(s)) => JsonType::str_owned(s), + Some(Value::BoolValue(b)) => JsonType::Bool(b), + Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), + Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), + None => JsonType::str_owned(String::new()), + }) + .collect(); + JsonType::Array(values) + } + Some(Value::KvlistValue(_)) => JsonType::str_owned(String::new()), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), + None => JsonType::str_owned(String::new()), + } +} + +/// Nested mode (owned): recursively convert with depth tracking. +fn anyvalue_to_jsontype_nested_owned(value: AnyValue, depth: usize, max_depth: usize) -> JsonType<'static> { + if depth > max_depth { + return JsonType::str_owned(json!(value).to_string()); + } + + match value.value { + Some(Value::IntValue(i)) => JsonType::Int(i), + Some(Value::DoubleValue(d)) => JsonType::Double(d), + Some(Value::StringValue(s)) => JsonType::str_owned(s), + Some(Value::BoolValue(b)) => JsonType::Bool(b), + Some(Value::ArrayValue(a)) => { + let values = a.values.into_iter() + .map(|v| anyvalue_to_jsontype_nested_owned(v, depth + 1, max_depth)) + .collect(); + JsonType::Array(values) + } + Some(Value::KvlistValue(kv)) => { + let entries = kv.values.into_iter() + .filter_map(|entry| { + entry.value.map(|v| ( + Cow::Owned(entry.key), + anyvalue_to_jsontype_nested_owned(v, depth + 1, max_depth), + )) + }) + .collect(); + JsonType::Object(entries) + } + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), + None => JsonType::str_owned(String::new()), + } +} + impl<'a> From<&'a AnyValue> for JsonType<'a> { + /// Default conversion uses flat mode for backwards compatibility. fn from(value: &'a AnyValue) -> Self { - match &value.value { - Some(Value::IntValue(i)) => JsonType::Int(*i), - Some(Value::DoubleValue(d)) => JsonType::Double(*d), - Some(Value::StringValue(s)) => JsonType::Str(s.as_str()), - Some(Value::BoolValue(b)) => JsonType::Bool(*b), - Some(Value::ArrayValue(a)) => { - // We support arrays with all simple values, no recursive nesting - let values = a - .values - .iter() - .map(|v| match &v.value { - Some(Value::IntValue(i)) => JsonType::Int(*i), - Some(Value::DoubleValue(d)) => JsonType::Double(*d), - Some(Value::StringValue(s)) => JsonType::Str(s.as_str()), - Some(Value::BoolValue(b)) => JsonType::Bool(*b), - Some(Value::ArrayValue(a)) => JsonType::StrOwned(json!(a).to_string()), - Some(Value::KvlistValue(kv)) => JsonType::StrOwned(json!(kv).to_string()), - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(b)), - None => JsonType::Str(""), - }) - .collect(); - JsonType::Array(values) - } - Some(Value::KvlistValue(_kv)) => { - // KvlistValue should be flattened before reaching this point - // This case should not occur in normal operation - JsonType::Str("") - } - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(b)), - None => JsonType::Str(""), - } + anyvalue_to_jsontype(value, None) } } impl From for JsonType<'static> { + /// Default conversion uses flat mode for backwards compatibility. fn from(value: AnyValue) -> Self { - match value.value { - Some(Value::IntValue(i)) => JsonType::Int(i), - Some(Value::DoubleValue(d)) => JsonType::Double(d), - Some(Value::StringValue(s)) => JsonType::StrOwned(s), - Some(Value::BoolValue(b)) => JsonType::Bool(b), - Some(Value::ArrayValue(a)) => { - // We support arrays with all simple values, no recursive nesting - let values = a - .values - .into_iter() - .map(|v| match v.value { - Some(Value::IntValue(i)) => JsonType::Int(i), - Some(Value::DoubleValue(d)) => JsonType::Double(d), - Some(Value::StringValue(s)) => JsonType::StrOwned(s), - Some(Value::BoolValue(b)) => JsonType::Bool(b), - Some(Value::ArrayValue(a)) => JsonType::StrOwned(json!(a).to_string()), - Some(Value::KvlistValue(kv)) => JsonType::StrOwned(json!(kv).to_string()), - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(b)), - None => JsonType::Str(""), - }) - .collect(); - JsonType::Array(values) - } - Some(Value::KvlistValue(_kv)) => { - // KvlistValue should be flattened before reaching this point - // This case should not occur in normal operation - JsonType::StrOwned(String::new()) - } - Some(Value::BytesValue(b)) => JsonType::StrOwned(hex::encode(&b)), - None => JsonType::StrOwned(String::new()), - } + anyvalue_to_jsontype_owned(value, None) } } @@ -125,14 +239,7 @@ impl<'a> Serialize for JsonType<'a> { JsonType::Str(s) => { let jsonstr = JsonStr { code: 0x15, - value: s, - }; - jsonstr.serialize(serializer) - } - JsonType::StrOwned(s) => { - let jsonstr = JsonStr { - code: 0x15, - value: &s, + value: s.as_ref(), }; jsonstr.serialize(serializer) } @@ -151,7 +258,7 @@ impl<'a> Serialize for JsonType<'a> { jsonbool.serialize(serializer) } JsonType::Array(a) => { - // We always use the Dyanmic type here because it is simpler to serialize. Clickhouse + // We always use the Dynamic type here because it is simpler to serialize. Clickhouse // will use Nullable(T) in responses if all types are the same, but there doesn't seem // to be a cost savings in wire size. let jsonarray = JsonArrayDynamic { @@ -162,6 +269,14 @@ impl<'a> Serialize for JsonType<'a> { }; jsonarray.serialize(serializer) } + JsonType::Object(entries) => { + // Object type code 0x1f for named tuples/objects + let jsonobject = JsonObject { + object_code: 0x1f, + entries, + }; + jsonobject.serialize(serializer) + } } } } @@ -198,6 +313,37 @@ struct JsonArrayDynamic<'a> { values: &'a Vec>, } +/// Serializer for Object type (named tuple in ClickHouse JSON format) +/// Format: object_code (0x1f) | num_entries | [key_len | key_bytes | value]... +struct JsonObject<'a> { + object_code: u8, + entries: &'a Vec<(Cow<'a, str>, JsonType<'a>)>, +} + +impl<'a> Serialize for JsonObject<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + use serde::ser::SerializeTuple; + // Object format: code | num_keys | [key_len | key_bytes | value_type | value]... + let num_entries = self.entries.len(); + let mut seq = serializer.serialize_tuple(2 + num_entries * 2)?; + seq.serialize_element(&self.object_code)?; + seq.serialize_element(&(num_entries as u8))?; + for (key, value) in self.entries.iter() { + // Serialize key as length-prefixed string (Cow dereferences to &str) + seq.serialize_element(&JsonStr { + code: 0x00, // No type code for object keys, just length + bytes + value: key.as_ref(), + })?; + // Serialize the value with its type + seq.serialize_element(value)?; + } + seq.end() + } +} + #[cfg(test)] mod tests { use bytes::BytesMut; @@ -244,12 +390,12 @@ mod tests { ); } - /// Test that verifies the byte patterns for JsonType::Str serialization. + /// Test that verifies the byte patterns for JsonType::Str serialization (borrowed). /// This test prevents regressions in the wire format by ensuring the serialized /// bytes match the expected pattern: [type_code(0x15), string_length, string_bytes] #[test] fn test_jsontype_str_serialization() { - let json_str = JsonType::Str("hello"); + let json_str = JsonType::str_borrowed("hello"); let serialized = serialize_to_bytes(json_str); // Expected bytes: type code 0x15 + string length (5 as u8) + "hello" bytes @@ -261,14 +407,14 @@ mod tests { assert_eq!( serialized, expected, - "JsonType::Str(\"hello\") serialization bytes mismatch" + "JsonType::str_borrowed(\"hello\") serialization bytes mismatch" ); } /// Test that verifies the byte patterns for JsonType::Str with empty string. #[test] fn test_jsontype_str_empty_serialization() { - let json_str = JsonType::Str(""); + let json_str = JsonType::str_borrowed(""); let serialized = serialize_to_bytes(json_str); // Expected bytes: type code 0x15 + string length (0 as u8) @@ -279,15 +425,15 @@ mod tests { assert_eq!( serialized, expected, - "JsonType::Str(\"\") serialization bytes mismatch" + "JsonType::str_borrowed(\"\") serialization bytes mismatch" ); } - /// Test that verifies the byte patterns for JsonType::StrOwned serialization. - /// This should produce the same byte pattern as JsonType::Str. + /// Test that verifies the byte patterns for JsonType::Str (owned) serialization. + /// This should produce the same byte pattern as borrowed Str. #[test] fn test_jsontype_str_owned_serialization() { - let json_str_owned = JsonType::StrOwned("world".to_string()); + let json_str_owned = JsonType::str_owned("world".to_string()); let serialized = serialize_to_bytes(json_str_owned); // Expected bytes: type code 0x15 + string length (5 as u8) + "world" bytes @@ -299,7 +445,7 @@ mod tests { assert_eq!( serialized, expected, - "JsonType::StrOwned(\"world\") serialization bytes mismatch" + "JsonType::str_owned(\"world\") serialization bytes mismatch" ); } @@ -370,7 +516,7 @@ mod tests { assert_eq!(serialized_array, expected); // Different types - let json_array = JsonType::Array(vec![JsonType::Int(3), JsonType::Str("hello")]); + let json_array = JsonType::Array(vec![JsonType::Int(3), JsonType::str_borrowed("hello")]); let serialized_array = serialize_to_bytes(json_array); let expected: Vec = vec![ @@ -413,11 +559,11 @@ mod tests { ); } + /// Test flat mode (backwards compatible) - KvlistValue and nested ArrayValue become JSON strings #[test] - fn test_anyvalue_arrayvalue_conversion() { + fn test_anyvalue_arrayvalue_flat_mode() { use opentelemetry_proto::tonic::common::v1::{ArrayValue, KeyValue, KeyValueList}; - // Create an ArrayValue with mixed types including KvlistValue let mut kv_list = KeyValueList::default(); kv_list.values.push(KeyValue { key: "nested_key".to_string(), @@ -428,91 +574,136 @@ mod tests { let array_value = ArrayValue { values: vec![ - AnyValue { - value: Some(Value::IntValue(42)), - }, - AnyValue { - value: Some(Value::StringValue("test_string".to_string())), - }, - AnyValue { - value: Some(Value::DoubleValue(3.14)), - }, - AnyValue { - value: Some(Value::BoolValue(true)), - }, - AnyValue { - value: Some(Value::KvlistValue(kv_list)), - }, - AnyValue { - value: Some(Value::BytesValue(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f])), // "Hello" - }, - AnyValue { - value: Some(Value::ArrayValue(ArrayValue { - values: vec![AnyValue { - value: Some(Value::IntValue(100)), - }], - })), - }, + AnyValue { value: Some(Value::IntValue(42)) }, + AnyValue { value: Some(Value::StringValue("test_string".to_string())) }, + AnyValue { value: Some(Value::KvlistValue(kv_list)) }, + AnyValue { value: Some(Value::ArrayValue(ArrayValue { + values: vec![AnyValue { value: Some(Value::IntValue(100)) }], + })) }, ], }; - let any_value = AnyValue { - value: Some(Value::ArrayValue(array_value)), - }; + let any_value = AnyValue { value: Some(Value::ArrayValue(array_value)) }; + // Default From uses flat mode (None) let json_type: JsonType = (&any_value).into(); match json_type { JsonType::Array(values) => { - assert_eq!(values.len(), 7); + assert_eq!(values.len(), 4); - // Check Int value + // Simple types are converted directly match &values[0] { JsonType::Int(i) => assert_eq!(*i, 42), _ => panic!("Expected Int(42) at index 0"), } - - // Check String value match &values[1] { - JsonType::Str(s) => assert_eq!(*s, "test_string"), - _ => panic!("Expected Str(\"test_string\") at index 1"), + JsonType::Str(s) => assert_eq!(s.as_ref(), "test_string"), + _ => panic!("Expected Str at index 1"), } - // Check Double value + // KvlistValue becomes a JSON string in flat mode match &values[2] { - JsonType::Double(d) => assert_eq!(*d, 3.14), - _ => panic!("Expected Double(3.14) at index 2"), + JsonType::Str(s) => { + assert!(s.as_ref().contains("nested_key")); + assert!(s.as_ref().contains("nested_value")); + } + _ => panic!("Expected Str (JSON string) at index 2 in flat mode, got {:?}", &values[2]), } - // Check Bool value + // Nested ArrayValue becomes a JSON string in flat mode + match &values[3] { + JsonType::Str(s) => { + assert!(s.as_ref().contains("100")); + } + _ => panic!("Expected Str (JSON string) at index 3 in flat mode, got {:?}", &values[3]), + } + } + _ => panic!("Expected JsonType::Array"), + } + } + + /// Test nested mode - KvlistValue becomes Object, nested ArrayValue stays Array + #[test] + fn test_anyvalue_arrayvalue_nested_mode() { + use opentelemetry_proto::tonic::common::v1::{ArrayValue, KeyValue, KeyValueList}; + + let mut kv_list = KeyValueList::default(); + kv_list.values.push(KeyValue { + key: "nested_key".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("nested_value".to_string())), + }), + }); + + let array_value = ArrayValue { + values: vec![ + AnyValue { value: Some(Value::IntValue(42)) }, + AnyValue { value: Some(Value::StringValue("test_string".to_string())) }, + AnyValue { value: Some(Value::DoubleValue(3.14)) }, + AnyValue { value: Some(Value::BoolValue(true)) }, + AnyValue { value: Some(Value::KvlistValue(kv_list)) }, + AnyValue { value: Some(Value::BytesValue(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f])) }, + AnyValue { value: Some(Value::ArrayValue(ArrayValue { + values: vec![AnyValue { value: Some(Value::IntValue(100)) }], + })) }, + ], + }; + + let any_value = AnyValue { value: Some(Value::ArrayValue(array_value)) }; + + // Use nested mode with max depth 10 + let json_type = anyvalue_to_jsontype(&any_value, Some(10)); + + match json_type { + JsonType::Array(values) => { + assert_eq!(values.len(), 7); + + match &values[0] { + JsonType::Int(i) => assert_eq!(*i, 42), + _ => panic!("Expected Int(42) at index 0"), + } + match &values[1] { + JsonType::Str(s) => assert_eq!(s.as_ref(), "test_string"), + _ => panic!("Expected Str at index 1"), + } + match &values[2] { + JsonType::Double(d) => assert_eq!(*d, 3.14), + _ => panic!("Expected Double at index 2"), + } match &values[3] { JsonType::Bool(b) => assert!(b), - _ => panic!("Expected Bool(true) at index 3"), + _ => panic!("Expected Bool at index 3"), } - // Check KvlistValue (converted to JSON string) + // KvlistValue becomes Object in nested mode match &values[4] { - JsonType::StrOwned(s) => { - // The exact JSON string format may vary, but it should contain the key-value pair - assert!(s.contains("nested_key")); - assert!(s.contains("nested_value")); + JsonType::Object(entries) => { + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].0.as_ref(), "nested_key"); + match &entries[0].1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "nested_value"), + _ => panic!("Expected nested_key to have Str value"), + } } - _ => panic!("Expected StrOwned with JSON string at index 4"), + _ => panic!("Expected Object at index 4 in nested mode, got {:?}", &values[4]), } - // Check BytesValue (hex encoded) match &values[5] { - JsonType::StrOwned(s) => assert_eq!(s, "48656c6c6f"), // "Hello" in hex - _ => panic!("Expected StrOwned with hex string at index 5"), + JsonType::Str(s) => assert_eq!(s.as_ref(), "48656c6c6f"), + _ => panic!("Expected Str (hex) at index 5"), } - // Check nested ArrayValue (converted to JSON string) + // Nested ArrayValue stays as Array in nested mode match &values[6] { - JsonType::StrOwned(s) => { - // Should be a JSON representation of the nested array - assert!(s.contains("100")); + JsonType::Array(nested) => { + assert_eq!(nested.len(), 1); + match &nested[0] { + JsonType::Int(i) => assert_eq!(*i, 100), + _ => panic!("Expected Int(100) in nested array"), + } } - _ => panic!("Expected StrOwned with JSON array string at index 6"), + _ => panic!("Expected Array at index 6 in nested mode, got {:?}", &values[6]), } } _ => panic!("Expected JsonType::Array"), @@ -532,11 +723,8 @@ mod tests { fn debug_jsontype_serialization_bytes() { let test_cases = vec![ ("Int(42)", JsonType::Int(42)), - ("Str(\"hello\")", JsonType::Str("hello")), - ( - "StrOwned(\"world\")", - JsonType::StrOwned("world".to_string()), - ), + ("Str(borrowed)", JsonType::str_borrowed("hello")), + ("Str(owned)", JsonType::str_owned("world".to_string())), ("Double(3.14)", JsonType::Double(std::f64::consts::PI)), ]; diff --git a/src/exporters/clickhouse/transformer.rs b/src/exporters/clickhouse/transformer.rs index 6e46bcfe..ed840663 100644 --- a/src/exporters/clickhouse/transformer.rs +++ b/src/exporters/clickhouse/transformer.rs @@ -10,6 +10,10 @@ use std::collections::HashMap; pub struct Transformer { pub(crate) compression: Compression, use_json: bool, + /// Maximum depth for nested KV list conversion. + /// - `None` or `Some(0)`: flat mode (backwards compatible, nested KV serialized as JSON strings) + /// - `Some(n)` where n > 0: recursive conversion up to depth n + nested_kv_max_depth: Option, } impl Transformer { @@ -17,8 +21,14 @@ impl Transformer { Self { compression, use_json, + nested_kv_max_depth: None, // Default: backwards compatible flat mode } } + + pub fn with_nested_kv_max_depth(mut self, max_depth: Option) -> Self { + self.nested_kv_max_depth = max_depth; + self + } } impl Transformer { @@ -80,6 +90,7 @@ impl Transformer { prefix: String, result: &mut HashMap, JsonType<'a>>, ) { + use crate::exporters::clickhouse::rowbinary::json::anyvalue_to_jsontype; use opentelemetry_proto::tonic::common::v1::any_value::Value; for kv in attrs { @@ -100,7 +111,7 @@ impl Transformer { } else { Cow::Owned(format!("{}.{}", prefix, kv.key)) }; - result.insert(key, any_value.into()); + result.insert(key, anyvalue_to_jsontype(any_value, self.nested_kv_max_depth)); } None => {} } @@ -127,6 +138,7 @@ impl Transformer { prefix: String, result: &mut HashMap>, ) { + use crate::exporters::clickhouse::rowbinary::json::anyvalue_to_jsontype_owned; use opentelemetry_proto::tonic::common::v1::any_value::Value; for kv in attrs { @@ -143,7 +155,7 @@ impl Transformer { self.flatten_keyvalues_owned(&kvlist.values, full_key, result); } Some(_) => { - result.insert(full_key, any_value.clone().into()); + result.insert(full_key, anyvalue_to_jsontype_owned(any_value.clone(), self.nested_kv_max_depth)); } None => {} } @@ -324,8 +336,8 @@ mod tests { // Verify the values are correct and owned match &map["service.name"] { - JsonType::StrOwned(s) => assert_eq!(s, "test-service"), - _ => panic!("Expected StrOwned variant"), + JsonType::Str(s) => assert_eq!(s.as_ref(), "test-service"), + _ => panic!("Expected Str variant"), } match &map["http.status_code"] { JsonType::Int(i) => assert_eq!(*i, 200), From 05b864aac5a454f838135416d35d1e787cbf42ae Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Wed, 28 Jan 2026 10:09:23 -0800 Subject: [PATCH 05/14] Fix cargo fmt --- src/exporters/clickhouse/rowbinary/json.rs | 150 +++++++++++++++------ src/exporters/clickhouse/transformer.rs | 10 +- 2 files changed, 119 insertions(+), 41 deletions(-) diff --git a/src/exporters/clickhouse/rowbinary/json.rs b/src/exporters/clickhouse/rowbinary/json.rs index 94189b8f..1320feef 100644 --- a/src/exporters/clickhouse/rowbinary/json.rs +++ b/src/exporters/clickhouse/rowbinary/json.rs @@ -63,7 +63,10 @@ impl From for JsonType<'static> { /// Flat mode is the hot path - it handles nested complex types by serializing to JSON strings. /// Nested mode recursively converts KvlistValue to Object and preserves ArrayValue structure. #[inline] -pub fn anyvalue_to_jsontype<'a>(value: &'a AnyValue, nested_kv_max_depth: Option) -> JsonType<'a> { +pub fn anyvalue_to_jsontype<'a>( + value: &'a AnyValue, + nested_kv_max_depth: Option, +) -> JsonType<'a> { match nested_kv_max_depth { // Fast path: flat mode (backwards compatible) None | Some(0) => anyvalue_to_jsontype_flat(value), @@ -82,7 +85,9 @@ fn anyvalue_to_jsontype_flat<'a>(value: &'a AnyValue) -> JsonType<'a> { Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), Some(Value::BoolValue(b)) => JsonType::Bool(*b), Some(Value::ArrayValue(a)) => { - let values = a.values.iter() + let values = a + .values + .iter() .map(|v| match &v.value { Some(Value::IntValue(i)) => JsonType::Int(*i), Some(Value::DoubleValue(d)) => JsonType::Double(*d), @@ -104,7 +109,11 @@ fn anyvalue_to_jsontype_flat<'a>(value: &'a AnyValue) -> JsonType<'a> { } /// Nested mode: recursively convert with depth tracking. -fn anyvalue_to_jsontype_nested<'a>(value: &'a AnyValue, depth: usize, max_depth: usize) -> JsonType<'a> { +fn anyvalue_to_jsontype_nested<'a>( + value: &'a AnyValue, + depth: usize, + max_depth: usize, +) -> JsonType<'a> { if depth > max_depth { return JsonType::str_owned(json!(value).to_string()); } @@ -115,18 +124,24 @@ fn anyvalue_to_jsontype_nested<'a>(value: &'a AnyValue, depth: usize, max_depth: Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), Some(Value::BoolValue(b)) => JsonType::Bool(*b), Some(Value::ArrayValue(a)) => { - let values = a.values.iter() + let values = a + .values + .iter() .map(|v| anyvalue_to_jsontype_nested(v, depth + 1, max_depth)) .collect(); JsonType::Array(values) } Some(Value::KvlistValue(kv)) => { - let entries = kv.values.iter() + let entries = kv + .values + .iter() .filter_map(|entry| { - entry.value.as_ref().map(|v| ( - Cow::Borrowed(entry.key.as_str()), - anyvalue_to_jsontype_nested(v, depth + 1, max_depth), - )) + entry.value.as_ref().map(|v| { + ( + Cow::Borrowed(entry.key.as_str()), + anyvalue_to_jsontype_nested(v, depth + 1, max_depth), + ) + }) }) .collect(); JsonType::Object(entries) @@ -138,7 +153,10 @@ fn anyvalue_to_jsontype_nested<'a>(value: &'a AnyValue, depth: usize, max_depth: /// Convert owned AnyValue to JsonType with configurable nesting behavior. #[inline] -pub fn anyvalue_to_jsontype_owned(value: AnyValue, nested_kv_max_depth: Option) -> JsonType<'static> { +pub fn anyvalue_to_jsontype_owned( + value: AnyValue, + nested_kv_max_depth: Option, +) -> JsonType<'static> { match nested_kv_max_depth { None | Some(0) => anyvalue_to_jsontype_flat_owned(value), Some(max_depth) => anyvalue_to_jsontype_nested_owned(value, 0, max_depth), @@ -154,7 +172,9 @@ fn anyvalue_to_jsontype_flat_owned(value: AnyValue) -> JsonType<'static> { Some(Value::StringValue(s)) => JsonType::str_owned(s), Some(Value::BoolValue(b)) => JsonType::Bool(b), Some(Value::ArrayValue(a)) => { - let values = a.values.into_iter() + let values = a + .values + .into_iter() .map(|v| match v.value { Some(Value::IntValue(i)) => JsonType::Int(i), Some(Value::DoubleValue(d)) => JsonType::Double(d), @@ -175,7 +195,11 @@ fn anyvalue_to_jsontype_flat_owned(value: AnyValue) -> JsonType<'static> { } /// Nested mode (owned): recursively convert with depth tracking. -fn anyvalue_to_jsontype_nested_owned(value: AnyValue, depth: usize, max_depth: usize) -> JsonType<'static> { +fn anyvalue_to_jsontype_nested_owned( + value: AnyValue, + depth: usize, + max_depth: usize, +) -> JsonType<'static> { if depth > max_depth { return JsonType::str_owned(json!(value).to_string()); } @@ -186,18 +210,24 @@ fn anyvalue_to_jsontype_nested_owned(value: AnyValue, depth: usize, max_depth: u Some(Value::StringValue(s)) => JsonType::str_owned(s), Some(Value::BoolValue(b)) => JsonType::Bool(b), Some(Value::ArrayValue(a)) => { - let values = a.values.into_iter() + let values = a + .values + .into_iter() .map(|v| anyvalue_to_jsontype_nested_owned(v, depth + 1, max_depth)) .collect(); JsonType::Array(values) } Some(Value::KvlistValue(kv)) => { - let entries = kv.values.into_iter() + let entries = kv + .values + .into_iter() .filter_map(|entry| { - entry.value.map(|v| ( - Cow::Owned(entry.key), - anyvalue_to_jsontype_nested_owned(v, depth + 1, max_depth), - )) + entry.value.map(|v| { + ( + Cow::Owned(entry.key), + anyvalue_to_jsontype_nested_owned(v, depth + 1, max_depth), + ) + }) }) .collect(); JsonType::Object(entries) @@ -574,16 +604,28 @@ mod tests { let array_value = ArrayValue { values: vec![ - AnyValue { value: Some(Value::IntValue(42)) }, - AnyValue { value: Some(Value::StringValue("test_string".to_string())) }, - AnyValue { value: Some(Value::KvlistValue(kv_list)) }, - AnyValue { value: Some(Value::ArrayValue(ArrayValue { - values: vec![AnyValue { value: Some(Value::IntValue(100)) }], - })) }, + AnyValue { + value: Some(Value::IntValue(42)), + }, + AnyValue { + value: Some(Value::StringValue("test_string".to_string())), + }, + AnyValue { + value: Some(Value::KvlistValue(kv_list)), + }, + AnyValue { + value: Some(Value::ArrayValue(ArrayValue { + values: vec![AnyValue { + value: Some(Value::IntValue(100)), + }], + })), + }, ], }; - let any_value = AnyValue { value: Some(Value::ArrayValue(array_value)) }; + let any_value = AnyValue { + value: Some(Value::ArrayValue(array_value)), + }; // Default From uses flat mode (None) let json_type: JsonType = (&any_value).into(); @@ -608,7 +650,10 @@ mod tests { assert!(s.as_ref().contains("nested_key")); assert!(s.as_ref().contains("nested_value")); } - _ => panic!("Expected Str (JSON string) at index 2 in flat mode, got {:?}", &values[2]), + _ => panic!( + "Expected Str (JSON string) at index 2 in flat mode, got {:?}", + &values[2] + ), } // Nested ArrayValue becomes a JSON string in flat mode @@ -616,7 +661,10 @@ mod tests { JsonType::Str(s) => { assert!(s.as_ref().contains("100")); } - _ => panic!("Expected Str (JSON string) at index 3 in flat mode, got {:?}", &values[3]), + _ => panic!( + "Expected Str (JSON string) at index 3 in flat mode, got {:?}", + &values[3] + ), } } _ => panic!("Expected JsonType::Array"), @@ -638,19 +686,37 @@ mod tests { let array_value = ArrayValue { values: vec![ - AnyValue { value: Some(Value::IntValue(42)) }, - AnyValue { value: Some(Value::StringValue("test_string".to_string())) }, - AnyValue { value: Some(Value::DoubleValue(3.14)) }, - AnyValue { value: Some(Value::BoolValue(true)) }, - AnyValue { value: Some(Value::KvlistValue(kv_list)) }, - AnyValue { value: Some(Value::BytesValue(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f])) }, - AnyValue { value: Some(Value::ArrayValue(ArrayValue { - values: vec![AnyValue { value: Some(Value::IntValue(100)) }], - })) }, + AnyValue { + value: Some(Value::IntValue(42)), + }, + AnyValue { + value: Some(Value::StringValue("test_string".to_string())), + }, + AnyValue { + value: Some(Value::DoubleValue(3.14)), + }, + AnyValue { + value: Some(Value::BoolValue(true)), + }, + AnyValue { + value: Some(Value::KvlistValue(kv_list)), + }, + AnyValue { + value: Some(Value::BytesValue(vec![0x48, 0x65, 0x6c, 0x6c, 0x6f])), + }, + AnyValue { + value: Some(Value::ArrayValue(ArrayValue { + values: vec![AnyValue { + value: Some(Value::IntValue(100)), + }], + })), + }, ], }; - let any_value = AnyValue { value: Some(Value::ArrayValue(array_value)) }; + let any_value = AnyValue { + value: Some(Value::ArrayValue(array_value)), + }; // Use nested mode with max depth 10 let json_type = anyvalue_to_jsontype(&any_value, Some(10)); @@ -686,7 +752,10 @@ mod tests { _ => panic!("Expected nested_key to have Str value"), } } - _ => panic!("Expected Object at index 4 in nested mode, got {:?}", &values[4]), + _ => panic!( + "Expected Object at index 4 in nested mode, got {:?}", + &values[4] + ), } match &values[5] { @@ -703,7 +772,10 @@ mod tests { _ => panic!("Expected Int(100) in nested array"), } } - _ => panic!("Expected Array at index 6 in nested mode, got {:?}", &values[6]), + _ => panic!( + "Expected Array at index 6 in nested mode, got {:?}", + &values[6] + ), } } _ => panic!("Expected JsonType::Array"), diff --git a/src/exporters/clickhouse/transformer.rs b/src/exporters/clickhouse/transformer.rs index ed840663..8b7df69e 100644 --- a/src/exporters/clickhouse/transformer.rs +++ b/src/exporters/clickhouse/transformer.rs @@ -111,7 +111,10 @@ impl Transformer { } else { Cow::Owned(format!("{}.{}", prefix, kv.key)) }; - result.insert(key, anyvalue_to_jsontype(any_value, self.nested_kv_max_depth)); + result.insert( + key, + anyvalue_to_jsontype(any_value, self.nested_kv_max_depth), + ); } None => {} } @@ -155,7 +158,10 @@ impl Transformer { self.flatten_keyvalues_owned(&kvlist.values, full_key, result); } Some(_) => { - result.insert(full_key, anyvalue_to_jsontype_owned(any_value.clone(), self.nested_kv_max_depth)); + result.insert( + full_key, + anyvalue_to_jsontype_owned(any_value.clone(), self.nested_kv_max_depth), + ); } None => {} } From 0b40ae4ee1e860f341dbb129439b6b7c1daaa45f Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Wed, 28 Jan 2026 10:22:39 -0800 Subject: [PATCH 06/14] Add CLI flags --- src/exporters/clickhouse/mod.rs | 12 +++++++++++- src/init/clickhouse_exporter.rs | 11 +++++++++++ src/init/config.rs | 1 + 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/exporters/clickhouse/mod.rs b/src/exporters/clickhouse/mod.rs index e1da36a4..aece9ee6 100644 --- a/src/exporters/clickhouse/mod.rs +++ b/src/exporters/clickhouse/mod.rs @@ -70,6 +70,7 @@ pub struct ClickhouseExporterConfigBuilder { auth_password: Option, async_insert: bool, use_json: bool, + nested_kv_max_depth: Option, request_timeout: Duration, } @@ -112,6 +113,7 @@ impl ClickhouseExporterConfigBuilder { request_timeout: Duration::from_secs(5), compression: Default::default(), use_json: false, + nested_kv_max_depth: None, async_insert: false, } } @@ -126,6 +128,11 @@ impl ClickhouseExporterConfigBuilder { self } + pub fn with_nested_kv_max_depth(mut self, max_depth: Option) -> Self { + self.nested_kv_max_depth = max_depth; + self + } + pub fn with_async_insert(mut self, async_insert: bool) -> Self { self.async_insert = async_insert; self @@ -173,6 +180,7 @@ impl ClickhouseExporterConfigBuilder { request_mapper: mapper, retry_config: self.retry_config, request_timeout: self.request_timeout, + nested_kv_max_depth: self.nested_kv_max_depth, }) } } @@ -182,6 +190,7 @@ pub struct ClickhouseExporterBuilder { retry_config: RetryConfig, request_mapper: Arc, request_timeout: Duration, + nested_kv_max_depth: Option, } impl ClickhouseExporterBuilder { @@ -221,7 +230,8 @@ impl ClickhouseExporterBuilder { { let client = Client::build(tls::Config::default(), Protocol::Http, Default::default())?; - let transformer = Transformer::new(self.config.compression.clone(), self.config.use_json); + let transformer = Transformer::new(self.config.compression.clone(), self.config.use_json) + .with_nested_kv_max_depth(self.nested_kv_max_depth); let req_builder = RequestBuilder::new(transformer, self.request_mapper.clone())?; diff --git a/src/init/clickhouse_exporter.rs b/src/init/clickhouse_exporter.rs index afdfcfe4..5bfc94ca 100644 --- a/src/init/clickhouse_exporter.rs +++ b/src/init/clickhouse_exporter.rs @@ -78,6 +78,16 @@ pub struct ClickhouseExporterArgs { )] pub enable_json: bool, + /// Clickhouse Exporter nested KV max depth for JSON columns. + /// When set to a value > 0, nested KeyValueList structures are converted to JSON objects + /// up to the specified depth. When unset or 0, nested KV is flattened (backwards compatible). + /// Recommended value: 10 for GenAI attributes. + #[arg( + long("clickhouse-exporter-nested-kv-max-depth"), + env = "ROTEL_CLICKHOUSE_EXPORTER_NESTED_KV_MAX_DEPTH" + )] + pub nested_kv_max_depth: Option, + /// Clickhouse Exporter request timeout #[arg( id("CLICKHOUSE_EXPORTER_REQUEST_TIMEOUT"), @@ -106,6 +116,7 @@ impl Default for ClickhouseExporterArgs { password: None, async_insert: "true".to_string(), enable_json: false, + nested_kv_max_depth: None, request_timeout: std::time::Duration::from_secs(5), retry: Default::default(), } diff --git a/src/init/config.rs b/src/init/config.rs index cae06b28..664fd49a 100644 --- a/src/init/config.rs +++ b/src/init/config.rs @@ -331,6 +331,7 @@ impl TryIntoConfig for ExporterArgs { .with_compression(ch.compression) .with_async_insert(async_insert) .with_json(ch.enable_json) + .with_nested_kv_max_depth(ch.nested_kv_max_depth) .with_request_timeout(ch.request_timeout); if let Some(user) = &ch.user { From eb84565b5ade3a24c3dda56932464e08a0ea6e15 Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Wed, 28 Jan 2026 10:49:39 -0800 Subject: [PATCH 07/14] Update README to include instructions on setting nested-kv flag --- Dockerfile.build | 35 ++++++++++++++++++++++ README.md | 28 +++++++++++++++++ src/exporters/clickhouse/rowbinary/json.rs | 3 -- 3 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 Dockerfile.build diff --git a/Dockerfile.build b/Dockerfile.build new file mode 100644 index 00000000..9e042e4a --- /dev/null +++ b/Dockerfile.build @@ -0,0 +1,35 @@ +# Multi-stage build for rotel +# Uses rust:slim for faster downloads +FROM rust:1.83-slim-bookworm AS builder + +WORKDIR /app + +# Install build dependencies +RUN apt-get update && apt-get install -y \ + pkg-config \ + libssl-dev \ + cmake \ + build-essential \ + clang \ + libclang-dev \ + protobuf-compiler \ + && rm -rf /var/lib/apt/lists/* + +# Copy source +COPY . . + +# Build release binary +RUN cargo build --release + +# Runtime stage - minimal image +FROM debian:bookworm-slim + +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /app/target/release/rotel /rotel +RUN chmod 0755 /rotel + +EXPOSE 4317 +EXPOSE 4318 + +ENTRYPOINT ["/rotel", "start", "--otlp-grpc-endpoint", "0.0.0.0:4317", "--otlp-http-endpoint", "0.0.0.0:4318"] diff --git a/README.md b/README.md index 9fec46aa..9b4908c3 100644 --- a/README.md +++ b/README.md @@ -232,6 +232,7 @@ and traces. | --clickhouse-exporter-async-insert | true | true, false | | --clickhouse-exporter-request-timeout | 5s | | | --clickhouse-exporter-enable-json | | | +| --clickhouse-exporter-nested-kv-max-depth | | | | --clickhouse-exporter-json-underscore | | | | --clickhouse-exporter-user | | | | --clickhouse-exporter-password | | | @@ -263,6 +264,18 @@ a nested JSON object. You can replace periods in JSON keys with underscores by p `--clickhouse-exporter-json-underscore` which will keep the JSON keys flat. For example, the resource attribute `service.name` will be inserted as `service_name`. +When exporting OpenTelemetry attributes that contain nested `KeyValueList` structures (such as GenAI message +attributes like `gen_ai.input.messages`), use `--clickhouse-exporter-nested-kv-max-depth` to convert them to +proper JSON objects. Without this option, nested structures are serialized as JSON strings (the protobuf +representation) for backwards compatibility. Set to a value like `10` to enable nested conversion: + +```shell +rotel start --exporter clickhouse \ + --clickhouse-exporter-endpoint "http://localhost:8123" \ + --clickhouse-exporter-enable-json \ + --clickhouse-exporter-nested-kv-max-depth 10 +``` + _The ClickHouse exporter is built using code from the official Rust [clickhouse-rs](https://crates.io/crates/clickhouse) crate._ @@ -1220,6 +1233,21 @@ tags: When running an image, map the OTLP receiver ports to their local values with the flag `-p 4317-4318:4317-4318`. +### Building locally + +To build a Docker image locally for development or testing, use the multi-stage `Dockerfile.build` which compiles +the Rust code inside Docker (no cross-compilation toolchain required): + +```shell +# Build for your current platform (e.g., ARM64 on Apple Silicon) +docker build -f Dockerfile.build -t rotel:latest . + +# Or explicitly specify the platform +docker build --platform linux/arm64 -f Dockerfile.build -t rotel:latest . +``` + +This is useful for testing local changes before creating a release. + Rotel releases with built-in Python Processor support and Python 3.13 are also available on [Dockerhub](https://hub.docker.com/repository/docker/streamfold/rotel-python-processors/general) with the following tags: diff --git a/src/exporters/clickhouse/rowbinary/json.rs b/src/exporters/clickhouse/rowbinary/json.rs index 1320feef..3857b74a 100644 --- a/src/exporters/clickhouse/rowbinary/json.rs +++ b/src/exporters/clickhouse/rowbinary/json.rs @@ -6,9 +6,6 @@ use opentelemetry_proto::tonic::common::v1::AnyValue; use opentelemetry_proto::tonic::common::v1::any_value::Value; use serde_json::json; -/// Default maximum recursion depth for nested structures to prevent stack overflow -pub const DEFAULT_NESTED_KV_MAX_DEPTH: usize = 10; - /// JSON type representation for ClickHouse rowbinary format. /// Uses `Cow` for strings and object keys to efficiently handle both /// borrowed and owned data with a single type. From 485a24fc488474a8dc71477874267f84e1bc44ba Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Wed, 28 Jan 2026 10:59:45 -0800 Subject: [PATCH 08/14] Update readme to refer to Dockerfile.build-python --- .dockerignore | 53 +++++++++++++++++++++ Dockerfile.build => Dockerfile.build-python | 18 ++++--- README.md | 11 +++-- 3 files changed, 71 insertions(+), 11 deletions(-) create mode 100644 .dockerignore rename Dockerfile.build => Dockerfile.build-python (56%) diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..d919d60b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,53 @@ +# Rust build artifacts (exclude most, but keep arch-specific binaries for Docker) +target/ +!target/arm64/ +!target/amd64/ +utilities/target/ +rotel_python_processor_sdk/target/ + +# Python build artifacts and caches +rotel_python_processor_sdk/dist/ +rotel_python_processor_sdk/.env/ +rotel_python_processor_sdk/rotel_sdk/*.so +rotel_python_processor_sdk/processors/__pycache__/ +**/__pycache__/ +*.pyc +*.pyo +*.egg-info/ + +# IDE and editor +.idea/ +.vscode/ +*.swp +*.swo + +# Git +.git/ +.gitignore +.dockerignore + +# Documentation (not needed for build) +*.md +LICENSE + +# Test files (not needed for runtime) +test/ +tests/ +rotel_python_processor_sdk/python_tests/ + +# Profiling artifacts +flamegraph.svg +perf.data +perf.data.old + +# Temporary and generated files +tmp/ +trace.pb +CLAUDE.md + +# Docker files (avoid recursion/confusion) +docker-compose*.yml +Dockerfile.* + +# Examples (not needed for build) +examples/ \ No newline at end of file diff --git a/Dockerfile.build b/Dockerfile.build-python similarity index 56% rename from Dockerfile.build rename to Dockerfile.build-python index 9e042e4a..57d069ff 100644 --- a/Dockerfile.build +++ b/Dockerfile.build-python @@ -1,10 +1,9 @@ -# Multi-stage build for rotel -# Uses rust:slim for faster downloads +# Multi-stage build for rotel with Python processor support FROM rust:1.83-slim-bookworm AS builder WORKDIR /app -# Install build dependencies +# Install build dependencies including Python RUN apt-get update && apt-get install -y \ pkg-config \ libssl-dev \ @@ -13,18 +12,23 @@ RUN apt-get update && apt-get install -y \ clang \ libclang-dev \ protobuf-compiler \ + python3-dev \ && rm -rf /var/lib/apt/lists/* # Copy source COPY . . -# Build release binary -RUN cargo build --release +# Build release binary with Python processor support +RUN cargo build --release --features pyo3 -# Runtime stage - minimal image +# Runtime stage - minimal image with Python FROM debian:bookworm-slim -RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y \ + ca-certificates \ + python3 \ + libpython3.11 \ + && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/rotel /rotel RUN chmod 0755 /rotel diff --git a/README.md b/README.md index 9b4908c3..28698b51 100644 --- a/README.md +++ b/README.md @@ -1235,17 +1235,20 @@ When running an image, map the OTLP receiver ports to their local values with th ### Building locally -To build a Docker image locally for development or testing, use the multi-stage `Dockerfile.build` which compiles -the Rust code inside Docker (no cross-compilation toolchain required): +To build a Docker image locally for development or testing, use the multi-stage `Dockerfile.build-python` which +compiles the Rust code inside Docker with Python processor support (no cross-compilation toolchain required): ```shell # Build for your current platform (e.g., ARM64 on Apple Silicon) -docker build -f Dockerfile.build -t rotel:latest . +docker build -f Dockerfile.build-python -t rotel:latest . # Or explicitly specify the platform -docker build --platform linux/arm64 -f Dockerfile.build -t rotel:latest . +docker build --platform linux/arm64 -f Dockerfile.build-python -t rotel:latest . ``` +This builds rotel with the `pyo3` feature enabled for Python processor support. The build includes all necessary +dependencies (Rust toolchain, clang, protobuf, Python) and produces a minimal runtime image. + This is useful for testing local changes before creating a release. Rotel releases with built-in Python Processor support and Python 3.13 are also available From f3f70eb00c585a50b9b9f614efd1859660b1ac7c Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Wed, 28 Jan 2026 11:50:26 -0800 Subject: [PATCH 09/14] Update to use python 3.13 for build --- Dockerfile.build-python | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/Dockerfile.build-python b/Dockerfile.build-python index 57d069ff..1806dffd 100644 --- a/Dockerfile.build-python +++ b/Dockerfile.build-python @@ -1,9 +1,9 @@ -# Multi-stage build for rotel with Python processor support +# Multi-stage build for rotel with Python 3.13 processor support FROM rust:1.83-slim-bookworm AS builder WORKDIR /app -# Install build dependencies including Python +# Install build dependencies including Python 3 for pyo3 build RUN apt-get update && apt-get install -y \ pkg-config \ libssl-dev \ @@ -21,14 +21,16 @@ COPY . . # Build release binary with Python processor support RUN cargo build --release --features pyo3 -# Runtime stage - minimal image with Python -FROM debian:bookworm-slim +# Runtime stage - Ubuntu 24.10 for Python 3.13 +FROM ubuntu:24.10 -RUN apt-get update && apt-get install -y \ +RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ - python3 \ - libpython3.11 \ - && rm -rf /var/lib/apt/lists/* + python3.13 \ + python3.13-venv \ + libpython3.13 \ + && rm -rf /var/lib/apt/lists/* \ + && ln -sf /usr/bin/python3.13 /usr/bin/python3 COPY --from=builder /app/target/release/rotel /rotel RUN chmod 0755 /rotel From 9d93eba8495dab340e000c2b298328369365e9cc Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Wed, 28 Jan 2026 17:15:06 -0500 Subject: [PATCH 10/14] Fix build without file receiver --- src/topology/payload.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/topology/payload.rs b/src/topology/payload.rs index a9b4a9e4..390e5042 100644 --- a/src/topology/payload.rs +++ b/src/topology/payload.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::bounded_channel::{BoundedSender, SendError}; +#[cfg(feature = "file_receiver")] use crate::receivers::file::offset_tracker::LineOffset; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; @@ -57,6 +58,7 @@ pub enum RequestContext { pub enum MessageMetadataInner { Kafka(KafkaMetadata), Forwarder(ForwarderMetadata), + #[cfg(feature = "file_receiver")] File(FileMetadata), } @@ -96,6 +98,7 @@ impl MessageMetadata { } /// Create new MessageMetadata with File variant, starting with ref_count = 1 + #[cfg(feature = "file_receiver")] pub fn file(metadata: FileMetadata) -> Self { Self { data: MessageMetadataInner::File(metadata), @@ -309,6 +312,7 @@ impl Ack for MessageMetadata { .await?; } } + #[cfg(feature = "file_receiver")] MessageMetadataInner::File(fm) => { if let Some(ack_chan) = &fm.ack_chan { ack_chan @@ -353,6 +357,7 @@ impl Ack for MessageMetadata { .await?; } } + #[cfg(feature = "file_receiver")] MessageMetadataInner::File(fm) => { if let Some(ack_chan) = &fm.ack_chan { ack_chan @@ -389,6 +394,7 @@ pub struct KafkaNack { } /// Metadata for file receiver messages +#[cfg(feature = "file_receiver")] #[derive(Clone)] pub struct FileMetadata { /// The unique file identity @@ -399,6 +405,7 @@ pub struct FileMetadata { pub ack_chan: Option>, } +#[cfg(feature = "file_receiver")] impl FileMetadata { /// Create new FileMetadata pub fn new( @@ -415,6 +422,7 @@ impl FileMetadata { } // Manual Debug for FileMetadata since BoundedSender doesn't implement Debug +#[cfg(feature = "file_receiver")] impl std::fmt::Debug for FileMetadata { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FileMetadata") @@ -426,6 +434,7 @@ impl std::fmt::Debug for FileMetadata { } // Manual PartialEq for FileMetadata since BoundedSender can't be compared +#[cfg(feature = "file_receiver")] impl PartialEq for FileMetadata { fn eq(&self, other: &Self) -> bool { self.file_id == other.file_id && self.offsets == other.offsets @@ -434,18 +443,21 @@ impl PartialEq for FileMetadata { } /// Acknowledgement message from exporter back to file receiver +#[cfg(feature = "file_receiver")] pub enum FileAcknowledgement { Ack(FileAck), Nack(FileNack), } /// Successful acknowledgement of a file batch +#[cfg(feature = "file_receiver")] pub struct FileAck { pub file_id: crate::receivers::file::input::FileId, pub offsets: Vec, } /// Failed acknowledgement of a file batch +#[cfg(feature = "file_receiver")] pub struct FileNack { pub file_id: crate::receivers::file::input::FileId, pub offsets: Vec, From d0e4e65d637a36d5f001503f72f0d3fdf973e834 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 29 Jan 2026 14:43:43 +0000 Subject: [PATCH 11/14] Bump version to 0.1.7 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c069a48..5935a94a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4086,7 +4086,7 @@ dependencies = [ [[package]] name = "rotel" -version = "0.1.6" +version = "0.1.7" dependencies = [ "arrow", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index 5a7efc9b..e1a7091d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rotel" -version = "0.1.6" +version = "0.1.7" edition = "2024" homepage = "https://github.com/streamfold/rotel" readme = "README.md" From c6a199e31d60b30cf76383d8442161d47673096b Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Fri, 30 Jan 2026 15:59:09 -0500 Subject: [PATCH 12/14] Yank docker infra --- .dockerignore | 53 ----------------------------------------- Dockerfile.build-python | 41 ------------------------------- README.md | 18 -------------- 3 files changed, 112 deletions(-) delete mode 100644 .dockerignore delete mode 100644 Dockerfile.build-python diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index d919d60b..00000000 --- a/.dockerignore +++ /dev/null @@ -1,53 +0,0 @@ -# Rust build artifacts (exclude most, but keep arch-specific binaries for Docker) -target/ -!target/arm64/ -!target/amd64/ -utilities/target/ -rotel_python_processor_sdk/target/ - -# Python build artifacts and caches -rotel_python_processor_sdk/dist/ -rotel_python_processor_sdk/.env/ -rotel_python_processor_sdk/rotel_sdk/*.so -rotel_python_processor_sdk/processors/__pycache__/ -**/__pycache__/ -*.pyc -*.pyo -*.egg-info/ - -# IDE and editor -.idea/ -.vscode/ -*.swp -*.swo - -# Git -.git/ -.gitignore -.dockerignore - -# Documentation (not needed for build) -*.md -LICENSE - -# Test files (not needed for runtime) -test/ -tests/ -rotel_python_processor_sdk/python_tests/ - -# Profiling artifacts -flamegraph.svg -perf.data -perf.data.old - -# Temporary and generated files -tmp/ -trace.pb -CLAUDE.md - -# Docker files (avoid recursion/confusion) -docker-compose*.yml -Dockerfile.* - -# Examples (not needed for build) -examples/ \ No newline at end of file diff --git a/Dockerfile.build-python b/Dockerfile.build-python deleted file mode 100644 index 1806dffd..00000000 --- a/Dockerfile.build-python +++ /dev/null @@ -1,41 +0,0 @@ -# Multi-stage build for rotel with Python 3.13 processor support -FROM rust:1.83-slim-bookworm AS builder - -WORKDIR /app - -# Install build dependencies including Python 3 for pyo3 build -RUN apt-get update && apt-get install -y \ - pkg-config \ - libssl-dev \ - cmake \ - build-essential \ - clang \ - libclang-dev \ - protobuf-compiler \ - python3-dev \ - && rm -rf /var/lib/apt/lists/* - -# Copy source -COPY . . - -# Build release binary with Python processor support -RUN cargo build --release --features pyo3 - -# Runtime stage - Ubuntu 24.10 for Python 3.13 -FROM ubuntu:24.10 - -RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates \ - python3.13 \ - python3.13-venv \ - libpython3.13 \ - && rm -rf /var/lib/apt/lists/* \ - && ln -sf /usr/bin/python3.13 /usr/bin/python3 - -COPY --from=builder /app/target/release/rotel /rotel -RUN chmod 0755 /rotel - -EXPOSE 4317 -EXPOSE 4318 - -ENTRYPOINT ["/rotel", "start", "--otlp-grpc-endpoint", "0.0.0.0:4317", "--otlp-http-endpoint", "0.0.0.0:4318"] diff --git a/README.md b/README.md index 28698b51..93a239ca 100644 --- a/README.md +++ b/README.md @@ -1233,24 +1233,6 @@ tags: When running an image, map the OTLP receiver ports to their local values with the flag `-p 4317-4318:4317-4318`. -### Building locally - -To build a Docker image locally for development or testing, use the multi-stage `Dockerfile.build-python` which -compiles the Rust code inside Docker with Python processor support (no cross-compilation toolchain required): - -```shell -# Build for your current platform (e.g., ARM64 on Apple Silicon) -docker build -f Dockerfile.build-python -t rotel:latest . - -# Or explicitly specify the platform -docker build --platform linux/arm64 -f Dockerfile.build-python -t rotel:latest . -``` - -This builds rotel with the `pyo3` feature enabled for Python processor support. The build includes all necessary -dependencies (Rust toolchain, clang, protobuf, Python) and produces a minimal runtime image. - -This is useful for testing local changes before creating a release. - Rotel releases with built-in Python Processor support and Python 3.13 are also available on [Dockerhub](https://hub.docker.com/repository/docker/streamfold/rotel-python-processors/general) with the following tags: From b29491ffed2ba02dbc92c8e810182b09d9705e8a Mon Sep 17 00:00:00 2001 From: Mike Heffner Date: Fri, 30 Jan 2026 16:56:46 -0500 Subject: [PATCH 13/14] Combine impls --- src/exporters/clickhouse/rowbinary/json.rs | 107 ++++++--------------- 1 file changed, 27 insertions(+), 80 deletions(-) diff --git a/src/exporters/clickhouse/rowbinary/json.rs b/src/exporters/clickhouse/rowbinary/json.rs index 3857b74a..7fc6097a 100644 --- a/src/exporters/clickhouse/rowbinary/json.rs +++ b/src/exporters/clickhouse/rowbinary/json.rs @@ -64,55 +64,27 @@ pub fn anyvalue_to_jsontype<'a>( value: &'a AnyValue, nested_kv_max_depth: Option, ) -> JsonType<'a> { - match nested_kv_max_depth { - // Fast path: flat mode (backwards compatible) - None | Some(0) => anyvalue_to_jsontype_flat(value), - // Nested mode: recursive conversion - Some(max_depth) => anyvalue_to_jsontype_nested(value, 0, max_depth), - } -} - -/// Fast path: convert without recursing into nested structures. -/// Arrays containing KvlistValue or nested ArrayValue are serialized as JSON strings. -#[inline] -fn anyvalue_to_jsontype_flat<'a>(value: &'a AnyValue) -> JsonType<'a> { - match &value.value { - Some(Value::IntValue(i)) => JsonType::Int(*i), - Some(Value::DoubleValue(d)) => JsonType::Double(*d), - Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), - Some(Value::BoolValue(b)) => JsonType::Bool(*b), - Some(Value::ArrayValue(a)) => { - let values = a - .values - .iter() - .map(|v| match &v.value { - Some(Value::IntValue(i)) => JsonType::Int(*i), - Some(Value::DoubleValue(d)) => JsonType::Double(*d), - Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), - Some(Value::BoolValue(b)) => JsonType::Bool(*b), - // Complex nested types become JSON strings - Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), - Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), - Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), - None => JsonType::str_borrowed(""), - }) - .collect(); - JsonType::Array(values) - } - Some(Value::KvlistValue(_)) => JsonType::str_borrowed(""), - Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), - None => JsonType::str_borrowed(""), - } + anyvalue_to_jsontype_nested(value, 0, nested_kv_max_depth) } /// Nested mode: recursively convert with depth tracking. fn anyvalue_to_jsontype_nested<'a>( value: &'a AnyValue, depth: usize, - max_depth: usize, + max_depth: Option, ) -> JsonType<'a> { - if depth > max_depth { - return JsonType::str_owned(json!(value).to_string()); + if max_depth.is_none() || depth > max_depth.unwrap() { + return match &value.value { + Some(Value::IntValue(i)) => JsonType::Int(*i), + Some(Value::DoubleValue(d)) => JsonType::Double(*d), + Some(Value::StringValue(s)) => JsonType::str_borrowed(s.as_str()), + Some(Value::BoolValue(b)) => JsonType::Bool(*b), + // Complex nested types become JSON strings + Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), + Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(b)), + None => JsonType::str_borrowed(""), + }; } match &value.value { @@ -154,51 +126,26 @@ pub fn anyvalue_to_jsontype_owned( value: AnyValue, nested_kv_max_depth: Option, ) -> JsonType<'static> { - match nested_kv_max_depth { - None | Some(0) => anyvalue_to_jsontype_flat_owned(value), - Some(max_depth) => anyvalue_to_jsontype_nested_owned(value, 0, max_depth), - } -} - -/// Fast path (owned): convert without recursing into nested structures. -#[inline] -fn anyvalue_to_jsontype_flat_owned(value: AnyValue) -> JsonType<'static> { - match value.value { - Some(Value::IntValue(i)) => JsonType::Int(i), - Some(Value::DoubleValue(d)) => JsonType::Double(d), - Some(Value::StringValue(s)) => JsonType::str_owned(s), - Some(Value::BoolValue(b)) => JsonType::Bool(b), - Some(Value::ArrayValue(a)) => { - let values = a - .values - .into_iter() - .map(|v| match v.value { - Some(Value::IntValue(i)) => JsonType::Int(i), - Some(Value::DoubleValue(d)) => JsonType::Double(d), - Some(Value::StringValue(s)) => JsonType::str_owned(s), - Some(Value::BoolValue(b)) => JsonType::Bool(b), - Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), - Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), - Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), - None => JsonType::str_owned(String::new()), - }) - .collect(); - JsonType::Array(values) - } - Some(Value::KvlistValue(_)) => JsonType::str_owned(String::new()), - Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), - None => JsonType::str_owned(String::new()), - } + anyvalue_to_jsontype_nested_owned(value, 0, nested_kv_max_depth) } /// Nested mode (owned): recursively convert with depth tracking. fn anyvalue_to_jsontype_nested_owned( value: AnyValue, depth: usize, - max_depth: usize, + max_depth: Option, ) -> JsonType<'static> { - if depth > max_depth { - return JsonType::str_owned(json!(value).to_string()); + if max_depth.is_none() || depth > max_depth.unwrap() { + return match value.value { + Some(Value::IntValue(i)) => JsonType::Int(i), + Some(Value::DoubleValue(d)) => JsonType::Double(d), + Some(Value::StringValue(s)) => JsonType::str_owned(s), + Some(Value::BoolValue(b)) => JsonType::Bool(b), + Some(Value::ArrayValue(arr)) => JsonType::str_owned(json!(arr).to_string()), + Some(Value::KvlistValue(kv)) => JsonType::str_owned(json!(kv).to_string()), + Some(Value::BytesValue(b)) => JsonType::str_owned(hex::encode(&b)), + None => JsonType::str_owned(String::new()), + }; } match value.value { From f3ad983c2dcb8543ac4feeb1a5a1b90ca3b3939e Mon Sep 17 00:00:00 2001 From: Julian Bright Date: Fri, 13 Feb 2026 10:33:55 -0800 Subject: [PATCH 14/14] Add tests for ntested values --- Dockerfile.python-processor | 8 +- src/exporters/clickhouse/rowbinary/json.rs | 6 +- src/exporters/clickhouse/transformer.rs | 649 ++++++++++++++++++++- 3 files changed, 655 insertions(+), 8 deletions(-) diff --git a/Dockerfile.python-processor b/Dockerfile.python-processor index ced631f0..fc18125d 100644 --- a/Dockerfile.python-processor +++ b/Dockerfile.python-processor @@ -15,13 +15,13 @@ ENV PYTHONUNBUFFERED 1 RUN add-apt-repository ppa:deadsnakes/ppa && \ apt update -# Install Python 3.14 and pip -RUN apt install -y python3.14 python3.14-venv python3.14-dev && \ +# Install Python 3.13 and pip +RUN apt install -y python3.13 python3.13-venv python3.13-dev && \ apt clean && \ rm -rf /var/lib/apt/lists/* -# Set Python 3.14 as the default python3 -RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.14 1 +# Set Python 3.13 as the default python3 +RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.13 1 COPY target/${TARGETARCH}/rotel /rotel RUN chmod 0755 /rotel diff --git a/src/exporters/clickhouse/rowbinary/json.rs b/src/exporters/clickhouse/rowbinary/json.rs index 7fc6097a..5ae9e89a 100644 --- a/src/exporters/clickhouse/rowbinary/json.rs +++ b/src/exporters/clickhouse/rowbinary/json.rs @@ -73,7 +73,8 @@ fn anyvalue_to_jsontype_nested<'a>( depth: usize, max_depth: Option, ) -> JsonType<'a> { - if max_depth.is_none() || depth > max_depth.unwrap() { + let effective_max = max_depth.unwrap_or(0); + if depth > effective_max { return match &value.value { Some(Value::IntValue(i)) => JsonType::Int(*i), Some(Value::DoubleValue(d)) => JsonType::Double(*d), @@ -135,7 +136,8 @@ fn anyvalue_to_jsontype_nested_owned( depth: usize, max_depth: Option, ) -> JsonType<'static> { - if max_depth.is_none() || depth > max_depth.unwrap() { + let effective_max = max_depth.unwrap_or(0); + if depth > effective_max { return match value.value { Some(Value::IntValue(i)) => JsonType::Int(i), Some(Value::DoubleValue(d)) => JsonType::Double(d), diff --git a/src/exporters/clickhouse/transformer.rs b/src/exporters/clickhouse/transformer.rs index 8b7df69e..f85b9e05 100644 --- a/src/exporters/clickhouse/transformer.rs +++ b/src/exporters/clickhouse/transformer.rs @@ -221,11 +221,11 @@ pub(crate) fn encode_id<'a>(id: &[u8], out: &'a mut [u8]) -> &'a str { #[cfg(test)] mod tests { use super::*; + use opentelemetry_proto::tonic::common::v1::AnyValue; + use opentelemetry_proto::tonic::common::v1::any_value::Value; #[test] fn test_transform_attrs_lifetime_correctness() { - use opentelemetry_proto::tonic::common::v1::AnyValue; - use opentelemetry_proto::tonic::common::v1::any_value::Value; let transformer = Transformer::new(Compression::None, true); @@ -742,4 +742,649 @@ mod tests { _ => panic!("Expected Map variant"), } } + + // ========================================================================= + // GenAI-style nested attribute tests + // ========================================================================= + // + // These tests exercise the nesting pattern used by OpenTelemetry GenAI + // semantic conventions, where attributes like gen_ai.input.messages use: + // ArrayValue [ KvlistValue { role, parts: ArrayValue [ KvlistValue { type, content } ] } ] + + /// Helper: build a GenAI message KvlistValue with role and text parts. + fn genai_message(role: &str, parts: Vec<(&str, &str)>) -> AnyValue { + use opentelemetry_proto::tonic::common::v1::KeyValueList; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + + let part_values: Vec = parts + .into_iter() + .map(|(part_type, content)| { + AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "type".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue(part_type.to_string())), + }), + }, + KeyValue { + key: "content".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue(content.to_string())), + }), + }, + ], + })), + } + }) + .collect(); + + AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "role".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue(role.to_string())), + }), + }, + KeyValue { + key: "parts".to_string(), + value: Some(AnyValue { + value: Some(Value::ArrayValue( + opentelemetry_proto::tonic::common::v1::ArrayValue { + values: part_values, + }, + )), + }), + }, + ], + })), + } + } + + /// Helper: build a gen_ai.input.messages attribute as ArrayValue of message KvlistValues. + fn genai_messages_attr(key: &str, messages: Vec) -> KeyValue { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + + KeyValue { + key: key.to_string(), + value: Some(AnyValue { + value: Some(Value::ArrayValue( + opentelemetry_proto::tonic::common::v1::ArrayValue { + values: messages, + }, + )), + }), + } + } + + #[test] + fn test_genai_input_messages_flat_mode() { + // In flat mode (default), the top-level ArrayValue stays as an Array, + // but nested KvlistValues inside the array become JSON strings. + let transformer = Transformer::new(Compression::None, true); + + let attrs = vec![ + genai_messages_attr( + "gen_ai.input.messages", + vec![ + genai_message("user", vec![("text", "What is the weather?")]), + genai_message("assistant", vec![("text", "It's sunny today.")]), + ], + ), + KeyValue { + key: "gen_ai.system".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("openai".to_string())), + }), + }, + ]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + // gen_ai.system is a simple string + assert!(map.contains_key("gen_ai.system")); + + // gen_ai.input.messages should be present as an array + assert!( + map.contains_key("gen_ai.input.messages"), + "Expected gen_ai.input.messages key" + ); + + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(values) => { + assert_eq!(values.len(), 2, "Expected 2 messages"); + + // In flat mode, KvlistValue elements become JSON strings + for val in values { + match val { + JsonType::Str(s) => { + // Should contain role and parts info serialized as JSON + let json_str = s.as_ref(); + assert!( + json_str.contains("role") || json_str.contains("type"), + "Expected JSON with message fields, got: {}", + json_str + ); + } + _ => panic!( + "Expected Str (JSON string) in flat mode for KvlistValue, got {:?}", + val + ), + } + } + } + other => panic!( + "Expected Array for gen_ai.input.messages, got {:?}", + other + ), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_input_messages_nested_mode() { + // In nested mode, ArrayValue stays Array and KvlistValues become Objects. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let attrs = vec![genai_messages_attr( + "gen_ai.input.messages", + vec![ + genai_message("user", vec![("text", "What is the weather?")]), + genai_message("assistant", vec![("text", "It's sunny today.")]), + ], + )]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + assert!(map.contains_key("gen_ai.input.messages")); + + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 2, "Expected 2 messages"); + + // First message: user + match &messages[0] { + JsonType::Object(entries) => { + assert_eq!(entries.len(), 2, "Expected role + parts"); + + // Check role + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role") + .expect("Expected 'role' key"); + match &role_entry.1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "user"), + _ => panic!("Expected role to be Str"), + } + + // Check parts is an array + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts") + .expect("Expected 'parts' key"); + match &parts_entry.1 { + JsonType::Array(parts) => { + assert_eq!(parts.len(), 1, "Expected 1 part"); + match &parts[0] { + JsonType::Object(part_entries) => { + let type_entry = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "type") + .expect("Expected 'type' key"); + match &type_entry.1 { + JsonType::Str(s) => { + assert_eq!(s.as_ref(), "text") + } + _ => panic!("Expected type to be Str"), + } + + let content_entry = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "content") + .expect("Expected 'content' key"); + match &content_entry.1 { + JsonType::Str(s) => assert_eq!( + s.as_ref(), + "What is the weather?" + ), + _ => panic!("Expected content to be Str"), + } + } + _ => panic!("Expected Object for part"), + } + } + _ => panic!("Expected Array for parts"), + } + } + _ => panic!("Expected Object for message, got {:?}", &messages[0]), + } + + // Second message: assistant + match &messages[1] { + JsonType::Object(entries) => { + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role") + .expect("Expected 'role' key"); + match &role_entry.1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "assistant"), + _ => panic!("Expected role to be Str"), + } + } + _ => panic!("Expected Object for message"), + } + } + other => panic!( + "Expected Array for gen_ai.input.messages, got {:?}", + other + ), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_input_messages_nested_mode_owned() { + // Same as nested mode test but using the owned path. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let attrs = vec![genai_messages_attr( + "gen_ai.input.messages", + vec![genai_message("user", vec![("text", "Hello!")])], + )]; + + let result = transformer.transform_attrs_kv_owned(&attrs); + + match result { + MapOrJson::JsonOwned(map) => { + assert!(map.contains_key("gen_ai.input.messages")); + + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + + match &messages[0] { + JsonType::Object(entries) => { + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role") + .expect("Expected 'role' key"); + match &role_entry.1 { + JsonType::Str(s) => assert_eq!(s.as_ref(), "user"), + _ => panic!("Expected role to be Str"), + } + + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts") + .expect("Expected 'parts' key"); + match &parts_entry.1 { + JsonType::Array(parts) => { + assert_eq!(parts.len(), 1); + match &parts[0] { + JsonType::Object(part_entries) => { + let content_entry = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "content") + .expect("Expected 'content' key"); + match &content_entry.1 { + JsonType::Str(s) => { + assert_eq!(s.as_ref(), "Hello!") + } + _ => panic!("Expected content to be Str"), + } + } + _ => panic!("Expected Object for part"), + } + } + _ => panic!("Expected Array for parts"), + } + } + _ => panic!("Expected Object for message"), + } + } + other => panic!("Expected Array, got {:?}", other), + } + } + _ => panic!("Expected JsonOwned variant"), + } + } + + #[test] + fn test_genai_tool_call_parts_nested_mode() { + // Test GenAI tool_call message part structure: + // KvlistValue { type: "tool_call", id: "call_123", name: "get_weather", arguments: "{}" } + use opentelemetry_proto::tonic::common::v1::KeyValueList; + use opentelemetry_proto::tonic::common::v1::any_value::Value; + + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let tool_call_part = AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "type".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("tool_call".to_string())), + }), + }, + KeyValue { + key: "id".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("call_abc123".to_string())), + }), + }, + KeyValue { + key: "name".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("get_weather".to_string())), + }), + }, + KeyValue { + key: "arguments".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue( + r#"{"location":"San Francisco"}"#.to_string(), + )), + }), + }, + ], + })), + }; + + // Build: assistant message with tool_call part + let assistant_msg = AnyValue { + value: Some(Value::KvlistValue(KeyValueList { + values: vec![ + KeyValue { + key: "role".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("assistant".to_string())), + }), + }, + KeyValue { + key: "parts".to_string(), + value: Some(AnyValue { + value: Some(Value::ArrayValue( + opentelemetry_proto::tonic::common::v1::ArrayValue { + values: vec![tool_call_part], + }, + )), + }), + }, + ], + })), + }; + + let attrs = vec![genai_messages_attr( + "gen_ai.output.messages", + vec![assistant_msg], + )]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + assert!(map.contains_key("gen_ai.output.messages")); + + match map.get("gen_ai.output.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + + match &messages[0] { + JsonType::Object(entries) => { + // Check parts contains tool_call + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts") + .expect("Expected 'parts' key"); + match &parts_entry.1 { + JsonType::Array(parts) => { + assert_eq!(parts.len(), 1); + match &parts[0] { + JsonType::Object(part_entries) => { + // Verify all tool_call fields + let fields: Vec<&str> = part_entries + .iter() + .map(|(k, _)| k.as_ref()) + .collect(); + assert!(fields.contains(&"type")); + assert!(fields.contains(&"id")); + assert!(fields.contains(&"name")); + assert!(fields.contains(&"arguments")); + + let name = part_entries + .iter() + .find(|(k, _)| k.as_ref() == "name") + .unwrap(); + match &name.1 { + JsonType::Str(s) => { + assert_eq!(s.as_ref(), "get_weather") + } + _ => panic!("Expected Str for name"), + } + } + _ => panic!("Expected Object for tool_call part"), + } + } + _ => panic!("Expected Array for parts"), + } + } + _ => panic!("Expected Object for message"), + } + } + other => panic!("Expected Array, got {:?}", other), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_depth_limit_truncation() { + // Test that depth limit correctly truncates deeply nested structures. + // With max_depth=1, only 1 level of nesting is preserved. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(1)); + + let attrs = vec![genai_messages_attr( + "gen_ai.input.messages", + vec![genai_message("user", vec![("text", "Hello")])], + )]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + assert!(map.contains_key("gen_ai.input.messages")); + + // At depth 0: ArrayValue → Array (each element at depth 1) + // At depth 1: KvlistValue → Object (each value at depth 2) + // At depth 2 > max_depth(1): values become JSON strings + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + + match &messages[0] { + JsonType::Object(entries) => { + // role at depth 2 → JSON string fallback + let role_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "role"); + assert!(role_entry.is_some(), "Expected 'role' key"); + + // parts at depth 2 → should be a JSON string (depth exceeded) + let parts_entry = entries + .iter() + .find(|(k, _)| k.as_ref() == "parts"); + assert!(parts_entry.is_some(), "Expected 'parts' key"); + + // The parts value at depth 2 should be a JSON string fallback + match &parts_entry.unwrap().1 { + JsonType::Str(s) => { + // Depth exceeded, so the ArrayValue is serialized as JSON + assert!( + s.as_ref().contains("text") + || s.as_ref().contains("Hello"), + "Expected JSON fallback containing message data, got: {}", + s.as_ref() + ); + } + _ => { + // At depth 2 with max_depth 1, it falls back to JSON string + // but role (StringValue) would still resolve as Str + } + } + } + JsonType::Str(s) => { + // If depth 1 is exceeded for the message itself + assert!( + s.as_ref().contains("role") || s.as_ref().contains("user"), + "Expected JSON string for truncated message" + ); + } + _ => panic!( + "Expected Object or Str for depth-limited message, got {:?}", + &messages[0] + ), + } + } + other => panic!("Expected Array, got {:?}", other), + } + } + _ => panic!("Expected JSON variant"), + } + } + + #[test] + fn test_genai_mixed_attributes() { + // Test a realistic span with both simple and GenAI nested attributes. + let transformer = Transformer::new(Compression::None, true) + .with_nested_kv_max_depth(Some(10)); + + let attrs = vec![ + KeyValue { + key: "gen_ai.system".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("openai".to_string())), + }), + }, + KeyValue { + key: "gen_ai.request.model".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("gpt-4".to_string())), + }), + }, + KeyValue { + key: "gen_ai.request.max_tokens".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(4096)), + }), + }, + KeyValue { + key: "gen_ai.request.temperature".to_string(), + value: Some(AnyValue { + value: Some(Value::DoubleValue(0.7)), + }), + }, + genai_messages_attr( + "gen_ai.input.messages", + vec![genai_message("user", vec![("text", "Tell me a joke")])], + ), + genai_messages_attr( + "gen_ai.output.messages", + vec![genai_message( + "assistant", + vec![("text", "Why did the chicken cross the road?")], + )], + ), + KeyValue { + key: "gen_ai.usage.input_tokens".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(10)), + }), + }, + KeyValue { + key: "gen_ai.usage.output_tokens".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(15)), + }), + }, + ]; + + let result = transformer.transform_attrs_kv(&attrs); + + match result { + MapOrJson::Json(map) => { + // All 8 attributes should be present + assert_eq!(map.len(), 8, "Expected 8 attributes, got: {:?}", map.keys().collect::>()); + + // Simple string attributes + match map.get("gen_ai.system").unwrap() { + JsonType::Str(s) => assert_eq!(s.as_ref(), "openai"), + _ => panic!("Expected Str"), + } + match map.get("gen_ai.request.model").unwrap() { + JsonType::Str(s) => assert_eq!(s.as_ref(), "gpt-4"), + _ => panic!("Expected Str"), + } + + // Integer attributes + match map.get("gen_ai.request.max_tokens").unwrap() { + JsonType::Int(i) => assert_eq!(*i, 4096), + _ => panic!("Expected Int"), + } + match map.get("gen_ai.usage.input_tokens").unwrap() { + JsonType::Int(i) => assert_eq!(*i, 10), + _ => panic!("Expected Int"), + } + match map.get("gen_ai.usage.output_tokens").unwrap() { + JsonType::Int(i) => assert_eq!(*i, 15), + _ => panic!("Expected Int"), + } + + // Double attribute + match map.get("gen_ai.request.temperature").unwrap() { + JsonType::Double(d) => assert_eq!(*d, 0.7), + _ => panic!("Expected Double"), + } + + // Nested array attributes + match map.get("gen_ai.input.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + match &messages[0] { + JsonType::Object(_) => {} // Nested Object is correct + _ => panic!("Expected Object for message"), + } + } + _ => panic!("Expected Array for input messages"), + } + match map.get("gen_ai.output.messages").unwrap() { + JsonType::Array(messages) => { + assert_eq!(messages.len(), 1); + match &messages[0] { + JsonType::Object(_) => {} // Nested Object is correct + _ => panic!("Expected Object for message"), + } + } + _ => panic!("Expected Array for output messages"), + } + } + _ => panic!("Expected JSON variant"), + } + } }