feat(integrations): add Temporal workflow platform integration#2664
feat(integrations): add Temporal workflow platform integration#2664Adityuh1 wants to merge 2 commits into
Conversation
- Integration config under app/integrations/temporal.py - HTTP service client under app/services/temporal/client.py - Four tools under app/tools/TemporalTool/ with typed input schemas - 25 unit/mock tests under tests/integrations/test_temporal.py - Docs page at docs/integrations/temporal.mdx registered in docs/docs.json - Registered in config_models, effective_models, and registry
Greptile code reviewThis repo uses Greptile for automated review. Before merge, aim for Confidence Score: 5/5 with zero unresolved review threads — see CONTRIBUTING.md. Run a review — add a PR comment with: Give it ~5-10 minutes (sometimes longer) for results, then fix feedback and re-trigger until you reach Confidence Score: 5/5. Optional: automate with the greploop skill. |
|
@greptile review |
Greptile SummaryThis PR adds a Temporal workflow platform integration to OpenSRE, enabling investigation of failed workflows, activity timeouts, worker health, and namespace metrics through four new LangChain tools backed by a thin
Confidence Score: 3/5Not safe to merge without fixes — multiple real defects in the new client and tool layer that would cause silent failures or incorrect behaviour in production. The client constructs URL paths by interpolating user-supplied workflow IDs and run IDs without encoding, so any slash in a workflow ID silently 404s. SSL certificate verification is disabled whenever TLS is set to false, which is the default, leaving any HTTPS-only deployment open to cert bypass. The
Important Files Changed
Sequence DiagramsequenceDiagram
participant Agent
participant Tool as TemporalTool (_run)
participant _make_client
participant EnvVars as Environment Variables
participant Client as TemporalClient
participant Temporal as Temporal HTTP API
Agent->>Tool: invoke(query, page_size, ...)
Tool->>_make_client: _make_client(self.config)
_make_client->>EnvVars: load_temporal_config_from_env() [if no config]
EnvVars-->>_make_client: TemporalConfig
_make_client-->>Tool: TemporalClient
Tool->>Client: list_workflows / get_workflow_history / ...
Client->>Temporal: "POST/GET /api/v1/namespaces/{ns}/..."
Temporal-->>Client: JSON response
Client-->>Tool: list[dict] / dict
Tool-->>Agent: JSON string
|
| ns = self.config.namespace | ||
| path = ( | ||
| f"/api/v1/namespaces/{ns}/workflows/{workflow_id}" | ||
| f"/runs/{run_id}/history" | ||
| ) | ||
| data = self._get(path, params={"maximumPageSize": max_event_count}) |
There was a problem hiding this comment.
URL path injection via unencoded
workflow_id and run_id
workflow_id and run_id are interpolated directly into the URL path without URL-encoding. Temporal workflow IDs are user-defined strings and can legally contain / (e.g., order-service/txn-42). A slash would split the path into extra segments, causing the server to return 404 with no obvious error message. The same applies to task_queue in list_task_queues and namespace in every path. Use urllib.parse.quote(workflow_id, safe="") (and similarly for run_id) before inserting into the f-string.
| def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: | ||
| url = f"{self.config.base_url}{path}" | ||
| try: | ||
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | ||
| response = client.get(url, headers=self._headers, params=params or {}) |
There was a problem hiding this comment.
verify=self.config.tls disables SSL verification for non-TLS connections
httpx.Client(verify=False) disables certificate verification for HTTPS connections. Here it is False whenever tls=False. For plain-HTTP deployments this has no effect, but any self-hosted Temporal behind a TLS reverse-proxy (where the operator forgot to set tls=True) would silently skip cert verification and accept invalid/expired certificates. The two concepts — does this endpoint speak TLS? (which controls the URL scheme) and should we verify the certificate? — are semantically independent and should not be coupled.
| def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: | |
| url = f"{self.config.base_url}{path}" | |
| try: | |
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | |
| response = client.get(url, headers=self._headers, params=params or {}) | |
| def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: | |
| url = f"{self.config.base_url}{path}" | |
| try: | |
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=True) as client: | |
| response = client.get(url, headers=self._headers, params=params or {}) |
| def _post(self, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]: | ||
| url = f"{self.config.base_url}{path}" | ||
| try: | ||
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | ||
| response = client.post(url, headers=self._headers, json=body or {}) |
There was a problem hiding this comment.
Same
verify=self.config.tls coupling in _post. Should be verify=True so that any HTTPS connection (regardless of how tls is set) always validates the server certificate.
| def _post(self, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]: | |
| url = f"{self.config.base_url}{path}" | |
| try: | |
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | |
| response = client.post(url, headers=self._headers, json=body or {}) | |
| def _post(self, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]: | |
| url = f"{self.config.base_url}{path}" | |
| try: | |
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=True) as client: | |
| response = client.post(url, headers=self._headers, json=body or {}) |
| ) | ||
|
|
||
| class TemporalIntegrationConfig(StrictConfigModel): | ||
| host: str = "localhost" | ||
| port: int = 7233 | ||
| namespace: str = "default" | ||
| api_key: str = "" | ||
| tls: bool = False | ||
| _normalize_strs = field_validator("host", "namespace", "api_key", mode="before")( |
There was a problem hiding this comment.
TemporalIntegrationConfig.api_key type differs from TemporalConfig.api_key
TemporalIntegrationConfig (here) declares api_key: str = "", but TemporalConfig in app/integrations/temporal.py declares api_key: str | None = None. The two models represent the same concept — the API key for Temporal Cloud — but with different defaults and types. The tools and client are wired to TemporalConfig, so configuration persisted through the integration framework (which uses TemporalIntegrationConfig) is never consumed by the client. Any operator who configures Temporal via the integration framework expecting the key to be picked up will get silent auth failures.
Greptile SummaryThis PR adds a Temporal workflow platform integration, including a config model, an HTTP service client, four LangChain tools (list workflows, workflow history, task queue, namespace metrics), 25 unit tests, and docs. It follows the same layered structure as the existing Prefect integration.
Confidence Score: 3/5Not safe to merge as-is — workflow IDs with slashes will silently hit wrong API endpoints, and stored integration config is never applied. The HTTP client builds URL paths by string-interpolating user-supplied IDs without encoding them. Temporal workflow IDs that contain app/services/temporal/client.py (URL encoding on all path segments) and app/tools/TemporalTool/tool.py (wiring registry config into the tool factory) Important Files Changed
|
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | ||
| response = client.get(url, headers=self._headers, params=params or {}) |
There was a problem hiding this comment.
verify=self.config.tls passes the TLS enable flag as httpx's certificate-verification flag. When tls=False (plain HTTP), this silently sets verify=False — a no-op on HTTP but bad practice because it would also suppress SSL warnings on any accidental HTTPS call. The verify parameter controls certificate validation; the URL scheme (http/https) already controls whether TLS is used.
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | |
| response = client.get(url, headers=self._headers, params=params or {}) | |
| with httpx.Client(timeout=DEFAULT_TIMEOUT) as client: | |
| response = client.get(url, headers=self._headers, params=params or {}) |
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | ||
| response = client.post(url, headers=self._headers, json=body or {}) |
There was a problem hiding this comment.
Same
verify=self.config.tls issue on the POST path — should not conflate the TLS enable flag with certificate verification.
| with httpx.Client(timeout=DEFAULT_TIMEOUT, verify=self.config.tls) as client: | |
| response = client.post(url, headers=self._headers, json=body or {}) | |
| with httpx.Client(timeout=DEFAULT_TIMEOUT) as client: | |
| response = client.post(url, headers=self._headers, json=body or {}) |
| _normalize_strs = field_validator("api_key", "account_id", "workspace_id", mode="before")( | ||
| normalize_str() | ||
| ) | ||
|
|
||
| class TemporalIntegrationConfig(StrictConfigModel): |
There was a problem hiding this comment.
A blank line is missing before the class definition. All other
StrictConfigModel subclasses in this file follow a two-blank-line separation per PEP 8.
| _normalize_strs = field_validator("api_key", "account_id", "workspace_id", mode="before")( | |
| normalize_str() | |
| ) | |
| class TemporalIntegrationConfig(StrictConfigModel): | |
| _normalize_strs = field_validator("api_key", "account_id", "workspace_id", mode="before")( | |
| normalize_str() | |
| ) | |
| class TemporalIntegrationConfig(StrictConfigModel): |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
Thanks for your effort In "app/integrations/temporal.py" and "app/integrations/config_models.py", there are currently two separate Temporal config models ("TemporalConfig" and "TemporalIntegrationConfig") and they don't seem to be wired together, so integration-level config may be ignored at runtime. In "app/services/temporal/client.py", "workflow_id", "run_id", "task_queue", and similar path parameters should probably be URL-encoded before being added to API paths. In "app/services/temporal/client.py", "verify=self.config.tls" mixes TLS enablement with certificate verification. Those are usually separate concerns. In "app/integrations/temporal.py", the config mentions the default gRPC port (7233), but the implementation uses an HTTP client, so the expected deployment path is a bit unclear. In "app/tools/TemporalTool/tool.py", failure reasons appear to be read from "memo", but actual Temporal failure details are usually available through workflow history events. Could you take another look at these? |
|
got it , will be back with the fixes asap! |
- URL-encode workflow_id, run_id, task_queue, namespace in all API paths - Remove verify=self.config.tls, decouple TLS from cert verification - Align api_key type between TemporalConfig and TemporalIntegrationConfig - Wire TemporalIntegrationConfig to TemporalConfig via load_temporal_config_from_integration - Replace memo-based failure reason with hint to use temporal_workflow_history - Clarified HTTP API usage in docstring and port field description
|
Thanks for the detailed review @cerencamkiran ! 1] URL encoding — workflow_id, run_id, task_queue, and namespace are now URL-encoded using urllib.parse.quote before being inserted into API paths. Lint, typecheck and all 25 tests passing. Attached the demo for the same OpenSre-temporal.integration.fixes.1.mp4 |
Fixes #2572
Describe the changes you have made in this PR -
Adds full Temporal workflow platform integration enabling OpenSRE to investigate
failed workflows, activity timeouts, worker failures, and namespace health issues.
Four tools added:
Demo/Screenshot for feature changes and bug fixes -
openSre-Temporal-demo.mp4
Code Understanding and AI Usage
Did you use AI assistance (ChatGPT, Claude, Copilot, etc.) to write any part of this code?
If you used AI assistance:
Explain your implementation approach:
Describe in your own words:
What problem does your code solve?
Teams using Temporal for workflow orchestration had no way to investigate failures through OpenSRE. When a workflow times out or workers stop polling, engineers had to manually dig through the Temporal UI to find the root cause. This integration brings that data directly into OpenSRE's investigation pipeline so failures can be surfaced automatically during an RCA.
What alternative approaches did you consider?
Temporal has both a gRPC API and an HTTP API. I initially considered using the official Temporal Python SDK which uses gRPC, but that would add a heavy dependency. The HTTP API achieves the same result with just httpx which is already a common dependency, so I went with that instead.
Why did you choose this specific implementation?
I looked at how the Prefect integration was structured and followed the same pattern, a config model, a service client, and tool classes on top. This keeps things consistent with the rest of the codebase and makes it easier for maintainers to review. Keeping the client thin and putting error handling in the tools means each layer has a single responsibility.
What are the key functions/components and what do they do?There arre 4 main key components :
1] TemporalConfig :---: holds connection settings and loads them from environment variables
2] TemporalClient :---: thin HTTP wrapper with four methods matching the four investigation use cases
3] Four LangChain tools :---: each wraps one client method, validates inputs with Pydantic, and returns a clean JSON string the agent can reason over
4] get_temporal_tools() :---: factory function that wires everything together and returns all four tools ready to use
Checklist before requesting a review
Note: Please check Allow edits from maintainers if you would like us to assist in the PR.