diff --git a/app/integrations/config_models.py b/app/integrations/config_models.py index eb8fe3fdc..46d437df8 100644 --- a/app/integrations/config_models.py +++ b/app/integrations/config_models.py @@ -889,3 +889,13 @@ class PrefectIntegrationConfig(StrictConfigModel): _normalize_strs = field_validator("api_key", "account_id", "workspace_id", mode="before")( normalize_str() ) + +class TemporalIntegrationConfig(StrictConfigModel): + host: str = "localhost" + port: int = 7233 + namespace: str = "default" + api_key: str | None = None + tls: bool = False + _normalize_strs = field_validator("host", "namespace", "api_key", mode="before")( + normalize_str() + ) diff --git a/app/integrations/effective_models.py b/app/integrations/effective_models.py index 40a0d0037..13a623075 100644 --- a/app/integrations/effective_models.py +++ b/app/integrations/effective_models.py @@ -74,6 +74,7 @@ class EffectiveIntegrations(StrictConfigModel): incident_io: EffectiveIntegrationEntry | None = None notion: EffectiveIntegrationEntry | None = None prefect: EffectiveIntegrationEntry | None = None + temporal: EffectiveIntegrationEntry | None = None posthog: EffectiveIntegrationEntry | None = None kafka: EffectiveIntegrationEntry | None = None clickhouse: EffectiveIntegrationEntry | None = None diff --git a/app/integrations/registry.py b/app/integrations/registry.py index 480698680..9713cd735 100644 --- a/app/integrations/registry.py +++ b/app/integrations/registry.py @@ -343,6 +343,7 @@ class IntegrationSpec: IntegrationSpec(service="alicloud", direct_effective=True), IntegrationSpec(service="notion"), IntegrationSpec(service="prefect"), + IntegrationSpec(service="temporal"), IntegrationSpec(service="posthog"), IntegrationSpec(service="trello"), IntegrationSpec(service="rds", setup_order=11), diff --git a/app/integrations/temporal.py b/app/integrations/temporal.py new file mode 100644 index 000000000..6caa7a69b --- /dev/null +++ b/app/integrations/temporal.py @@ -0,0 +1,87 @@ +"""Temporal workflow platform integration config and verification. + +This integration uses Temporal's HTTP API gateway (available on the same +port as gRPC, default 7233). For self-hosted Temporal, no extra configuration +is needed. For Temporal Cloud, set TEMPORAL_TLS=true and TEMPORAL_API_KEY. +""" +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from app.integrations.config_models import TemporalIntegrationConfig +from pydantic import BaseModel, Field + +"""Temporal workflow platform integration config and verification. + +This integration uses Temporal's HTTP API gateway (available on the same +port as gRPC, default 7233). For self-hosted Temporal, no extra configuration +is needed. For Temporal Cloud, set TEMPORAL_TLS=true and TEMPORAL_API_KEY. +""" + +class TemporalConfig(BaseModel): + """Configuration for connecting to a Temporal server.""" + + host: str = Field( + default="localhost", + description="Temporal server hostname or IP address.", + ) + port: int = Field( + default=7233, + description="Temporal server HTTP API port (default 7233). " + "For self-hosted Temporal this is the same port as gRPC. " + "Temporal Cloud uses port 7233 for both gRPC and the HTTP API gateway.", + ) + namespace: str = Field( + default="default", + description="Temporal namespace to query.", + ) + api_key: str | None = Field( + default=None, + description="API key for Temporal Cloud authentication (optional for self-hosted).", + ) + tls: bool = Field( + default=False, + description="Whether to use TLS for the connection (required for Temporal Cloud).", + ) + + @property + def base_url(self) -> str: + scheme = "https" if self.tls else "http" + return f"{scheme}://{self.host}:{self.port}" + + @property + def connection_verified(self) -> bool: + """Lightweight check — host and port are set.""" + return bool(self.host and self.port) + + +def load_temporal_config_from_env() -> TemporalConfig: + """Load Temporal config from environment variables. + + Expected env vars: + TEMPORAL_HOST — server host (default: localhost) + TEMPORAL_PORT — HTTP API port (default: 7233) + TEMPORAL_NAMESPACE — namespace (default: default) + TEMPORAL_API_KEY — API key for Temporal Cloud (optional) + TEMPORAL_TLS — "true" to enable TLS (default: false) + """ + import os + + return TemporalConfig( + host=os.environ.get("TEMPORAL_HOST", "localhost"), + port=int(os.environ.get("TEMPORAL_PORT", "7233")), + namespace=os.environ.get("TEMPORAL_NAMESPACE", "default"), + api_key=os.environ.get("TEMPORAL_API_KEY"), + tls=os.environ.get("TEMPORAL_TLS", "false").lower() == "true", + ) + +def load_temporal_config_from_integration(integration_config: TemporalIntegrationConfig) -> TemporalConfig: + """Wire TemporalIntegrationConfig (from registry) into TemporalConfig (used by client).""" + return TemporalConfig( + host=integration_config.host, + port=integration_config.port, + namespace=integration_config.namespace, + api_key=integration_config.api_key or None, + tls=integration_config.tls, + ) diff --git a/app/services/temporal/__init__.py b/app/services/temporal/__init__.py new file mode 100644 index 000000000..0e632e10c --- /dev/null +++ b/app/services/temporal/__init__.py @@ -0,0 +1 @@ +# Package marker diff --git a/app/services/temporal/client.py b/app/services/temporal/client.py new file mode 100644 index 000000000..d77b08844 --- /dev/null +++ b/app/services/temporal/client.py @@ -0,0 +1,118 @@ +"""Temporal HTTP API client. + +Uses Temporal's HTTP API (port 7233 REST gateway, or Temporal Cloud's API). +Falls back to the Temporal HTTP-API spec: + POST /api/v1/namespaces/{namespace}/workflows → list workflows + GET /api/v1/namespaces/{namespace}/workflows/{id}/runs/{runId}/history + GET /api/v1/namespaces/{namespace}/task-queues/{queue} + GET /api/v1/namespaces/{namespace} → namespace metrics +""" + +from __future__ import annotations + +import logging +from typing import Any +from urllib.parse import quote + +import httpx + +from app.integrations.temporal import TemporalConfig + +logger = logging.getLogger(__name__) + +DEFAULT_TIMEOUT = 15.0 +MAX_PAGE_SIZE = 50 + + +class TemporalClientError(Exception): + """Raised when a Temporal API call fails.""" + + +class TemporalClient: + """Thin HTTP client for the Temporal REST/HTTP-API gateway.""" + + def __init__(self, config: TemporalConfig) -> None: + self.config = config + self._headers: dict[str, str] = {"Content-Type": "application/json"} + if config.api_key: + self._headers["Authorization"] = f"Bearer {config.api_key}" + + def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: + url = f"{self.config.base_url}{path}" + try: + with httpx.Client(timeout=DEFAULT_TIMEOUT) as client: + response = client.get(url, headers=self._headers, params=params or {}) + response.raise_for_status() + return response.json() # type: ignore[no-any-return] + except httpx.HTTPStatusError as exc: + raise TemporalClientError( + f"Temporal API {exc.response.status_code} for {url}: {exc.response.text}" + ) from exc + except httpx.RequestError as exc: + raise TemporalClientError( + f"Failed to connect to Temporal at {url}: {exc}" + ) from exc + + def _post(self, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]: + url = f"{self.config.base_url}{path}" + try: + with httpx.Client(timeout=DEFAULT_TIMEOUT) as client: + response = client.post(url, headers=self._headers, json=body or {}) + response.raise_for_status() + return response.json() # type: ignore[no-any-return] + except httpx.HTTPStatusError as exc: + raise TemporalClientError( + f"Temporal API {exc.response.status_code} for {url}: {exc.response.text}" + ) from exc + except httpx.RequestError as exc: + raise TemporalClientError( + f"Failed to connect to Temporal at {url}: {exc}" + ) from exc + + def list_workflows( + self, + query: str = "", + page_size: int = MAX_PAGE_SIZE, + ) -> list[dict[str, Any]]: + """List recent workflow executions. + + Args: + query: Temporal visibility query e.g. ``ExecutionStatus='Failed'``. + page_size: Max results (capped at 50). + """ + ns = quote(self.config.namespace, safe="") + body: dict[str, Any] = {"pageSize": min(page_size, MAX_PAGE_SIZE)} + if query: + body["query"] = query + data = self._post(f"/api/v1/namespaces/{ns}/workflows", body) + return list(data.get("executions", [])) + + def get_workflow_history( + self, + workflow_id: str, + run_id: str, + max_event_count: int = 100, + ) -> list[dict[str, Any]]: + """Fetch event history for a specific workflow run.""" + ns = quote(self.config.namespace, safe="") + wid = quote(workflow_id, safe="") + rid = quote(run_id, safe="") + path = f"/api/v1/namespaces/{ns}/workflows/{wid}/runs/{rid}/history" + data = self._get(path, params={"maximumPageSize": max_event_count}) + return list(data.get("history", {}).get("events", [])) + + def list_task_queues(self, task_queue: str) -> dict[str, Any]: + """Fetch pollers and status for a task queue.""" + ns = quote(self.config.namespace, safe="") + tq = quote(task_queue, safe="") + return self._get(f"/api/v1/namespaces/{ns}/task-queues/{tq}") + + def get_namespace_metrics(self) -> dict[str, Any]: + """Fetch namespace-level summary (open workflows, error counts).""" + ns = quote(self.config.namespace, safe="") + return self._get(f"/api/v1/namespaces/{ns}") + + def get_workflow_count(self) -> dict[str, Any]: + """Fetch open workflow count for the namespace.""" + ns = quote(self.config.namespace, safe="") + return self._get(f"/api/v1/namespaces/{ns}/workflows/count") diff --git a/app/tools/TemporalTool/__init__.py b/app/tools/TemporalTool/__init__.py new file mode 100644 index 000000000..dc383d0b1 --- /dev/null +++ b/app/tools/TemporalTool/__init__.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from app.tools.TemporalTool.tool import ( + TemporalListWorkflowsTool, + TemporalNamespaceMetricsTool, + TemporalTaskQueueTool, + TemporalWorkflowHistoryTool, + get_temporal_tools, +) + +__all__ = [ + "TemporalListWorkflowsTool", + "TemporalWorkflowHistoryTool", + "TemporalTaskQueueTool", + "TemporalNamespaceMetricsTool", + "get_temporal_tools", +] diff --git a/app/tools/TemporalTool/tool.py b/app/tools/TemporalTool/tool.py new file mode 100644 index 000000000..56defef9c --- /dev/null +++ b/app/tools/TemporalTool/tool.py @@ -0,0 +1,349 @@ +"""Temporal LangChain tool definitions. + +Four tools surface the four Temporal API capabilities: + 1. TemporalListWorkflowsTool — list recent executions with status/failure reason + 2. TemporalWorkflowHistoryTool — fetch event history for a single run + 3. TemporalTaskQueueTool — list task queues and worker pollers + 4. TemporalNamespaceMetricsTool — namespace-level metrics (open workflows, errors) +""" + +from __future__ import annotations + +import json +import logging + +from langchain_core.tools import BaseTool +from pydantic import BaseModel, Field + +from app.integrations.temporal import ( + TemporalConfig, + load_temporal_config_from_env, + load_temporal_config_from_integration, +) +from app.services.temporal.client import TemporalClient, TemporalClientError + +logger = logging.getLogger(__name__) + + +# Input schemas + +class ListWorkflowsInput(BaseModel): + """Input schema for listing Temporal workflow executions.""" + + query: str = Field( + default="", + description=( + "Temporal visibility query string. Examples: " + "``ExecutionStatus='Failed'``, " + "``WorkflowType='OrderWorkflow' AND ExecutionStatus='Running'``. " + "Leave empty to list all recent executions." + ), + ) + page_size: int = Field( + default=20, + ge=1, + le=50, + description="Maximum number of workflow executions to return (1–50).", + ) + + +class WorkflowHistoryInput(BaseModel): + """Input schema for fetching a Temporal workflow's event history.""" + + workflow_id: str = Field( + description="The workflow ID, e.g. ``order-workflow-abc123``." + ) + run_id: str = Field( + description="The run ID of the specific execution to inspect." + ) + max_event_count: int = Field( + default=50, + ge=1, + le=200, + description="Maximum number of history events to return (1–200).", + ) + + +class TaskQueueInput(BaseModel): + """Input schema for fetching Temporal task queue info.""" + + task_queue: str = Field( + description="Name of the task queue, e.g. ``payment-task-queue``." + ) + + +class NamespaceMetricsInput(BaseModel): + """Input schema for fetching Temporal namespace metrics (no parameters needed).""" + + pass + + +#Implementation of Tools + +def _make_client(config: TemporalConfig | None = None) -> TemporalClient: + return TemporalClient(config or load_temporal_config_from_env()) + +class TemporalListWorkflowsTool(BaseTool): + """List recent Temporal workflow executions with status and failure reason.""" + + name: str = "temporal_list_workflows" + description: str = ( + "List recent Temporal workflow executions. " + "Use this to identify failed, timed-out, or stuck workflows in a namespace. " + "Supports Temporal visibility queries to filter by status, type, or time range." + ) + args_schema: type[BaseModel] = ListWorkflowsInput + config: TemporalConfig | None = None + + def _run(self, query: str = "", page_size: int = 20) -> str: + client = _make_client(self.config) + try: + executions = client.list_workflows(query=query, page_size=page_size) + except TemporalClientError as exc: + return f"Error fetching workflows: {exc}" + + if not executions: + return "No workflow executions found matching the query." + + results = [] + for ex in executions: + execution = ex.get("execution", {}) + status = ex.get("status", "UNKNOWN") + wf_type = ex.get("type", {}).get("name", "unknown") + start_time = ex.get("startTime", "") + close_time = ex.get("closeTime", "") + + entry: dict = { + "workflow_id": execution.get("workflowId", ""), + "run_id": execution.get("runId", ""), + "type": wf_type, + "status": status, + "start_time": start_time, + } + if close_time: + entry["close_time"] = close_time + + # Surface failure reason if present + if status in ("FAILED", "TIMED_OUT", "TERMINATED"): + entry["hint"] = "Use temporal_workflow_history to fetch the failure reason and stack trace." + + results.append(entry) + + return json.dumps(results, indent=2) + + async def _arun(self, query: str = "", page_size: int = 20) -> str: + return self._run(query=query, page_size=page_size) + + +class TemporalWorkflowHistoryTool(BaseTool): + """Fetch the event history for a specific Temporal workflow run.""" + + name: str = "temporal_workflow_history" + description: str = ( + "Fetch the full event history for a specific Temporal workflow execution. " + "Use this to diagnose failures: the history shows every activity attempt, " + "retry, timeout, signal, and the final failure cause with stack traces." + ) + args_schema: type[BaseModel] = WorkflowHistoryInput + config: TemporalConfig | None = None + + def _run( + self, + workflow_id: str, + run_id: str, + max_event_count: int = 50, + ) -> str: + client = _make_client(self.config) + try: + events = client.get_workflow_history( + workflow_id=workflow_id, + run_id=run_id, + max_event_count=max_event_count, + ) + except TemporalClientError as exc: + return f"Error fetching workflow history: {exc}" + + if not events: + return f"No history events found for workflow {workflow_id}/{run_id}." + + # Summarise key failure events for the agent + summary = [] + for event in events: + event_type = event.get("eventType", "") + event_id = event.get("eventId", "") + event_time = event.get("eventTime", "") + attrs_key = _attrs_key(event_type) + attrs = event.get(attrs_key, {}) + + entry: dict = { + "event_id": event_id, + "event_time": event_time, + "event_type": event_type, + } + + # Pull out failure details when present + failure = attrs.get("failure") + if failure: + entry["failure"] = { + "message": failure.get("message", ""), + "cause": failure.get("cause", {}).get("message", ""), + "stack_trace": failure.get("stackTrace", "")[:500], + } + + # Pull out activity info when present + if "activityType" in attrs: + entry["activity_type"] = attrs["activityType"].get("name", "") + if "attempt" in attrs: + entry["attempt"] = attrs["attempt"] + + summary.append(entry) + + return json.dumps(summary, indent=2) + + async def _arun(self, workflow_id: str, run_id: str, max_event_count: int = 50) -> str: + return self._run(workflow_id=workflow_id, run_id=run_id, max_event_count=max_event_count) + + +class TemporalTaskQueueTool(BaseTool): + """List task queue pollers and worker health for a Temporal task queue.""" + + name: str = "temporal_task_queue" + description: str = ( + "Fetch Temporal task queue info: which workers are polling, their identity, " + "last access time, and whether the queue is drained. " + "Use this when investigating worker crashes, missing workers, or stuck workflows." + ) + args_schema: type[BaseModel] = TaskQueueInput + config: TemporalConfig | None = None + + def _run(self, task_queue: str) -> str: + client = _make_client(self.config) + try: + data = client.list_task_queues(task_queue=task_queue) + except TemporalClientError as exc: + return f"Error fetching task queue '{task_queue}': {exc}" + + pollers = data.get("pollers", []) + status = data.get("taskQueueStatus", {}) + + result: dict = { + "task_queue": task_queue, + "poller_count": len(pollers), + "pollers": [ + { + "identity": p.get("identity", ""), + "last_access_time": p.get("lastAccessTime", ""), + "rate_per_second": p.get("ratePerSecond", 0), + } + for p in pollers + ], + "backlog_count": status.get("backlogCountHint", 0), + "read_level": status.get("readLevel", 0), + } + + if not pollers: + result["warning"] = ( + "No workers are currently polling this task queue. " + "Workflows may be stuck waiting for workers." + ) + + return json.dumps(result, indent=2) + + async def _arun(self, task_queue: str) -> str: + return self._run(task_queue=task_queue) + + +class TemporalNamespaceMetricsTool(BaseTool): + """Fetch namespace-level Temporal metrics: open workflows, activity errors.""" + + name: str = "temporal_namespace_metrics" + description: str = ( + "Fetch Temporal namespace-level summary metrics: open workflow count, " + "namespace config, retention period, and cluster info. " + "Use this as a first step to get an overview of Temporal health." + ) + args_schema: type[BaseModel] = NamespaceMetricsInput + config: TemporalConfig | None = None + + def _run(self) -> str: + client = _make_client(self.config) + try: + data = client.get_namespace_metrics() + except TemporalClientError as exc: + return f"Error fetching namespace metrics: {exc}" + + try: + count_data = client.get_workflow_count() + open_workflow_count = count_data.get("count", 0) + except TemporalClientError: + open_workflow_count = None + + ns_info = data.get("namespaceInfo", {}) + config = data.get("config", {}) + replication = data.get("replicationConfig", {}) + + result = { + "namespace": ns_info.get("name", ""), + "state": ns_info.get("state", ""), + "description": ns_info.get("description", ""), + "retention_days": config.get("workflowExecutionRetentionTtl", ""), + "active_cluster": replication.get("activeClusterName", ""), + "clusters": replication.get("clusters", []), + "data": ns_info.get("data", {}), + "open_workflow_count": open_workflow_count, + } + + return json.dumps(result, indent=2) + + async def _arun(self) -> str: + return self._run() + + +#Factory + +def get_temporal_tools( + config: TemporalConfig | None = None, + integration_config: object | None = None, +) -> list[BaseTool]: + """Return all Temporal tools wired to the given config. + + Priority: + 1. Explicit TemporalConfig passed directly + 2. TemporalIntegrationConfig from the registry + 3. Environment variables fallback + """ + if config is not None: + resolved = config + elif integration_config is not None: + from app.integrations.config_models import TemporalIntegrationConfig + if isinstance(integration_config, TemporalIntegrationConfig): + resolved = load_temporal_config_from_integration(integration_config) + else: + resolved = load_temporal_config_from_env() + else: + resolved = load_temporal_config_from_env() + + return [ + TemporalListWorkflowsTool(config=resolved), + TemporalWorkflowHistoryTool(config=resolved), + TemporalTaskQueueTool(config=resolved), + TemporalNamespaceMetricsTool(config=resolved), + ] +# Helpers + + +def _attrs_key(event_type: str) -> str: + """Map event type string to its attributes dict key in the Temporal API response.""" + _map = { + "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED": "workflowExecutionStartedEventAttributes", + "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED": "workflowExecutionCompletedEventAttributes", + "EVENT_TYPE_WORKFLOW_EXECUTION_FAILED": "workflowExecutionFailedEventAttributes", + "EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT": "workflowExecutionTimedOutEventAttributes", + "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED": "activityTaskScheduledEventAttributes", + "EVENT_TYPE_ACTIVITY_TASK_STARTED": "activityTaskStartedEventAttributes", + "EVENT_TYPE_ACTIVITY_TASK_COMPLETED": "activityTaskCompletedEventAttributes", + "EVENT_TYPE_ACTIVITY_TASK_FAILED": "activityTaskFailedEventAttributes", + "EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT": "activityTaskTimedOutEventAttributes", + "EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED": "activityTaskCancelRequestedEventAttributes", + } + return _map.get(event_type, "") diff --git a/docs/docs.json b/docs/docs.json index 6eee89bcb..3d6f3fb1d 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -149,7 +149,8 @@ "postgresql", "prefect", "rabbitmq", - "rds" + "rds", + "temporal" ] } ] diff --git a/docs/integrations/temporal.mdx b/docs/integrations/temporal.mdx new file mode 100644 index 000000000..cafe15e6f --- /dev/null +++ b/docs/integrations/temporal.mdx @@ -0,0 +1,108 @@ +# Temporal + +OpenSRE integrates with [Temporal](https://temporal.io) to investigate failed workflows, +diagnose activity timeouts, identify missing workers, and surface root causes during +production incidents. + +## Overview + +Temporal is a durable workflow platform used for long-running, fault-tolerant business +processes. When workflows fail — due to activity timeouts, worker crashes, or retried +exhaustion — the evidence is spread across workflow history events, task queue state, +and namespace metrics. + +OpenSRE's Temporal integration pulls all of this into a single RCA investigation. + +## Setup + +### Environment variables + +| Variable | Default | Description | +|---|---|---| +| `TEMPORAL_HOST` | `localhost` | Temporal server hostname | +| `TEMPORAL_PORT` | `7233` | gRPC/HTTP-API port | +| `TEMPORAL_NAMESPACE` | `default` | Namespace to query | +| `TEMPORAL_API_KEY` | *(none)* | API key (required for Temporal Cloud) | +| `TEMPORAL_TLS` | `false` | Set to `true` for Temporal Cloud or TLS-enabled self-hosted | + +### Self-hosted Temporal + +```bash +export TEMPORAL_HOST=temporal.internal.example.com +export TEMPORAL_PORT=7233 +export TEMPORAL_NAMESPACE=production +``` + +### Temporal Cloud + +```bash +export TEMPORAL_HOST=.tmprl.cloud +export TEMPORAL_PORT=7233 +export TEMPORAL_NAMESPACE=. +export TEMPORAL_API_KEY= +export TEMPORAL_TLS=true +``` + +## Tools + +OpenSRE exposes four tools for Temporal investigation: + +### `temporal_list_workflows` + +Lists recent workflow executions with their status and failure reason. + +**Useful for:** Finding all failed or stuck workflows in a namespace at a glance. + +``` +ExecutionStatus='Failed' +WorkflowType='OrderWorkflow' AND ExecutionStatus='Running' +StartTime > '2024-01-01T00:00:00Z' +``` + +### `temporal_workflow_history` + +Fetches the full event history for a specific workflow run, including every activity +attempt, retry, timeout, and failure cause with stack traces. + +**Useful for:** Diagnosing *why* a specific workflow failed — seeing exactly which +activity timed out, how many retries were attempted, and the final failure message. + +### `temporal_task_queue` + +Fetches poller info and worker health for a task queue. + +**Useful for:** Investigating workflow stuck in `Running` state — if no workers are +polling a task queue, workflows wait indefinitely. + +### `temporal_namespace_metrics` + +Fetches namespace-level summary: registration state, retention period, and cluster info. + +**Useful for:** Getting a high-level health overview of a Temporal namespace at the +start of an investigation. + +## Example investigation + +``` +opensre investigate "Temporal order workflows are stuck and not completing" +``` + +OpenSRE will: + +1. Call `temporal_namespace_metrics` to confirm the namespace is healthy +2. Call `temporal_list_workflows` with `ExecutionStatus='Running'` to find stuck workflows +3. Call `temporal_task_queue` to check if workers are polling +4. Call `temporal_workflow_history` on a representative stuck workflow to find the root cause +5. Synthesise a root cause report: e.g. *"Workers stopped polling payment-task-queue at 14:32 UTC. 47 OrderWorkflow executions are stuck waiting for the ProcessPayment activity. Last worker heartbeat was 6 minutes ago — likely a worker pod OOMKill."* + +## Supported Temporal versions + +This integration uses the [Temporal HTTP API](https://docs.temporal.io/api-reference/overview) +which is available in Temporal Server v1.20+ and all Temporal Cloud namespaces. + +## Roadmap + +- Batch workflow termination tool +- Search attribute filtering +- Schedule (cron workflow) inspection +- Worker deployment version tracking \ No newline at end of file diff --git a/tests/integrations/test_temporal.py b/tests/integrations/test_temporal.py new file mode 100644 index 000000000..f8a3952a2 --- /dev/null +++ b/tests/integrations/test_temporal.py @@ -0,0 +1,410 @@ +"""Unit tests for the Temporal integration. + +All tests use mocks — no live Temporal server required. +Run with: pytest tests/integrations/test_temporal.py -v +""" + +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from app.integrations.temporal import TemporalConfig, load_temporal_config_from_env +from app.services.temporal.client import TemporalClient, TemporalClientError +from app.tools.TemporalTool.tool import ( + TemporalListWorkflowsTool, + TemporalNamespaceMetricsTool, + TemporalTaskQueueTool, + TemporalWorkflowHistoryTool, + get_temporal_tools, +) + +# Fixtures + +@pytest.fixture() +def config() -> TemporalConfig: + return TemporalConfig(host="localhost", port=7233, namespace="default") + + +@pytest.fixture() +def client(config: TemporalConfig) -> TemporalClient: + return TemporalClient(config) + + +# TemporalConfig + +class TestTemporalConfig: + def test_default_values(self) -> None: + cfg = TemporalConfig() + assert cfg.host == "localhost" + assert cfg.port == 7233 + assert cfg.namespace == "default" + assert cfg.api_key is None + assert cfg.tls is False + + def test_base_url_http(self) -> None: + cfg = TemporalConfig(host="temporal.example.com", port=7233, tls=False) + assert cfg.base_url == "http://temporal.example.com:7233" + + def test_base_url_https(self) -> None: + cfg = TemporalConfig(host="temporal.example.com", port=7233, tls=True) + assert cfg.base_url == "https://temporal.example.com:7233" + + def test_connection_verified(self, config: TemporalConfig) -> None: + assert config.connection_verified is True + + def test_connection_verified_empty_host(self) -> None: + cfg = TemporalConfig(host="", port=7233) + assert cfg.connection_verified is False + + def test_api_key_optional(self) -> None: + cfg = TemporalConfig(api_key="my-secret-key") + assert cfg.api_key == "my-secret-key" + + def test_load_from_env(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("TEMPORAL_HOST", "temporal-prod.example.com") + monkeypatch.setenv("TEMPORAL_PORT", "7234") + monkeypatch.setenv("TEMPORAL_NAMESPACE", "production") + monkeypatch.setenv("TEMPORAL_API_KEY", "abc123") + monkeypatch.setenv("TEMPORAL_TLS", "true") + + cfg = load_temporal_config_from_env() + assert cfg.host == "temporal-prod.example.com" + assert cfg.port == 7234 + assert cfg.namespace == "production" + assert cfg.api_key == "abc123" + assert cfg.tls is True + + +# TemporalClient + +class TestTemporalClient: + def test_auth_header_with_api_key(self) -> None: + cfg = TemporalConfig(api_key="tok-123") + c = TemporalClient(cfg) + assert c._headers["Authorization"] == "Bearer tok-123" + + def test_no_auth_header_without_api_key(self, config: TemporalConfig) -> None: + c = TemporalClient(config) + assert "Authorization" not in c._headers + + @patch("app.services.temporal.client.httpx.Client") + def test_list_workflows_success( + self, mock_httpx: MagicMock, client: TemporalClient + ) -> None: + mock_response = MagicMock() + mock_response.json.return_value = { + "executions": [ + { + "execution": {"workflowId": "wf-1", "runId": "run-1"}, + "type": {"name": "OrderWorkflow"}, + "status": "FAILED", + "startTime": "2024-01-15T10:00:00Z", + } + ] + } + mock_response.raise_for_status = MagicMock() + mock_httpx.return_value.__enter__.return_value.post.return_value = mock_response + + executions = client.list_workflows(query="ExecutionStatus='Failed'") + assert len(executions) == 1 + assert executions[0]["execution"]["workflowId"] == "wf-1" + assert executions[0]["status"] == "FAILED" + + @patch("app.services.temporal.client.httpx.Client") + def test_list_workflows_empty( + self, mock_httpx: MagicMock, client: TemporalClient + ) -> None: + mock_response = MagicMock() + mock_response.json.return_value = {"executions": []} + mock_response.raise_for_status = MagicMock() + mock_httpx.return_value.__enter__.return_value.post.return_value = mock_response + + executions = client.list_workflows() + assert executions == [] + + @patch("app.services.temporal.client.httpx.Client") + def test_get_workflow_history_success( + self, mock_httpx: MagicMock, client: TemporalClient + ) -> None: + mock_response = MagicMock() + mock_response.json.return_value = { + "history": { + "events": [ + { + "eventId": "1", + "eventTime": "2024-01-15T10:00:00Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + }, + { + "eventId": "2", + "eventTime": "2024-01-15T10:01:00Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_FAILED", + "workflowExecutionFailedEventAttributes": { + "failure": { + "message": "activity timeout", + "stackTrace": "at OrderActivity.java:42", + } + }, + }, + ] + } + } + mock_response.raise_for_status = MagicMock() + mock_httpx.return_value.__enter__.return_value.get.return_value = mock_response + + events = client.get_workflow_history("wf-1", "run-1") + assert len(events) == 2 + assert events[1]["eventType"] == "EVENT_TYPE_WORKFLOW_EXECUTION_FAILED" + + @patch("app.services.temporal.client.httpx.Client") + def test_list_task_queues_success( + self, mock_httpx: MagicMock, client: TemporalClient + ) -> None: + mock_response = MagicMock() + mock_response.json.return_value = { + "pollers": [ + { + "identity": "worker-1@host", + "lastAccessTime": "2024-01-15T10:00:00Z", + "ratePerSecond": 100.0, + } + ], + "taskQueueStatus": {"backlogCountHint": 0, "readLevel": 42}, + } + mock_response.raise_for_status = MagicMock() + mock_httpx.return_value.__enter__.return_value.get.return_value = mock_response + + data = client.list_task_queues("payment-task-queue") + assert len(data["pollers"]) == 1 + assert data["pollers"][0]["identity"] == "worker-1@host" + + @patch("app.services.temporal.client.httpx.Client") + def test_get_namespace_metrics_success( + self, mock_httpx: MagicMock, client: TemporalClient + ) -> None: + mock_response = MagicMock() + mock_response.json.return_value = { + "namespaceInfo": { + "name": "default", + "state": "Registered", + "description": "Default namespace", + }, + "config": {"workflowExecutionRetentionTtl": "72h"}, + "replicationConfig": {"activeClusterName": "us-east"}, + } + mock_response.raise_for_status = MagicMock() + mock_httpx.return_value.__enter__.return_value.get.return_value = mock_response + + data = client.get_namespace_metrics() + assert data["namespaceInfo"]["name"] == "default" + + @patch("app.services.temporal.client.httpx.Client") + def test_client_error_on_http_failure( + self, mock_httpx: MagicMock, client: TemporalClient + ) -> None: + import httpx + + mock_response = MagicMock() + mock_response.status_code = 503 + mock_response.text = "Service Unavailable" + mock_httpx.return_value.__enter__.return_value.post.side_effect = ( + httpx.HTTPStatusError("503", request=MagicMock(), response=mock_response) + ) + + with pytest.raises(TemporalClientError, match="503"): + client.list_workflows() + + +# Tools + +class TestTemporalListWorkflowsTool: + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_returns_executions(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.list_workflows.return_value = [ + { + "execution": {"workflowId": "wf-1", "runId": "run-1"}, + "type": {"name": "OrderWorkflow"}, + "status": "FAILED", + "startTime": "2024-01-15T10:00:00Z", + } + ] + mock_client_cls.return_value = mock_client + + tool = TemporalListWorkflowsTool() + result = tool._run(query="ExecutionStatus='Failed'", page_size=10) + parsed = json.loads(result) + assert len(parsed) == 1 + assert parsed[0]["workflow_id"] == "wf-1" + assert parsed[0]["status"] == "FAILED" + + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_empty_returns_message(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.list_workflows.return_value = [] + mock_client_cls.return_value = mock_client + + tool = TemporalListWorkflowsTool() + result = tool._run() + assert "No workflow" in result + + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_error_handled_gracefully(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.list_workflows.side_effect = TemporalClientError("connection refused") + mock_client_cls.return_value = mock_client + + tool = TemporalListWorkflowsTool() + result = tool._run() + assert "Error" in result + assert "connection refused" in result + + +class TestTemporalWorkflowHistoryTool: + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_returns_events_with_failure(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.get_workflow_history.return_value = [ + { + "eventId": "1", + "eventTime": "2024-01-15T10:00:00Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_FAILED", + "workflowExecutionFailedEventAttributes": { + "failure": { + "message": "activity timed out", + "cause": {"message": "deadline exceeded"}, + "stackTrace": "at Workflow.java:99", + } + }, + } + ] + mock_client_cls.return_value = mock_client + + tool = TemporalWorkflowHistoryTool() + result = tool._run(workflow_id="wf-1", run_id="run-1") + parsed = json.loads(result) + assert parsed[0]["failure"]["message"] == "activity timed out" + assert parsed[0]["failure"]["cause"] == "deadline exceeded" + + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_no_history_returns_message(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.get_workflow_history.return_value = [] + mock_client_cls.return_value = mock_client + + tool = TemporalWorkflowHistoryTool() + result = tool._run(workflow_id="wf-1", run_id="run-1") + assert "No history" in result + + +class TestTemporalTaskQueueTool: + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_returns_pollers(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.list_task_queues.return_value = { + "pollers": [ + { + "identity": "worker-1@host", + "lastAccessTime": "2024-01-15T10:00:00Z", + "ratePerSecond": 100.0, + } + ], + "taskQueueStatus": {"backlogCountHint": 5, "readLevel": 10}, + } + mock_client_cls.return_value = mock_client + + tool = TemporalTaskQueueTool() + result = tool._run(task_queue="payment-task-queue") + parsed = json.loads(result) + assert parsed["poller_count"] == 1 + assert parsed["pollers"][0]["identity"] == "worker-1@host" + assert parsed["backlog_count"] == 5 + + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_warns_when_no_pollers(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.list_task_queues.return_value = { + "pollers": [], + "taskQueueStatus": {}, + } + mock_client_cls.return_value = mock_client + + tool = TemporalTaskQueueTool() + result = tool._run(task_queue="orphan-queue") + parsed = json.loads(result) + assert "warning" in parsed + assert "No workers" in parsed["warning"] + + +class TestTemporalNamespaceMetricsTool: + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_returns_namespace_info(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.get_namespace_metrics.return_value = { + "namespaceInfo": { + "name": "production", + "state": "Registered", + "description": "Production namespace", + }, + "config": {"workflowExecutionRetentionTtl": "72h"}, + "replicationConfig": { + "activeClusterName": "us-east", + "clusters": [{"clusterName": "us-east"}], + }, + } + mock_client.get_workflow_count.return_value = {"count": 42} + mock_client_cls.return_value = mock_client + + tool = TemporalNamespaceMetricsTool() + result = tool._run() + parsed = json.loads(result) + assert parsed["namespace"] == "production" + assert parsed["active_cluster"] == "us-east" + assert parsed["retention_days"] == "72h" + assert parsed["open_workflow_count"] == 42 + + @patch("app.tools.TemporalTool.tool.TemporalClient") + def test_namespace_info_returned_even_if_count_fails(self, mock_client_cls: MagicMock) -> None: + mock_client = MagicMock() + mock_client.get_namespace_metrics.return_value = { + "namespaceInfo": { + "name": "production", + "state": "Registered", + "description": "Production namespace", + }, + "config": {"workflowExecutionRetentionTtl": "72h"}, + "replicationConfig": { + "activeClusterName": "us-east", + "clusters": [{"clusterName": "us-east"}], + }, + } + mock_client.get_workflow_count.side_effect = TemporalClientError("endpoint not found") + mock_client_cls.return_value = mock_client + + tool = TemporalNamespaceMetricsTool() + result = tool._run() + parsed = json.loads(result) + assert parsed["namespace"] == "production" + assert parsed["active_cluster"] == "us-east" + assert parsed["retention_days"] == "72h" + assert parsed["open_workflow_count"] is None + +class TestGetTemporalTools: + def test_returns_four_tools(self) -> None: + tools = get_temporal_tools() + assert len(tools) == 4 + names = {t.name for t in tools} + assert names == { + "temporal_list_workflows", + "temporal_workflow_history", + "temporal_task_queue", + "temporal_namespace_metrics", + } + + def test_accepts_custom_config(self) -> None: + cfg = TemporalConfig(host="prod-temporal", namespace="prod") + tools = get_temporal_tools(config=cfg) + assert all(t.config == cfg for t in tools)