Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ ORCH8_ARTIFACT_RETENTION_SECS=0
# that uses `response_as: "artifact"` or `body_artifact` — otherwise those
# steps fail with a permanent "artifact storage is not configured" error.

# --- Telemetry (OpenTelemetry trace export) ------------------------------
# OTLP collector endpoint (gRPC), e.g. http://localhost:4317 — Langfuse,
# Datadog Agent, Grafana Alloy, otel-collector. Empty/unset = export disabled
# (zero overhead). Exports `orch8.step` spans (one per step execution; LLM
# steps carry the `gen_ai.client.inference` event). Standard OTEL_SERVICE_NAME
# and OTEL_RESOURCE_ATTRIBUTES env vars are honored.
ORCH8_OTLP_ENDPOINT=
ORCH8_OTLP_PROTOCOL=grpc # grpc (only supported value)

# --- Outbound webhooks ---------------------------------------------------
ORCH8_WEBHOOK_URLS= # comma-separated delivery URLs
# Optional shared secret. When set, each delivery is signed:
Expand Down
102 changes: 102 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
metrics = "0.24"
metrics-exporter-prometheus = "0.16"

# OpenTelemetry trace export (OTLP). Pinned to the 0.29 line: opentelemetry-otlp
# 0.30+ links against tonic 0.13 while the workspace is on tonic 0.12, so 0.29
# is the newest release whose `grpc-tonic` transport unifies with our tonic.
# tracing-opentelemetry versions trail opentelemetry by one (0.30 ↔ otel 0.29).
opentelemetry = "0.29"
opentelemetry_sdk = "0.29"
opentelemetry-otlp = { version = "0.29", features = ["grpc-tonic"] }
tracing-opentelemetry = "0.30"

# API
axum = "0.8"
tonic = "0.12"
Expand Down
18 changes: 18 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ Controls the HTTP and gRPC servers, authentication, CORS, and rate limiting.

---

## [telemetry]

OpenTelemetry trace export (OTLP). Disabled unless `otlp_endpoint` is set — when empty there is zero runtime overhead. When enabled, the server exports `orch8.step` spans (one per step-handler execution, with `instance_id`, `block_id`, `handler`, `tenant_id`, `attempt` fields); LLM steps carry the `gen_ai.client.inference` structured event inside them. Pipe to Langfuse, Datadog, Grafana Tempo, or any OTLP collector. The standard `OTEL_SERVICE_NAME` and `OTEL_RESOURCE_ATTRIBUTES` env vars are honored (`service.name` defaults to `orch8-server`). Export failures are non-fatal: a down collector logs warnings but never blocks step execution.

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `otlp_endpoint` | string | `""` | OTLP collector endpoint, e.g. `"http://localhost:4317"`. Empty = export disabled |
| `otlp_protocol` | string | `"grpc"` | OTLP transport. Only `"grpc"` is supported |

---

## Environment Variables

All config fields can be set via `ORCH8_*` environment variables. Environment variables override values in `orch8.toml`.
Expand Down Expand Up @@ -151,6 +162,13 @@ All config fields can be set via `ORCH8_*` environment variables. Environment va
| `ORCH8_LOG_LEVEL` | `info` | Log level: `trace`, `debug`, `info`, `warn`, `error` |
| `ORCH8_LOG_JSON` | `false` | Set to `true` or `1` for structured JSON logs; any other value (or unset) uses human-readable pretty logs |

### Telemetry

| Variable | Default | Description |
|----------|---------|-------------|
| `ORCH8_OTLP_ENDPOINT` | — | OTLP collector endpoint (e.g. `http://localhost:4317`). Unset/empty = trace export disabled |
| `ORCH8_OTLP_PROTOCOL` | `grpc` | OTLP transport protocol. Only `grpc` is supported |

---

## Example Configurations
Expand Down
132 changes: 127 additions & 5 deletions orch8-engine/src/handlers/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;
use std::time::Duration;

use chrono::Utc;
use tracing::{info, warn};
use tracing::{info, warn, Instrument};
use uuid::Uuid;

use orch8_storage::StorageBackend;
Expand Down Expand Up @@ -130,6 +130,19 @@ pub async fn execute_step_dry(
let timeout = exec.timeout;
let cache_key = exec.cache_key;

// Span around the handler invocation, exported via OTLP when the server
// is configured with an endpoint. Structured handler events (e.g. the
// `gen_ai.client.inference` event from `llm_call`) ride inside it.
// Cardinality stays sane: identity fields only — no params, no outputs.
let step_span = tracing::info_span!(
"orch8.step",
instance_id = %instance_id,
block_id = %block_id,
handler = %exec.handler_name,
tenant_id = %exec.tenant_id,
attempt = attempt,
);

let step_ctx = StepContext {
instance_id,
tenant_id: exec.tenant_id,
Expand All @@ -141,8 +154,9 @@ pub async fn execute_step_dry(
wait_for_input: exec.wait_for_input,
};

let handler_fut = handler(step_ctx).instrument(step_span);
let result = if let Some(dur) = timeout {
match tokio::time::timeout(dur, handler(step_ctx)).await {
match tokio::time::timeout(dur, handler_fut).await {
Ok(res) => res,
Err(_) => {
return Err(EngineError::StepTimeout {
Expand All @@ -152,7 +166,7 @@ pub async fn execute_step_dry(
}
}
} else {
handler(step_ctx).await
handler_fut.await
};

match result {
Expand Down Expand Up @@ -289,6 +303,17 @@ pub async fn execute_step(
let timeout = exec.timeout;
let cache_key = exec.cache_key;

// Same `orch8.step` span as `execute_step_dry` — the tree-evaluator path
// must export identically-shaped spans as the flat scheduler path.
let step_span = tracing::info_span!(
"orch8.step",
instance_id = %instance_id,
block_id = %block_id,
handler = %exec.handler_name,
tenant_id = %exec.tenant_id,
attempt = attempt,
);

let step_ctx = StepContext {
instance_id,
tenant_id: exec.tenant_id,
Expand All @@ -301,8 +326,9 @@ pub async fn execute_step(
};

// Execute with optional timeout.
let handler_fut = handler(step_ctx).instrument(step_span);
let result = if let Some(dur) = timeout {
match tokio::time::timeout(dur, handler(step_ctx)).await {
match tokio::time::timeout(dur, handler_fut).await {
Ok(res) => res,
Err(_) => {
return Err(EngineError::StepTimeout {
Expand All @@ -312,7 +338,7 @@ pub async fn execute_step(
}
}
} else {
handler(step_ctx).await
handler_fut.await
};

match result {
Expand Down Expand Up @@ -490,4 +516,100 @@ mod tests {
Duration::from_secs(10)
);
}

/// Asserts the `orch8.step` span (exported via OTLP when the server has an
/// endpoint configured) wraps handler invocation and carries the expected
/// identity fields. Span emission is asserted via a tracing test layer
/// rather than `opentelemetry_sdk`'s in-memory exporter — wiring the OpenTelemetry
/// bridge into orch8-engine's dev-deps just for this would drag the whole
/// opentelemetry stack into the engine's test build for no extra signal:
/// the tracing span IS the unit the OTLP layer exports.
#[tokio::test]
async fn execute_step_dry_emits_orch8_step_span_around_handler() {
use std::collections::HashMap;
use std::sync::Mutex;

use tracing::instrument::WithSubscriber;
use tracing_subscriber::layer::SubscriberExt;

#[derive(Clone, Default)]
struct SpanCapture {
spans: Arc<Mutex<Vec<HashMap<String, String>>>>,
}

struct FieldVisitor(HashMap<String, String>);
impl tracing::field::Visit for FieldVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.0
.insert(field.name().to_string(), format!("{value:?}"));
}
}

impl<S> tracing_subscriber::Layer<S> for SpanCapture
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
_id: &tracing::span::Id,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
if attrs.metadata().name() == "orch8.step" {
let mut visitor = FieldVisitor(HashMap::new());
attrs.record(&mut visitor);
self.spans.lock().unwrap().push(visitor.0);
}
}
}

let capture = SpanCapture::default();
let subscriber = tracing_subscriber::registry().with(capture.clone());

let storage: Arc<dyn orch8_storage::StorageBackend> = Arc::new(
orch8_storage::sqlite::SqliteStorage::in_memory()
.await
.unwrap(),
);
let mut handlers = HandlerRegistry::new();
handlers.register("mock_step", |_ctx| async {
Ok(serde_json::json!({"ok": true}))
});

let instance_id = InstanceId::new();
let exec = StepExecParams {
instance_id,
tenant_id: TenantId::unchecked("tenant-a"),
block_id: BlockId::new("step-1"),
handler_name: "mock_step".into(),
params: serde_json::json!({}),
context: orch8_types::context::ExecutionContext::default(),
attempt: 0,
timeout: None,
externalize_threshold: 0,
wait_for_input: None,
cache_key: None,
};

let output = execute_step_dry(&storage, &handlers, exec)
.with_subscriber(subscriber)
.await
.unwrap();
assert_eq!(output.output["ok"], true);

let spans = capture.spans.lock().unwrap();
assert_eq!(spans.len(), 1, "expected exactly one orch8.step span");
let fields = &spans[0];
assert_eq!(fields.get("handler").map(String::as_str), Some("mock_step"));
assert_eq!(
fields.get("tenant_id").map(String::as_str),
Some("tenant-a")
);
assert_eq!(fields.get("block_id").map(String::as_str), Some("step-1"));
assert_eq!(fields.get("attempt").map(String::as_str), Some("0"));
assert_eq!(
fields.get("instance_id").cloned(),
Some(instance_id.to_string())
);
}
}
Loading
Loading