From 8fd912dc0dfcacc4e4ad104dfa87ee65028b602c Mon Sep 17 00:00:00 2001 From: Oleksii Date: Wed, 10 Jun 2026 01:10:45 -0300 Subject: [PATCH] =?UTF-8?q?feat(server):=20OTLP=20trace=20export=20?= =?UTF-8?q?=E2=80=94=20ORCH8=5FOTLP=5FENDPOINT=20with=20orch8.step=20spans?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds optional OpenTelemetry trace export to orch8-server, enabled only when ORCH8_OTLP_ENDPOINT (config: [telemetry] otlp_endpoint) is set: - New [telemetry] config section (otlp_endpoint, otlp_protocol) with env overrides ORCH8_OTLP_ENDPOINT / ORCH8_OTLP_PROTOCOL (grpc only) and startup validation. Unset endpoint = zero behavior change. - orch8-server gains a default-on `otlp` cargo feature wiring tracing-opentelemetry onto the existing subscriber (fmt + env-filter preserved), with a self-feedback filter so exporter errors can't loop back into the exporter. Tracer provider is flushed on graceful shutdown after the engine drains. - OTel 0.29 line pinned (opentelemetry/opentelemetry_sdk/-otlp 0.29, tracing-opentelemetry 0.30): newest releases whose grpc-tonic transport links against the workspace's tonic 0.12 (0.30+ needs 0.13). - Engine emits an `orch8.step` span around every step-handler invocation (flat + tree paths) with instance_id, block_id, handler, tenant_id, attempt — the existing gen_ai.client.inference event from llm_call rides inside it. Identity fields only; no params/outputs. - Tests: config parsing (set/unset/protocol validation), env override round-trip, subscriber smoke test against a non-listening endpoint (init + flush must be non-fatal), and a tracing-layer test asserting the orch8.step span wraps a mock handler with the expected fields. - Docs: .env.example + docs/CONFIGURATION.md telemetry sections. Co-Authored-By: Claude Fable 5 --- .env.example | 9 + Cargo.lock | 102 +++++++++ Cargo.toml | 9 + docs/CONFIGURATION.md | 18 ++ orch8-engine/src/handlers/step.rs | 132 +++++++++++- orch8-server/Cargo.toml | 17 ++ orch8-server/src/main.rs | 86 +++++++- orch8-server/src/telemetry.rs | 243 ++++++++++++++++++++++ orch8-types/Cargo.toml | 4 + orch8-types/src/config.rs | 96 +++++++++ orch8-types/tests/types_coverage_extra.rs | 2 + 11 files changed, 706 insertions(+), 12 deletions(-) create mode 100644 orch8-server/src/telemetry.rs diff --git a/.env.example b/.env.example index b8254bdc..47e987fe 100644 --- a/.env.example +++ b/.env.example @@ -46,6 +46,15 @@ ORCH8_ARTIFACT_RETENTION_SECS=0 # that uses `response_as: "artifact"` or `body_artifact` — otherwise those # steps fail with a permanent "artifact storage is not configured" error. +# --- Telemetry (OpenTelemetry trace export) ------------------------------ +# OTLP collector endpoint (gRPC), e.g. http://localhost:4317 — Langfuse, +# Datadog Agent, Grafana Alloy, otel-collector. Empty/unset = export disabled +# (zero overhead). Exports `orch8.step` spans (one per step execution; LLM +# steps carry the `gen_ai.client.inference` event). Standard OTEL_SERVICE_NAME +# and OTEL_RESOURCE_ATTRIBUTES env vars are honored. +ORCH8_OTLP_ENDPOINT= +ORCH8_OTLP_PROTOCOL=grpc # grpc (only supported value) + # --- Outbound webhooks --------------------------------------------------- ORCH8_WEBHOOK_URLS= # comma-separated delivery URLs # Optional shared secret. When set, each delivery is signed: diff --git a/Cargo.lock b/Cargo.lock index 69034d6e..492bd4dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2765,6 +2765,84 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "opentelemetry" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.18", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" +dependencies = [ + "futures-core", + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 2.0.18", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.9.4", + "serde_json", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "orch8-api" version = "0.5.0" @@ -2942,6 +3020,9 @@ dependencies = [ "http", "metrics", "metrics-exporter-prometheus", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "orch8-api", "orch8-engine", "orch8-grpc", @@ -2957,6 +3038,7 @@ dependencies = [ "tower 0.5.3", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "utoipa", "utoipa-swagger-ui", @@ -2998,6 +3080,7 @@ dependencies = [ "strsim", "subtle", "thiserror 2.0.18", + "toml 0.8.23", "utoipa", "uuid", ] @@ -3689,6 +3772,7 @@ checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64", "bytes", + "futures-channel", "futures-core", "futures-util", "h2", @@ -5060,6 +5144,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd8e764bd6f5813fd8bebc3117875190c5b0415be8f7f8059bffb6ecd979c444" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 80db737e..001da127 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,15 @@ tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } metrics = "0.24" metrics-exporter-prometheus = "0.16" +# OpenTelemetry trace export (OTLP). Pinned to the 0.29 line: opentelemetry-otlp +# 0.30+ links against tonic 0.13 while the workspace is on tonic 0.12, so 0.29 +# is the newest release whose `grpc-tonic` transport unifies with our tonic. +# tracing-opentelemetry versions trail opentelemetry by one (0.30 ↔ otel 0.29). +opentelemetry = "0.29" +opentelemetry_sdk = "0.29" +opentelemetry-otlp = { version = "0.29", features = ["grpc-tonic"] } +tracing-opentelemetry = "0.30" + # API axum = "0.8" tonic = "0.12" diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 129bc4b3..4834beb0 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -94,6 +94,17 @@ Controls the HTTP and gRPC servers, authentication, CORS, and rate limiting. --- +## [telemetry] + +OpenTelemetry trace export (OTLP). Disabled unless `otlp_endpoint` is set — when empty there is zero runtime overhead. When enabled, the server exports `orch8.step` spans (one per step-handler execution, with `instance_id`, `block_id`, `handler`, `tenant_id`, `attempt` fields); LLM steps carry the `gen_ai.client.inference` structured event inside them. Pipe to Langfuse, Datadog, Grafana Tempo, or any OTLP collector. The standard `OTEL_SERVICE_NAME` and `OTEL_RESOURCE_ATTRIBUTES` env vars are honored (`service.name` defaults to `orch8-server`). Export failures are non-fatal: a down collector logs warnings but never blocks step execution. + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `otlp_endpoint` | string | `""` | OTLP collector endpoint, e.g. `"http://localhost:4317"`. Empty = export disabled | +| `otlp_protocol` | string | `"grpc"` | OTLP transport. Only `"grpc"` is supported | + +--- + ## Environment Variables All config fields can be set via `ORCH8_*` environment variables. Environment variables override values in `orch8.toml`. @@ -151,6 +162,13 @@ All config fields can be set via `ORCH8_*` environment variables. Environment va | `ORCH8_LOG_LEVEL` | `info` | Log level: `trace`, `debug`, `info`, `warn`, `error` | | `ORCH8_LOG_JSON` | `false` | Set to `true` or `1` for structured JSON logs; any other value (or unset) uses human-readable pretty logs | +### Telemetry + +| Variable | Default | Description | +|----------|---------|-------------| +| `ORCH8_OTLP_ENDPOINT` | — | OTLP collector endpoint (e.g. `http://localhost:4317`). Unset/empty = trace export disabled | +| `ORCH8_OTLP_PROTOCOL` | `grpc` | OTLP transport protocol. Only `grpc` is supported | + --- ## Example Configurations diff --git a/orch8-engine/src/handlers/step.rs b/orch8-engine/src/handlers/step.rs index e9c847e4..f99b2b2d 100644 --- a/orch8-engine/src/handlers/step.rs +++ b/orch8-engine/src/handlers/step.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::time::Duration; use chrono::Utc; -use tracing::{info, warn}; +use tracing::{info, warn, Instrument}; use uuid::Uuid; use orch8_storage::StorageBackend; @@ -130,6 +130,19 @@ pub async fn execute_step_dry( let timeout = exec.timeout; let cache_key = exec.cache_key; + // Span around the handler invocation, exported via OTLP when the server + // is configured with an endpoint. Structured handler events (e.g. the + // `gen_ai.client.inference` event from `llm_call`) ride inside it. + // Cardinality stays sane: identity fields only — no params, no outputs. + let step_span = tracing::info_span!( + "orch8.step", + instance_id = %instance_id, + block_id = %block_id, + handler = %exec.handler_name, + tenant_id = %exec.tenant_id, + attempt = attempt, + ); + let step_ctx = StepContext { instance_id, tenant_id: exec.tenant_id, @@ -141,8 +154,9 @@ pub async fn execute_step_dry( wait_for_input: exec.wait_for_input, }; + let handler_fut = handler(step_ctx).instrument(step_span); let result = if let Some(dur) = timeout { - match tokio::time::timeout(dur, handler(step_ctx)).await { + match tokio::time::timeout(dur, handler_fut).await { Ok(res) => res, Err(_) => { return Err(EngineError::StepTimeout { @@ -152,7 +166,7 @@ pub async fn execute_step_dry( } } } else { - handler(step_ctx).await + handler_fut.await }; match result { @@ -289,6 +303,17 @@ pub async fn execute_step( let timeout = exec.timeout; let cache_key = exec.cache_key; + // Same `orch8.step` span as `execute_step_dry` — the tree-evaluator path + // must export identically-shaped spans as the flat scheduler path. + let step_span = tracing::info_span!( + "orch8.step", + instance_id = %instance_id, + block_id = %block_id, + handler = %exec.handler_name, + tenant_id = %exec.tenant_id, + attempt = attempt, + ); + let step_ctx = StepContext { instance_id, tenant_id: exec.tenant_id, @@ -301,8 +326,9 @@ pub async fn execute_step( }; // Execute with optional timeout. + let handler_fut = handler(step_ctx).instrument(step_span); let result = if let Some(dur) = timeout { - match tokio::time::timeout(dur, handler(step_ctx)).await { + match tokio::time::timeout(dur, handler_fut).await { Ok(res) => res, Err(_) => { return Err(EngineError::StepTimeout { @@ -312,7 +338,7 @@ pub async fn execute_step( } } } else { - handler(step_ctx).await + handler_fut.await }; match result { @@ -490,4 +516,100 @@ mod tests { Duration::from_secs(10) ); } + + /// Asserts the `orch8.step` span (exported via OTLP when the server has an + /// endpoint configured) wraps handler invocation and carries the expected + /// identity fields. Span emission is asserted via a tracing test layer + /// rather than `opentelemetry_sdk`'s in-memory exporter — wiring the OpenTelemetry + /// bridge into orch8-engine's dev-deps just for this would drag the whole + /// opentelemetry stack into the engine's test build for no extra signal: + /// the tracing span IS the unit the OTLP layer exports. + #[tokio::test] + async fn execute_step_dry_emits_orch8_step_span_around_handler() { + use std::collections::HashMap; + use std::sync::Mutex; + + use tracing::instrument::WithSubscriber; + use tracing_subscriber::layer::SubscriberExt; + + #[derive(Clone, Default)] + struct SpanCapture { + spans: Arc>>>, + } + + struct FieldVisitor(HashMap); + impl tracing::field::Visit for FieldVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0 + .insert(field.name().to_string(), format!("{value:?}")); + } + } + + impl tracing_subscriber::Layer for SpanCapture + where + S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, + { + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + _id: &tracing::span::Id, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + if attrs.metadata().name() == "orch8.step" { + let mut visitor = FieldVisitor(HashMap::new()); + attrs.record(&mut visitor); + self.spans.lock().unwrap().push(visitor.0); + } + } + } + + let capture = SpanCapture::default(); + let subscriber = tracing_subscriber::registry().with(capture.clone()); + + let storage: Arc = Arc::new( + orch8_storage::sqlite::SqliteStorage::in_memory() + .await + .unwrap(), + ); + let mut handlers = HandlerRegistry::new(); + handlers.register("mock_step", |_ctx| async { + Ok(serde_json::json!({"ok": true})) + }); + + let instance_id = InstanceId::new(); + let exec = StepExecParams { + instance_id, + tenant_id: TenantId::unchecked("tenant-a"), + block_id: BlockId::new("step-1"), + handler_name: "mock_step".into(), + params: serde_json::json!({}), + context: orch8_types::context::ExecutionContext::default(), + attempt: 0, + timeout: None, + externalize_threshold: 0, + wait_for_input: None, + cache_key: None, + }; + + let output = execute_step_dry(&storage, &handlers, exec) + .with_subscriber(subscriber) + .await + .unwrap(); + assert_eq!(output.output["ok"], true); + + let spans = capture.spans.lock().unwrap(); + assert_eq!(spans.len(), 1, "expected exactly one orch8.step span"); + let fields = &spans[0]; + assert_eq!(fields.get("handler").map(String::as_str), Some("mock_step")); + assert_eq!( + fields.get("tenant_id").map(String::as_str), + Some("tenant-a") + ); + assert_eq!(fields.get("block_id").map(String::as_str), Some("step-1")); + assert_eq!(fields.get("attempt").map(String::as_str), Some("0")); + assert_eq!( + fields.get("instance_id").cloned(), + Some(instance_id.to_string()) + ); + } } diff --git a/orch8-server/Cargo.toml b/orch8-server/Cargo.toml index d34cca3c..ca8bacff 100644 --- a/orch8-server/Cargo.toml +++ b/orch8-server/Cargo.toml @@ -40,5 +40,22 @@ tonic.workspace = true http = "1" tikv-jemallocator.workspace = true +# OTLP trace export (feature `otlp`, on by default — opt out with +# `--no-default-features` for lean builds without the OpenTelemetry stack). +opentelemetry = { workspace = true, optional = true } +opentelemetry_sdk = { workspace = true, optional = true } +opentelemetry-otlp = { workspace = true, optional = true } +tracing-opentelemetry = { workspace = true, optional = true } + +[features] +default = ["otlp"] +# OpenTelemetry OTLP trace export (ORCH8_OTLP_ENDPOINT / [telemetry] otlp_endpoint). +otlp = [ + "dep:opentelemetry", + "dep:opentelemetry_sdk", + "dep:opentelemetry-otlp", + "dep:tracing-opentelemetry", +] + [lints] workspace = true diff --git a/orch8-server/src/main.rs b/orch8-server/src/main.rs index 7d255a17..a7bf5464 100644 --- a/orch8-server/src/main.rs +++ b/orch8-server/src/main.rs @@ -29,6 +29,8 @@ use orch8_storage::sqlite::SqliteStorage; use orch8_storage::StorageBackend; use orch8_types::config::EngineConfig; +mod telemetry; + #[derive(Parser)] #[command( name = "orch8", @@ -63,8 +65,9 @@ async fn main() -> anyhow::Result<()> { )); } - // Initialize logging. - init_logging(&config.logging); + // Initialize OTLP trace export (no-op unless ORCH8_OTLP_ENDPOINT / + // [telemetry] otlp_endpoint is set) and logging. + let otel = init_observability(&config)?; print_startup_banner(&config, cli.insecure); @@ -199,6 +202,11 @@ async fn main() -> anyhow::Result<()> { drain_shutdown(engine_handle, grpc_handle, cb_registry).await; + // Flush any spans still buffered in the OTLP batch exporter. After the + // engine has drained so the last step spans make it out. Best-effort: a + // dead collector logs a warning, never fails shutdown. + otel.shutdown().await; + tracing::info!("Shutdown complete"); Ok(()) } @@ -600,6 +608,12 @@ fn apply_env_overrides(config: &mut EngineConfig) { if let Ok(val) = std::env::var("ORCH8_LOG_JSON") { config.logging.json = val == "true" || val == "1"; } + if let Ok(val) = std::env::var("ORCH8_OTLP_ENDPOINT") { + config.telemetry.otlp_endpoint = val; + } + if let Ok(val) = std::env::var("ORCH8_OTLP_PROTOCOL") { + config.telemetry.otlp_protocol = val; + } if let Ok(val) = std::env::var("ORCH8_HTTP_ADDR") { config.api.http_addr = val; } @@ -694,7 +708,26 @@ fn apply_env_overrides(config: &mut EngineConfig) { } } -fn init_logging(config: &orch8_types::config::LoggingConfig) { +/// Initialize OTLP trace export, then the tracing subscriber (which needs the +/// OpenTelemetry layer at construction time). Returns the guard used to flush +/// buffered spans on shutdown — inert when no OTLP endpoint is configured. +fn init_observability(config: &EngineConfig) -> anyhow::Result { + let otel = telemetry::init(&config.telemetry)?; + init_logging(&config.logging, &otel); + if otel.is_enabled() { + tracing::info!( + endpoint = %config.telemetry.otlp_endpoint, + protocol = "grpc", + "OTLP trace export enabled" + ); + } + Ok(otel) +} + +fn init_logging(config: &orch8_types::config::LoggingConfig, otel: &telemetry::OtelGuard) { + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&config.level)); @@ -704,18 +737,24 @@ fn init_logging(config: &orch8_types::config::LoggingConfig) { // output to stderr prevents the stdout pipe buffer from filling up // (~64KB on macOS/Linux) and blocking the scheduler's tick loop on // its next log write. - let subscriber = tracing_subscriber::fmt() - .with_env_filter(filter) + let fmt_layer = tracing_subscriber::fmt::layer() .with_writer(std::io::stderr) .with_target(true) .with_thread_ids(false) .with_file(false) .with_line_number(false); + // The OTel layer is `None` when no OTLP endpoint is configured (or the + // `otlp` feature is off) — `Option` composes as a no-op, keeping + // the zero-config subscriber identical to the pre-OTLP stack. + let registry = tracing_subscriber::registry() + .with(filter) + .with(otel.layer()); + if config.json { - subscriber.json().init(); + registry.with(fmt_layer.json()).init(); } else { - subscriber.init(); + registry.with(fmt_layer).init(); } } @@ -794,3 +833,36 @@ fn build_cors_layer(origins: &str) -> CorsLayer { layer.allow_origin(parsed) } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Endpoint set/unset env parsing in ONE test: `std::env` is process-global + /// and tests run in parallel, so splitting these into separate tests would + /// race on the same variables. + #[test] + fn apply_env_overrides_telemetry_endpoint_set_and_unset() { + // Unset: defaults survive — export stays disabled. + std::env::remove_var("ORCH8_OTLP_ENDPOINT"); + std::env::remove_var("ORCH8_OTLP_PROTOCOL"); + let mut config = EngineConfig::default(); + apply_env_overrides(&mut config); + assert!(config.telemetry.otlp_endpoint.is_empty()); + assert_eq!(config.telemetry.otlp_protocol, "grpc"); + assert!(!config.telemetry.otlp_enabled()); + + // Set: env wins over the (default) config values. + std::env::set_var("ORCH8_OTLP_ENDPOINT", "http://collector:4317"); + std::env::set_var("ORCH8_OTLP_PROTOCOL", "grpc"); + let mut config = EngineConfig::default(); + apply_env_overrides(&mut config); + assert_eq!(config.telemetry.otlp_endpoint, "http://collector:4317"); + assert_eq!(config.telemetry.otlp_protocol, "grpc"); + assert!(config.telemetry.otlp_enabled()); + assert!(config.validate().is_ok()); + + std::env::remove_var("ORCH8_OTLP_ENDPOINT"); + std::env::remove_var("ORCH8_OTLP_PROTOCOL"); + } +} diff --git a/orch8-server/src/telemetry.rs b/orch8-server/src/telemetry.rs new file mode 100644 index 00000000..490b98f3 --- /dev/null +++ b/orch8-server/src/telemetry.rs @@ -0,0 +1,243 @@ +//! OpenTelemetry OTLP trace export. +//! +//! Enabled ONLY when `ORCH8_OTLP_ENDPOINT` (config: `[telemetry] otlp_endpoint`) +//! is set. When unset, [`init`] returns an inert guard and the tracing stack is +//! byte-identical to a build without this module — no OpenTelemetry layer, no runtime +//! cost. Export failures (collector down, wrong port…) are non-fatal background +//! noise: the batch exporter retries/logs internally and never blocks steps. +//! +//! Spans exported include `orch8.step` (one per step-handler invocation, see +//! `orch8_engine::handlers::step`) which carries the `gen_ai.client.inference` +//! events emitted by the `llm_call` handler — pipe them to Langfuse / Datadog / +//! Grafana Tempo via any OTLP collector. +//! +//! The whole module is feature-gated (`otlp`, on by default). With +//! `--no-default-features` it compiles down to a no-op guard. + +use orch8_types::config::TelemetryConfig; + +#[cfg(feature = "otlp")] +pub use enabled::{init, OtelGuard}; + +#[cfg(not(feature = "otlp"))] +pub use disabled::{init, OtelGuard}; + +#[cfg(feature = "otlp")] +mod enabled { + use anyhow::Context as _; + use opentelemetry::trace::TracerProvider as _; + use opentelemetry_otlp::WithExportConfig as _; + use opentelemetry_sdk::trace::SdkTracerProvider; + use tracing_subscriber::Layer as _; + + use super::TelemetryConfig; + + /// Tracer name recorded on exported spans (`otel.library.name`). + const TRACER_NAME: &str = "orch8-server"; + + /// Per-layer filter type for the OpenTelemetry layer: drops events/spans emitted by + /// the export pipeline itself (opentelemetry/tonic/h2/hyper) so a failing + /// exporter can't feed its own error logs back into the exporter forever. + type OtelLayer = tracing_subscriber::filter::Filtered< + tracing_opentelemetry::OpenTelemetryLayer, + tracing_subscriber::filter::FilterFn) -> bool>, + S, + >; + + fn not_export_pipeline(meta: &tracing::Metadata<'_>) -> bool { + let target = meta.target(); + !(target.starts_with("opentelemetry") + || target.starts_with("tonic") + || target.starts_with("h2") + || target.starts_with("hyper")) + } + + /// Holds the tracer provider so the batch exporter can be flushed on + /// shutdown. Inert (all methods no-ops) when no endpoint is configured. + pub struct OtelGuard { + provider: Option, + } + + impl OtelGuard { + /// Whether OTLP export is active. + pub fn is_enabled(&self) -> bool { + self.provider.is_some() + } + + /// Build the `tracing-opentelemetry` layer bridging tracing spans into + /// the OTLP pipeline. `None` when export is disabled — `Option` + /// composes onto the subscriber as a no-op. + pub fn layer(&self) -> Option> + where + S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, + { + self.provider.as_ref().map(|provider| { + tracing_opentelemetry::layer() + .with_tracer(provider.tracer(TRACER_NAME)) + .with_filter(tracing_subscriber::filter::filter_fn( + not_export_pipeline as fn(&tracing::Metadata<'_>) -> bool, + )) + }) + } + + /// Flush and shut down the tracer provider. Blocking under the hood + /// (drains the batch queue), so it runs on the blocking pool. Export + /// errors are logged, never propagated — a dead collector must not + /// turn a clean shutdown into a failure. + pub async fn shutdown(self) { + let Some(provider) = self.provider else { + return; + }; + match tokio::task::spawn_blocking(move || provider.shutdown()).await { + Ok(Ok(())) => tracing::debug!("OTLP tracer provider shut down"), + Ok(Err(e)) => tracing::warn!(error = %e, "OTLP tracer provider shutdown error"), + Err(e) => tracing::warn!(error = %e, "OTLP shutdown task failed"), + } + } + } + + /// Build the OTLP pipeline from config. Returns an inert guard when no + /// endpoint is configured. Fails only on configuration-level errors (bad + /// endpoint syntax) — runtime export failures are background noise. + /// + /// Must be called from within a Tokio runtime (the tonic exporter grabs + /// the ambient runtime handle for its gRPC channel). + pub fn init(config: &TelemetryConfig) -> anyhow::Result { + if !config.otlp_enabled() { + return Ok(OtelGuard { provider: None }); + } + // `EngineConfig::validate()` already rejects non-grpc protocols at + // startup; double-check here so direct callers (tests) fail loudly too. + anyhow::ensure!( + matches!(config.otlp_protocol.as_str(), "grpc" | ""), + "telemetry.otlp_protocol: only \"grpc\" is supported (got {:?})", + config.otlp_protocol + ); + + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(&config.otlp_endpoint) + .build() + .context("failed to build OTLP span exporter")?; + + // `Resource::builder()` includes the standard env detectors, so + // OTEL_RESOURCE_ATTRIBUTES is honored. service.name defaults to + // "orch8-server" but yields to an explicit OTEL_SERVICE_NAME. + let mut resource = opentelemetry_sdk::Resource::builder().with_attribute( + opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), + ); + if std::env::var("OTEL_SERVICE_NAME").is_err() { + resource = resource.with_service_name("orch8-server"); + } + + let provider = SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_resource(resource.build()) + .build(); + + // NOTE: no tracing::info! here — init runs BEFORE the subscriber is + // installed (the subscriber needs this guard's layer). main logs the + // "OTLP trace export enabled" line after init_logging. + Ok(OtelGuard { + provider: Some(provider), + }) + } +} + +#[cfg(not(feature = "otlp"))] +mod disabled { + #![allow(clippy::unused_self, clippy::unused_async)] + + use super::TelemetryConfig; + + /// No-op guard for builds without the `otlp` feature. + pub struct OtelGuard; + + impl OtelGuard { + pub fn is_enabled(&self) -> bool { + false + } + + /// Identity layer placeholder — `None` composes as a no-op. + /// Non-generic (unlike the enabled variant): `Identity` implements + /// `Layer` for every subscriber, so no type parameter is needed + /// and call-site inference stays happy. + pub fn layer(&self) -> Option { + None + } + + pub async fn shutdown(self) {} + } + + /// Without the `otlp` feature an endpoint cannot be honored — warn instead + /// of silently dropping traces the operator asked for. + // Result-wrapped for signature parity with the `otlp`-enabled variant. + #[allow(clippy::unnecessary_wraps)] + pub fn init(config: &TelemetryConfig) -> anyhow::Result { + if config.otlp_enabled() { + eprintln!( + "WARNING: ORCH8_OTLP_ENDPOINT is set but this binary was built without the \ + `otlp` feature — trace export is disabled" + ); + } + Ok(OtelGuard) + } +} + +#[cfg(all(test, feature = "otlp"))] +mod tests { + use super::*; + + #[test] + fn init_without_endpoint_is_inert() { + type Registry = tracing_subscriber::Registry; + let guard = init(&TelemetryConfig::default()).expect("init must succeed"); + assert!(!guard.is_enabled()); + assert!(guard.layer::().is_none()); + } + + #[test] + fn init_rejects_non_grpc_protocol() { + let cfg = TelemetryConfig { + otlp_endpoint: "http://127.0.0.1:4317".into(), + otlp_protocol: "http".into(), + }; + // Outside a runtime this must fail on the protocol check, BEFORE any + // exporter construction. + let Err(err) = init(&cfg) else { + panic!("non-grpc protocol must be rejected"); + }; + assert!(err.to_string().contains("otlp_protocol")); + } + + /// Smoke test: subscriber initializes with the OTLP layer pointed at a + /// non-listening local address. Initialization must not panic or block; + /// export failures are non-fatal background noise swallowed by the batch + /// exporter. Shutdown (which flushes into the dead endpoint) must also + /// return without panicking. + #[tokio::test(flavor = "multi_thread")] + async fn init_with_unreachable_endpoint_does_not_panic_or_block() { + use tracing_subscriber::layer::SubscriberExt as _; + + let cfg = TelemetryConfig { + // Port 1 is essentially guaranteed closed → immediate refusal, + // exercising the export-failure path without slow timeouts. + otlp_endpoint: "http://127.0.0.1:1".into(), + otlp_protocol: "grpc".into(), + }; + let guard = init(&cfg).expect("init must succeed even if collector is down"); + assert!(guard.is_enabled()); + + // Compose a scoped subscriber with the OTel layer and emit a span + // through it — must not panic even though every export will fail. + let subscriber = tracing_subscriber::registry().with(guard.layer()); + tracing::subscriber::with_default(subscriber, || { + let span = tracing::info_span!("smoke", test = true); + let _entered = span.enter(); + tracing::info!("event inside exported span"); + }); + + // Flush against the dead endpoint: errors are logged, not returned. + guard.shutdown().await; + } +} diff --git a/orch8-types/Cargo.toml b/orch8-types/Cargo.toml index 2487f693..29e32d5f 100644 --- a/orch8-types/Cargo.toml +++ b/orch8-types/Cargo.toml @@ -18,5 +18,9 @@ sha2.workspace = true subtle.workspace = true strsim = "0.11" +[dev-dependencies] +# TOML round-trip tests for `EngineConfig` ([telemetry] section parsing). +toml.workspace = true + [lints] workspace = true diff --git a/orch8-types/src/config.rs b/orch8-types/src/config.rs index 73b29a18..1360ac27 100644 --- a/orch8-types/src/config.rs +++ b/orch8-types/src/config.rs @@ -112,6 +112,8 @@ pub struct EngineConfig { pub logging: LoggingConfig, #[serde(default)] pub artifacts: ArtifactConfig, + #[serde(default)] + pub telemetry: TelemetryConfig, } /// Selectable durable artifact backends. In-memory is intentionally absent — @@ -562,6 +564,42 @@ fn default_log_level() -> String { "info".to_string() } +/// OpenTelemetry trace export (OTLP). Disabled unless `otlp_endpoint` is set — +/// when it is empty (the default) the server behaves exactly as before: no +/// OpenTelemetry layer is installed and there is no runtime cost. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryConfig { + /// OTLP collector endpoint, e.g. `"http://localhost:4317"` (Langfuse, + /// Datadog Agent, Grafana Alloy, otel-collector…). Empty = export disabled. + /// Env override: `ORCH8_OTLP_ENDPOINT`. + #[serde(default)] + pub otlp_endpoint: String, + /// OTLP transport protocol. Only `"grpc"` is currently supported. + /// Env override: `ORCH8_OTLP_PROTOCOL`. + #[serde(default = "default_otlp_protocol")] + pub otlp_protocol: String, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + otlp_endpoint: String::new(), + otlp_protocol: default_otlp_protocol(), + } + } +} + +impl TelemetryConfig { + /// Trace export is enabled iff an endpoint is configured. + pub fn otlp_enabled(&self) -> bool { + !self.otlp_endpoint.is_empty() + } +} + +fn default_otlp_protocol() -> String { + "grpc".to_string() +} + impl EngineConfig { /// Validate configuration values, returning all errors found. pub fn validate(&self) -> Result<(), Vec> { @@ -609,6 +647,17 @@ impl EngineConfig { )), } + // Telemetry — only validated when export is actually enabled, so an + // unset endpoint keeps zero-config startup byte-identical to before. + if self.telemetry.otlp_enabled() { + match self.telemetry.otlp_protocol.as_str() { + "grpc" | "" => {} + other => errors.push(format!( + "telemetry.otlp_protocol: unsupported protocol \"{other}\" (only \"grpc\" is supported)" + )), + } + } + if errors.is_empty() { Ok(()) } else { @@ -1084,6 +1133,53 @@ mod tests { assert!(cfg.json); } + #[test] + fn telemetry_config_defaults_to_disabled() { + let cfg = TelemetryConfig::default(); + assert!(cfg.otlp_endpoint.is_empty()); + assert_eq!(cfg.otlp_protocol, "grpc"); + assert!(!cfg.otlp_enabled()); + // EngineConfig embeds the same default — and a default config must + // validate cleanly (no telemetry errors when export is off). + let engine_cfg = EngineConfig::default(); + assert!(!engine_cfg.telemetry.otlp_enabled()); + assert!(engine_cfg.validate().is_ok()); + } + + #[test] + fn telemetry_config_parses_endpoint_from_toml_section() { + let toml_src = r#" + [telemetry] + otlp_endpoint = "http://localhost:4317" + "#; + let cfg: EngineConfig = toml::from_str(toml_src).unwrap(); + assert_eq!(cfg.telemetry.otlp_endpoint, "http://localhost:4317"); + assert_eq!(cfg.telemetry.otlp_protocol, "grpc"); // default kicks in + assert!(cfg.telemetry.otlp_enabled()); + assert!(cfg.validate().is_ok()); + } + + #[test] + fn telemetry_config_missing_section_means_disabled() { + let cfg: EngineConfig = toml::from_str("[logging]\nlevel = \"info\"\n").unwrap(); + assert!(!cfg.telemetry.otlp_enabled()); + } + + #[test] + fn telemetry_validation_rejects_unknown_protocol_only_when_enabled() { + let mut cfg = EngineConfig::default(); + // Unknown protocol with NO endpoint: export disabled, must not error. + cfg.telemetry.otlp_protocol = "carrier-pigeon".into(); + assert!(cfg.validate().is_ok()); + // Same protocol with an endpoint set: now it must be rejected. + cfg.telemetry.otlp_endpoint = "http://localhost:4317".into(); + let errors = cfg.validate().unwrap_err(); + assert!( + errors.iter().any(|e| e.contains("otlp_protocol")), + "expected otlp_protocol error, got: {errors:?}" + ); + } + #[test] fn config_deserialization_from_json_string_for_key_fields() { let json = r#"{ diff --git a/orch8-types/tests/types_coverage_extra.rs b/orch8-types/tests/types_coverage_extra.rs index f2970d17..6e75a1bb 100644 --- a/orch8-types/tests/types_coverage_extra.rs +++ b/orch8-types/tests/types_coverage_extra.rs @@ -361,6 +361,7 @@ fn cfg_36_config_serde_with_all_sections() { json: true, }, artifacts: orch8_types::config::ArtifactConfig::default(), + telemetry: orch8_types::config::TelemetryConfig::default(), }; let json = serde_json::to_string(&cfg).unwrap(); let back: EngineConfig = serde_json::from_str(&json).unwrap(); @@ -468,6 +469,7 @@ fn cfg_44_secret_string_not_leaked_in_serialized_config() { }, logging: LoggingConfig::default(), artifacts: orch8_types::config::ArtifactConfig::default(), + telemetry: orch8_types::config::TelemetryConfig::default(), }; let json = serde_json::to_string(&cfg).unwrap(); assert!(!json.contains("pass@host"));