diff --git a/app/agent/investigation.py b/app/agent/investigation.py index 324842155..74bd0d844 100644 --- a/app/agent/investigation.py +++ b/app/agent/investigation.py @@ -53,6 +53,7 @@ "azure": ["azure", "azure_sql"], "splunk": ["splunk"], "signoz": ["signoz"], + "tempo": ["tempo"], } # Callback type: called with (event_kind, data_dict) during the agent loop. diff --git a/app/cli/wizard/flow.py b/app/cli/wizard/flow.py index a40f9baf9..39b6465a5 100644 --- a/app/cli/wizard/flow.py +++ b/app/cli/wizard/flow.py @@ -211,6 +211,12 @@ def validate_splunk_integration(**kwargs): return _validate(**kwargs) +def validate_tempo_integration(**kwargs): + from app.cli.wizard.integration_health import validate_tempo_integration as _validate + + return _validate(**kwargs) + + def get_sentry_auth_recommendations(): from app.integrations.sentry import get_sentry_auth_recommendations as _get @@ -1681,6 +1687,45 @@ def _configure_telegram() -> tuple[str, str]: _console.print(f"[{SECONDARY}]Try again or press Ctrl+C to cancel.[/]") +def _configure_tempo() -> tuple[str, str]: + _, credentials = _integration_defaults("tempo") + _console.print( + f"[{SECONDARY}]Tempo commonly runs without auth behind a gateway — a URL alone is enough.[/]" + ) + while True: + url = _prompt_value( + "Tempo URL (e.g. http://localhost:3200)", + default=_string_value(credentials.get("url")), + ) + api_key = _prompt_value( + "Tempo bearer token (optional, leave blank if none)", + default=_string_value(credentials.get("api_key")), + secret=True, + allow_empty=True, + ) + org_id = _prompt_value( + "Tempo tenant / X-Scope-OrgID (optional, leave blank if single-tenant)", + default=_string_value(credentials.get("org_id")), + allow_empty=True, + ) + with _console.status("Validating Tempo integration...", spinner="dots"): + result = validate_tempo_integration(url=url, api_key=api_key, org_id=org_id) + _render_integration_result("Tempo", result) + if result.ok: + creds: dict[str, str] = {"url": url} + if api_key: + creds["api_key"] = api_key + if org_id: + creds["org_id"] = org_id + upsert_integration("tempo", {"credentials": creds}) + env_values: dict[str, str] = {"TEMPO_URL": url} + if org_id: + env_values["TEMPO_ORG_ID"] = org_id + env_path = sync_env_values(env_values) + return "Tempo", str(env_path) + _console.print(f"[{SECONDARY}]Try again or press Ctrl+C to cancel.[/]") + + def _configure_splunk() -> tuple[str, str]: _, credentials = _integration_defaults("splunk") while True: @@ -1931,6 +1976,11 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: label="OpenSearch / Elasticsearch", hint="Query logs and indices from OpenSearch or Elasticsearch clusters", ), + Choice( + value="tempo", + label="Grafana Tempo", + hint="Query distributed traces from a standalone Tempo backend", + ), Choice( value="skip", label="Skip for now", @@ -1969,6 +2019,7 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: "openclaw": _configure_openclaw, "opensearch": _configure_opensearch, "splunk": _configure_splunk, + "tempo": _configure_tempo, } _SERVICE_LABELS = { "grafana_local": "grafana local", @@ -1992,6 +2043,7 @@ def _configure_selected_integrations() -> tuple[list[str], str | None]: "notion": "notion", "openclaw": "openclaw", "opensearch": "opensearch", + "tempo": "grafana tempo", } _step(f"Service · {_SERVICE_LABELS.get(selected_service, selected_service)}") diff --git a/app/cli/wizard/integration_health.py b/app/cli/wizard/integration_health.py index d47b75e45..84f2961cf 100644 --- a/app/cli/wizard/integration_health.py +++ b/app/cli/wizard/integration_health.py @@ -17,6 +17,7 @@ validate_opsgenie_integration, validate_sentry_integration, validate_splunk_integration, + validate_tempo_integration, validate_vercel_integration, ) from app.cli.wizard.integration_validators.http_probe_validators import ( @@ -55,5 +56,6 @@ "validate_slack_webhook", "validate_telegram_bot", "validate_splunk_integration", + "validate_tempo_integration", "validate_vercel_integration", ] diff --git a/app/cli/wizard/integration_validators/client_validators.py b/app/cli/wizard/integration_validators/client_validators.py index cab1715ad..aceaf01e7 100644 --- a/app/cli/wizard/integration_validators/client_validators.py +++ b/app/cli/wizard/integration_validators/client_validators.py @@ -15,6 +15,7 @@ IncidentIoIntegrationConfig, ) from app.integrations.sentry import build_sentry_config, validate_sentry_config +from app.integrations.tempo import build_tempo_config, validate_tempo_config from app.services.alertmanager import make_alertmanager_client from app.services.coralogix import CoralogixClient from app.services.datadog import DatadogClient, DatadogConfig @@ -480,6 +481,31 @@ def validate_splunk_integration( ) +def validate_tempo_integration( + *, + url: str, + api_key: str = "", + username: str = "", + password: str = "", + org_id: str = "", +) -> IntegrationHealthResult: + """Validate Tempo connectivity via the tag-search endpoint.""" + try: + config = build_tempo_config( + { + "url": url, + "api_key": api_key, + "username": username, + "password": password, + "org_id": org_id, + } + ) + except Exception as err: + return IntegrationHealthResult(ok=False, detail=f"Tempo config invalid: {err}") + result = validate_tempo_config(config) + return IntegrationHealthResult(ok=result.ok, detail=result.detail) + + def validate_opensearch_integration( *, url: str, diff --git a/app/integrations/_catalog_impl.py b/app/integrations/_catalog_impl.py index 4228bbaf9..f4091c78d 100644 --- a/app/integrations/_catalog_impl.py +++ b/app/integrations/_catalog_impl.py @@ -60,6 +60,7 @@ from app.integrations.signoz import build_signoz_config, signoz_config_from_env from app.integrations.store import _STRUCTURAL_RECORD_FIELDS, load_integrations from app.integrations.supabase import build_supabase_config +from app.integrations.tempo import build_tempo_config, tempo_config_from_env from app.llm_credentials import resolve_env_credential from app.services.vercel import VercelConfig from app.utils.coercion import safe_int @@ -950,6 +951,24 @@ def _classify_service_instance( return signoz_config.model_dump(), "signoz" return None, None + if key == "tempo": + try: + tempo_config = build_tempo_config( + { + "url": credentials.get("url", ""), + "api_key": credentials.get("api_key", ""), + "username": credentials.get("username", ""), + "password": credentials.get("password", ""), + "org_id": credentials.get("org_id", ""), + "integration_id": record_id, + } + ) + except Exception: + return None, None + if tempo_config.is_configured: + return tempo_config.model_dump(), "tempo" + return None, None + # Fallback for unknown services: pass through credentials + record id. return {"credentials": credentials, "integration_id": record_id}, key @@ -1871,6 +1890,18 @@ def load_env_integrations() -> list[dict[str, Any]]: except Exception: logger.debug("Failed to load SigNoz config from env", exc_info=True) + try: + tempo_config = tempo_config_from_env() + if tempo_config is not None and tempo_config.is_configured: + integrations.append( + _active_env_record( + "tempo", + tempo_config.model_dump(exclude={"integration_id"}), + ) + ) + except Exception: + logger.debug("Failed to load Tempo config from env", exc_info=True) + return integrations diff --git a/app/integrations/_verification_adapters.py b/app/integrations/_verification_adapters.py index 4b45b56dd..f5e7c22f9 100644 --- a/app/integrations/_verification_adapters.py +++ b/app/integrations/_verification_adapters.py @@ -35,6 +35,7 @@ from app.integrations.sentry import build_sentry_config, validate_sentry_config from app.integrations.signoz import build_signoz_config, validate_signoz_config from app.integrations.supabase import build_supabase_config, validate_supabase_config +from app.integrations.tempo import build_tempo_config, validate_tempo_config from app.services.alertmanager import AlertmanagerClient, AlertmanagerConfig from app.services.argocd import ArgoCDClient, ArgoCDConfig from app.services.coralogix import CoralogixClient @@ -543,6 +544,11 @@ def _verify_opensearch(source: str, config: dict[str, Any]) -> dict[str, str]: build_config=build_signoz_config, validate_config=validate_signoz_config, ) +_verify_tempo = build_validation_verifier( + "tempo", + build_config=build_tempo_config, + validate_config=validate_tempo_config, +) def _build_kafka_config(raw: dict[str, Any]) -> Any: @@ -705,6 +711,7 @@ def _verify_supabase(service: str, config: dict[str, Any]) -> dict[str, str]: "_verify_rabbitmq", "_verify_sentry", "_verify_signoz", + "_verify_tempo", "_verify_slack", "_verify_slack_without_test", "_verify_snowflake", diff --git a/app/integrations/cli.py b/app/integrations/cli.py index 9ab6a89a9..66d247b71 100644 --- a/app/integrations/cli.py +++ b/app/integrations/cli.py @@ -829,6 +829,23 @@ def _setup_signoz() -> None: ) +def _setup_tempo() -> None: + url = _p("Tempo URL (e.g. http://localhost:3200 for local Docker)") + if not url: + _die("Tempo URL is required.") + + api_key = _p("Tempo bearer token (optional, leave blank if none)", secret=True) + org_id = _p("Tempo tenant / X-Scope-OrgID (optional, leave blank if single-tenant)") + + credentials: dict[str, str] = {"url": url} + if api_key: + credentials["api_key"] = api_key + if org_id: + credentials["org_id"] = org_id + + upsert_integration("tempo", {"credentials": credentials}) + + _HANDLERS: dict[str, Any] = { "alertmanager": _setup_alertmanager, "aws": _setup_aws, @@ -857,6 +874,7 @@ def _setup_signoz() -> None: "postgresql": _setup_postgresql, "mysql": _setup_mysql, "signoz": _setup_signoz, + "tempo": _setup_tempo, } diff --git a/app/integrations/effective_models.py b/app/integrations/effective_models.py index 40a0d0037..08f7fe23d 100644 --- a/app/integrations/effective_models.py +++ b/app/integrations/effective_models.py @@ -99,3 +99,4 @@ class EffectiveIntegrations(StrictConfigModel): victoria_logs: EffectiveIntegrationEntry | None = None alicloud: EffectiveIntegrationEntry | None = None signoz: EffectiveIntegrationEntry | None = None + tempo: EffectiveIntegrationEntry | None = None diff --git a/app/integrations/registry.py b/app/integrations/registry.py index 480698680..c13f8505e 100644 --- a/app/integrations/registry.py +++ b/app/integrations/registry.py @@ -42,6 +42,7 @@ _verify_splunk, _verify_supabase, _verify_telegram, + _verify_tempo, _verify_tracer, _verify_twilio, _verify_vercel, @@ -358,6 +359,13 @@ class IntegrationSpec: setup_order=23, verify_order=35, ), + IntegrationSpec( + service="tempo", + verifier=_verify_tempo, + direct_effective=True, + setup_order=24, + verify_order=36, + ), ) INTEGRATION_SPECS_BY_SERVICE = {spec.service: spec for spec in INTEGRATION_SPECS} diff --git a/app/integrations/tempo.py b/app/integrations/tempo.py new file mode 100644 index 000000000..2aee4d22d --- /dev/null +++ b/app/integrations/tempo.py @@ -0,0 +1,155 @@ +"""Grafana Tempo integration helpers. + +Provides configuration and connectivity validation for a standalone Grafana +Tempo backend via its HTTP API (``TEMPO_URL`` plus optional auth). Unlike the +Grafana Cloud integration, this talks to Tempo directly and does not require a +Grafana instance or datasource proxy. +""" + +from __future__ import annotations + +import base64 +import logging +import os +from dataclasses import dataclass +from typing import Any + +import httpx +from pydantic import Field + +from app.integrations._validation_helpers import report_validation_failure +from app.strict_config import StrictConfigModel + +logger = logging.getLogger(__name__) + +DEFAULT_TEMPO_TIMEOUT_SECONDS = 10.0 +DEFAULT_TEMPO_MAX_RESULTS = 20 + + +class TempoConfig(StrictConfigModel): + """Normalized Grafana Tempo connection settings.""" + + url: str = "" + api_key: str = "" + username: str = "" + password: str = "" + org_id: str = "" + timeout_seconds: float = Field(default=DEFAULT_TEMPO_TIMEOUT_SECONDS, gt=0) + max_results: int = Field(default=DEFAULT_TEMPO_MAX_RESULTS, gt=0, le=200) + integration_id: str = "" + + @property + def is_configured(self) -> bool: + # Tempo commonly runs without auth behind a gateway, so a URL alone is + # enough; auth headers are added only when credentials are present. + return bool(self.url) + + def auth_headers(self) -> dict[str, str]: + """Build request headers for the Tempo HTTP API.""" + headers = {"Accept": "application/json"} + if self.username and self.password: + token = base64.b64encode(f"{self.username}:{self.password}".encode()).decode() + headers["Authorization"] = f"Basic {token}" + elif self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + if self.org_id: + headers["X-Scope-OrgID"] = self.org_id + return headers + + def base_url(self) -> str: + return self.url.rstrip("/") + + +@dataclass(frozen=True) +class TempoValidationResult: + """Result of validating a Tempo integration.""" + + ok: bool + detail: str + + +def build_tempo_config(raw: dict[str, Any] | None) -> TempoConfig: + """Build a normalized Tempo config object from env/store data.""" + return TempoConfig.model_validate(raw or {}) + + +def tempo_config_from_env() -> TempoConfig | None: + """Load a Tempo config from env vars.""" + url = os.getenv("TEMPO_URL", "").strip() + if not url: + return None + + return build_tempo_config( + { + "url": url, + "api_key": os.getenv("TEMPO_API_KEY", "").strip(), + "username": os.getenv("TEMPO_USERNAME", "").strip(), + "password": os.getenv("TEMPO_PASSWORD", "").strip(), + "org_id": os.getenv("TEMPO_ORG_ID", "").strip(), + } + ) + + +def validate_tempo_config(config: TempoConfig) -> TempoValidationResult: + """Validate Tempo HTTP API connectivity via the tag-search endpoint.""" + if not config.is_configured: + return TempoValidationResult( + ok=False, + detail="Tempo configuration is incomplete. Provide TEMPO_URL.", + ) + + try: + response = httpx.get( + f"{config.base_url()}/api/search/tags", + headers=config.auth_headers(), + params={"limit": 1}, + timeout=config.timeout_seconds, + ) + response.raise_for_status() + return TempoValidationResult( + ok=True, + detail="Connected to Grafana Tempo HTTP API (/api/search, /api/traces).", + ) + except httpx.HTTPStatusError as err: + snippet = err.response.text[:200].strip() + detail = ( + f"HTTP {err.response.status_code}: {snippet}" + if snippet + else f"HTTP {err.response.status_code}" + ) + return TempoValidationResult( + ok=False, + detail=f"Tempo API validation failed: {detail}", + ) + except Exception as err: + report_validation_failure( + err, + logger=logger, + integration="tempo", + method="validate_tempo_config", + ) + return TempoValidationResult( + ok=False, + detail=f"Tempo API validation failed: {err}", + ) + + +def tempo_is_available(sources: dict[str, dict]) -> bool: + """Check if Tempo integration params are present in available sources.""" + return bool(sources.get("tempo", {}).get("connection_verified")) + + +def tempo_extract_params(sources: dict[str, dict]) -> dict[str, Any]: + """Extract Tempo connection params from resolved integrations. + + Credentials are resolved from the integration store or environment, so the + LLM never needs to supply the URL or auth directly. + """ + tempo = sources.get("tempo", {}) + return { + "url": str(tempo.get("url", "")).strip(), + "api_key": str(tempo.get("api_key", "")).strip(), + "username": str(tempo.get("username", "")).strip(), + "password": str(tempo.get("password", "")).strip(), + "org_id": str(tempo.get("org_id", "")).strip(), + } diff --git a/app/integrations/verify.py b/app/integrations/verify.py index 4c048c212..5ead2a5dc 100644 --- a/app/integrations/verify.py +++ b/app/integrations/verify.py @@ -43,6 +43,7 @@ _verify_rabbitmq = _adapters._verify_rabbitmq _verify_sentry = _adapters._verify_sentry _verify_signoz = _adapters._verify_signoz +_verify_tempo = _adapters._verify_tempo _verify_slack = _adapters._verify_slack _verify_snowflake = _adapters._verify_snowflake _verify_splunk = _adapters._verify_splunk @@ -188,6 +189,7 @@ def verification_exit_code( "_verify_rabbitmq", "_verify_sentry", "_verify_signoz", + "_verify_tempo", "_verify_slack", "_verify_snowflake", "_verify_splunk", diff --git a/app/services/grafana/tempo.py b/app/services/grafana/tempo.py index d99442daa..7c3a49f90 100644 --- a/app/services/grafana/tempo.py +++ b/app/services/grafana/tempo.py @@ -7,6 +7,7 @@ import requests +from app.services.otlp_trace import extract_span_attributes, parse_otlp_trace from app.utils.errors import report_exception if TYPE_CHECKING: @@ -114,24 +115,7 @@ def _get_trace_details( # type: ignore[misc] ) if response.status_code == 200: - trace_data = response.json() - spans = [] - - if "batches" in trace_data: - for batch in trace_data["batches"]: - if "scopeSpans" in batch: - for scope in batch["scopeSpans"]: - if "spans" in scope: - for span in scope["spans"]: - attributes = self._extract_span_attributes(span) # type: ignore[attr-defined] - spans.append( - { - "name": span.get("name", "unknown"), - "attributes": attributes, - } - ) - - return {"spans": spans} + return {"spans": parse_otlp_trace(response.json())} except Exception as exc: report_exception( exc, @@ -153,26 +137,5 @@ def _extract_span_attributes( # type: ignore[misc] self: GrafanaClientBase, span: dict[str, Any], ) -> dict[str, Any]: - """Extract attributes from a span. - - Args: - span: Span data dictionary - - Returns: - Dictionary of attribute key-value pairs - """ - attributes: dict[str, Any] = {} - - if "attributes" in span: - for attr in span["attributes"]: - key = attr.get("key", "") - if not key: - continue - value = attr.get("value", {}) - - if "stringValue" in value: - attributes[key] = value["stringValue"] - elif "intValue" in value: - attributes[key] = value["intValue"] - - return attributes + """Extract attributes from a span (delegates to the shared OTLP parser).""" + return extract_span_attributes(span) diff --git a/app/services/otlp_trace.py b/app/services/otlp_trace.py new file mode 100644 index 000000000..c83504d1d --- /dev/null +++ b/app/services/otlp_trace.py @@ -0,0 +1,90 @@ +"""Shared parsing for OTLP/JSON trace payloads. + +Used by any client that fetches a single trace in OpenTelemetry JSON form +(``{"batches": [{"resource": ..., "scopeSpans": [{"spans": [...]}]}]}``): +the standalone Tempo client and the Grafana Cloud Tempo mixin both consume it. +""" + +from __future__ import annotations + +from typing import Any + + +def extract_span_attributes(span: dict[str, Any]) -> dict[str, Any]: + """Flatten an OTLP attribute list into a plain key -> value mapping. + + Handles the common OTLP/JSON value kinds (string, int, bool, double). + Attributes without a key or with an unsupported value kind are skipped. + """ + attributes: dict[str, Any] = {} + + for attr in span.get("attributes", []): + key = attr.get("key", "") + if not key: + continue + value = attr.get("value", {}) + + if "stringValue" in value: + attributes[key] = value["stringValue"] + elif "intValue" in value: + attributes[key] = value["intValue"] + elif "boolValue" in value: + attributes[key] = value["boolValue"] + elif "doubleValue" in value: + attributes[key] = value["doubleValue"] + + return attributes + + +def _duration_ms(start_unix_nano: Any, end_unix_nano: Any) -> float: + """Span duration in milliseconds from OTLP nanosecond timestamps.""" + try: + start = int(start_unix_nano) + end = int(end_unix_nano) + except (TypeError, ValueError): + return 0.0 + if end <= start: + return 0.0 + return round((end - start) / 1_000_000, 4) + + +def parse_otlp_trace(trace_data: dict[str, Any]) -> list[dict[str, Any]]: + """Parse an OTLP/JSON trace into a flat list of span dicts. + + The ``service_name`` is lifted from each batch's resource attributes so + callers can correlate spans to services without a nested lookup. + """ + spans: list[dict[str, Any]] = [] + + for batch in trace_data.get("batches", []): + if not isinstance(batch, dict): + continue + resource_attributes = extract_span_attributes(batch.get("resource", {})) + service_name = str(resource_attributes.get("service.name", "")) + + for scope in batch.get("scopeSpans", []): + if not isinstance(scope, dict): + continue + for span in scope.get("spans", []): + if not isinstance(span, dict): + continue + status = span.get("status") or {} + spans.append( + { + "name": span.get("name", "unknown"), + "span_id": span.get("spanId", ""), + "parent_span_id": span.get("parentSpanId", ""), + "trace_id": span.get("traceId", ""), + "kind": span.get("kind", ""), + "service_name": service_name, + "duration_ms": _duration_ms( + span.get("startTimeUnixNano"), + span.get("endTimeUnixNano"), + ), + "status_code": status.get("code", ""), + "status_message": status.get("message", ""), + "attributes": extract_span_attributes(span), + } + ) + + return spans diff --git a/app/services/tempo/__init__.py b/app/services/tempo/__init__.py new file mode 100644 index 000000000..3bc0a3f95 --- /dev/null +++ b/app/services/tempo/__init__.py @@ -0,0 +1,5 @@ +"""Grafana Tempo service client package.""" + +from app.services.tempo.client import TempoClient + +__all__ = ["TempoClient"] diff --git a/app/services/tempo/client.py b/app/services/tempo/client.py new file mode 100644 index 000000000..2f406bf33 --- /dev/null +++ b/app/services/tempo/client.py @@ -0,0 +1,310 @@ +"""Grafana Tempo query client. + +Read-only access to a standalone Tempo backend via its HTTP API: + +* ``GET /api/traces/{id}`` — fetch a full trace (OTLP/JSON) +* ``GET /api/search`` — search traces with TraceQL +* ``GET /api/v2/search/tag/{tag}/values`` — list services / span names +""" + +from __future__ import annotations + +import logging +import re +from datetime import UTC, datetime, timedelta +from typing import Any + +import httpx + +from app.integrations.tempo import TempoConfig +from app.services.otlp_trace import parse_otlp_trace + +logger = logging.getLogger(__name__) + +DEFAULT_TIME_RANGE_MINUTES = 60 +_NOT_CONFIGURED_ERROR = "Tempo not configured. Set TEMPO_URL." + +# Scoped tag names used by Tempo's tag-values endpoint. +_SERVICE_NAME_TAG = "resource.service.name" +_SPAN_NAME_TAG = "name" + + +_VALID_TAG_KEY_RE = re.compile(r"^[A-Za-z0-9_.:-]+$") + + +def _escape_traceql_value(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"') + + +def _time_bounds_seconds(minutes: int) -> tuple[int, int]: + """Return (start, end) as unix seconds for the last *minutes*.""" + end = datetime.now(UTC) + start = end - timedelta(minutes=max(1, minutes)) + return int(start.timestamp()), int(end.timestamp()) + + +def _parse_tag_values(payload: dict[str, Any]) -> list[str]: + """Parse a Tempo tag-values response (v1 strings or v2 typed objects).""" + values: list[str] = [] + for item in payload.get("tagValues", []) or []: + if isinstance(item, dict): + value = item.get("value") + if value: + values.append(str(value)) + elif item: + values.append(str(item)) + return values + + +def _parse_search_traces(payload: dict[str, Any]) -> list[dict[str, Any]]: + """Parse the trace summaries returned by ``GET /api/search``.""" + results: list[dict[str, Any]] = [] + for trace in payload.get("traces", []) or []: + if not isinstance(trace, dict): + continue + span_set = trace.get("spanSet") or {} + matched = span_set.get("matched") + if matched is None: + matched = len(span_set.get("spans", []) or []) + try: + duration_ms = round(float(trace.get("durationMs", 0)), 4) + except (TypeError, ValueError): + duration_ms = 0 + results.append( + { + "trace_id": str(trace.get("traceID", "")), + "root_service_name": str(trace.get("rootServiceName", "")), + "root_trace_name": str(trace.get("rootTraceName", "")), + "start_time_unix_nano": str(trace.get("startTimeUnixNano", "")), + "duration_ms": duration_ms, + "matched_spans": matched, + } + ) + return results + + +class TempoClient: + """Read-only Grafana Tempo client over the HTTP API.""" + + def __init__(self, config: TempoConfig) -> None: + self.config = config + + def _configuration_error(self) -> str | None: + if self.config.is_configured: + return None + return _NOT_CONFIGURED_ERROR + + def _get( + self, path: str, params: dict[str, Any] | None = None + ) -> tuple[dict[str, Any] | None, str | None]: + try: + response = httpx.get( + f"{self.config.base_url()}{path}", + headers=self.config.auth_headers(), + params=params, + timeout=self.config.timeout_seconds, + ) + response.raise_for_status() + parsed = response.json() + return (parsed if isinstance(parsed, dict) else {}), None + except httpx.HTTPStatusError as err: + snippet = err.response.text[:200].strip() + if snippet: + return None, f"HTTP {err.response.status_code}: {snippet}" + return None, f"HTTP {err.response.status_code}" + except Exception as err: + return None, str(err) + + def _clamped_limit(self, limit: int) -> int: + return max(1, min(limit, self.config.max_results)) + + # ----------------------------------------------------------- get by id + + def get_trace_by_id(self, trace_id: str) -> dict[str, Any]: + """Fetch a full trace by ID and flatten it into spans.""" + config_error = self._configuration_error() + if config_error: + return { + "source": "tempo", + "action": "get_trace", + "available": False, + "error": config_error, + "spans": [], + } + if not trace_id: + return { + "source": "tempo", + "action": "get_trace", + "available": False, + "error": "trace_id is required for get_trace.", + "spans": [], + } + + payload, error = self._get(f"/api/traces/{trace_id}") + if error: + return { + "source": "tempo", + "action": "get_trace", + "available": False, + "trace_id": trace_id, + "error": error, + "spans": [], + } + + spans = parse_otlp_trace(payload or {}) + return { + "source": "tempo", + "action": "get_trace", + "available": True, + "trace_id": trace_id, + "total_spans": len(spans), + "spans": spans, + } + + # ------------------------------------------------------------- search + + def search_traces( + self, + service: str | None = None, + span_name: str | None = None, + min_duration_ms: float | None = None, + max_duration_ms: float | None = None, + tags: dict[str, str] | None = None, + time_range_minutes: int = DEFAULT_TIME_RANGE_MINUTES, + limit: int = 20, + ) -> dict[str, Any]: + """Search traces by service, span name, duration, and tags via TraceQL.""" + config_error = self._configuration_error() + if config_error: + return { + "source": "tempo", + "action": "search", + "available": False, + "error": config_error, + "traces": [], + } + + traceql = self._build_traceql( + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + ) + start, end = _time_bounds_seconds(time_range_minutes) + params: dict[str, Any] = { + "q": traceql, + "limit": self._clamped_limit(limit), + "start": start, + "end": end, + } + + payload, error = self._get("/api/search", params=params) + if error: + return { + "source": "tempo", + "action": "search", + "available": False, + "query": traceql, + "error": error, + "traces": [], + } + + traces = _parse_search_traces(payload or {}) + return { + "source": "tempo", + "action": "search", + "available": True, + "query": traceql, + "total": len(traces), + "traces": traces, + } + + @staticmethod + def _build_traceql( + *, + service: str | None, + span_name: str | None, + min_duration_ms: float | None, + max_duration_ms: float | None, + tags: dict[str, str] | None, + ) -> str: + parts: list[str] = [] + if service: + parts.append(f'resource.service.name = "{_escape_traceql_value(service)}"') + if span_name: + parts.append(f'name = "{_escape_traceql_value(span_name)}"') + if min_duration_ms is not None and min_duration_ms > 0: + parts.append(f"duration > {min_duration_ms}ms") + if max_duration_ms is not None and max_duration_ms > 0: + parts.append(f"duration < {max_duration_ms}ms") + for key, value in (tags or {}).items(): + if not key or not _VALID_TAG_KEY_RE.match(key): + continue + parts.append(f'span.{key} = "{_escape_traceql_value(str(value))}"') + if not parts: + return "{}" + return "{ " + " && ".join(parts) + " }" + + # ----------------------------------------------------- list tag values + + def list_services(self, time_range_minutes: int = DEFAULT_TIME_RANGE_MINUTES) -> dict[str, Any]: + """List service names registered in Tempo.""" + return self._list_tag_values( + tag=_SERVICE_NAME_TAG, + result_key="services", + time_range_minutes=time_range_minutes, + action="list_services", + ) + + def list_span_names( + self, time_range_minutes: int = DEFAULT_TIME_RANGE_MINUTES + ) -> dict[str, Any]: + """List span names registered in Tempo.""" + return self._list_tag_values( + tag=_SPAN_NAME_TAG, + result_key="span_names", + time_range_minutes=time_range_minutes, + action="list_span_names", + ) + + def _list_tag_values( + self, + *, + tag: str, + result_key: str, + time_range_minutes: int, + action: str, + ) -> dict[str, Any]: + config_error = self._configuration_error() + if config_error: + return { + "source": "tempo", + "action": action, + "available": False, + "error": config_error, + result_key: [], + } + + start, end = _time_bounds_seconds(time_range_minutes) + payload, error = self._get( + f"/api/v2/search/tag/{tag}/values", + params={"start": start, "end": end}, + ) + if error: + return { + "source": "tempo", + "action": action, + "available": False, + "error": error, + result_key: [], + } + + values = _parse_tag_values(payload or {}) + return { + "source": "tempo", + "action": action, + "available": True, + "total": len(values), + result_key: values, + } diff --git a/app/tools/TempoTool/__init__.py b/app/tools/TempoTool/__init__.py new file mode 100644 index 000000000..14d9f48f9 --- /dev/null +++ b/app/tools/TempoTool/__init__.py @@ -0,0 +1,162 @@ +"""Grafana Tempo trace query tool (single action-based entrypoint).""" + +from __future__ import annotations + +from typing import Any + +from app.integrations.tempo import TempoConfig, tempo_extract_params +from app.services.tempo.client import TempoClient +from app.tools.tool_decorator import tool +from app.tools.utils.availability import tempo_available_or_backend + +_VALID_ACTIONS = ("search", "get_trace", "list_services", "list_span_names") + + +def _tempo_is_available(sources: dict[str, dict]) -> bool: + return tempo_available_or_backend(sources) + + +def _tempo_extract_params(sources: dict[str, dict]) -> dict[str, Any]: + tempo = sources.get("tempo", {}) + return { + **tempo_extract_params(sources), + "action": "search", + "service": tempo.get("service_name", ""), + "time_range_minutes": tempo.get("time_range_minutes", 60), + "limit": 20, + "tempo_backend": tempo.get("_backend"), + } + + +def _dispatch( + client: Any, + *, + action: str, + trace_id: str | None, + service: str | None, + span_name: str | None, + min_duration_ms: float | None, + max_duration_ms: float | None, + tags: dict[str, str] | None, + time_range_minutes: int, + limit: int, +) -> dict[str, Any]: + result: dict[str, Any] + if action == "get_trace": + result = client.get_trace_by_id(trace_id or "") + elif action == "list_services": + result = client.list_services(time_range_minutes=time_range_minutes) + elif action == "list_span_names": + result = client.list_span_names(time_range_minutes=time_range_minutes) + else: + result = client.search_traces( + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + time_range_minutes=time_range_minutes, + limit=limit, + ) + return result + + +@tool( + name="query_tempo", + display_name="Grafana Tempo", + source="tempo", + tags=("traces", "observability"), + cost_tier="moderate", + description=( + "Query a standalone Grafana Tempo backend for distributed traces. " + "Use 'action' to pick: search traces, fetch a trace by ID, or list " + "registered services / span names." + ), + use_cases=[ + "Fetching a full trace by trace ID to inspect its spans", + "Searching traces by service, span name, duration, or tags", + "Listing services and span names registered in Tempo", + "Correlating slow or error spans with logs and metrics", + ], + requires=[], + input_schema={ + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": list(_VALID_ACTIONS), + "default": "search", + "description": "Which Tempo query to run.", + }, + "trace_id": { + "type": "string", + "description": "Trace ID to fetch (required when action='get_trace').", + }, + "service": {"type": "string", "description": "Service name filter for search."}, + "span_name": {"type": "string", "description": "Span name filter for search."}, + "min_duration_ms": {"type": "number", "description": "Minimum span duration (ms)."}, + "max_duration_ms": {"type": "number", "description": "Maximum span duration (ms)."}, + "tags": { + "type": "object", + "description": "Span attribute filters (key -> value), applied as span..", + }, + "time_range_minutes": {"type": "integer", "default": 60}, + "limit": {"type": "integer", "default": 20}, + }, + "required": [], + }, + is_available=_tempo_is_available, + extract_params=_tempo_extract_params, +) +def query_tempo( + action: str = "search", + trace_id: str | None = None, + service: str | None = None, + span_name: str | None = None, + min_duration_ms: float | None = None, + max_duration_ms: float | None = None, + tags: dict[str, str] | None = None, + time_range_minutes: int = 60, + limit: int = 20, + tempo_backend: Any = None, + **_kwargs: Any, +) -> dict[str, Any]: + """Query Grafana Tempo for traces, a single trace, or registered tag values.""" + if action not in _VALID_ACTIONS: + action = "search" + + if tempo_backend is not None: + return _dispatch( + tempo_backend, + action=action, + trace_id=trace_id, + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + time_range_minutes=time_range_minutes, + limit=limit, + ) + + config = TempoConfig.model_validate(_kwargs) + if not config.is_configured: + return { + "source": "tempo", + "action": action, + "available": False, + "error": "Tempo not configured. Provide TEMPO_URL.", + } + + return _dispatch( + TempoClient(config), + action=action, + trace_id=trace_id, + service=service, + span_name=span_name, + min_duration_ms=min_duration_ms, + max_duration_ms=max_duration_ms, + tags=tags, + time_range_minutes=time_range_minutes, + limit=limit, + ) diff --git a/app/tools/utils/availability.py b/app/tools/utils/availability.py index 29c865355..c27440d19 100644 --- a/app/tools/utils/availability.py +++ b/app/tools/utils/availability.py @@ -68,6 +68,18 @@ def signoz_available_or_backend(sources: dict[str, dict]) -> bool: return bool(signoz.get("connection_verified") and signoz.get("url") and signoz.get("api_key")) +def tempo_available_or_backend(sources: dict[str, dict]) -> bool: + """Available when a real Tempo URL is present OR a fixture backend is injected. + + Used by the Tempo tool wrapper whose ``extract_params`` can delegate to a + mock ``tempo_backend`` for synthetic tests. + """ + tempo = sources.get("tempo", {}) + if tempo.get("_backend"): + return True + return bool(tempo.get("url")) + + def hermes_available_or_backend(sources: dict[str, dict]) -> bool: """Available when Hermes integration is connected or a fixture backend is injected.""" hermes = sources.get("hermes", {}) diff --git a/app/types/evidence.py b/app/types/evidence.py index a671651ef..83b929de4 100644 --- a/app/types/evidence.py +++ b/app/types/evidence.py @@ -44,6 +44,7 @@ "opensearch", "alertmanager", "signoz", + "tempo", "splunk", "supabase", "airflow", diff --git a/docs/docs.json b/docs/docs.json index edb37ae90..6cf463659 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -114,6 +114,7 @@ "opsgenie", "sentry", "signoz", + "tempo", "splunk" ] }, diff --git a/docs/tempo.mdx b/docs/tempo.mdx new file mode 100644 index 000000000..5c2280f46 --- /dev/null +++ b/docs/tempo.mdx @@ -0,0 +1,142 @@ +--- +title: "Grafana Tempo" +description: "Connect Grafana Tempo so OpenSRE can query distributed traces during investigations" +--- + +OpenSRE uses Grafana Tempo to investigate trace-related alerts — searching spans by service, fetching full traces by ID, listing instrumented services, and filtering by error status or latency. + +This integration talks to Tempo **directly** via its HTTP API. It does not require a Grafana instance or datasource proxy. If you run the full Grafana stack, the [Grafana](/grafana) integration already surfaces Tempo through the datasource proxy — use this integration when you run Tempo standalone. + +## Prerequisites + +- Grafana Tempo 2.0+ +- Network access from the OpenSRE environment to your Tempo instance +- Auth credentials only if your deployment requires them (many run without auth behind a gateway) + +## Setup + +### Option 1: Interactive CLI + +```bash +opensre integrations setup tempo +``` + +You will be prompted for the Tempo URL and optional auth. Leave auth fields blank if your Tempo runs without authentication. + +### Option 2: Environment variables + +Add to your `.env`: + +```bash +TEMPO_URL=http://localhost:3200 +TEMPO_API_KEY= # optional +TEMPO_USERNAME= # optional (basic auth) +TEMPO_PASSWORD= # optional (basic auth) +TEMPO_ORG_ID= # optional (X-Scope-OrgID for multi-tenant) +``` + +| Variable | Default | Description | +| --- | --- | --- | +| `TEMPO_URL` | — | **Required.** Tempo HTTP API base URL | +| `TEMPO_API_KEY` | _(empty)_ | Bearer token for auth | +| `TEMPO_USERNAME` | _(empty)_ | Username for basic auth | +| `TEMPO_PASSWORD` | _(empty)_ | Password for basic auth | +| `TEMPO_ORG_ID` | _(empty)_ | Tenant ID sent as `X-Scope-OrgID` | + +### Option 3: Persistent store + +Integrations are automatically persisted to `~/.opensre/integrations.json`: + +```json +{ + "version": 1, + "integrations": [ + { + "id": "tempo-prod", + "service": "tempo", + "status": "active", + "credentials": { + "url": "http://localhost:3200", + "api_key": "" + } + } + ] +} +``` + +## Investigation tools + +OpenSRE exposes a single `query_tempo` tool with an `action` parameter: + +### search + +Searches traces by service, span name, duration, and tags using TraceQL. Returns one summary row per trace. + +```text +query_tempo(action="search", service="checkout-service", min_duration_ms=500) +query_tempo(action="search", tags={"http.status_code": "500"}) +``` + +| Parameter | Description | +| --- | --- | +| `service` | Filter by `resource.service.name` | +| `span_name` | Filter by span name | +| `min_duration_ms` | Minimum trace duration | +| `max_duration_ms` | Maximum trace duration | +| `tags` | Key/value span attributes | +| `time_range_minutes` | Lookback window (default 60) | +| `limit` | Max traces to return (default 20) | + +### get_trace + +Fetches a full trace by ID and flattens its spans. + +```text +query_tempo(action="get_trace", trace_id="4f8c...e21") +``` + +### list_services + +Lists all services registered in Tempo. + +```text +query_tempo(action="list_services") +``` + +### list_span_names + +Lists all span names registered in Tempo. + +```text +query_tempo(action="list_span_names") +``` + +## Verify + +```bash +opensre integrations verify tempo +``` + +Expected output: + +``` +Service: tempo +Status: passed +Detail: Connected to Grafana Tempo HTTP API (/api/search, /api/traces). +``` + +## Troubleshooting + +| Symptom | Fix | +| --- | --- | +| **Connection refused** | Verify the URL and port. Default Tempo HTTP port is `3200`. | +| **HTTP 401** | Set `TEMPO_API_KEY` or `TEMPO_USERNAME`/`TEMPO_PASSWORD` if your deployment requires auth. | +| **HTTP 404 on verify** | Check your Tempo version — the `/api/search/tags` endpoint requires Tempo 1.4+. | +| **No traces returned** | Confirm your services are sending traces to Tempo and the time range covers the period of interest. | +| **Multi-tenant 400** | Set `TEMPO_ORG_ID` to the correct tenant ID. | + +## Security best practices + +- Use a **read-only** token or service account if your Tempo deployment supports auth. +- Store credentials in `.env`, never in code. +- Restrict network access to Tempo — OpenSRE only needs the HTTP API port (`3200` by default). diff --git a/tests/cli/wizard/test_integration_health.py b/tests/cli/wizard/test_integration_health.py index b2ebfa00f..6222a0ce3 100644 --- a/tests/cli/wizard/test_integration_health.py +++ b/tests/cli/wizard/test_integration_health.py @@ -52,6 +52,7 @@ def test_legacy_integration_health_import_surface_still_exports_validators() -> "validate_sentry_integration", "validate_slack_webhook", "validate_splunk_integration", + "validate_tempo_integration", "validate_vercel_integration", } diff --git a/tests/cli_smoke_test.py b/tests/cli_smoke_test.py index 77e102cae..b0deafe53 100644 --- a/tests/cli_smoke_test.py +++ b/tests/cli_smoke_test.py @@ -513,8 +513,8 @@ def test_tests_inventory_commands_smoke(cli_sandbox: CliSandbox) -> None: def test_onboard_interactive_smoke(cli_sandbox: CliSandbox) -> None: # One `j` per keypress (burst writes are not separate keys). The select list wraps; # from the first option, len(choices)-1 steps reach "Skip for now" without wrapping past it. - # 23 integrations + "Skip for now" = 24 choices. OpenSearch is at index 22; - # 23 j's lands on "Skip for now" at index 23. + # 24 integrations + "Skip for now" = 25 choices. Grafana Tempo is at index 23; + # 24 j's lands on "Skip for now" at index 24. result = _run_cli_pty( cli_sandbox, "onboard", @@ -526,7 +526,7 @@ def test_onboard_interactive_smoke(cli_sandbox: CliSandbox) -> None: PtyAction( expect="Choose an integration to configure", send=b"\r", - stagger_j=23, + stagger_j=24, ), ], timeout=30.0, @@ -620,7 +620,7 @@ def test_onboard_interactive_smoke_cli_provider_repick_when_unauthenticated( PtyAction( expect="Choose an integration to configure", send=b"\r", - stagger_j=23, + stagger_j=24, ), ], timeout=pty_timeout, diff --git a/tests/integrations/test_tempo.py b/tests/integrations/test_tempo.py new file mode 100644 index 000000000..c62a1ca9d --- /dev/null +++ b/tests/integrations/test_tempo.py @@ -0,0 +1,124 @@ +"""Unit tests for the Grafana Tempo integration module.""" + +from app.integrations.catalog import load_env_integrations +from app.integrations.tempo import ( + TempoConfig, + build_tempo_config, + tempo_config_from_env, + tempo_extract_params, + tempo_is_available, + validate_tempo_config, +) + + +class TestTempoConfig: + def test_defaults(self) -> None: + config = TempoConfig() + assert config.url == "" + assert config.api_key == "" + assert config.timeout_seconds == 10.0 + assert config.max_results == 20 + + def test_is_configured_with_url_only(self) -> None: + config = TempoConfig(url="http://localhost:3200") + assert config.is_configured is True + + def test_is_configured_without_url(self) -> None: + assert TempoConfig(api_key="token").is_configured is False + + def test_auth_headers_bearer(self) -> None: + headers = TempoConfig(url="http://x", api_key="token").auth_headers() + assert headers["Authorization"] == "Bearer token" + assert headers["Accept"] == "application/json" + + def test_auth_headers_basic_and_org(self) -> None: + headers = TempoConfig( + url="http://x", username="u", password="p", org_id="42" + ).auth_headers() + assert headers["Authorization"].startswith("Basic ") + assert headers["X-Scope-OrgID"] == "42" + + def test_auth_headers_none(self) -> None: + headers = TempoConfig(url="http://x").auth_headers() + assert "Authorization" not in headers + + +class TestBuildTempoConfig: + def test_from_dict(self) -> None: + config = build_tempo_config({"url": "http://tempo.example.com", "api_key": "secret"}) + assert config.url == "http://tempo.example.com" + assert config.api_key == "secret" + + def test_from_none(self) -> None: + assert build_tempo_config(None).is_configured is False + + +class TestTempoConfigFromEnv: + def test_returns_none_without_url(self, monkeypatch) -> None: + monkeypatch.delenv("TEMPO_URL", raising=False) + assert tempo_config_from_env() is None + + def test_returns_config_with_url(self, monkeypatch) -> None: + monkeypatch.setenv("TEMPO_URL", "http://localhost:3200") + monkeypatch.setenv("TEMPO_API_KEY", "token") + config = tempo_config_from_env() + assert config is not None + assert config.url == "http://localhost:3200" + assert config.api_key == "token" + assert config.is_configured is True + + +class TestTempoValidation: + def test_validate_requires_url(self) -> None: + result = validate_tempo_config(TempoConfig()) + assert result.ok is False + assert "TEMPO_URL" in result.detail + + def test_validate_hits_search_tags(self, monkeypatch) -> None: + class _FakeResponse: + def raise_for_status(self) -> None: + return None + + captured: dict[str, object] = {} + + def _fake_get(url: str, **kwargs: object) -> _FakeResponse: + captured["url"] = url + captured["headers"] = kwargs.get("headers") + return _FakeResponse() + + monkeypatch.setattr("app.integrations.tempo.httpx.get", _fake_get) + + result = validate_tempo_config(TempoConfig(url="http://localhost:3200")) + assert result.ok is True + assert str(captured["url"]).endswith("/api/search/tags") + + +class TestTempoExtractParams: + def test_extracts_params(self) -> None: + params = tempo_extract_params( + {"tempo": {"url": "http://tempo.example.com", "api_key": "key"}} + ) + assert params["url"] == "http://tempo.example.com" + assert params["api_key"] == "key" + + def test_uses_defaults_when_missing(self) -> None: + params = tempo_extract_params({}) + assert params["url"] == "" + assert params["api_key"] == "" + + +class TestTempoIsAvailable: + def test_available_when_connection_verified(self) -> None: + assert tempo_is_available({"tempo": {"connection_verified": True}}) is True + + def test_unavailable_without_connection_verified(self) -> None: + assert tempo_is_available({"tempo": {"url": "http://localhost:3200"}}) is False + + +class TestTempoEnvCatalogLoading: + def test_loads_from_env(self, monkeypatch) -> None: + monkeypatch.setenv("TEMPO_URL", "http://localhost:3200") + records = load_env_integrations() + tempo_records = [r for r in records if r.get("service") == "tempo"] + assert len(tempo_records) == 1 + assert tempo_records[0]["credentials"]["url"] == "http://localhost:3200" diff --git a/tests/services/test_grafana_tempo.py b/tests/services/test_grafana_tempo.py index a5ac591a8..f25d208f2 100644 --- a/tests/services/test_grafana_tempo.py +++ b/tests/services/test_grafana_tempo.py @@ -158,7 +158,8 @@ def test_extract_span_attributes_edge_cases(self): "attributes": [ {"key": "valid_string", "value": {"stringValue": "test"}}, {"key": "valid_int", "value": {"intValue": 42}}, - {"key": "unsupported_type", "value": {"boolValue": True}}, + {"key": "valid_bool", "value": {"boolValue": True}}, + {"key": "valid_double", "value": {"doubleValue": 1.5}}, {"key": "empty_value", "value": {}}, {"value": {"stringValue": "missing_key"}}, # Should be skipped! ] @@ -168,7 +169,8 @@ def test_extract_span_attributes_edge_cases(self): assert attributes.get("valid_string") == "test" assert attributes.get("valid_int") == 42 - assert "unsupported_type" not in attributes + assert attributes.get("valid_bool") is True + assert attributes.get("valid_double") == 1.5 assert "empty_value" not in attributes assert "" not in attributes diff --git a/tests/services/test_otlp_trace.py b/tests/services/test_otlp_trace.py new file mode 100644 index 000000000..837b85715 --- /dev/null +++ b/tests/services/test_otlp_trace.py @@ -0,0 +1,71 @@ +"""Unit tests for the shared OTLP/JSON trace parser.""" + +from __future__ import annotations + +from app.services.otlp_trace import extract_span_attributes, parse_otlp_trace + + +def test_extract_span_attributes_value_kinds() -> None: + span = { + "attributes": [ + {"key": "str", "value": {"stringValue": "v"}}, + {"key": "int", "value": {"intValue": "42"}}, + {"key": "bool", "value": {"boolValue": True}}, + {"key": "double", "value": {"doubleValue": 1.5}}, + {"key": "empty", "value": {}}, + {"value": {"stringValue": "no-key"}}, + ] + } + attrs = extract_span_attributes(span) + assert attrs == {"str": "v", "int": "42", "bool": True, "double": 1.5} + + +def test_parse_otlp_trace_flattens_spans_with_service_name() -> None: + trace = { + "batches": [ + { + "resource": { + "attributes": [{"key": "service.name", "value": {"stringValue": "checkout"}}] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "POST /checkout", + "spanId": "span-1", + "parentSpanId": "span-0", + "traceId": "trace-1", + "kind": 2, + "startTimeUnixNano": "1000000000", + "endTimeUnixNano": "1150000000", + "status": {"code": 2, "message": "boom"}, + "attributes": [ + { + "key": "http.status_code", + "value": {"intValue": "500"}, + } + ], + } + ] + } + ], + } + ] + } + spans = parse_otlp_trace(trace) + assert len(spans) == 1 + span = spans[0] + assert span["name"] == "POST /checkout" + assert span["service_name"] == "checkout" + assert span["span_id"] == "span-1" + assert span["parent_span_id"] == "span-0" + assert span["duration_ms"] == 150.0 + assert span["status_code"] == 2 + assert span["status_message"] == "boom" + assert span["attributes"]["http.status_code"] == "500" + + +def test_parse_otlp_trace_handles_empty_and_malformed() -> None: + assert parse_otlp_trace({}) == [] + assert parse_otlp_trace({"batches": ["not-a-dict"]}) == [] + assert parse_otlp_trace({"batches": [{"scopeSpans": [{"spans": []}]}]}) == [] diff --git a/tests/services/test_tempo_client.py b/tests/services/test_tempo_client.py new file mode 100644 index 000000000..5d5bd1f4d --- /dev/null +++ b/tests/services/test_tempo_client.py @@ -0,0 +1,177 @@ +"""Unit tests for the Grafana Tempo service client.""" + +from __future__ import annotations + +from typing import Any + +import httpx + +from app.integrations.tempo import TempoConfig +from app.services.tempo.client import TempoClient + + +class _FakeResponse: + def __init__(self, payload: dict[str, Any]) -> None: + self._payload = payload + + def raise_for_status(self) -> None: + return None + + def json(self) -> dict[str, Any]: + return self._payload + + +class _ErrorResponse: + def __init__(self, status_code: int, text: str) -> None: + self.status_code = status_code + self.text = text + self.request = httpx.Request("GET", "http://localhost") + + def raise_for_status(self) -> None: + raise httpx.HTTPStatusError( + f"error {self.status_code}", + request=self.request, + response=httpx.Response(self.status_code, request=self.request, text=self.text), + ) + + def json(self) -> dict[str, Any]: + return {} + + +def _client() -> TempoClient: + return TempoClient(TempoConfig(url="http://localhost:3200", api_key="token")) + + +def test_get_trace_requires_configuration() -> None: + result = TempoClient(TempoConfig()).get_trace_by_id("abc") + assert result["available"] is False + assert "TEMPO_URL" in result["error"] + + +def test_get_trace_requires_trace_id() -> None: + result = _client().get_trace_by_id("") + assert result["available"] is False + assert "trace_id is required" in result["error"] + + +def test_get_trace_by_id_parses_spans(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(url: str, **kwargs: Any) -> _FakeResponse: + captured["url"] = url + captured["headers"] = kwargs.get("headers") + return _FakeResponse( + { + "batches": [ + { + "resource": { + "attributes": [{"key": "service.name", "value": {"stringValue": "api"}}] + }, + "scopeSpans": [ + {"spans": [{"name": "GET /x", "spanId": "s1", "attributes": []}]} + ], + } + ] + } + ) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().get_trace_by_id("trace-1") + + assert result["available"] is True + assert result["trace_id"] == "trace-1" + assert result["total_spans"] == 1 + assert result["spans"][0]["service_name"] == "api" + assert captured["url"].endswith("/api/traces/trace-1") + assert captured["headers"]["Authorization"] == "Bearer token" + + +def test_search_traces_builds_traceql(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(url: str, **kwargs: Any) -> _FakeResponse: + captured["url"] = url + captured["params"] = kwargs.get("params") + return _FakeResponse( + { + "traces": [ + { + "traceID": "t1", + "rootServiceName": "api", + "rootTraceName": "GET /x", + "durationMs": 120, + "spanSet": {"matched": 3}, + } + ] + } + ) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().search_traces( + service="api", + span_name="GET /x", + min_duration_ms=100, + tags={"http.status_code": "500"}, + limit=5, + ) + + assert result["available"] is True + assert result["total"] == 1 + assert result["traces"][0]["trace_id"] == "t1" + assert result["traces"][0]["matched_spans"] == 3 + assert captured["url"].endswith("/api/search") + query = captured["params"]["q"] + assert 'resource.service.name = "api"' in query + assert 'name = "GET /x"' in query + assert "duration > 100ms" in query + assert 'span.http.status_code = "500"' in query + assert captured["params"]["limit"] == 5 + + +def test_search_traces_empty_query_when_no_filters(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(_url: str, **kwargs: Any) -> _FakeResponse: + captured["params"] = kwargs.get("params") + return _FakeResponse({"traces": []}) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().search_traces() + assert result["available"] is True + assert captured["params"]["q"] == "{}" + + +def test_list_services_parses_v2_tag_values(monkeypatch) -> None: + captured: dict[str, Any] = {} + + def _fake_get(url: str, **_kwargs: Any) -> _FakeResponse: + captured["url"] = url + return _FakeResponse( + {"tagValues": [{"type": "string", "value": "frontend"}, {"value": "cartservice"}]} + ) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().list_services() + assert result["available"] is True + assert result["services"] == ["frontend", "cartservice"] + assert captured["url"].endswith("/api/v2/search/tag/resource.service.name/values") + + +def test_list_span_names_parses_v1_string_values(monkeypatch) -> None: + def _fake_get(_url: str, **_kwargs: Any) -> _FakeResponse: + return _FakeResponse({"tagValues": ["GET /a", "POST /b"]}) + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().list_span_names() + assert result["available"] is True + assert result["span_names"] == ["GET /a", "POST /b"] + + +def test_search_traces_surfaces_http_error(monkeypatch) -> None: + def _fake_get(_url: str, **_kwargs: Any) -> _ErrorResponse: + return _ErrorResponse(403, "forbidden") + + monkeypatch.setattr("app.services.tempo.client.httpx.get", _fake_get) + result = _client().search_traces(service="api") + assert result["available"] is False + assert "403" in result["error"] diff --git a/tests/synthetic/test_tempo_scenario.py b/tests/synthetic/test_tempo_scenario.py new file mode 100644 index 000000000..109c5055e --- /dev/null +++ b/tests/synthetic/test_tempo_scenario.py @@ -0,0 +1,127 @@ +"""Synthetic RCA scenario using Grafana Tempo as the evidence source. + +Validates that a Tempo alert seeds the correct tools and that a mock backend +returns realistic fixture trace data through the single action-based tool. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +pytestmark = pytest.mark.synthetic + +from app.agent.investigation import _ALERT_SOURCE_TO_TOOL_SOURCES +from app.tools.TempoTool import query_tempo + + +class _FixtureTempoBackend: + """Minimal fixture backend for synthetic Tempo scenarios.""" + + def search_traces(self, **kwargs: Any) -> dict[str, Any]: + service = kwargs.get("service") or "checkout-service" + return { + "source": "tempo", + "action": "search", + "available": True, + "query": '{ resource.service.name = "checkout-service" }', + "total": 2, + "traces": [ + { + "trace_id": "trace-001", + "root_service_name": service, + "root_trace_name": "POST /checkout", + "start_time_unix_nano": "1716120000000000000", + "duration_ms": 2400, + "matched_spans": 8, + }, + { + "trace_id": "trace-002", + "root_service_name": service, + "root_trace_name": "POST /checkout", + "start_time_unix_nano": "1716120030000000000", + "duration_ms": 2100, + "matched_spans": 7, + }, + ], + } + + def get_trace_by_id(self, trace_id: str) -> dict[str, Any]: + return { + "source": "tempo", + "action": "get_trace", + "available": True, + "trace_id": trace_id, + "total_spans": 2, + "spans": [ + { + "name": "POST /checkout", + "span_id": "span-1", + "parent_span_id": "", + "service_name": "checkout-service", + "duration_ms": 2400.0, + "status_code": 2, + "status_message": "upstream timeout", + "attributes": {"http.status_code": "504"}, + }, + { + "name": "GET /inventory", + "span_id": "span-2", + "parent_span_id": "span-1", + "service_name": "inventory-service", + "duration_ms": 2300.0, + "status_code": 2, + "status_message": "deadline exceeded", + "attributes": {"rpc.grpc.status_code": "4"}, + }, + ], + } + + def list_services(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_services", + "available": True, + "total": 2, + "services": ["checkout-service", "inventory-service"], + } + + def list_span_names(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_span_names", + "available": True, + "total": 2, + "span_names": ["POST /checkout", "GET /inventory"], + } + + +def test_tempo_alert_source_maps_to_tools() -> None: + """Tempo alert source seeds tempo tools before the ReAct loop.""" + assert "tempo" in _ALERT_SOURCE_TO_TOOL_SOURCES + assert _ALERT_SOURCE_TO_TOOL_SOURCES["tempo"] == ["tempo"] + + +def test_tempo_search_synthetic_scenario() -> None: + backend = _FixtureTempoBackend() + result = query_tempo(service="checkout-service", tempo_backend=backend) + assert result["available"] is True + assert result["total"] == 2 + assert result["traces"][0]["duration_ms"] == 2400 + + +def test_tempo_get_trace_synthetic_scenario() -> None: + backend = _FixtureTempoBackend() + result = query_tempo(action="get_trace", trace_id="trace-001", tempo_backend=backend) + assert result["available"] is True + assert result["total_spans"] == 2 + assert any(s["service_name"] == "inventory-service" for s in result["spans"]) + assert result["spans"][0]["attributes"]["http.status_code"] == "504" + + +def test_tempo_list_services_synthetic_scenario() -> None: + backend = _FixtureTempoBackend() + result = query_tempo(action="list_services", tempo_backend=backend) + assert result["available"] is True + assert "checkout-service" in result["services"] diff --git a/tests/tools/test_telemetry.py b/tests/tools/test_telemetry.py index c1d055c1a..ab4be2b62 100644 --- a/tests/tools/test_telemetry.py +++ b/tests/tools/test_telemetry.py @@ -948,6 +948,7 @@ def _describe(_cluster: str, ng: str) -> dict[str, Any]: "query_signoz_metrics", "query_signoz_traces", "query_splunk_logs", + "query_tempo", "run_diagnostic_code", "search_bitbucket_code", "search_github_code", diff --git a/tests/tools/test_tempo_tools.py b/tests/tools/test_tempo_tools.py new file mode 100644 index 000000000..097d918ec --- /dev/null +++ b/tests/tools/test_tempo_tools.py @@ -0,0 +1,91 @@ +"""Tests for the Grafana Tempo tool.""" + +from __future__ import annotations + +from typing import Any + +from app.tools.TempoTool import _tempo_is_available, query_tempo + + +class _FakeTempoBackend: + """Fake Tempo backend for tool dispatch tests.""" + + def get_trace_by_id(self, trace_id: str) -> dict[str, Any]: + return { + "source": "tempo", + "action": "get_trace", + "available": True, + "trace_id": trace_id, + "total_spans": 1, + "spans": [{"name": "GET /x", "service_name": "api"}], + } + + def search_traces(self, **kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "search", + "available": True, + "total": 1, + "traces": [{"trace_id": "t1", "root_service_name": kwargs.get("service") or "api"}], + } + + def list_services(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_services", + "available": True, + "total": 2, + "services": ["api", "worker"], + } + + def list_span_names(self, **_kwargs: Any) -> dict[str, Any]: + return { + "source": "tempo", + "action": "list_span_names", + "available": True, + "total": 1, + "span_names": ["GET /x"], + } + + +class TestTempoAvailability: + def test_available_with_url_only(self) -> None: + assert _tempo_is_available({"tempo": {"url": "http://localhost:3200"}}) is True + + def test_available_with_backend(self) -> None: + assert _tempo_is_available({"tempo": {"_backend": object()}}) is True + + def test_unavailable_when_empty(self) -> None: + assert _tempo_is_available({}) is False + + +class TestTempoToolDispatch: + def test_search_default_action(self) -> None: + result = query_tempo(service="api", tempo_backend=_FakeTempoBackend()) + assert result["action"] == "search" + assert result["traces"][0]["root_service_name"] == "api" + + def test_get_trace_action(self) -> None: + result = query_tempo( + action="get_trace", trace_id="trace-1", tempo_backend=_FakeTempoBackend() + ) + assert result["action"] == "get_trace" + assert result["trace_id"] == "trace-1" + assert result["spans"][0]["service_name"] == "api" + + def test_list_services_action(self) -> None: + result = query_tempo(action="list_services", tempo_backend=_FakeTempoBackend()) + assert result["services"] == ["api", "worker"] + + def test_list_span_names_action(self) -> None: + result = query_tempo(action="list_span_names", tempo_backend=_FakeTempoBackend()) + assert result["span_names"] == ["GET /x"] + + def test_invalid_action_falls_back_to_search(self) -> None: + result = query_tempo(action="bogus", tempo_backend=_FakeTempoBackend()) + assert result["action"] == "search" + + def test_not_configured_without_backend(self) -> None: + result = query_tempo(action="search") + assert result["available"] is False + assert "not configured" in result["error"].lower()