Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions app/integrations/config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,3 +889,13 @@ class PrefectIntegrationConfig(StrictConfigModel):
_normalize_strs = field_validator("api_key", "account_id", "workspace_id", mode="before")(
normalize_str()
)

class TemporalIntegrationConfig(StrictConfigModel):
Comment thread
greptile-apps[bot] marked this conversation as resolved.
host: str = "localhost"
port: int = 7233
namespace: str = "default"
api_key: str | None = None
tls: bool = False
_normalize_strs = field_validator("host", "namespace", "api_key", mode="before")(
Comment thread
greptile-apps[bot] marked this conversation as resolved.
normalize_str()
)
1 change: 1 addition & 0 deletions app/integrations/effective_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class EffectiveIntegrations(StrictConfigModel):
incident_io: EffectiveIntegrationEntry | None = None
notion: EffectiveIntegrationEntry | None = None
prefect: EffectiveIntegrationEntry | None = None
temporal: EffectiveIntegrationEntry | None = None
posthog: EffectiveIntegrationEntry | None = None
kafka: EffectiveIntegrationEntry | None = None
clickhouse: EffectiveIntegrationEntry | None = None
Expand Down
1 change: 1 addition & 0 deletions app/integrations/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ class IntegrationSpec:
IntegrationSpec(service="alicloud", direct_effective=True),
IntegrationSpec(service="notion"),
IntegrationSpec(service="prefect"),
IntegrationSpec(service="temporal"),
IntegrationSpec(service="posthog"),
IntegrationSpec(service="trello"),
IntegrationSpec(service="rds", setup_order=11),
Expand Down
87 changes: 87 additions & 0 deletions app/integrations/temporal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Temporal workflow platform integration config and verification.

This integration uses Temporal's HTTP API gateway (available on the same
port as gRPC, default 7233). For self-hosted Temporal, no extra configuration
is needed. For Temporal Cloud, set TEMPORAL_TLS=true and TEMPORAL_API_KEY.
"""
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from app.integrations.config_models import TemporalIntegrationConfig
from pydantic import BaseModel, Field

"""Temporal workflow platform integration config and verification.

This integration uses Temporal's HTTP API gateway (available on the same
port as gRPC, default 7233). For self-hosted Temporal, no extra configuration
is needed. For Temporal Cloud, set TEMPORAL_TLS=true and TEMPORAL_API_KEY.
"""

class TemporalConfig(BaseModel):
"""Configuration for connecting to a Temporal server."""

host: str = Field(
default="localhost",
description="Temporal server hostname or IP address.",
)
port: int = Field(
default=7233,
description="Temporal server HTTP API port (default 7233). "
"For self-hosted Temporal this is the same port as gRPC. "
"Temporal Cloud uses port 7233 for both gRPC and the HTTP API gateway.",
)
namespace: str = Field(
default="default",
description="Temporal namespace to query.",
)
api_key: str | None = Field(
default=None,
description="API key for Temporal Cloud authentication (optional for self-hosted).",
)
tls: bool = Field(
default=False,
description="Whether to use TLS for the connection (required for Temporal Cloud).",
)

@property
def base_url(self) -> str:
scheme = "https" if self.tls else "http"
return f"{scheme}://{self.host}:{self.port}"

@property
def connection_verified(self) -> bool:
"""Lightweight check — host and port are set."""
return bool(self.host and self.port)


def load_temporal_config_from_env() -> TemporalConfig:
"""Load Temporal config from environment variables.

Expected env vars:
TEMPORAL_HOST — server host (default: localhost)
TEMPORAL_PORT — HTTP API port (default: 7233)
TEMPORAL_NAMESPACE — namespace (default: default)
TEMPORAL_API_KEY — API key for Temporal Cloud (optional)
TEMPORAL_TLS — "true" to enable TLS (default: false)
"""
import os

return TemporalConfig(
host=os.environ.get("TEMPORAL_HOST", "localhost"),
port=int(os.environ.get("TEMPORAL_PORT", "7233")),
namespace=os.environ.get("TEMPORAL_NAMESPACE", "default"),
api_key=os.environ.get("TEMPORAL_API_KEY"),
tls=os.environ.get("TEMPORAL_TLS", "false").lower() == "true",
)

def load_temporal_config_from_integration(integration_config: TemporalIntegrationConfig) -> TemporalConfig:
"""Wire TemporalIntegrationConfig (from registry) into TemporalConfig (used by client)."""
return TemporalConfig(
host=integration_config.host,
port=integration_config.port,
namespace=integration_config.namespace,
api_key=integration_config.api_key or None,
tls=integration_config.tls,
)
1 change: 1 addition & 0 deletions app/services/temporal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Package marker
118 changes: 118 additions & 0 deletions app/services/temporal/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Temporal HTTP API client.

Uses Temporal's HTTP API (port 7233 REST gateway, or Temporal Cloud's API).
Falls back to the Temporal HTTP-API spec:
POST /api/v1/namespaces/{namespace}/workflows → list workflows
GET /api/v1/namespaces/{namespace}/workflows/{id}/runs/{runId}/history
GET /api/v1/namespaces/{namespace}/task-queues/{queue}
GET /api/v1/namespaces/{namespace} → namespace metrics
"""

from __future__ import annotations

import logging
from typing import Any
from urllib.parse import quote

import httpx

from app.integrations.temporal import TemporalConfig

logger = logging.getLogger(__name__)

DEFAULT_TIMEOUT = 15.0
MAX_PAGE_SIZE = 50


class TemporalClientError(Exception):
"""Raised when a Temporal API call fails."""


class TemporalClient:
"""Thin HTTP client for the Temporal REST/HTTP-API gateway."""

def __init__(self, config: TemporalConfig) -> None:
self.config = config
self._headers: dict[str, str] = {"Content-Type": "application/json"}
if config.api_key:
self._headers["Authorization"] = f"Bearer {config.api_key}"

def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
url = f"{self.config.base_url}{path}"
try:
with httpx.Client(timeout=DEFAULT_TIMEOUT) as client:
response = client.get(url, headers=self._headers, params=params or {})
Comment thread
greptile-apps[bot] marked this conversation as resolved.
response.raise_for_status()
return response.json() # type: ignore[no-any-return]
except httpx.HTTPStatusError as exc:
raise TemporalClientError(
f"Temporal API {exc.response.status_code} for {url}: {exc.response.text}"
) from exc
except httpx.RequestError as exc:
raise TemporalClientError(
f"Failed to connect to Temporal at {url}: {exc}"
) from exc

def _post(self, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]:
url = f"{self.config.base_url}{path}"
try:
with httpx.Client(timeout=DEFAULT_TIMEOUT) as client:
response = client.post(url, headers=self._headers, json=body or {})
Comment thread
greptile-apps[bot] marked this conversation as resolved.
response.raise_for_status()
return response.json() # type: ignore[no-any-return]
except httpx.HTTPStatusError as exc:
raise TemporalClientError(
f"Temporal API {exc.response.status_code} for {url}: {exc.response.text}"
) from exc
except httpx.RequestError as exc:
raise TemporalClientError(
f"Failed to connect to Temporal at {url}: {exc}"
) from exc

def list_workflows(
self,
query: str = "",
page_size: int = MAX_PAGE_SIZE,
) -> list[dict[str, Any]]:
"""List recent workflow executions.

Args:
query: Temporal visibility query e.g. ``ExecutionStatus='Failed'``.
page_size: Max results (capped at 50).
"""
ns = quote(self.config.namespace, safe="")
body: dict[str, Any] = {"pageSize": min(page_size, MAX_PAGE_SIZE)}
if query:
body["query"] = query
data = self._post(f"/api/v1/namespaces/{ns}/workflows", body)
return list(data.get("executions", []))

def get_workflow_history(
self,
workflow_id: str,
run_id: str,
max_event_count: int = 100,
) -> list[dict[str, Any]]:
"""Fetch event history for a specific workflow run."""
ns = quote(self.config.namespace, safe="")
wid = quote(workflow_id, safe="")
rid = quote(run_id, safe="")
path = f"/api/v1/namespaces/{ns}/workflows/{wid}/runs/{rid}/history"
data = self._get(path, params={"maximumPageSize": max_event_count})
return list(data.get("history", {}).get("events", []))

def list_task_queues(self, task_queue: str) -> dict[str, Any]:
"""Fetch pollers and status for a task queue."""
ns = quote(self.config.namespace, safe="")
tq = quote(task_queue, safe="")
return self._get(f"/api/v1/namespaces/{ns}/task-queues/{tq}")

def get_namespace_metrics(self) -> dict[str, Any]:
"""Fetch namespace-level summary (open workflows, error counts)."""
ns = quote(self.config.namespace, safe="")
return self._get(f"/api/v1/namespaces/{ns}")

def get_workflow_count(self) -> dict[str, Any]:
"""Fetch open workflow count for the namespace."""
ns = quote(self.config.namespace, safe="")
return self._get(f"/api/v1/namespaces/{ns}/workflows/count")
17 changes: 17 additions & 0 deletions app/tools/TemporalTool/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from __future__ import annotations

from app.tools.TemporalTool.tool import (
TemporalListWorkflowsTool,
TemporalNamespaceMetricsTool,
TemporalTaskQueueTool,
TemporalWorkflowHistoryTool,
get_temporal_tools,
)

__all__ = [
"TemporalListWorkflowsTool",
"TemporalWorkflowHistoryTool",
"TemporalTaskQueueTool",
"TemporalNamespaceMetricsTool",
"get_temporal_tools",
]
Loading