From 5e05de330c9bbb179c52c8525e356d5a0645f116 Mon Sep 17 00:00:00 2001 From: Aniket Agarwal Date: Thu, 28 May 2026 22:29:10 +0530 Subject: [PATCH 1/7] feat(tempo): added grafana tempo integration --- app/agent/investigation.py | 1 + app/integrations/_catalog_impl.py | 31 +++ app/integrations/_verification_adapters.py | 7 + app/integrations/cli.py | 18 ++ app/integrations/effective_models.py | 1 + app/integrations/registry.py | 8 + app/integrations/tempo.py | 155 +++++++++++ app/integrations/verify.py | 2 + app/services/grafana/tempo.py | 45 +-- app/services/otlp_trace.py | 90 ++++++ app/services/tempo/__init__.py | 5 + app/services/tempo/client.py | 306 +++++++++++++++++++++ app/tools/TempoTool/__init__.py | 164 +++++++++++ app/tools/utils/availability.py | 12 + app/types/evidence.py | 1 + docs/docs.json | 1 + docs/tempo.mdx | 105 +++++++ tests/integrations/test_tempo.py | 124 +++++++++ tests/services/test_grafana_tempo.py | 6 +- tests/services/test_otlp_trace.py | 71 +++++ tests/services/test_tempo_client.py | 177 ++++++++++++ tests/synthetic/signoz/__init__.py | 10 + tests/synthetic/test_tempo_scenario.py | 123 +++++++++ tests/tools/test_tempo_tools.py | 91 ++++++ 24 files changed, 1511 insertions(+), 43 deletions(-) create mode 100644 app/integrations/tempo.py create mode 100644 app/services/otlp_trace.py create mode 100644 app/services/tempo/__init__.py create mode 100644 app/services/tempo/client.py create mode 100644 app/tools/TempoTool/__init__.py create mode 100644 docs/tempo.mdx create mode 100644 tests/integrations/test_tempo.py create mode 100644 tests/services/test_otlp_trace.py create mode 100644 tests/services/test_tempo_client.py create mode 100644 tests/synthetic/signoz/__init__.py create mode 100644 tests/synthetic/test_tempo_scenario.py create mode 100644 tests/tools/test_tempo_tools.py diff --git a/app/agent/investigation.py b/app/agent/investigation.py index 324842155..74bd0d844 100644 --- a/app/agent/investigation.py +++ b/app/agent/investigation.py @@ -53,6 +53,7 @@ "azure": ["azure", "azure_sql"], "splunk": ["splunk"], "signoz": ["signoz"], + "tempo": ["tempo"], } # Callback type: called with (event_kind, data_dict) during the agent loop. diff --git a/app/integrations/_catalog_impl.py b/app/integrations/_catalog_impl.py index 4228bbaf9..f4091c78d 100644 --- a/app/integrations/_catalog_impl.py +++ b/app/integrations/_catalog_impl.py @@ -60,6 +60,7 @@ from app.integrations.signoz import build_signoz_config, signoz_config_from_env from app.integrations.store import _STRUCTURAL_RECORD_FIELDS, load_integrations from app.integrations.supabase import build_supabase_config +from app.integrations.tempo import build_tempo_config, tempo_config_from_env from app.llm_credentials import resolve_env_credential from app.services.vercel import VercelConfig from app.utils.coercion import safe_int @@ -950,6 +951,24 @@ def _classify_service_instance( return signoz_config.model_dump(), "signoz" return None, None + if key == "tempo": + try: + tempo_config = build_tempo_config( + { + "url": credentials.get("url", ""), + "api_key": credentials.get("api_key", ""), + "username": credentials.get("username", ""), + "password": credentials.get("password", ""), + "org_id": credentials.get("org_id", ""), + "integration_id": record_id, + } + ) + except Exception: + return None, None + if tempo_config.is_configured: + return tempo_config.model_dump(), "tempo" + return None, None + # Fallback for unknown services: pass through credentials + record id. return {"credentials": credentials, "integration_id": record_id}, key @@ -1871,6 +1890,18 @@ def load_env_integrations() -> list[dict[str, Any]]: except Exception: logger.debug("Failed to load SigNoz config from env", exc_info=True) + try: + tempo_config = tempo_config_from_env() + if tempo_config is not None and tempo_config.is_configured: + integrations.append( + _active_env_record( + "tempo", + tempo_config.model_dump(exclude={"integration_id"}), + ) + ) + except Exception: + logger.debug("Failed to load Tempo config from env", exc_info=True) + return integrations diff --git a/app/integrations/_verification_adapters.py b/app/integrations/_verification_adapters.py index 4b45b56dd..f5e7c22f9 100644 --- a/app/integrations/_verification_adapters.py +++ b/app/integrations/_verification_adapters.py @@ -35,6 +35,7 @@ from app.integrations.sentry import build_sentry_config, validate_sentry_config from app.integrations.signoz import build_signoz_config, validate_signoz_config from app.integrations.supabase import build_supabase_config, validate_supabase_config +from app.integrations.tempo import build_tempo_config, validate_tempo_config from app.services.alertmanager import AlertmanagerClient, AlertmanagerConfig from app.services.argocd import ArgoCDClient, ArgoCDConfig from app.services.coralogix import CoralogixClient @@ -543,6 +544,11 @@ def _verify_opensearch(source: str, config: dict[str, Any]) -> dict[str, str]: build_config=build_signoz_config, validate_config=validate_signoz_config, ) +_verify_tempo = build_validation_verifier( + "tempo", + build_config=build_tempo_config, + validate_config=validate_tempo_config, +) def _build_kafka_config(raw: dict[str, Any]) -> Any: @@ -705,6 +711,7 @@ def _verify_supabase(service: str, config: dict[str, Any]) -> dict[str, str]: "_verify_rabbitmq", "_verify_sentry", "_verify_signoz", + "_verify_tempo", "_verify_slack", "_verify_slack_without_test", "_verify_snowflake", diff --git a/app/integrations/cli.py b/app/integrations/cli.py index 9ab6a89a9..66d247b71 100644 --- a/app/integrations/cli.py +++ b/app/integrations/cli.py @@ -829,6 +829,23 @@ def _setup_signoz() -> None: ) +def _setup_tempo() -> None: + url = _p("Tempo URL (e.g. http://localhost:3200 for local Docker)") + if not url: + _die("Tempo URL is required.") + + api_key = _p("Tempo bearer token (optional, leave blank if none)", secret=True) + org_id = _p("Tempo tenant / X-Scope-OrgID (optional, leave blank if single-tenant)") + + credentials: dict[str, str] = {"url": url} + if api_key: + credentials["api_key"] = api_key + if org_id: + credentials["org_id"] = org_id + + upsert_integration("tempo", {"credentials": credentials}) + + _HANDLERS: dict[str, Any] = { "alertmanager": _setup_alertmanager, "aws": _setup_aws, @@ -857,6 +874,7 @@ def _setup_signoz() -> None: "postgresql": _setup_postgresql, "mysql": _setup_mysql, "signoz": _setup_signoz, + "tempo": _setup_tempo, } diff --git a/app/integrations/effective_models.py b/app/integrations/effective_models.py index 40a0d0037..08f7fe23d 100644 --- a/app/integrations/effective_models.py +++ b/app/integrations/effective_models.py @@ -99,3 +99,4 @@ class EffectiveIntegrations(StrictConfigModel): victoria_logs: EffectiveIntegrationEntry | None = None alicloud: EffectiveIntegrationEntry | None = None signoz: EffectiveIntegrationEntry | None = None + tempo: EffectiveIntegrationEntry | None = None diff --git a/app/integrations/registry.py b/app/integrations/registry.py index 480698680..c13f8505e 100644 --- a/app/integrations/registry.py +++ b/app/integrations/registry.py @@ -42,6 +42,7 @@ _verify_splunk, _verify_supabase, _verify_telegram, + _verify_tempo, _verify_tracer, _verify_twilio, _verify_vercel, @@ -358,6 +359,13 @@ class IntegrationSpec: setup_order=23, verify_order=35, ), + IntegrationSpec( + service="tempo", + verifier=_verify_tempo, + direct_effective=True, + setup_order=24, + verify_order=36, + ), ) INTEGRATION_SPECS_BY_SERVICE = {spec.service: spec for spec in INTEGRATION_SPECS} diff --git a/app/integrations/tempo.py b/app/integrations/tempo.py new file mode 100644 index 000000000..2aee4d22d --- /dev/null +++ b/app/integrations/tempo.py @@ -0,0 +1,155 @@ +"""Grafana Tempo integration helpers. + +Provides configuration and connectivity validation for a standalone Grafana +Tempo backend via its HTTP API (``TEMPO_URL`` plus optional auth). Unlike the +Grafana Cloud integration, this talks to Tempo directly and does not require a +Grafana instance or datasource proxy. +""" + +from __future__ import annotations + +import base64 +import logging +import os +from dataclasses import dataclass +from typing import Any + +import httpx +from pydantic import Field + +from app.integrations._validation_helpers import report_validation_failure +from app.strict_config import StrictConfigModel + +logger = logging.getLogger(__name__) + +DEFAULT_TEMPO_TIMEOUT_SECONDS = 10.0 +DEFAULT_TEMPO_MAX_RESULTS = 20 + + +class TempoConfig(StrictConfigModel): + """Normalized Grafana Tempo connection settings.""" + + url: str = "" + api_key: str = "" + username: str = "" + password: str = "" + org_id: str = "" + timeout_seconds: float = Field(default=DEFAULT_TEMPO_TIMEOUT_SECONDS, gt=0) + max_results: int = Field(default=DEFAULT_TEMPO_MAX_RESULTS, gt=0, le=200) + integration_id: str = "" + + @property + def is_configured(self) -> bool: + # Tempo commonly runs without auth behind a gateway, so a URL alone is + # enough; auth headers are added only when credentials are present. + return bool(self.url) + + def auth_headers(self) -> dict[str, str]: + """Build request headers for the Tempo HTTP API.""" + headers = {"Accept": "application/json"} + if self.username and self.password: + token = base64.b64encode(f"{self.username}:{self.password}".encode()).decode() + headers["Authorization"] = f"Basic {token}" + elif self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + if self.org_id: + headers["X-Scope-OrgID"] = self.org_id + return headers + + def base_url(self) -> str: + return self.url.rstrip("/") + + +@dataclass(frozen=True) +class TempoValidationResult: + """Result of validating a Tempo integration.""" + + ok: bool + detail: str + + +def build_tempo_config(raw: dict[str, Any] | None) -> TempoConfig: + """Build a normalized Tempo config object from env/store data.""" + return TempoConfig.model_validate(raw or {}) + + +def tempo_config_from_env() -> TempoConfig | None: + """Load a Tempo config from env vars.""" + url = os.getenv("TEMPO_URL", "").strip() + if not url: + return None + + return build_tempo_config( + { + "url": url, + "api_key": os.getenv("TEMPO_API_KEY", "").strip(), + "username": os.getenv("TEMPO_USERNAME", "").strip(), + "password": os.getenv("TEMPO_PASSWORD", "").strip(), + "org_id": os.getenv("TEMPO_ORG_ID", "").strip(), + } + ) + + +def validate_tempo_config(config: TempoConfig) -> TempoValidationResult: + """Validate Tempo HTTP API connectivity via the tag-search endpoint.""" + if not config.is_configured: + return TempoValidationResult( + ok=False, + detail="Tempo configuration is incomplete. Provide TEMPO_URL.", + ) + + try: + response = httpx.get( + f"{config.base_url()}/api/search/tags", + headers=config.auth_headers(), + params={"limit": 1}, + timeout=config.timeout_seconds, + ) + response.raise_for_status() + return TempoValidationResult( + ok=True, + detail="Connected to Grafana Tempo HTTP API (/api/search, /api/traces).", + ) + except httpx.HTTPStatusError as err: + snippet = err.response.text[:200].strip() + detail = ( + f"HTTP {err.response.status_code}: {snippet}" + if snippet + else f"HTTP {err.response.status_code}" + ) + return TempoValidationResult( + ok=False, + detail=f"Tempo API validation failed: {detail}", + ) + except Exception as err: + report_validation_failure( + err, + logger=logger, + integration="tempo", + method="validate_tempo_config", + ) + return TempoValidationResult( + ok=False, + detail=f"Tempo API validation failed: {err}", + ) + + +def tempo_is_available(sources: dict[str, dict]) -> bool: + """Check if Tempo integration params are present in available sources.""" + return bool(sources.get("tempo", {}).get("connection_verified")) + + +def tempo_extract_params(sources: dict[str, dict]) -> dict[str, Any]: + """Extract Tempo connection params from resolved integrations. + + Credentials are resolved from the integration store or environment, so the + LLM never needs to supply the URL or auth directly. + """ + tempo = sources.get("tempo", {}) + return { + "url": str(tempo.get("url", "")).strip(), + "api_key": str(tempo.get("api_key", "")).strip(), + "username": str(tempo.get("username", "")).strip(), + "password": str(tempo.get("password", "")).strip(), + "org_id": str(tempo.get("org_id", "")).strip(), + } diff --git a/app/integrations/verify.py b/app/integrations/verify.py index 4c048c212..5ead2a5dc 100644 --- a/app/integrations/verify.py +++ b/app/integrations/verify.py @@ -43,6 +43,7 @@ _verify_rabbitmq = _adapters._verify_rabbitmq _verify_sentry = _adapters._verify_sentry _verify_signoz = _adapters._verify_signoz +_verify_tempo = _adapters._verify_tempo _verify_slack = _adapters._verify_slack _verify_snowflake = _adapters._verify_snowflake _verify_splunk = _adapters._verify_splunk @@ -188,6 +189,7 @@ def verification_exit_code( "_verify_rabbitmq", "_verify_sentry", "_verify_signoz", + "_verify_tempo", "_verify_slack", "_verify_snowflake", "_verify_splunk", diff --git a/app/services/grafana/tempo.py b/app/services/grafana/tempo.py index d99442daa..7c3a49f90 100644 --- a/app/services/grafana/tempo.py +++ b/app/services/grafana/tempo.py @@ -7,6 +7,7 @@ import requests +from app.services.otlp_trace import extract_span_attributes, parse_otlp_trace from app.utils.errors import report_exception if TYPE_CHECKING: @@ -114,24 +115,7 @@ def _get_trace_details( # type: ignore[misc] ) if response.status_code == 200: - trace_data = response.json() - spans = [] - - if "batches" in trace_data: - for batch in trace_data["batches"]: - if "scopeSpans" in batch: - for scope in batch["scopeSpans"]: - if "spans" in scope: - for span in scope["spans"]: - attributes = self._extract_span_attributes(span) # type: ignore[attr-defined] - spans.append( - { - "name": span.get("name", "unknown"), - "attributes": attributes, - } - ) - - return {"spans": spans} + return {"spans": parse_otlp_trace(response.json())} except Exception as exc: report_exception( exc, @@ -153,26 +137,5 @@ def _extract_span_attributes( # type: ignore[misc] self: GrafanaClientBase, span: dict[str, Any], ) -> dict[str, Any]: - """Extract attributes from a span. - - Args: - span: Span data dictionary - - Returns: - Dictionary of attribute key-value pairs - """ - attributes: dict[str, Any] = {} - - if "attributes" in span: - for attr in span["attributes"]: - key = attr.get("key", "") - if not key: - continue - value = attr.get("value", {}) - - if "stringValue" in value: - attributes[key] = value["stringValue"] - elif "intValue" in value: - attributes[key] = value["intValue"] - - return attributes + """Extract attributes from a span (delegates to the shared OTLP parser).""" + return extract_span_attributes(span) diff --git a/app/services/otlp_trace.py b/app/services/otlp_trace.py new file mode 100644 index 000000000..c83504d1d --- /dev/null +++ b/app/services/otlp_trace.py @@ -0,0 +1,90 @@ +"""Shared parsing for OTLP/JSON trace payloads. + +Used by any client that fetches a single trace in OpenTelemetry JSON form +(``{"batches": [{"resource": ..., "scopeSpans": [{"spans": [...]}]}]}``): +the standalone Tempo client and the Grafana Cloud Tempo mixin both consume it. +""" + +from __future__ import annotations + +from typing import Any + + +def extract_span_attributes(span: dict[str, Any]) -> dict[str, Any]: + """Flatten an OTLP attribute list into a plain key -> value mapping. + + Handles the common OTLP/JSON value kinds (string, int, bool, double). + Attributes without a key or with an unsupported value kind are skipped. + """ + attributes: dict[str, Any] = {} + + for attr in span.get("attributes", []): + key = attr.get("key", "") + if not key: + continue + value = attr.get("value", {}) + + if "stringValue" in value: + attributes[key] = value["stringValue"] + elif "intValue" in value: + attributes[key] = value["intValue"] + elif "boolValue" in value: + attributes[key] = value["boolValue"] + elif "doubleValue" in value: + attributes[key] = value["doubleValue"] + + return attributes + + +def _duration_ms(start_unix_nano: Any, end_unix_nano: Any) -> float: + """Span duration in milliseconds from OTLP nanosecond timestamps.""" + try: + start = int(start_unix_nano) + end = int(end_unix_nano) + except (TypeError, ValueError): + return 0.0 + if end <= start: + return 0.0 + return round((end - start) / 1_000_000, 4) + + +def parse_otlp_trace(trace_data: dict[str, Any]) -> list[dict[str, Any]]: + """Parse an OTLP/JSON trace into a flat list of span dicts. + + The ``service_name`` is lifted from each batch's resource attributes so + callers can correlate spans to services without a nested lookup. + """ + spans: list[dict[str, Any]] = [] + + for batch in trace_data.get("batches", []): + if not isinstance(batch, dict): + continue + resource_attributes = extract_span_attributes(batch.get("resource", {})) + service_name = str(resource_attributes.get("service.name", "")) + + for scope in batch.get("scopeSpans", []): + if not isinstance(scope, dict): + continue + for span in scope.get("spans", []): + if not isinstance(span, dict): + continue + status = span.get("status") or {} + spans.append( + { + "name": span.get("name", "unknown"), + "span_id": span.get("spanId", ""), + "parent_span_id": span.get("parentSpanId", ""), + "trace_id": span.get("traceId", ""), + "kind": span.get("kind", ""), + "service_name": service_name, + "duration_ms": _duration_ms( + span.get("startTimeUnixNano"), + span.get("endTimeUnixNano"), + ), + "status_code": status.get("code", ""), + "status_message": status.get("message", ""), + "attributes": extract_span_attributes(span), + } + ) + + return spans diff --git a/app/services/tempo/__init__.py b/app/services/tempo/__init__.py new file mode 100644 index 000000000..3bc0a3f95 --- /dev/null +++ b/app/services/tempo/__init__.py @@ -0,0 +1,5 @@ +"""Grafana Tempo service client package.""" + +from app.services.tempo.client import TempoClient + +__all__ = ["TempoClient"] diff --git a/app/services/tempo/client.py b/app/services/tempo/client.py new file mode 100644 index 000000000..ea76b7fe0 --- /dev/null +++ b/app/services/tempo/client.py @@ -0,0 +1,306 @@ +"""Grafana Tempo query client. + +Read-only access to a standalone Tempo backend via its HTTP API: + +* ``GET /api/traces/{id}`` — fetch a full trace (OTLP/JSON) +* ``GET /api/search`` — search traces with TraceQL +* ``GET /api/v2/search/tag/{tag}/values`` — list services / span names +""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime, timedelta +from typing import Any + +import httpx + +from app.integrations.tempo import TempoConfig +from app.services.otlp_trace import parse_otlp_trace + +logger = logging.getLogger(__name__) + +DEFAULT_TIME_RANGE_MINUTES = 60 +_NOT_CONFIGURED_ERROR = "Tempo not configured. Set TEMPO_URL." + +# Scoped tag names used by Tempo's tag-values endpoint. +_SERVICE_NAME_TAG = "resource.service.name" +_SPAN_NAME_TAG = "name" + + +def _escape_traceql_value(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"') + + +def _time_bounds_seconds(minutes: int) -> tuple[int, int]: + """Return (start, end) as unix seconds for the last *minutes*.""" + end = datetime.now(UTC) + start = end - timedelta(minutes=max(1, minutes)) + return int(start.timestamp()), int(end.timestamp()) + + +def _parse_tag_values(payload: dict[str, Any]) -> list[str]: + """Parse a Tempo tag-values response (v1 strings or v2 typed objects).""" + values: list[str] = [] + for item in payload.get("tagValues", []) or []: + if isinstance(item, dict): + value = item.get("value") + if value: + values.append(str(value)) + elif item: + values.append(str(item)) + return values + + +def _parse_search_traces(payload: dict[str, Any]) -> list[dict[str, Any]]: + """Parse the trace summaries returned by ``GET /api/search``.""" + results: list[dict[str, Any]] = [] + for trace in payload.get("traces", []) or []: + if not isinstance(trace, dict): + continue + span_set = trace.get("spanSet") or {} + matched = span_set.get("matched") + if matched is None: + matched = len(span_set.get("spans", []) or []) + try: + duration_ms = round(int(trace.get("durationMs", 0)), 4) + except (TypeError, ValueError): + duration_ms = 0 + results.append( + { + "trace_id": str(trace.get("traceID", "")), + "root_service_name": str(trace.get("rootServiceName", "")), + "root_trace_name": str(trace.get("rootTraceName", "")), + "start_time_unix_nano": str(trace.get("startTimeUnixNano", "")), + "duration_ms": duration_ms, + "matched_spans": matched, + } + ) + return results + + +class TempoClient: + """Read-only Grafana Tempo client over the HTTP API.""" + + def __init__(self, config: TempoConfig) -> None: + self.config = config + + def _configuration_error(self) -> str | None: + if self.config.is_configured: + return None + return _NOT_CONFIGURED_ERROR + + def _get( + self, path: str, params: dict[str, Any] | None = None + ) -> tuple[dict[str, Any] | None, str | None]: + try: + response = httpx.get( + f"{self.config.base_url()}{path}", + headers=self.config.auth_headers(), + params=params, + timeout=self.config.timeout_seconds, + ) + response.raise_for_status() + parsed = response.json() + return (parsed if isinstance(parsed, dict) else {}), None + except httpx.HTTPStatusError as err: + snippet = err.response.text[:200].strip() + if snippet: + return None, f"HTTP {err.response.status_code}: {snippet}" + return None, f"HTTP {err.response.status_code}" + except Exception as err: + return None, str(err) + + def _clamped_limit(self, limit: int) -> int: + return max(1, min(limit, self.config.max_results)) + + # ----------------------------------------------------------- get by id + + def get_trace_by_id(self, trace_id: str) -> dict[str, Any]: + """Fetch a full trace by ID and flatten it into spans.""" + config_error = self._configuration_error() + if config_error: + return { + "source": "tempo", + "action": "get_trace", + "available": False, + "error": config_error, + "spans": [], + } + if not trace_id: + return { + "source": "tempo", + "action": "get_trace", + "available": False, + "error": "trace_id is required for get_trace.", + "spans": [], + } + + payload, error = self._get(f"/api/traces/{trace_id}") + if error: + return { + "source": "tempo", + "action": "get_trace", + "available": False, + "trace_id": trace_id, + "error": error, + "spans": [], + } + + spans = parse_otlp_trace(payload or {}) + return { + "source": "tempo", + "action": "get_trace", + "available": True, + "trace_id": trace_id, + "total_spans": len(spans), + "spans": spans, + } + + # ------------------------------------------------------------- search + + def search_traces( + self, + service: str | None = None, + span_name: str | None = None, + min_duration_ms: float | None = None, + max_duration_ms: float | None = None, + tags: dict[str, str] | None = None, + time_range_minutes: int = DEFAULT_TIME_RANGE_MINUTES, + limit: int = 20, + ) -> dict[str, Any]: + """Search traces by service, span name, duration, and tags via TraceQL.""" + config_error = self._configuration_error() + if config_error: + return { + "source": "tempo", + "action": "search", + "available": False, + "error": config_error, + "traces": [], + } + + traceql = self._build_traceql( + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + ) + start, end = _time_bounds_seconds(time_range_minutes) + params: dict[str, Any] = { + "q": traceql, + "limit": self._clamped_limit(limit), + "start": start, + "end": end, + } + + payload, error = self._get("/api/search", params=params) + if error: + return { + "source": "tempo", + "action": "search", + "available": False, + "query": traceql, + "error": error, + "traces": [], + } + + traces = _parse_search_traces(payload or {}) + return { + "source": "tempo", + "action": "search", + "available": True, + "query": traceql, + "total": len(traces), + "traces": traces, + } + + @staticmethod + def _build_traceql( + *, + service: str | None, + span_name: str | None, + min_duration_ms: float | None, + max_duration_ms: float | None, + tags: dict[str, str] | None, + ) -> str: + parts: list[str] = [] + if service: + parts.append(f'resource.service.name = "{_escape_traceql_value(service)}"') + if span_name: + parts.append(f'name = "{_escape_traceql_value(span_name)}"') + if min_duration_ms is not None and min_duration_ms > 0: + parts.append(f"duration > {min_duration_ms}ms") + if max_duration_ms is not None and max_duration_ms > 0: + parts.append(f"duration < {max_duration_ms}ms") + for key, value in (tags or {}).items(): + if not key: + continue + parts.append(f'span.{key} = "{_escape_traceql_value(str(value))}"') + if not parts: + return "{}" + return "{ " + " && ".join(parts) + " }" + + # ----------------------------------------------------- list tag values + + def list_services(self, time_range_minutes: int = DEFAULT_TIME_RANGE_MINUTES) -> dict[str, Any]: + """List service names registered in Tempo.""" + return self._list_tag_values( + tag=_SERVICE_NAME_TAG, + result_key="services", + time_range_minutes=time_range_minutes, + action="list_services", + ) + + def list_span_names( + self, time_range_minutes: int = DEFAULT_TIME_RANGE_MINUTES + ) -> dict[str, Any]: + """List span names registered in Tempo.""" + return self._list_tag_values( + tag=_SPAN_NAME_TAG, + result_key="span_names", + time_range_minutes=time_range_minutes, + action="list_span_names", + ) + + def _list_tag_values( + self, + *, + tag: str, + result_key: str, + time_range_minutes: int, + action: str, + ) -> dict[str, Any]: + config_error = self._configuration_error() + if config_error: + return { + "source": "tempo", + "action": action, + "available": False, + "error": config_error, + result_key: [], + } + + start, end = _time_bounds_seconds(time_range_minutes) + payload, error = self._get( + f"/api/v2/search/tag/{tag}/values", + params={"start": start, "end": end}, + ) + if error: + return { + "source": "tempo", + "action": action, + "available": False, + "error": error, + result_key: [], + } + + values = _parse_tag_values(payload or {}) + return { + "source": "tempo", + "action": action, + "available": True, + "total": len(values), + result_key: values, + } diff --git a/app/tools/TempoTool/__init__.py b/app/tools/TempoTool/__init__.py new file mode 100644 index 000000000..1c4adf8ec --- /dev/null +++ b/app/tools/TempoTool/__init__.py @@ -0,0 +1,164 @@ +"""Grafana Tempo trace query tool (single action-based entrypoint).""" + +from __future__ import annotations + +from typing import Any + +from app.integrations.tempo import TempoConfig, tempo_extract_params +from app.services.tempo.client import TempoClient +from app.tools.tool_decorator import tool +from app.tools.utils.availability import tempo_available_or_backend + +_VALID_ACTIONS = ("search", "get_trace", "list_services", "list_span_names") + + +def _tempo_is_available(sources: dict[str, dict]) -> bool: + if tempo_available_or_backend(sources): + return True + return bool(sources.get("tempo", {}).get("url")) + + +def _tempo_extract_params(sources: dict[str, dict]) -> dict[str, Any]: + tempo = sources.get("tempo", {}) + return { + **tempo_extract_params(sources), + "action": "search", + "service": tempo.get("service_name", ""), + "time_range_minutes": tempo.get("time_range_minutes", 60), + "limit": 20, + "tempo_backend": tempo.get("_backend"), + } + + +def _dispatch( + client: Any, + *, + action: str, + trace_id: str | None, + service: str | None, + span_name: str | None, + min_duration_ms: float | None, + max_duration_ms: float | None, + tags: dict[str, str] | None, + time_range_minutes: int, + limit: int, +) -> dict[str, Any]: + result: dict[str, Any] + if action == "get_trace": + result = client.get_trace_by_id(trace_id or "") + elif action == "list_services": + result = client.list_services(time_range_minutes=time_range_minutes) + elif action == "list_span_names": + result = client.list_span_names(time_range_minutes=time_range_minutes) + else: + result = client.search_traces( + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + time_range_minutes=time_range_minutes, + limit=limit, + ) + return result + + +@tool( + name="query_tempo", + display_name="Grafana Tempo", + source="tempo", + tags=("traces", "observability"), + cost_tier="moderate", + description=( + "Query a standalone Grafana Tempo backend for distributed traces. " + "Use 'action' to pick: search traces, fetch a trace by ID, or list " + "registered services / span names." + ), + use_cases=[ + "Fetching a full trace by trace ID to inspect its spans", + "Searching traces by service, span name, duration, or tags", + "Listing services and span names registered in Tempo", + "Correlating slow or error spans with logs and metrics", + ], + requires=[], + input_schema={ + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": list(_VALID_ACTIONS), + "default": "search", + "description": "Which Tempo query to run.", + }, + "trace_id": { + "type": "string", + "description": "Trace ID to fetch (required when action='get_trace').", + }, + "service": {"type": "string", "description": "Service name filter for search."}, + "span_name": {"type": "string", "description": "Span name filter for search."}, + "min_duration_ms": {"type": "number", "description": "Minimum span duration (ms)."}, + "max_duration_ms": {"type": "number", "description": "Maximum span duration (ms)."}, + "tags": { + "type": "object", + "description": "Span attribute filters (key -> value), applied as span..", + }, + "time_range_minutes": {"type": "integer", "default": 60}, + "limit": {"type": "integer", "default": 20}, + }, + "required": [], + }, + is_available=_tempo_is_available, + extract_params=_tempo_extract_params, +) +def query_tempo( + action: str = "search", + trace_id: str | None = None, + service: str | None = None, + span_name: str | None = None, + min_duration_ms: float | None = None, + max_duration_ms: float | None = None, + tags: dict[str, str] | None = None, + time_range_minutes: int = 60, + limit: int = 20, + tempo_backend: Any = None, + **_kwargs: Any, +) -> dict[str, Any]: + """Query Grafana Tempo for traces, a single trace, or registered tag values.""" + if action not in _VALID_ACTIONS: + action = "search" + + if tempo_backend is not None: + return _dispatch( + tempo_backend, + action=action, + trace_id=trace_id, + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + time_range_minutes=time_range_minutes, + limit=limit, + ) + + config = TempoConfig.model_validate(_kwargs) + if not config.is_configured: + return { + "source": "tempo", + "action": action, + "available": False, + "error": "Tempo not configured. Provide TEMPO_URL.", + } + + return _dispatch( + TempoClient(config), + action=action, + trace_id=trace_id, + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + time_range_minutes=time_range_minutes, + limit=limit, + ) diff --git a/app/tools/utils/availability.py b/app/tools/utils/availability.py index 29c865355..3d3e4ea7a 100644 --- a/app/tools/utils/availability.py +++ b/app/tools/utils/availability.py @@ -68,6 +68,18 @@ def signoz_available_or_backend(sources: dict[str, dict]) -> bool: return bool(signoz.get("connection_verified") and signoz.get("url") and signoz.get("api_key")) +def tempo_available_or_backend(sources: dict[str, dict]) -> bool: + """Available when a real Tempo URL is present OR a fixture backend is injected. + + Used by the Tempo tool wrapper whose ``extract_params`` can delegate to a + mock ``tempo_backend`` for synthetic tests. + """ + tempo = sources.get("tempo", {}) + if tempo.get("_backend"): + return True + return bool(tempo.get("connection_verified") and tempo.get("url")) + + def hermes_available_or_backend(sources: dict[str, dict]) -> bool: """Available when Hermes integration is connected or a fixture backend is injected.""" hermes = sources.get("hermes", {}) diff --git a/app/types/evidence.py b/app/types/evidence.py index a671651ef..83b929de4 100644 --- a/app/types/evidence.py +++ b/app/types/evidence.py @@ -44,6 +44,7 @@ "opensearch", "alertmanager", "signoz", + "tempo", "splunk", "supabase", "airflow", diff --git a/docs/docs.json b/docs/docs.json index 569e5f24c..79e83ebce 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -113,6 +113,7 @@ "opsgenie", "sentry", "signoz", + "tempo", "splunk" ] }, diff --git a/docs/tempo.mdx b/docs/tempo.mdx new file mode 100644 index 000000000..b09eba53f --- /dev/null +++ b/docs/tempo.mdx @@ -0,0 +1,105 @@ +# Grafana Tempo Integration + +Query distributed traces from a standalone [Grafana Tempo](https://grafana.com/oss/tempo/) +backend via its HTTP API. + +This integration talks to Tempo **directly** (its own URL and auth) — it does not +require a Grafana instance or datasource proxy. If you run the full Grafana stack, +the [Grafana](/grafana) integration already surfaces Tempo through the datasource +proxy; use this integration when you run Tempo on its own (alongside Loki/Mimir or +any OpenTelemetry pipeline). + +OpenSRE exposes a single tool, `query_tempo`, with an `action` parameter: + +| action | Tempo endpoint | What it does | +| --- | --- | --- | +| `search` (default) | `GET /api/search` | Search traces by service, span name, duration, and tags (TraceQL) | +| `get_trace` | `GET /api/traces/{id}` | Fetch a full trace by ID and flatten its spans | +| `list_services` | `GET /api/v2/search/tag/resource.service.name/values` | List services registered in Tempo | +| `list_span_names` | `GET /api/v2/search/tag/name/values` | List span names registered in Tempo | + +## Quick Start + +### 1. Configure environment variables + +Tempo commonly runs without auth behind a gateway, so a URL alone is enough. +Add a bearer token, basic-auth tenant credentials, or a multi-tenant org ID only +if your deployment requires them. + +```bash +export TEMPO_URL="http://localhost:3200" # required +export TEMPO_API_KEY="" # optional +export TEMPO_USERNAME="" # optional (basic auth) +export TEMPO_PASSWORD="" # optional (basic auth) +export TEMPO_ORG_ID="" # optional (X-Scope-OrgID) +``` + +Or configure it interactively: + +```bash +uv run opensre integrations setup tempo +``` + +### 2. Verify connectivity + +```bash +uv run opensre integrations verify tempo +``` + +This calls `GET /api/search/tags` and confirms the API is reachable (and that auth +passes, if configured). + +## Usage examples + +The agent selects the tool and action automatically during an investigation. The +underlying calls map to: + +```text +# Search recent error-ish traces for a service, slower than 500ms +query_tempo(action="search", service="checkout-service", min_duration_ms=500) + +# Filter by a span attribute (applied as span.) +query_tempo(action="search", tags={"http.status_code": "500"}) + +# Inspect one trace end to end +query_tempo(action="get_trace", trace_id="4f8c...e21") + +# Discover what is instrumented +query_tempo(action="list_services") +query_tempo(action="list_span_names") +``` + +`search` accepts `service`, `span_name`, `min_duration_ms`, `max_duration_ms`, +`tags`, `time_range_minutes` (default 60), and `limit` (default 20). String filters +are compiled into a TraceQL expression, e.g. +`{ resource.service.name = "checkout-service" && duration > 500ms }`. + +## Output shape + +`get_trace` returns flattened spans (shared OTLP parser): + +```json +{ + "source": "tempo", + "action": "get_trace", + "available": true, + "trace_id": "4f8c...e21", + "total_spans": 2, + "spans": [ + { + "name": "POST /checkout", + "span_id": "span-1", + "parent_span_id": "", + "service_name": "checkout-service", + "duration_ms": 2400.0, + "status_code": 2, + "status_message": "upstream timeout", + "attributes": {"http.status_code": "504"} + } + ] +} +``` + +`search` returns one summary row per trace (`trace_id`, `root_service_name`, +`root_trace_name`, `duration_ms`, `matched_spans`). `list_services` / +`list_span_names` return a flat list under `services` / `span_names`. diff --git a/tests/integrations/test_tempo.py b/tests/integrations/test_tempo.py new file mode 100644 index 000000000..c62a1ca9d --- /dev/null +++ b/tests/integrations/test_tempo.py @@ -0,0 +1,124 @@ +"""Unit tests for the Grafana Tempo integration module.""" + +from app.integrations.catalog import load_env_integrations +from app.integrations.tempo import ( + TempoConfig, + build_tempo_config, + tempo_config_from_env, + tempo_extract_params, + tempo_is_available, + validate_tempo_config, +) + + +class TestTempoConfig: + def test_defaults(self) -> None: + config = TempoConfig() + assert config.url == "" + assert config.api_key == "" + assert config.timeout_seconds == 10.0 + assert config.max_results == 20 + + def test_is_configured_with_url_only(self) -> None: + config = TempoConfig(url="http://localhost:3200") + assert config.is_configured is True + + def test_is_configured_without_url(self) -> None: + assert TempoConfig(api_key="token").is_configured is False + + def test_auth_headers_bearer(self) -> None: + headers = TempoConfig(url="http://x", api_key="token").auth_headers() + assert headers["Authorization"] == "Bearer token" + assert headers["Accept"] == "application/json" + + def test_auth_headers_basic_and_org(self) -> None: + headers = TempoConfig( + url="http://x", username="u", password="p", org_id="42" + ).auth_headers() + assert headers["Authorization"].startswith("Basic ") + assert headers["X-Scope-OrgID"] == "42" + + def test_auth_headers_none(self) -> None: + headers = TempoConfig(url="http://x").auth_headers() + assert "Authorization" not in headers + + +class TestBuildTempoConfig: + def test_from_dict(self) -> None: + config = build_tempo_config({"url": "http://tempo.example.com", "api_key": "secret"}) + assert config.url == "http://tempo.example.com" + assert config.api_key == "secret" + + def test_from_none(self) -> None: + assert build_tempo_config(None).is_configured is False + + +class TestTempoConfigFromEnv: + def test_returns_none_without_url(self, monkeypatch) -> None: + monkeypatch.delenv("TEMPO_URL", raising=False) + assert tempo_config_from_env() is None + + def test_returns_config_with_url(self, monkeypatch) -> None: + monkeypatch.setenv("TEMPO_URL", "http://localhost:3200") + monkeypatch.setenv("TEMPO_API_KEY", "token") + config = tempo_config_from_env() + assert config is not None + assert config.url == "http://localhost:3200" + assert config.api_key == "token" + assert config.is_configured is True + + +class TestTempoValidation: + def test_validate_requires_url(self) -> None: + result = validate_tempo_config(TempoConfig()) + assert result.ok is False + assert "TEMPO_URL" in result.detail + + def test_validate_hits_search_tags(self, monkeypatch) -> None: + class _FakeResponse: + def raise_for_status(self) -> None: + return None + + captured: dict[str, object] = {} + + def _fake_get(url: str, **kwargs: object) -> _FakeResponse: + captured["url"] = url + captured["headers"] = kwargs.get("headers") + return _FakeResponse() + + monkeypatch.setattr("app.integrations.tempo.httpx.get", _fake_get) + + result = validate_tempo_config(TempoConfig(url="http://localhost:3200")) + assert result.ok is True + assert str(captured["url"]).endswith("/api/search/tags") + + +class TestTempoExtractParams: + def test_extracts_params(self) -> None: + params = tempo_extract_params( + {"tempo": {"url": "http://tempo.example.com", "api_key": "key"}} + ) + assert params["url"] == "http://tempo.example.com" + assert params["api_key"] == "key" + + def test_uses_defaults_when_missing(self) -> None: + params = tempo_extract_params({}) + assert params["url"] == "" + assert params["api_key"] == "" + + +class TestTempoIsAvailable: + def test_available_when_connection_verified(self) -> None: + assert tempo_is_available({"tempo": {"connection_verified": True}}) is True + + def test_unavailable_without_connection_verified(self) -> None: + assert tempo_is_available({"tempo": {"url": "http://localhost:3200"}}) is False + + +class TestTempoEnvCatalogLoading: + def test_loads_from_env(self, monkeypatch) -> None: + monkeypatch.setenv("TEMPO_URL", "http://localhost:3200") + records = load_env_integrations() + tempo_records = [r for r in records if r.get("service") == "tempo"] + assert len(tempo_records) == 1 + assert tempo_records[0]["credentials"]["url"] == "http://localhost:3200" diff --git a/tests/services/test_grafana_tempo.py b/tests/services/test_grafana_tempo.py index a5ac591a8..f25d208f2 100644 --- a/tests/services/test_grafana_tempo.py +++ b/tests/services/test_grafana_tempo.py @@ -158,7 +158,8 @@ def test_extract_span_attributes_edge_cases(self): "attributes": [ {"key": "valid_string", "value": {"stringValue": "test"}}, {"key": "valid_int", "value": {"intValue": 42}}, - {"key": "unsupported_type", "value": {"boolValue": True}}, + {"key": "valid_bool", "value": {"boolValue": True}}, + {"key": "valid_double", "value": {"doubleValue": 1.5}}, {"key": "empty_value", "value": {}}, {"value": {"stringValue": "missing_key"}}, # Should be skipped! ] @@ -168,7 +169,8 @@ def test_extract_span_attributes_edge_cases(self): assert attributes.get("valid_string") == "test" assert attributes.get("valid_int") == 42 - assert "unsupported_type" not in attributes + assert attributes.get("valid_bool") is True + assert attributes.get("valid_double") == 1.5 assert "empty_value" not in attributes assert "" not in attributes diff --git a/tests/services/test_otlp_trace.py b/tests/services/test_otlp_trace.py new file mode 100644 index 000000000..837b85715 --- /dev/null +++ b/tests/services/test_otlp_trace.py @@ -0,0 +1,71 @@ +"""Unit tests for the shared OTLP/JSON trace parser.""" + +from __future__ import annotations + +from app.services.otlp_trace import extract_span_attributes, parse_otlp_trace + + +def test_extract_span_attributes_value_kinds() -> None: + span = { + "attributes": [ + {"key": "str", "value": {"stringValue": "v"}}, + {"key": "int", "value": {"intValue": "42"}}, + {"key": "bool", "value": {"boolValue": True}}, + {"key": "double", "value": {"doubleValue": 1.5}}, + {"key": "empty", "value": {}}, + {"value": {"stringValue": "no-key"}}, + ] + } + attrs = extract_span_attributes(span) + assert attrs == {"str": "v", "int": "42", "bool": True, "double": 1.5} + + +def test_parse_otlp_trace_flattens_spans_with_service_name() -> None: + trace = { + "batches": [ + { + "resource": { + "attributes": [{"key": "service.name", "value": {"stringValue": "checkout"}}] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "POST /checkout", + "spanId": "span-1", + "parentSpanId": "span-0", + "traceId": "trace-1", + "kind": 2, + "startTimeUnixNano": "1000000000", + "endTimeUnixNano": "1150000000", + "status": {"code": 2, "message": "boom"}, + "attributes": [ + { + "key": "http.status_code", + "value": {"intValue": "500"}, + } + ], + } + ] + } + ], + } + ] + } + spans = parse_otlp_trace(trace) + assert len(spans) == 1 + span = spans[0] + assert span["name"] == "POST /checkout" + assert span["service_name"] == "checkout" + assert span["span_id"] == "span-1" + assert span["parent_span_id"] == "span-0" + assert span["duration_ms"] == 150.0 + assert span["status_code"] == 2 + assert span["status_message"] == "boom" + assert span["attributes"]["http.status_code"] == "500" + + +def test_parse_otlp_trace_handles_empty_and_malformed() -> None: + assert parse_otlp_trace({}) == [] + assert parse_otlp_trace({"batches": ["not-a-dict"]}) == [] + assert parse_otlp_trace({"batches": [{"scopeSpans": [{"spans": []}]}]}) == [] diff --git a/tests/services/test_tempo_client.py b/tests/services/test_tempo_client.py new file mode 100644 index 000000000..5d5bd1f4d --- /dev/null +++ b/tests/services/test_tempo_client.py @@ -0,0 +1,177 @@ +"""Unit tests for the Grafana Tempo service client.""" + +from __future__ import annotations + +from typing import Any + +import httpx + +from app.integrations.tempo import TempoConfig +from app.services.tempo.client import TempoClient + + +class _FakeResponse: + def __init__(self, payload: dict[str, Any]) -> None: + self._payload = payload + + def raise_for_status(self) -> None: + return None + + def json(self) -> dict[str, Any]: + return self._payload + + +class _ErrorResponse: + def __init__(self, status_code: int, text: str) -> None: + self.status_code = status_code + self.text = text + self.request = httpx.Request("GET", "http://localhost") + + def raise_for_status(self) -> None: + raise httpx.HTTPStatusError( + f"error {self.status_code}", + request=self.request, + response=httpx.Response(self.status_code, request=self.request, text=self.text), + ) + + def json(self) -> dict[str, Any]: + return {} + + +def _client() -> TempoClient: + return TempoClient(TempoConfig(url="http://localhost:3200", api_key="token")) + + +def test_get_trace_requires_configuration() -> None: + result = TempoClient(TempoConfig()).get_trace_by_id("abc") + assert result["available"] is False + assert "TEMPO_URL" in result["error"] + + +def test_get_trace_requires_trace_id() -> None: + result = _client().get_trace_by_id("") + assert result["available"] is False + assert "trace_id is required" in result["error"] + + +def test_get_trace_by_id_parses_spans(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(url: str, **kwargs: Any) -> _FakeResponse: + captured["url"] = url + captured["headers"] = kwargs.get("headers") + return _FakeResponse( + { + "batches": [ + { + "resource": { + "attributes": [{"key": "service.name", "value": {"stringValue": "api"}}] + }, + "scopeSpans": [ + {"spans": [{"name": "GET /x", "spanId": "s1", "attributes": []}]} + ], + } + ] + } + ) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().get_trace_by_id("trace-1") + + assert result["available"] is True + assert result["trace_id"] == "trace-1" + assert result["total_spans"] == 1 + assert result["spans"][0]["service_name"] == "api" + assert captured["url"].endswith("/api/traces/trace-1") + assert captured["headers"]["Authorization"] == "Bearer token" + + +def test_search_traces_builds_traceql(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(url: str, **kwargs: Any) -> _FakeResponse: + captured["url"] = url + captured["params"] = kwargs.get("params") + return _FakeResponse( + { + "traces": [ + { + "traceID": "t1", + "rootServiceName": "api", + "rootTraceName": "GET /x", + "durationMs": 120, + "spanSet": {"matched": 3}, + } + ] + } + ) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().search_traces( + service="api", + span_name="GET /x", + min_duration_ms=100, + tags={"http.status_code": "500"}, + limit=5, + ) + + assert result["available"] is True + assert result["total"] == 1 + assert result["traces"][0]["trace_id"] == "t1" + assert result["traces"][0]["matched_spans"] == 3 + assert captured["url"].endswith("/api/search") + query = captured["params"]["q"] + assert 'resource.service.name = "api"' in query + assert 'name = "GET /x"' in query + assert "duration > 100ms" in query + assert 'span.http.status_code = "500"' in query + assert captured["params"]["limit"] == 5 + + +def test_search_traces_empty_query_when_no_filters(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(_url: str, **kwargs: Any) -> _FakeResponse: + captured["params"] = kwargs.get("params") + return _FakeResponse({"traces": []}) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().search_traces() + assert result["available"] is True + assert captured["params"]["q"] == "{}" + + +def test_list_services_parses_v2_tag_values(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(url: str, **_kwargs: Any) -> _FakeResponse: + captured["url"] = url + return _FakeResponse( + {"tagValues": [{"type": "string", "value": "frontend"}, {"value": "cartservice"}]} + ) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().list_services() + assert result["available"] is True + assert result["services"] == ["frontend", "cartservice"] + assert captured["url"].endswith("/api/v2/search/tag/resource.service.name/values") + + +def test_list_span_names_parses_v1_string_values(monkeypatch) -> None: + def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse: + return _FakeResponse({"tagValues": ["GET /a", "POST /b"]}) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().list_span_names() + assert result["available"] is True + assert result["span_names"] == ["GET /a", "POST /b"] + + +def test_search_traces_surfaces_http_error(monkeypatch) -> None: + def _fake_get(_url: str, **_kwargs: Any) -> _ErrorResponse: + return _ErrorResponse(403, "forbidden") + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().search_traces(service="api") + assert result["available"] is False + assert "403" in result["error"] diff --git a/tests/synthetic/signoz/__init__.py b/tests/synthetic/signoz/__init__.py new file mode 100644 index 000000000..ac6227ab8 --- /dev/null +++ b/tests/synthetic/signoz/__init__.py @@ -0,0 +1,10 @@ +"""Synthetic suite for the SigNoz integration. + +Exercises the four SigNoz tools (logs, traces, services, metrics) end-to-end +against realistic mocked SigNoz v5 query_range responses. The fixtures mirror +the actual envelope shape returned by SigNoz Cloud (outer ``{status, data}`` +wrapper around the ``QueryRangeResponse`` struct from +``pkg/types/querybuildertypes/querybuildertypesv5/resp.go`` in the SigNoz +source). Catches client-to-tool wiring bugs that pure unit tests miss +because they mock at the client boundary. +""" diff --git a/tests/synthetic/test_tempo_scenario.py b/tests/synthetic/test_tempo_scenario.py new file mode 100644 index 000000000..758723d81 --- /dev/null +++ b/tests/synthetic/test_tempo_scenario.py @@ -0,0 +1,123 @@ +"""Synthetic RCA scenario using Grafana Tempo as the evidence source. + +Validates that a Tempo alert seeds the correct tools and that a mock backend +returns realistic fixture trace data through the single action-based tool. +""" + +from __future__ import annotations + +from typing import Any + +from app.agent.investigation import _ALERT_SOURCE_TO_TOOL_SOURCES +from app.tools.TempoTool import query_tempo + + +class _FixtureTempoBackend: + """Minimal fixture backend for synthetic Tempo scenarios.""" + + def search_traces(self, **kwargs: Any) -> dict[str, Any]: + service = kwargs.get("service") or "checkout-service" + return { + "source": "tempo", + "action": "search", + "available": True, + "query": '{ resource.service.name = "checkout-service" }', + "total": 2, + "traces": [ + { + "trace_id": "trace-001", + "root_service_name": service, + "root_trace_name": "POST /checkout", + "start_time_unix_nano": "1716120000000000000", + "duration_ms": 2400, + "matched_spans": 8, + }, + { + "trace_id": "trace-002", + "root_service_name": service, + "root_trace_name": "POST /checkout", + "start_time_unix_nano": "1716120030000000000", + "duration_ms": 2100, + "matched_spans": 7, + }, + ], + } + + def get_trace_by_id(self, trace_id: str) -> dict[str, Any]: + return { + "source": "tempo", + "action": "get_trace", + "available": True, + "trace_id": trace_id, + "total_spans": 2, + "spans": [ + { + "name": "POST /checkout", + "span_id": "span-1", + "parent_span_id": "", + "service_name": "checkout-service", + "duration_ms": 2400.0, + "status_code": 2, + "status_message": "upstream timeout", + "attributes": {"http.status_code": "504"}, + }, + { + "name": "GET /inventory", + "span_id": "span-2", + "parent_span_id": "span-1", + "service_name": "inventory-service", + "duration_ms": 2300.0, + "status_code": 2, + "status_message": "deadline exceeded", + "attributes": {"rpc.grpc.status_code": "4"}, + }, + ], + } + + def list_services(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_services", + "available": True, + "total": 2, + "services": ["checkout-service", "inventory-service"], + } + + def list_span_names(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_span_names", + "available": True, + "total": 2, + "span_names": ["POST /checkout", "GET /inventory"], + } + + +def test_tempo_alert_source_maps_to_tools() -> None: + """Tempo alert source seeds tempo tools before the ReAct loop.""" + assert "tempo" in _ALERT_SOURCE_TO_TOOL_SOURCES + assert _ALERT_SOURCE_TO_TOOL_SOURCES["tempo"] == ["tempo"] + + +def test_tempo_search_synthetic_scenario() -> None: + backend = _FixtureTempoBackend() + result = query_tempo(service="checkout-service", tempo_backend=backend) + assert result["available"] is True + assert result["total"] == 2 + assert result["traces"][0]["duration_ms"] == 2400 + + +def test_tempo_get_trace_synthetic_scenario() -> None: + backend = _FixtureTempoBackend() + result = query_tempo(action="get_trace", trace_id="trace-001", tempo_backend=backend) + assert result["available"] is True + assert result["total_spans"] == 2 + assert any(s["service_name"] == "inventory-service" for s in result["spans"]) + assert result["spans"][0]["attributes"]["http.status_code"] == "504" + + +def test_tempo_list_services_synthetic_scenario() -> None: + backend = _FixtureTempoBackend() + result = query_tempo(action="list_services", tempo_backend=backend) + assert result["available"] is True + assert "checkout-service" in result["services"] diff --git a/tests/tools/test_tempo_tools.py b/tests/tools/test_tempo_tools.py new file mode 100644 index 000000000..097d918ec --- /dev/null +++ b/tests/tools/test_tempo_tools.py @@ -0,0 +1,91 @@ +"""Tests for the Grafana Tempo tool.""" + +from __future__ import annotations + +from typing import Any + +from app.tools.TempoTool import _tempo_is_available, query_tempo + + +class _FakeTempoBackend: + """Fake Tempo backend for tool dispatch tests.""" + + def get_trace_by_id(self, trace_id: str) -> dict[str, Any]: + return { + "source": "tempo", + "action": "get_trace", + "available": True, + "trace_id": trace_id, + "total_spans": 1, + "spans": [{"name": "GET /x", "service_name": "api"}], + } + + def search_traces(self, **kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "search", + "available": True, + "total": 1, + "traces": [{"trace_id": "t1", "root_service_name": kwargs.get("service") or "api"}], + } + + def list_services(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_services", + "available": True, + "total": 2, + "services": ["api", "worker"], + } + + def list_span_names(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_span_names", + "available": True, + "total": 1, + "span_names": ["GET /x"], + } + + +class TestTempoAvailability: + def test_available_with_url_only(self) -> None: + assert _tempo_is_available({"tempo": {"url": "http://localhost:3200"}}) is True + + def test_available_with_backend(self) -> None: + assert _tempo_is_available({"tempo": {"_backend": object()}}) is True + + def test_unavailable_when_empty(self) -> None: + assert _tempo_is_available({}) is False + + +class TestTempoToolDispatch: + def test_search_default_action(self) -> None: + result = query_tempo(service="api", tempo_backend=_FakeTempoBackend()) + assert result["action"] == "search" + assert result["traces"][0]["root_service_name"] == "api" + + def test_get_trace_action(self) -> None: + result = query_tempo( + action="get_trace", trace_id="trace-1", tempo_backend=_FakeTempoBackend() + ) + assert result["action"] == "get_trace" + assert result["trace_id"] == "trace-1" + assert result["spans"][0]["service_name"] == "api" + + def test_list_services_action(self) -> None: + result = query_tempo(action="list_services", tempo_backend=_FakeTempoBackend()) + assert result["services"] == ["api", "worker"] + + def test_list_span_names_action(self) -> None: + result = query_tempo(action="list_span_names", tempo_backend=_FakeTempoBackend()) + assert result["span_names"] == ["GET /x"] + + def test_invalid_action_falls_back_to_search(self) -> None: + result = query_tempo(action="bogus", tempo_backend=_FakeTempoBackend()) + assert result["action"] == "search" + + def test_not_configured_without_backend(self) -> None: + result = query_tempo(action="search") + assert result["available"] is False + assert "not configured" in result["error"].lower() From 41b39f88111143b6bb8762546f63e0395e356a5a Mon Sep 17 00:00:00 2001 From: Aniket Agarwal Date: Thu, 28 May 2026 23:57:17 +0530 Subject: [PATCH 2/7] feat(tempo): bug fix --- app/services/tempo/client.py | 2 +- tests/synthetic/signoz/__init__.py | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) delete mode 100644 tests/synthetic/signoz/__init__.py diff --git a/app/services/tempo/client.py b/app/services/tempo/client.py index ea76b7fe0..af7b4af0e 100644 --- a/app/services/tempo/client.py +++ b/app/services/tempo/client.py @@ -63,7 +63,7 @@ def _parse_search_traces(payload: dict[str, Any]) -> list[dict[str, Any]]: if matched is None: matched = len(span_set.get("spans", []) or []) try: - duration_ms = round(int(trace.get("durationMs", 0)), 4) + duration_ms = round(float(trace.get("durationMs", 0)), 4) except (TypeError, ValueError): duration_ms = 0 results.append( diff --git a/tests/synthetic/signoz/__init__.py b/tests/synthetic/signoz/__init__.py deleted file mode 100644 index ac6227ab8..000000000 --- a/tests/synthetic/signoz/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Synthetic suite for the SigNoz integration. - -Exercises the four SigNoz tools (logs, traces, services, metrics) end-to-end -against realistic mocked SigNoz v5 query_range responses. The fixtures mirror -the actual envelope shape returned by SigNoz Cloud (outer ``{status, data}`` -wrapper around the ``QueryRangeResponse`` struct from -``pkg/types/querybuildertypes/querybuildertypesv5/resp.go`` in the SigNoz -source). Catches client-to-tool wiring bugs that pure unit tests miss -because they mock at the client boundary. -""" From e55ba90427f6b390f07a3bad2f2f6b0e8a04467c Mon Sep 17 00:00:00 2001 From: Aniket Agarwal Date: Fri, 29 May 2026 13:14:47 +0530 Subject: [PATCH 3/7] feat(tempo): pr review comments --- app/services/tempo/client.py | 6 +++++- app/tools/TempoTool/__init__.py | 4 +--- tests/synthetic/test_tempo_scenario.py | 4 ++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/app/services/tempo/client.py b/app/services/tempo/client.py index af7b4af0e..2f406bf33 100644 --- a/app/services/tempo/client.py +++ b/app/services/tempo/client.py @@ -10,6 +10,7 @@ from __future__ import annotations import logging +import re from datetime import UTC, datetime, timedelta from typing import Any @@ -28,6 +29,9 @@ _SPAN_NAME_TAG = "name" +_VALID_TAG_KEY_RE = re.compile(r"^[A-Za-z0-9_.:-]+$") + + def _escape_traceql_value(value: str) -> str: return value.replace("\\", "\\\\").replace('"', '\\"') @@ -235,7 +239,7 @@ def _build_traceql( if max_duration_ms is not None and max_duration_ms > 0: parts.append(f"duration < {max_duration_ms}ms") for key, value in (tags or {}).items(): - if not key: + if not key or not _VALID_TAG_KEY_RE.match(key): continue parts.append(f'span.{key} = "{_escape_traceql_value(str(value))}"') if not parts: diff --git a/app/tools/TempoTool/__init__.py b/app/tools/TempoTool/__init__.py index 1c4adf8ec..14d9f48f9 100644 --- a/app/tools/TempoTool/__init__.py +++ b/app/tools/TempoTool/__init__.py @@ -13,9 +13,7 @@ def _tempo_is_available(sources: dict[str, dict]) -> bool: - if tempo_available_or_backend(sources): - return True - return bool(sources.get("tempo", {}).get("url")) + return tempo_available_or_backend(sources) def _tempo_extract_params(sources: dict[str, dict]) -> dict[str, Any]: diff --git a/tests/synthetic/test_tempo_scenario.py b/tests/synthetic/test_tempo_scenario.py index 758723d81..109c5055e 100644 --- a/tests/synthetic/test_tempo_scenario.py +++ b/tests/synthetic/test_tempo_scenario.py @@ -8,6 +8,10 @@ from typing import Any +import pytest + +pytestmark = pytest.mark.synthetic + from app.agent.investigation import _ALERT_SOURCE_TO_TOOL_SOURCES from app.tools.TempoTool import query_tempo From 83eda948243836ca976ea42870680bff02a68e91 Mon Sep 17 00:00:00 2001 From: Aniket Agarwal Date: Fri, 29 May 2026 20:34:22 +0530 Subject: [PATCH 4/7] feat(tempo): test fix --- app/tools/utils/availability.py | 2 +- tests/tools/test_telemetry.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/app/tools/utils/availability.py b/app/tools/utils/availability.py index 3d3e4ea7a..c27440d19 100644 --- a/app/tools/utils/availability.py +++ b/app/tools/utils/availability.py @@ -77,7 +77,7 @@ def tempo_available_or_backend(sources: dict[str, dict]) -> bool: tempo = sources.get("tempo", {}) if tempo.get("_backend"): return True - return bool(tempo.get("connection_verified") and tempo.get("url")) + return bool(tempo.get("url")) def hermes_available_or_backend(sources: dict[str, dict]) -> bool: diff --git a/tests/tools/test_telemetry.py b/tests/tools/test_telemetry.py index c1d055c1a..ab4be2b62 100644 --- a/tests/tools/test_telemetry.py +++ b/tests/tools/test_telemetry.py @@ -948,6 +948,7 @@ def _describe(_cluster: str, ng: str) -> dict[str, Any]: "query_signoz_metrics", "query_signoz_traces", "query_splunk_logs", + "query_tempo", "run_diagnostic_code", "search_bitbucket_code", "search_github_code", From 9e0cf759921675fbf21245429ccc86e819c3acda Mon Sep 17 00:00:00 2001 From: Aniket Agarwal Date: Sun, 31 May 2026 12:38:49 +0530 Subject: [PATCH 5/7] feat(tempo): updated tempo docs --- app/cli/wizard/flow.py | 81 ++++++++ app/cli/wizard/integration_health.py | 2 + .../client_validators.py | 20 ++ docs/tempo.mdx | 177 +++++++++++------- 4 files changed, 210 insertions(+), 70 deletions(-) diff --git a/app/cli/wizard/flow.py b/app/cli/wizard/flow.py index a40f9baf9..7646438eb 100644 --- a/app/cli/wizard/flow.py +++ b/app/cli/wizard/flow.py @@ -211,6 +211,12 @@ def validate_splunk_integration(**kwargs): return _validate(**kwargs) +def validate_tempo_integration(**kwargs): + from app.cli.wizard.integration_health import validate_tempo_integration as _validate + + return _validate(**kwargs) + + def get_sentry_auth_recommendations(): from app.integrations.sentry import get_sentry_auth_recommendations as _get @@ -1681,6 +1687,67 @@ def _configure_telegram() -> tuple[str, str]: _console.print(f"[{SECONDARY}]Try again or press Ctrl+C to cancel.[/]") +def _configure_signoz() -> tuple[str, str]: + _, credentials = _integration_defaults("signoz") + while True: + url = _prompt_value( + "SigNoz URL (e.g. http://localhost:8080 for local Docker)", + default=_string_value(credentials.get("url")), + ) + api_key = _prompt_value( + "SigNoz API key (Settings → Service Accounts → Keys)", + default=_string_value(credentials.get("api_key")), + secret=True, + ) + with _console.status("Validating SigNoz integration...", spinner="dots"): + result = validate_signoz_integration(url=url, api_key=api_key) + _render_integration_result("SigNoz", result) + if result.ok: + upsert_integration("signoz", {"credentials": {"url": url, "api_key": api_key}}) + env_path = sync_env_values({"SIGNOZ_URL": url}) + return "SigNoz", str(env_path) + _console.print(f"[{SECONDARY}]Try again or press Ctrl+C to cancel.[/]") + + +def _configure_tempo() -> tuple[str, str]: + _, credentials = _integration_defaults("tempo") + _console.print( + f"[{SECONDARY}]Tempo commonly runs without auth behind a gateway — a URL alone is enough.[/]" + ) + while True: + url = _prompt_value( + "Tempo URL (e.g. http://localhost:3200)", + default=_string_value(credentials.get("url")), + ) + api_key = _prompt_value( + "Tempo bearer token (optional, leave blank if none)", + default=_string_value(credentials.get("api_key")), + secret=True, + allow_empty=True, + ) + org_id = _prompt_value( + "Tempo tenant / X-Scope-OrgID (optional, leave blank if single-tenant)", + default=_string_value(credentials.get("org_id")), + allow_empty=True, + ) + with _console.status("Validating Tempo integration...", spinner="dots"): + result = validate_tempo_integration(url=url, api_key=api_key, org_id=org_id) + _render_integration_result("Tempo", result) + if result.ok: + creds: dict[str, str] = {"url": url} + if api_key: + creds["api_key"] = api_key + if org_id: + creds["org_id"] = org_id + upsert_integration("tempo", {"credentials": creds}) + env_values: dict[str, str] = {"TEMPO_URL": url} + if org_id: + env_values["TEMPO_ORG_ID"] = org_id + env_path = sync_env_values(env_values) + return "Tempo", str(env_path) + _console.print(f"[{SECONDARY}]Try again or press Ctrl+C to cancel.[/]") + + def _configure_splunk() -> tuple[str, str]: _, credentials = _integration_defaults("splunk") while True: @@ -1931,6 +1998,16 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: label="OpenSearch / Elasticsearch", hint="Query logs and indices from OpenSearch or Elasticsearch clusters", ), + Choice( + value="signoz", + label="SigNoz", + hint="Query logs, metrics, and traces from SigNoz", + ), + Choice( + value="tempo", + label="Grafana Tempo", + hint="Query distributed traces from a standalone Tempo backend", + ), Choice( value="skip", label="Skip for now", @@ -1969,6 +2046,8 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: "openclaw": _configure_openclaw, "opensearch": _configure_opensearch, "splunk": _configure_splunk, + "signoz": _configure_signoz, + "tempo": _configure_tempo, } _SERVICE_LABELS = { "grafana_local": "grafana local", @@ -1992,6 +2071,8 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: "notion": "notion", "openclaw": "openclaw", "opensearch": "opensearch", + "signoz": "signoz", + "tempo": "grafana tempo", } _step(f"Service · {_SERVICE_LABELS.get(selected_service, selected_service)}") diff --git a/app/cli/wizard/integration_health.py b/app/cli/wizard/integration_health.py index d47b75e45..84f2961cf 100644 --- a/app/cli/wizard/integration_health.py +++ b/app/cli/wizard/integration_health.py @@ -17,6 +17,7 @@ validate_opsgenie_integration, validate_sentry_integration, validate_splunk_integration, + validate_tempo_integration, validate_vercel_integration, ) from app.cli.wizard.integration_validators.http_probe_validators import ( @@ -55,5 +56,6 @@ "validate_slack_webhook", "validate_telegram_bot", "validate_splunk_integration", + "validate_tempo_integration", "validate_vercel_integration", ] diff --git a/app/cli/wizard/integration_validators/client_validators.py b/app/cli/wizard/integration_validators/client_validators.py index cab1715ad..fa4539c92 100644 --- a/app/cli/wizard/integration_validators/client_validators.py +++ b/app/cli/wizard/integration_validators/client_validators.py @@ -6,6 +6,7 @@ from app.integrations.betterstack import build_betterstack_config, validate_betterstack_config from app.integrations.gitlab import build_gitlab_config, validate_gitlab_config +from app.integrations.tempo import build_tempo_config, validate_tempo_config from app.integrations.models import ( AWSIntegrationConfig, CoralogixIntegrationConfig, @@ -480,6 +481,25 @@ def validate_splunk_integration( ) +def validate_tempo_integration( + *, + url: str, + api_key: str = "", + username: str = "", + password: str = "", + org_id: str = "", +) -> IntegrationHealthResult: + """Validate Tempo connectivity via the tag-search endpoint.""" + try: + config = build_tempo_config( + {"url": url, "api_key": api_key, "username": username, "password": password, "org_id": org_id} + ) + except Exception as err: + return IntegrationHealthResult(ok=False, detail=f"Tempo config invalid: {err}") + result = validate_tempo_config(config) + return IntegrationHealthResult(ok=result.ok, detail=result.detail) + + def validate_opensearch_integration( *, url: str, diff --git a/docs/tempo.mdx b/docs/tempo.mdx index b09eba53f..5c2280f46 100644 --- a/docs/tempo.mdx +++ b/docs/tempo.mdx @@ -1,105 +1,142 @@ -# Grafana Tempo Integration +--- +title: "Grafana Tempo" +description: "Connect Grafana Tempo so OpenSRE can query distributed traces during investigations" +--- -Query distributed traces from a standalone [Grafana Tempo](https://grafana.com/oss/tempo/) -backend via its HTTP API. +OpenSRE uses Grafana Tempo to investigate trace-related alerts — searching spans by service, fetching full traces by ID, listing instrumented services, and filtering by error status or latency. -This integration talks to Tempo **directly** (its own URL and auth) — it does not -require a Grafana instance or datasource proxy. If you run the full Grafana stack, -the [Grafana](/grafana) integration already surfaces Tempo through the datasource -proxy; use this integration when you run Tempo on its own (alongside Loki/Mimir or -any OpenTelemetry pipeline). +This integration talks to Tempo **directly** via its HTTP API. It does not require a Grafana instance or datasource proxy. If you run the full Grafana stack, the [Grafana](/grafana) integration already surfaces Tempo through the datasource proxy — use this integration when you run Tempo standalone. -OpenSRE exposes a single tool, `query_tempo`, with an `action` parameter: +## Prerequisites -| action | Tempo endpoint | What it does | -| --- | --- | --- | -| `search` (default) | `GET /api/search` | Search traces by service, span name, duration, and tags (TraceQL) | -| `get_trace` | `GET /api/traces/{id}` | Fetch a full trace by ID and flatten its spans | -| `list_services` | `GET /api/v2/search/tag/resource.service.name/values` | List services registered in Tempo | -| `list_span_names` | `GET /api/v2/search/tag/name/values` | List span names registered in Tempo | - -## Quick Start +- Grafana Tempo 2.0+ +- Network access from the OpenSRE environment to your Tempo instance +- Auth credentials only if your deployment requires them (many run without auth behind a gateway) -### 1. Configure environment variables +## Setup -Tempo commonly runs without auth behind a gateway, so a URL alone is enough. -Add a bearer token, basic-auth tenant credentials, or a multi-tenant org ID only -if your deployment requires them. +### Option 1: Interactive CLI ```bash -export TEMPO_URL="http://localhost:3200" # required -export TEMPO_API_KEY="" # optional -export TEMPO_USERNAME="" # optional (basic auth) -export TEMPO_PASSWORD="" # optional (basic auth) -export TEMPO_ORG_ID="" # optional (X-Scope-OrgID) +opensre integrations setup tempo ``` -Or configure it interactively: +You will be prompted for the Tempo URL and optional auth. Leave auth fields blank if your Tempo runs without authentication. + +### Option 2: Environment variables + +Add to your `.env`: ```bash -uv run opensre integrations setup tempo +TEMPO_URL=http://localhost:3200 +TEMPO_API_KEY= # optional +TEMPO_USERNAME= # optional (basic auth) +TEMPO_PASSWORD= # optional (basic auth) +TEMPO_ORG_ID= # optional (X-Scope-OrgID for multi-tenant) ``` -### 2. Verify connectivity +| Variable | Default | Description | +| --- | --- | --- | +| `TEMPO_URL` | — | **Required.** Tempo HTTP API base URL | +| `TEMPO_API_KEY` | _(empty)_ | Bearer token for auth | +| `TEMPO_USERNAME` | _(empty)_ | Username for basic auth | +| `TEMPO_PASSWORD` | _(empty)_ | Password for basic auth | +| `TEMPO_ORG_ID` | _(empty)_ | Tenant ID sent as `X-Scope-OrgID` | -```bash -uv run opensre integrations verify tempo +### Option 3: Persistent store + +Integrations are automatically persisted to `~/.opensre/integrations.json`: + +```json +{ + "version": 1, + "integrations": [ + { + "id": "tempo-prod", + "service": "tempo", + "status": "active", + "credentials": { + "url": "http://localhost:3200", + "api_key": "" + } + } + ] +} ``` -This calls `GET /api/search/tags` and confirms the API is reachable (and that auth -passes, if configured). +## Investigation tools -## Usage examples +OpenSRE exposes a single `query_tempo` tool with an `action` parameter: -The agent selects the tool and action automatically during an investigation. The -underlying calls map to: +### search + +Searches traces by service, span name, duration, and tags using TraceQL. Returns one summary row per trace. ```text -# Search recent error-ish traces for a service, slower than 500ms query_tempo(action="search", service="checkout-service", min_duration_ms=500) - -# Filter by a span attribute (applied as span.) query_tempo(action="search", tags={"http.status_code": "500"}) +``` + +| Parameter | Description | +| --- | --- | +| `service` | Filter by `resource.service.name` | +| `span_name` | Filter by span name | +| `min_duration_ms` | Minimum trace duration | +| `max_duration_ms` | Maximum trace duration | +| `tags` | Key/value span attributes | +| `time_range_minutes` | Lookback window (default 60) | +| `limit` | Max traces to return (default 20) | + +### get_trace -# Inspect one trace end to end +Fetches a full trace by ID and flattens its spans. + +```text query_tempo(action="get_trace", trace_id="4f8c...e21") +``` + +### list_services + +Lists all services registered in Tempo. -# Discover what is instrumented +```text query_tempo(action="list_services") +``` + +### list_span_names + +Lists all span names registered in Tempo. + +```text query_tempo(action="list_span_names") ``` -`search` accepts `service`, `span_name`, `min_duration_ms`, `max_duration_ms`, -`tags`, `time_range_minutes` (default 60), and `limit` (default 20). String filters -are compiled into a TraceQL expression, e.g. -`{ resource.service.name = "checkout-service" && duration > 500ms }`. +## Verify -## Output shape +```bash +opensre integrations verify tempo +``` -`get_trace` returns flattened spans (shared OTLP parser): +Expected output: -```json -{ - "source": "tempo", - "action": "get_trace", - "available": true, - "trace_id": "4f8c...e21", - "total_spans": 2, - "spans": [ - { - "name": "POST /checkout", - "span_id": "span-1", - "parent_span_id": "", - "service_name": "checkout-service", - "duration_ms": 2400.0, - "status_code": 2, - "status_message": "upstream timeout", - "attributes": {"http.status_code": "504"} - } - ] -} ``` +Service: tempo +Status: passed +Detail: Connected to Grafana Tempo HTTP API (/api/search, /api/traces). +``` + +## Troubleshooting + +| Symptom | Fix | +| --- | --- | +| **Connection refused** | Verify the URL and port. Default Tempo HTTP port is `3200`. | +| **HTTP 401** | Set `TEMPO_API_KEY` or `TEMPO_USERNAME`/`TEMPO_PASSWORD` if your deployment requires auth. | +| **HTTP 404 on verify** | Check your Tempo version — the `/api/search/tags` endpoint requires Tempo 1.4+. | +| **No traces returned** | Confirm your services are sending traces to Tempo and the time range covers the period of interest. | +| **Multi-tenant 400** | Set `TEMPO_ORG_ID` to the correct tenant ID. | + +## Security best practices -`search` returns one summary row per trace (`trace_id`, `root_service_name`, -`root_trace_name`, `duration_ms`, `matched_spans`). `list_services` / -`list_span_names` return a flat list under `services` / `span_names`. +- Use a **read-only** token or service account if your Tempo deployment supports auth. +- Store credentials in `.env`, never in code. +- Restrict network access to Tempo — OpenSRE only needs the HTTP API port (`3200` by default). From 6219af9f8cf3b10f1ac963059260a33231ba42c0 Mon Sep 17 00:00:00 2001 From: Aniket Agarwal Date: Sun, 31 May 2026 13:49:32 +0530 Subject: [PATCH 6/7] feat(tempo): updated tempo docs --- app/cli/wizard/flow.py | 29 ------------------- .../client_validators.py | 10 +++++-- 2 files changed, 8 insertions(+), 31 deletions(-) diff --git a/app/cli/wizard/flow.py b/app/cli/wizard/flow.py index 7646438eb..39b6465a5 100644 --- a/app/cli/wizard/flow.py +++ b/app/cli/wizard/flow.py @@ -1687,28 +1687,6 @@ def _configure_telegram() -> tuple[str, str]: _console.print(f"[{SECONDARY}]Try again or press Ctrl+C to cancel.[/]") -def _configure_signoz() -> tuple[str, str]: - _, credentials = _integration_defaults("signoz") - while True: - url = _prompt_value( - "SigNoz URL (e.g. http://localhost:8080 for local Docker)", - default=_string_value(credentials.get("url")), - ) - api_key = _prompt_value( - "SigNoz API key (Settings → Service Accounts → Keys)", - default=_string_value(credentials.get("api_key")), - secret=True, - ) - with _console.status("Validating SigNoz integration...", spinner="dots"): - result = validate_signoz_integration(url=url, api_key=api_key) - _render_integration_result("SigNoz", result) - if result.ok: - upsert_integration("signoz", {"credentials": {"url": url, "api_key": api_key}}) - env_path = sync_env_values({"SIGNOZ_URL": url}) - return "SigNoz", str(env_path) - _console.print(f"[{SECONDARY}]Try again or press Ctrl+C to cancel.[/]") - - def _configure_tempo() -> tuple[str, str]: _, credentials = _integration_defaults("tempo") _console.print( @@ -1998,11 +1976,6 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: label="OpenSearch / Elasticsearch", hint="Query logs and indices from OpenSearch or Elasticsearch clusters", ), - Choice( - value="signoz", - label="SigNoz", - hint="Query logs, metrics, and traces from SigNoz", - ), Choice( value="tempo", label="Grafana Tempo", @@ -2046,7 +2019,6 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: "openclaw": _configure_openclaw, "opensearch": _configure_opensearch, "splunk": _configure_splunk, - "signoz": _configure_signoz, "tempo": _configure_tempo, } _SERVICE_LABELS = { @@ -2071,7 +2043,6 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: "notion": "notion", "openclaw": "openclaw", "opensearch": "opensearch", - "signoz": "signoz", "tempo": "grafana tempo", } diff --git a/app/cli/wizard/integration_validators/client_validators.py b/app/cli/wizard/integration_validators/client_validators.py index fa4539c92..aceaf01e7 100644 --- a/app/cli/wizard/integration_validators/client_validators.py +++ b/app/cli/wizard/integration_validators/client_validators.py @@ -6,7 +6,6 @@ from app.integrations.betterstack import build_betterstack_config, validate_betterstack_config from app.integrations.gitlab import build_gitlab_config, validate_gitlab_config -from app.integrations.tempo import build_tempo_config, validate_tempo_config from app.integrations.models import ( AWSIntegrationConfig, CoralogixIntegrationConfig, @@ -16,6 +15,7 @@ IncidentIoIntegrationConfig, ) from app.integrations.sentry import build_sentry_config, validate_sentry_config +from app.integrations.tempo import build_tempo_config, validate_tempo_config from app.services.alertmanager import make_alertmanager_client from app.services.coralogix import CoralogixClient from app.services.datadog import DatadogClient, DatadogConfig @@ -492,7 +492,13 @@ def validate_tempo_integration( """Validate Tempo connectivity via the tag-search endpoint.""" try: config = build_tempo_config( - {"url": url, "api_key": api_key, "username": username, "password": password, "org_id": org_id} + { + "url": url, + "api_key": api_key, + "username": username, + "password": password, + "org_id": org_id, + } ) except Exception as err: return IntegrationHealthResult(ok=False, detail=f"Tempo config invalid: {err}") From 45153ce246a85dc73d6a8f2bb742bbe0122d0313 Mon Sep 17 00:00:00 2001 From: Aniket Agarwal Date: Sun, 31 May 2026 18:05:07 +0530 Subject: [PATCH 7/7] feat(tempo): correct test cases --- tests/cli/wizard/test_integration_health.py | 1 + tests/cli_smoke_test.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/cli/wizard/test_integration_health.py b/tests/cli/wizard/test_integration_health.py index b2ebfa00f..6222a0ce3 100644 --- a/tests/cli/wizard/test_integration_health.py +++ b/tests/cli/wizard/test_integration_health.py @@ -52,6 +52,7 @@ def test_legacy_integration_health_import_surface_still_exports_validators() -> "validate_sentry_integration", "validate_slack_webhook", "validate_splunk_integration", + "validate_tempo_integration", "validate_vercel_integration", } diff --git a/tests/cli_smoke_test.py b/tests/cli_smoke_test.py index 77e102cae..b0deafe53 100644 --- a/tests/cli_smoke_test.py +++ b/tests/cli_smoke_test.py @@ -513,8 +513,8 @@ def test_tests_inventory_commands_smoke(cli_sandbox: CliSandbox) -> None: def test_onboard_interactive_smoke(cli_sandbox: CliSandbox) -> None: # One `j` per keypress (burst writes are not separate keys). The select list wraps; # from the first option, len(choices)-1 steps reach "Skip for now" without wrapping past it. - # 23 integrations + "Skip for now" = 24 choices. OpenSearch is at index 22; - # 23 j's lands on "Skip for now" at index 23. + # 24 integrations + "Skip for now" = 25 choices. Grafana Tempo is at index 23; + # 24 j's lands on "Skip for now" at index 24. result = _run_cli_pty( cli_sandbox, "onboard", @@ -526,7 +526,7 @@ def test_onboard_interactive_smoke(cli_sandbox: CliSandbox) -> None: PtyAction( expect="Choose an integration to configure", send=b"\r", - stagger_j=23, + stagger_j=24, ), ], timeout=30.0, @@ -620,7 +620,7 @@ def test_onboard_interactive_smoke_cli_provider_repick_when_unauthenticated( PtyAction( expect="Choose an integration to configure", send=b"\r", - stagger_j=23, + stagger_j=24, ), ], timeout=pty_timeout,