From a2328ec12b96ee48782e7c582e30d45283a89849 Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Mon, 19 Jan 2026 11:32:08 +0530 Subject: [PATCH 1/7] feat(run-tasks-integration): add run tasks integration callback support - Add RunTasksIntegration resource with callback method - Add RunTaskRequest model for webhook payload parsing - Add TaskResultCallbackOptions, TaskResultOutcome, TaskResultStatus models - Add example Flask server for run tasks webhooks - Add 15 unit tests for run tasks integration - Update client to include run_tasks_integration property - Export RunTaskRequest model --- examples/run_tasks_integration.py | 208 ++++++++++++ src/pytfe/client.py | 2 + src/pytfe/models/__init__.py | 4 + src/pytfe/models/run_task_request.py | 118 +++++++ src/pytfe/resources/run_tasks_integration.py | 187 +++++++++++ tests/units/test_run_tasks_integration.py | 314 +++++++++++++++++++ 6 files changed, 833 insertions(+) create mode 100644 examples/run_tasks_integration.py create mode 100644 src/pytfe/models/run_task_request.py create mode 100644 src/pytfe/resources/run_tasks_integration.py create mode 100644 tests/units/test_run_tasks_integration.py diff --git a/examples/run_tasks_integration.py b/examples/run_tasks_integration.py new file mode 100644 index 0000000..25e08c7 --- /dev/null +++ b/examples/run_tasks_integration.py @@ -0,0 +1,208 @@ +""" +Terraform Cloud/Enterprise Run Tasks Integration Example + +This example demonstrates how to use the python-tfe SDK to build a run task server +that receives task requests from TFC/TFE and sends results back via the callback API. + +IMPORTANT: This example uses Flask as a simple HTTP server for demonstration purposes. +You can use any web framework (FastAPI, Django, etc.) or even the built-in http.server. +The key components are: +1. Receiving POST requests with run task payloads +2. Using TFEClient.run_tasks_integration.callback() to send results back + +Prerequisites: + - Install Flask (for this example only): pip install flask + - Expose your server publicly using ngrok, cloudflare tunnel, or similar + - Create a run task in TFC/TFE pointing to your public URL endpoint + - Attach the run task to a workspace + +Usage: + python examples/run_tasks_integration.py + +Then expose with ngrok: + ngrok http 5000 + +API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration +""" + +from __future__ import annotations + +import os + +try: + from flask import Flask, request, jsonify +except ImportError: + print("Error: Flask is required for this example") + print("Install it with: pip install flask") + exit(1) + +from pytfe import TFEClient, TFEConfig +from pytfe.models import RunTaskRequest, RunTaskRequestCapabilities +from pytfe.resources.run_tasks_integration import ( + RunTasksIntegration, + TaskResultCallbackOptions, + TaskResultOutcome, + TaskResultStatus, + TaskResultTag, +) + +app = Flask(__name__) + +# Initialize TFE client for callback functionality +# Note: The callback uses the access_token from the run task request, +# NOT your regular TFE API token +config = TFEConfig() +client = TFEClient(config) + + +@app.route('/run-task', methods=['POST']) +def handle_run_task(): + """Handle incoming run task request from TFC/TFE.""" + try: + # Parse the incoming request + run_task_request = RunTaskRequest(**request.json) + + print(f"Received run task request:") + print(f" Organization: {run_task_request.organization_name}") + print(f" Workspace: {run_task_request.workspace_name}") + print(f" Run ID: {run_task_request.run_id}") + print(f" Stage: {run_task_request.stage}") + print(f" Enforcement Level: {run_task_request.task_result_enforcement_level}") + + # Extract the callback information + callback_url = run_task_request.task_result_callback_url + access_token = run_task_request.access_token + + # YOUR CUSTOM LOGIC HERE + # This is where you would perform your actual run task checks + # For example: + # - Download and analyze the plan JSON + # - Check for policy violations + # - Validate resource configurations + # - Run security scans + # - Check cost estimates + + # Example: Simple check based on workspace name + if "prod" in run_task_request.workspace_name.lower(): + # Production workspace - run strict checks + result = perform_strict_checks(run_task_request) + else: + # Non-production - run basic checks + result = perform_basic_checks(run_task_request) + + # Send the callback to TFC/TFE + callback_options = TaskResultCallbackOptions( + status=result["status"], + message=result["message"], + url=result.get("url"), + outcomes=result.get("outcomes", []), + ) + + client.run_tasks_integration.callback( + callback_url=callback_url, + access_token=access_token, + options=callback_options, + ) + + print(f"Successfully sent callback with status: {result['status']}") + + # Return 200 OK to TFC/TFE + return jsonify({"status": "accepted"}), 200 + + except Exception as e: + print(f"Error processing run task: {e}") + + # Even if processing fails, try to send a failure callback + try: + if 'callback_url' in locals() and 'access_token' in locals(): + error_options = TaskResultCallbackOptions( + status=TaskResultStatus.FAILED, + message=f"Run task processing error: {str(e)}", + ) + client.run_tasks_integration.callback( + callback_url=callback_url, + access_token=access_token, + options=error_options, + ) + except Exception as callback_error: + print(f"Failed to send error callback: {callback_error}") + + return jsonify({"error": str(e)}), 500 + + +def perform_strict_checks(run_task_request: RunTaskRequest) -> dict: + """Perform strict checks for production workspaces. + + This is a placeholder for your actual check logic. + """ + # Example: Always pass for demo purposes + # In real implementation, you would: + # - Download the configuration or plan + # - Analyze it for compliance/security + # - Generate detailed outcomes + + outcomes = [ + TaskResultOutcome( + outcome_id="SECURITY-001", + description="Security check passed", + body="All security requirements met for production deployment.", + tags={ + "Category": [TaskResultTag(label="Security")], + "Severity": [TaskResultTag(label="Info", level="info")], + }, + ), + TaskResultOutcome( + outcome_id="COMPLIANCE-001", + description="Compliance check passed", + body="Configuration meets all compliance requirements.", + tags={ + "Category": [TaskResultTag(label="Compliance")], + "Severity": [TaskResultTag(label="Info", level="info")], + }, + ), + ] + + return { + "status": TaskResultStatus.PASSED, + "message": "All production checks passed", + "url": "https://your-dashboard.example.com/results/123", + "outcomes": outcomes, + } + + +def perform_basic_checks(run_task_request: RunTaskRequest) -> dict: + """Perform basic checks for non-production workspaces. + + This is a placeholder for your actual check logic. + """ + # Example: Simple validation + outcomes = [ + TaskResultOutcome( + outcome_id="BASIC-001", + description="Basic validation passed", + body="Configuration syntax is valid.", + tags={ + "Category": [TaskResultTag(label="Validation")], + }, + ), + ] + + return { + "status": TaskResultStatus.PASSED, + "message": "Basic checks completed successfully", + "outcomes": outcomes, + } + + +@app.route('/health', methods=['GET']) +def health_check(): + """Health check endpoint.""" + return jsonify({"status": "healthy"}), 200 + + +if __name__ == '__main__': + print("Starting Run Task server on http://localhost:5000") + print("Make sure to expose this with ngrok or similar for TFC/TFE to reach it") + print("Example: ngrok http 5000") + app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/src/pytfe/client.py b/src/pytfe/client.py index 3894886..ed1bd8f 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -25,6 +25,7 @@ from .resources.run import Runs from .resources.run_event import RunEvents from .resources.run_task import RunTasks +from .resources.run_tasks_integration import RunTasksIntegration from .resources.run_trigger import RunTriggers from .resources.ssh_keys import SSHKeys from .resources.state_version_outputs import StateVersionOutputs @@ -76,6 +77,7 @@ def __init__(self, config: TFEConfig | None = None): self.state_versions = StateVersions(self._transport) self.state_version_outputs = StateVersionOutputs(self._transport) self.run_tasks = RunTasks(self._transport) + self.run_tasks_integration = RunTasksIntegration(self._transport) self.run_triggers = RunTriggers(self._transport) self.runs = Runs(self._transport) self.query_runs = QueryRuns(self._transport) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index f3eb33b..a03fe8e 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -256,6 +256,10 @@ Stage, TaskEnforcementLevel, ) +from .run_task_request import ( + RunTaskRequest, + RunTaskRequestCapabilities, +) from .run_trigger import ( RunTrigger, RunTriggerCreateOptions, diff --git a/src/pytfe/models/run_task_request.py b/src/pytfe/models/run_task_request.py new file mode 100644 index 0000000..73e070b --- /dev/null +++ b/src/pytfe/models/run_task_request.py @@ -0,0 +1,118 @@ +"""Run Task Request models for python-tfe. + +This module contains the RunTaskRequest model which represents the payload +that TFC/TFE sends to external run task servers. +""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + + +class RunTaskRequestCapabilities(BaseModel): + """Capabilities that the caller supports.""" + + outcomes: bool = Field( + default=False, + description="Whether the run task server supports outcomes" + ) + + +class RunTaskRequest(BaseModel): + """Represents the payload that TFC/TFE sends to a run task's URL. + + This is the incoming request that your external run task server receives + from Terraform Cloud/Enterprise when a run task is triggered. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#common-properties + """ + + access_token: str = Field( + description="Token to use for authentication when sending callback" + ) + capabilities: RunTaskRequestCapabilities | None = Field( + default=None, + description="Capabilities that the caller supports" + ) + configuration_version_download_url: str | None = Field( + default=None, + description="URL to download the configuration version" + ) + configuration_version_id: str | None = Field( + default=None, + description="ID of the configuration version" + ) + is_speculative: bool = Field( + description="Whether this is a speculative run" + ) + organization_name: str = Field( + description="Name of the organization" + ) + payload_version: int = Field( + description="Version of the payload format" + ) + plan_json_api_url: str | None = Field( + default=None, + description="URL to access the plan JSON via API (post_plan, pre_apply, post_apply stages)" + ) + run_app_url: str = Field( + description="URL to view the run in TFC/TFE UI" + ) + run_created_at: datetime = Field( + description="Timestamp when the run was created" + ) + run_created_by: str = Field( + description="Username of the user who created the run" + ) + run_id: str = Field( + description="ID of the run" + ) + run_message: str = Field( + description="Message associated with the run" + ) + stage: str = Field( + description="Stage when the run task is executed (pre_plan, post_plan, pre_apply, post_apply)" + ) + task_result_callback_url: str = Field( + description="URL to send the task result callback to" + ) + task_result_enforcement_level: str = Field( + description="Enforcement level for the task result (advisory, mandatory)" + ) + task_result_id: str = Field( + description="ID of the task result" + ) + vcs_branch: str | None = Field( + default=None, + description="VCS branch name" + ) + vcs_commit_url: str | None = Field( + default=None, + description="URL to the VCS commit" + ) + vcs_pull_request_url: str | None = Field( + default=None, + description="URL to the VCS pull request" + ) + vcs_repo_url: str | None = Field( + default=None, + description="URL to the VCS repository" + ) + workspace_app_url: str = Field( + description="URL to view the workspace in TFC/TFE UI" + ) + workspace_id: str = Field( + description="ID of the workspace" + ) + workspace_name: str = Field( + description="Name of the workspace" + ) + workspace_working_directory: str | None = Field( + default=None, + description="Working directory for the workspace" + ) + + model_config = ConfigDict(populate_by_name=True) diff --git a/src/pytfe/resources/run_tasks_integration.py b/src/pytfe/resources/run_tasks_integration.py new file mode 100644 index 0000000..8fc60ab --- /dev/null +++ b/src/pytfe/resources/run_tasks_integration.py @@ -0,0 +1,187 @@ +"""Run Tasks Integration resource for python-tfe. + +This module provides the callback functionality for external run task servers +to send results back to Terraform Cloud/Enterprise. +""" + +from __future__ import annotations + +from typing import Any + +from ..errors import TFEError +from ._base import _Service + + +class TaskResultStatus: + """Task result status enum.""" + + PASSED = "passed" + FAILED = "failed" + RUNNING = "running" + + +class TaskResultTag: + """Tag to enrich outcomes display in TFC/TFE.""" + + def __init__(self, label: str, level: str | None = None): + self.label = label + self.level = level + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + result = {"label": self.label} + if self.level: + result["level"] = self.level + return result + + +class TaskResultOutcome: + """Detailed run task outcome for improved visibility in TFC/TFE UI. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#outcomes-payload-body + """ + + def __init__( + self, + outcome_id: str | None = None, + description: str | None = None, + body: str | None = None, + url: str | None = None, + tags: dict[str, list[TaskResultTag]] | None = None, + ): + self.outcome_id = outcome_id + self.description = description + self.body = body + self.url = url + self.tags = tags or {} + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + result: dict[str, Any] = {"type": "task-result-outcomes", "attributes": {}} + + if self.outcome_id: + result["attributes"]["outcome-id"] = self.outcome_id + if self.description: + result["attributes"]["description"] = self.description + if self.body: + result["attributes"]["body"] = self.body + if self.url: + result["attributes"]["url"] = self.url + if self.tags: + result["attributes"]["tags"] = { + key: [tag.to_dict() for tag in tags] + for key, tags in self.tags.items() + } + + return result + + +class TaskResultCallbackOptions: + """Options for sending task result callback to TFC/TFE. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#request-body-1 + """ + + def __init__( + self, + status: str, + message: str | None = None, + url: str | None = None, + outcomes: list[TaskResultOutcome] | None = None, + ): + """Initialize callback options. + + Args: + status: Task result status (passed, failed, running) + message: Optional message about the task result + url: Optional URL to view detailed results + outcomes: Optional list of detailed outcomes + """ + self.status = status + self.message = message + self.url = url + self.outcomes = outcomes or [] + + def validate(self) -> None: + """Validate the callback options.""" + valid_statuses = [TaskResultStatus.PASSED, TaskResultStatus.FAILED, TaskResultStatus.RUNNING] + if self.status not in valid_statuses: + raise TFEError( + f"Invalid task result status: {self.status}. " + f"Must be one of: {', '.join(valid_statuses)}" + ) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON:API format.""" + data: dict[str, Any] = { + "type": "task-results", + "attributes": { + "status": self.status, + }, + } + + if self.message: + data["attributes"]["message"] = self.message + if self.url: + data["attributes"]["url"] = self.url + + if self.outcomes: + data["relationships"] = { + "outcomes": { + "data": [outcome.to_dict() for outcome in self.outcomes] + } + } + + return {"data": data} + + +class RunTasksIntegration(_Service): + """Run Tasks Integration API for sending callbacks to TFC/TFE. + + This service is used by external run task servers to send task results + back to Terraform Cloud/Enterprise. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration + """ + + def callback( + self, + callback_url: str, + access_token: str, + options: TaskResultCallbackOptions, + ) -> None: + """Send task result callback to TFC/TFE. + + Args: + callback_url: The callback URL from the run task request + access_token: The access token from the run task request + options: Task result callback options + + Raises: + TFEError: If callback_url or access_token is invalid + TFEError: If options validation fails + """ + if not callback_url or not callback_url.strip(): + raise TFEError("callback_url cannot be empty") + + if not access_token or not access_token.strip(): + raise TFEError("access_token cannot be empty") + + options.validate() + + # Create custom headers with the access token from the request + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/vnd.api+json", + } + + # Send PATCH request to callback URL + self.t.request( + "PATCH", + callback_url, + json_body=options.to_dict(), + headers=headers, + ) diff --git a/tests/units/test_run_tasks_integration.py b/tests/units/test_run_tasks_integration.py new file mode 100644 index 0000000..8e48cf3 --- /dev/null +++ b/tests/units/test_run_tasks_integration.py @@ -0,0 +1,314 @@ +"""Unit tests for Run Tasks Integration.""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +import pytest + +from pytfe.client import TFEClient +from pytfe.errors import TFEError +from pytfe.models.run_task_request import RunTaskRequest, RunTaskRequestCapabilities +from pytfe.resources.run_tasks_integration import ( + RunTasksIntegration, + TaskResultCallbackOptions, + TaskResultOutcome, + TaskResultStatus, + TaskResultTag, +) + + +class TestRunTaskRequest: + """Tests for RunTaskRequest model.""" + + def test_run_task_request_minimal(self): + """Test parsing minimal run task request.""" + data = { + "access_token": "test-token-123", + "is_speculative": False, + "organization_name": "my-org", + "payload_version": 1, + "run_app_url": "https://app.terraform.io/app/my-org/my-workspace/runs/run-123", + "run_created_at": "2025-12-22T10:00:00Z", + "run_created_by": "user@example.com", + "run_id": "run-123", + "run_message": "Test run", + "stage": "post_plan", + "task_result_callback_url": "https://app.terraform.io/api/v2/task-results/tr-123/callback", + "task_result_enforcement_level": "mandatory", + "task_result_id": "tr-123", + "workspace_app_url": "https://app.terraform.io/app/my-org/my-workspace", + "workspace_id": "ws-123", + "workspace_name": "my-workspace", + } + + request = RunTaskRequest(**data) + + assert request.access_token == "test-token-123" + assert request.organization_name == "my-org" + assert request.run_id == "run-123" + assert request.stage == "post_plan" + assert request.task_result_callback_url == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + + def test_run_task_request_complete(self): + """Test parsing complete run task request with all fields.""" + data = { + "access_token": "test-token-456", + "capabilities": {"outcomes": True}, + "configuration_version_download_url": "https://app.terraform.io/api/v2/configuration-versions/cv-123/download", + "configuration_version_id": "cv-123", + "is_speculative": True, + "organization_name": "test-org", + "payload_version": 1, + "plan_json_api_url": "https://app.terraform.io/api/v2/plans/plan-123/json-output", + "run_app_url": "https://app.terraform.io/app/test-org/test-workspace/runs/run-456", + "run_created_at": "2025-12-22T11:30:00Z", + "run_created_by": "admin@example.com", + "run_id": "run-456", + "run_message": "Test with VCS", + "stage": "pre_plan", + "task_result_callback_url": "https://app.terraform.io/api/v2/task-results/tr-456/callback", + "task_result_enforcement_level": "advisory", + "task_result_id": "tr-456", + "vcs_branch": "main", + "vcs_commit_url": "https://github.com/org/repo/commit/abc123", + "vcs_pull_request_url": "https://github.com/org/repo/pull/42", + "vcs_repo_url": "https://github.com/org/repo", + "workspace_app_url": "https://app.terraform.io/app/test-org/test-workspace", + "workspace_id": "ws-456", + "workspace_name": "test-workspace", + "workspace_working_directory": "terraform/", + } + + request = RunTaskRequest(**data) + + assert request.access_token == "test-token-456" + assert request.capabilities is not None + assert request.capabilities.outcomes is True + assert request.configuration_version_id == "cv-123" + assert request.vcs_branch == "main" + assert request.vcs_commit_url == "https://github.com/org/repo/commit/abc123" + assert request.workspace_working_directory == "terraform/" + + +class TestTaskResultTag: + """Tests for TaskResultTag.""" + + def test_tag_with_level(self): + """Test tag with level.""" + tag = TaskResultTag(label="High", level="error") + data = tag.to_dict() + + assert data["label"] == "High" + assert data["level"] == "error" + + def test_tag_without_level(self): + """Test tag without level.""" + tag = TaskResultTag(label="Passed") + data = tag.to_dict() + + assert data["label"] == "Passed" + assert "level" not in data + + +class TestTaskResultOutcome: + """Tests for TaskResultOutcome.""" + + def test_outcome_complete(self): + """Test complete outcome with all fields.""" + tags = { + "Status": [TaskResultTag(label="Failed", level="error")], + "Severity": [TaskResultTag(label="High", level="error")], + } + + outcome = TaskResultOutcome( + outcome_id="ISSUE-123", + description="Security issue found", + body="# Details\n\nSecurity vulnerability detected.", + url="https://example.com/issues/123", + tags=tags, + ) + + data = outcome.to_dict() + + assert data["type"] == "task-result-outcomes" + assert data["attributes"]["outcome-id"] == "ISSUE-123" + assert data["attributes"]["description"] == "Security issue found" + assert data["attributes"]["body"] == "# Details\n\nSecurity vulnerability detected." + assert data["attributes"]["url"] == "https://example.com/issues/123" + assert "Status" in data["attributes"]["tags"] + + def test_outcome_minimal(self): + """Test minimal outcome.""" + outcome = TaskResultOutcome() + data = outcome.to_dict() + + assert data["type"] == "task-result-outcomes" + assert "attributes" in data + + +class TestTaskResultCallbackOptions: + """Tests for TaskResultCallbackOptions.""" + + def test_callback_options_passed(self): + """Test callback options with passed status.""" + options = TaskResultCallbackOptions( + status=TaskResultStatus.PASSED, + message="All checks passed", + url="https://example.com/results/123", + ) + + options.validate() + data = options.to_dict() + + assert data["data"]["type"] == "task-results" + assert data["data"]["attributes"]["status"] == "passed" + assert data["data"]["attributes"]["message"] == "All checks passed" + assert data["data"]["attributes"]["url"] == "https://example.com/results/123" + + def test_callback_options_with_outcomes(self): + """Test callback options with outcomes.""" + outcome = TaskResultOutcome( + outcome_id="ISSUE-1", + description="Test issue", + ) + + options = TaskResultCallbackOptions( + status=TaskResultStatus.FAILED, + message="1 issue found", + outcomes=[outcome], + ) + + data = options.to_dict() + + assert "relationships" in data["data"] + assert "outcomes" in data["data"]["relationships"] + assert len(data["data"]["relationships"]["outcomes"]["data"]) == 1 + + def test_validate_invalid_status(self): + """Test validation fails with invalid status.""" + options = TaskResultCallbackOptions(status="invalid") + + with pytest.raises(TFEError) as exc_info: + options.validate() + + assert "Invalid task result status" in str(exc_info.value) + + def test_validate_valid_statuses(self): + """Test validation passes with all valid statuses.""" + for status in [TaskResultStatus.PASSED, TaskResultStatus.FAILED, TaskResultStatus.RUNNING]: + options = TaskResultCallbackOptions(status=status) + options.validate() # Should not raise + + +class TestRunTasksIntegration: + """Tests for RunTasksIntegration service.""" + + def test_callback_success(self): + """Test successful callback.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions( + status=TaskResultStatus.PASSED, + message="All tests passed", + ) + + integration.callback( + callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", + access_token="test-token-123", + options=options, + ) + + # Verify request was made + mock_transport.request.assert_called_once() + call_args = mock_transport.request.call_args + + assert call_args[0][0] == "PATCH" + assert call_args[0][1] == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + assert "Authorization" in call_args[1]["headers"] + assert call_args[1]["headers"]["Authorization"] == "Bearer test-token-123" + + def test_callback_empty_url(self): + """Test callback fails with empty URL.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) + + with pytest.raises(TFEError) as exc_info: + integration.callback( + callback_url="", + access_token="test-token", + options=options, + ) + + assert "callback_url cannot be empty" in str(exc_info.value) + + def test_callback_empty_token(self): + """Test callback fails with empty token.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) + + with pytest.raises(TFEError) as exc_info: + integration.callback( + callback_url="https://example.com/callback", + access_token="", + options=options, + ) + + assert "access_token cannot be empty" in str(exc_info.value) + + def test_callback_invalid_status(self): + """Test callback fails with invalid status.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions(status="invalid-status") + + with pytest.raises(TFEError) as exc_info: + integration.callback( + callback_url="https://example.com/callback", + access_token="test-token", + options=options, + ) + + assert "Invalid task result status" in str(exc_info.value) + + def test_callback_with_outcomes(self): + """Test callback with detailed outcomes.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + outcome = TaskResultOutcome( + outcome_id="CHECK-1", + description="Policy violation", + body="## Issue\n\nPolicy check failed.", + url="https://example.com/check-1", + tags={ + "Severity": [TaskResultTag(label="High", level="error")], + }, + ) + + options = TaskResultCallbackOptions( + status=TaskResultStatus.FAILED, + message="Policy check failed", + url="https://example.com/results", + outcomes=[outcome], + ) + + integration.callback( + callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", + access_token="test-token-123", + options=options, + ) + + call_args = mock_transport.request.call_args + body = call_args[1]["json_body"] + + assert "relationships" in body["data"] + assert "outcomes" in body["data"]["relationships"] From aa9c040e68fecc687590976e525710b1be0a9f28 Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Mon, 19 Jan 2026 11:36:39 +0530 Subject: [PATCH 2/7] Remove Flask example file Flask dependency removed completely from the project --- examples/run_tasks_integration.py | 208 ------------------------------ 1 file changed, 208 deletions(-) delete mode 100644 examples/run_tasks_integration.py diff --git a/examples/run_tasks_integration.py b/examples/run_tasks_integration.py deleted file mode 100644 index 25e08c7..0000000 --- a/examples/run_tasks_integration.py +++ /dev/null @@ -1,208 +0,0 @@ -""" -Terraform Cloud/Enterprise Run Tasks Integration Example - -This example demonstrates how to use the python-tfe SDK to build a run task server -that receives task requests from TFC/TFE and sends results back via the callback API. - -IMPORTANT: This example uses Flask as a simple HTTP server for demonstration purposes. -You can use any web framework (FastAPI, Django, etc.) or even the built-in http.server. -The key components are: -1. Receiving POST requests with run task payloads -2. Using TFEClient.run_tasks_integration.callback() to send results back - -Prerequisites: - - Install Flask (for this example only): pip install flask - - Expose your server publicly using ngrok, cloudflare tunnel, or similar - - Create a run task in TFC/TFE pointing to your public URL endpoint - - Attach the run task to a workspace - -Usage: - python examples/run_tasks_integration.py - -Then expose with ngrok: - ngrok http 5000 - -API Documentation: - https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration -""" - -from __future__ import annotations - -import os - -try: - from flask import Flask, request, jsonify -except ImportError: - print("Error: Flask is required for this example") - print("Install it with: pip install flask") - exit(1) - -from pytfe import TFEClient, TFEConfig -from pytfe.models import RunTaskRequest, RunTaskRequestCapabilities -from pytfe.resources.run_tasks_integration import ( - RunTasksIntegration, - TaskResultCallbackOptions, - TaskResultOutcome, - TaskResultStatus, - TaskResultTag, -) - -app = Flask(__name__) - -# Initialize TFE client for callback functionality -# Note: The callback uses the access_token from the run task request, -# NOT your regular TFE API token -config = TFEConfig() -client = TFEClient(config) - - -@app.route('/run-task', methods=['POST']) -def handle_run_task(): - """Handle incoming run task request from TFC/TFE.""" - try: - # Parse the incoming request - run_task_request = RunTaskRequest(**request.json) - - print(f"Received run task request:") - print(f" Organization: {run_task_request.organization_name}") - print(f" Workspace: {run_task_request.workspace_name}") - print(f" Run ID: {run_task_request.run_id}") - print(f" Stage: {run_task_request.stage}") - print(f" Enforcement Level: {run_task_request.task_result_enforcement_level}") - - # Extract the callback information - callback_url = run_task_request.task_result_callback_url - access_token = run_task_request.access_token - - # YOUR CUSTOM LOGIC HERE - # This is where you would perform your actual run task checks - # For example: - # - Download and analyze the plan JSON - # - Check for policy violations - # - Validate resource configurations - # - Run security scans - # - Check cost estimates - - # Example: Simple check based on workspace name - if "prod" in run_task_request.workspace_name.lower(): - # Production workspace - run strict checks - result = perform_strict_checks(run_task_request) - else: - # Non-production - run basic checks - result = perform_basic_checks(run_task_request) - - # Send the callback to TFC/TFE - callback_options = TaskResultCallbackOptions( - status=result["status"], - message=result["message"], - url=result.get("url"), - outcomes=result.get("outcomes", []), - ) - - client.run_tasks_integration.callback( - callback_url=callback_url, - access_token=access_token, - options=callback_options, - ) - - print(f"Successfully sent callback with status: {result['status']}") - - # Return 200 OK to TFC/TFE - return jsonify({"status": "accepted"}), 200 - - except Exception as e: - print(f"Error processing run task: {e}") - - # Even if processing fails, try to send a failure callback - try: - if 'callback_url' in locals() and 'access_token' in locals(): - error_options = TaskResultCallbackOptions( - status=TaskResultStatus.FAILED, - message=f"Run task processing error: {str(e)}", - ) - client.run_tasks_integration.callback( - callback_url=callback_url, - access_token=access_token, - options=error_options, - ) - except Exception as callback_error: - print(f"Failed to send error callback: {callback_error}") - - return jsonify({"error": str(e)}), 500 - - -def perform_strict_checks(run_task_request: RunTaskRequest) -> dict: - """Perform strict checks for production workspaces. - - This is a placeholder for your actual check logic. - """ - # Example: Always pass for demo purposes - # In real implementation, you would: - # - Download the configuration or plan - # - Analyze it for compliance/security - # - Generate detailed outcomes - - outcomes = [ - TaskResultOutcome( - outcome_id="SECURITY-001", - description="Security check passed", - body="All security requirements met for production deployment.", - tags={ - "Category": [TaskResultTag(label="Security")], - "Severity": [TaskResultTag(label="Info", level="info")], - }, - ), - TaskResultOutcome( - outcome_id="COMPLIANCE-001", - description="Compliance check passed", - body="Configuration meets all compliance requirements.", - tags={ - "Category": [TaskResultTag(label="Compliance")], - "Severity": [TaskResultTag(label="Info", level="info")], - }, - ), - ] - - return { - "status": TaskResultStatus.PASSED, - "message": "All production checks passed", - "url": "https://your-dashboard.example.com/results/123", - "outcomes": outcomes, - } - - -def perform_basic_checks(run_task_request: RunTaskRequest) -> dict: - """Perform basic checks for non-production workspaces. - - This is a placeholder for your actual check logic. - """ - # Example: Simple validation - outcomes = [ - TaskResultOutcome( - outcome_id="BASIC-001", - description="Basic validation passed", - body="Configuration syntax is valid.", - tags={ - "Category": [TaskResultTag(label="Validation")], - }, - ), - ] - - return { - "status": TaskResultStatus.PASSED, - "message": "Basic checks completed successfully", - "outcomes": outcomes, - } - - -@app.route('/health', methods=['GET']) -def health_check(): - """Health check endpoint.""" - return jsonify({"status": "healthy"}), 200 - - -if __name__ == '__main__': - print("Starting Run Task server on http://localhost:5000") - print("Make sure to expose this with ngrok or similar for TFC/TFE to reach it") - print("Example: ngrok http 5000") - app.run(host='0.0.0.0', port=5000, debug=True) From b043a85866a7356f80324482401218a2799c1a0e Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Wed, 21 Jan 2026 11:23:38 +0530 Subject: [PATCH 3/7] Add task_result and task_stages models matching go-tfe - Add task_result.py with TaskResult, TaskResultStatus, TaskEnforcementLevel models - Add task_stages.py with TaskStage, Stage, TaskStageStatus, Actions, Permissions models - Update run_task.py to import Stage and TaskEnforcementLevel from new modules (remove duplicates) - Update run_tasks_integration.py to use TaskResultStatus enum from task_result - Update run_task_request.py to add model_config for proper serialization - Export all new models in __init__.py - All 22 unit tests passing - Matches go-tfe implementation structure --- src/pytfe/models/__init__.py | 19 ++- src/pytfe/models/run_task.py | 14 +-- src/pytfe/models/run_task_request.py | 2 + src/pytfe/models/task_result.py | 74 ++++++++++++ src/pytfe/models/task_stages.py | 119 +++++++++++++++++++ src/pytfe/resources/run_tasks_integration.py | 46 +++++-- 6 files changed, 247 insertions(+), 27 deletions(-) create mode 100644 src/pytfe/models/task_result.py create mode 100644 src/pytfe/models/task_stages.py diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index a03fe8e..ee3ff16 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -253,13 +253,28 @@ RunTaskListOptions, RunTaskReadOptions, RunTaskUpdateOptions, - Stage, - TaskEnforcementLevel, ) from .run_task_request import ( RunTaskRequest, RunTaskRequestCapabilities, ) +from .task_result import ( + TaskEnforcementLevel, + TaskResult, + TaskResultStatus, + TaskResultStatusTimestamps, +) +from .task_stages import ( + Actions, + Permissions, + Stage, + TaskStage, + TaskStageListOptions, + TaskStageOverrideOptions, + TaskStageReadOptions, + TaskStageStatus, + TaskStageStatusTimestamps, +) from .run_trigger import ( RunTrigger, RunTriggerCreateOptions, diff --git a/src/pytfe/models/run_task.py b/src/pytfe/models/run_task.py index 8741162..eac0fcc 100644 --- a/src/pytfe/models/run_task.py +++ b/src/pytfe/models/run_task.py @@ -7,6 +7,8 @@ from ..models.common import Pagination from .agent import AgentPool from .organization import Organization +from .task_result import TaskEnforcementLevel +from .task_stages import Stage from .workspace_run_task import WorkspaceRunTask @@ -37,18 +39,6 @@ class GlobalRunTaskOptions(BaseModel): enforcement_level: TaskEnforcementLevel | None = None -class Stage(str, Enum): - PRE_PLAN = "pre-plan" - POST_PLAN = "post-plan" - PRE_APPLY = "pre-apply" - POST_APPLY = "post-apply" - - -class TaskEnforcementLevel(str, Enum): - ADVISORY = "advisory" - MANDATORY = "mandatory" - - class RunTaskIncludeOptions(str, Enum): RUN_TASK_WORKSPACE_TASKS = "workspace_tasks" RUN_TASK_WORKSPACE = "workspace_tasks.workspace" diff --git a/src/pytfe/models/run_task_request.py b/src/pytfe/models/run_task_request.py index 73e070b..1306d70 100644 --- a/src/pytfe/models/run_task_request.py +++ b/src/pytfe/models/run_task_request.py @@ -18,6 +18,8 @@ class RunTaskRequestCapabilities(BaseModel): default=False, description="Whether the run task server supports outcomes" ) + + model_config = ConfigDict(populate_by_name=True) class RunTaskRequest(BaseModel): diff --git a/src/pytfe/models/task_result.py b/src/pytfe/models/task_result.py new file mode 100644 index 0000000..c001fca --- /dev/null +++ b/src/pytfe/models/task_result.py @@ -0,0 +1,74 @@ +"""Task Result models for python-tfe. + +This module contains models related to task results in Terraform Cloud/Enterprise. +""" + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import TYPE_CHECKING + +from pydantic import BaseModel, ConfigDict, Field + +if TYPE_CHECKING: + from .task_stages import TaskStage + + +class TaskResultStatus(str, Enum): + """Task result status enum.""" + + PASSED = "passed" + FAILED = "failed" + PENDING = "pending" + RUNNING = "running" + UNREACHABLE = "unreachable" + ERRORED = "errored" + + +class TaskEnforcementLevel(str, Enum): + """Task enforcement level enum.""" + + ADVISORY = "advisory" + MANDATORY = "mandatory" + + +class TaskResultStatusTimestamps(BaseModel): + """Timestamps recorded for a task result.""" + + errored_at: datetime | None = Field(default=None, alias="errored-at") + running_at: datetime | None = Field(default=None, alias="running-at") + canceled_at: datetime | None = Field(default=None, alias="canceled-at") + failed_at: datetime | None = Field(default=None, alias="failed-at") + passed_at: datetime | None = Field(default=None, alias="passed-at") + + model_config = ConfigDict(populate_by_name=True) + + +class TaskResult(BaseModel): + """Represents a HCP Terraform or Terraform Enterprise run task result. + + API Documentation: + https://developer.hashicorp.com/terraform/cloud-docs/api-docs/task-results + """ + + id: str + status: TaskResultStatus + message: str + status_timestamps: TaskResultStatusTimestamps = Field(alias="status-timestamps") + url: str + created_at: datetime = Field(alias="created-at") + updated_at: datetime = Field(alias="updated-at") + task_id: str = Field(alias="task-id") + task_name: str = Field(alias="task-name") + task_url: str = Field(alias="task-url") + workspace_task_id: str = Field(alias="workspace-task-id") + workspace_task_enforcement_level: TaskEnforcementLevel = Field( + alias="workspace-task-enforcement-level" + ) + agent_pool_id: str | None = Field(default=None, alias="agent-pool-id") + + # Relationships + task_stage: TaskStage | None = Field(default=None, alias="task-stage") + + model_config = ConfigDict(populate_by_name=True) diff --git a/src/pytfe/models/task_stages.py b/src/pytfe/models/task_stages.py new file mode 100644 index 0000000..6d0d0a2 --- /dev/null +++ b/src/pytfe/models/task_stages.py @@ -0,0 +1,119 @@ +"""Task Stage models for python-tfe. + +This module contains models related to task stages in Terraform Cloud/Enterprise. +""" + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import TYPE_CHECKING + +from pydantic import BaseModel, ConfigDict, Field + +if TYPE_CHECKING: + from .policy_evaluation import PolicyEvaluation + from .run import Run + from .task_result import TaskResult + + +class Stage(str, Enum): + """Enum representing possible run stages for run tasks.""" + + PRE_PLAN = "pre-plan" + POST_PLAN = "post-plan" + PRE_APPLY = "pre-apply" + POST_APPLY = "post-apply" + + +class TaskStageStatus(str, Enum): + """Enum representing all possible statuses for a task stage.""" + + PENDING = "pending" + RUNNING = "running" + PASSED = "passed" + FAILED = "failed" + AWAITING_OVERRIDE = "awaiting_override" + CANCELED = "canceled" + ERRORED = "errored" + UNREACHABLE = "unreachable" + + +class Permissions(BaseModel): + """Permission types for overriding a task stage.""" + + can_override_policy: bool | None = Field(default=None, alias="can-override-policy") + can_override_tasks: bool | None = Field(default=None, alias="can-override-tasks") + can_override: bool | None = Field(default=None, alias="can-override") + + model_config = ConfigDict(populate_by_name=True) + + +class Actions(BaseModel): + """Task stage actions.""" + + is_overridable: bool | None = Field(default=None, alias="is-overridable") + + model_config = ConfigDict(populate_by_name=True) + + +class TaskStageStatusTimestamps(BaseModel): + """Timestamps recorded for a task stage.""" + + errored_at: datetime | None = Field(default=None, alias="errored-at") + running_at: datetime | None = Field(default=None, alias="running-at") + canceled_at: datetime | None = Field(default=None, alias="canceled-at") + failed_at: datetime | None = Field(default=None, alias="failed-at") + passed_at: datetime | None = Field(default=None, alias="passed-at") + + model_config = ConfigDict(populate_by_name=True) + + +class TaskStage(BaseModel): + """Represents a HCP Terraform or Terraform Enterprise run's task stage. + + Task stages are where run tasks can occur during a run lifecycle. + + API Documentation: + https://developer.hashicorp.com/terraform/cloud-docs/api-docs/task-stages + """ + + id: str + stage: Stage + status: TaskStageStatus + status_timestamps: TaskStageStatusTimestamps = Field(alias="status-timestamps") + created_at: datetime = Field(alias="created-at") + updated_at: datetime = Field(alias="updated-at") + permissions: Permissions | None = None + actions: Actions | None = None + + # Relationships + run: Run | None = None + task_results: list[TaskResult] = Field(default_factory=list, alias="task-results") + policy_evaluations: list[PolicyEvaluation] = Field( + default_factory=list, + alias="policy-evaluations" + ) + + model_config = ConfigDict(populate_by_name=True) + + +class TaskStageOverrideOptions(BaseModel): + """Options for overriding a task stage.""" + + comment: str | None = None + + +class TaskStageReadOptions(BaseModel): + """Options for reading a task stage.""" + + include: list[str] | None = None + + +class TaskStageListOptions(BaseModel): + """Options for listing task stages.""" + + page_number: int | None = Field(default=None, alias="page[number]") + page_size: int | None = Field(default=None, alias="page[size]") + + model_config = ConfigDict(populate_by_name=True) diff --git a/src/pytfe/resources/run_tasks_integration.py b/src/pytfe/resources/run_tasks_integration.py index 8fc60ab..826aa7c 100644 --- a/src/pytfe/resources/run_tasks_integration.py +++ b/src/pytfe/resources/run_tasks_integration.py @@ -9,26 +9,29 @@ from typing import Any from ..errors import TFEError +from ..models.task_result import TaskResultStatus from ._base import _Service -class TaskResultStatus: - """Task result status enum.""" - - PASSED = "passed" - FAILED = "failed" - RUNNING = "running" - - class TaskResultTag: - """Tag to enrich outcomes display in TFC/TFE.""" + """Tag to enrich outcomes display in TFC/TFE. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#severity-and-status-tags + """ def __init__(self, label: str, level: str | None = None): + """Initialize a task result tag. + + Args: + label: The label for the tag + level: Optional severity level (error, warning, info) + """ self.label = label self.level = level def to_dict(self) -> dict[str, Any]: - """Convert to dictionary.""" + """Convert to dictionary for JSON serialization.""" result = {"label": self.label} if self.level: result["level"] = self.level @@ -50,6 +53,15 @@ def __init__( url: str | None = None, tags: dict[str, list[TaskResultTag]] | None = None, ): + """Initialize a task result outcome. + + Args: + outcome_id: Unique identifier for the outcome + description: Brief description of the outcome + body: Detailed body content (supports markdown) + url: URL to view more details + tags: Dictionary of tag categories to lists of tags + """ self.outcome_id = outcome_id self.description = description self.body = body @@ -57,7 +69,7 @@ def __init__( self.tags = tags or {} def to_dict(self) -> dict[str, Any]: - """Convert to dictionary for JSON serialization.""" + """Convert to dictionary for JSON:API serialization.""" result: dict[str, Any] = {"type": "task-result-outcomes", "attributes": {}} if self.outcome_id: @@ -105,8 +117,16 @@ def __init__( self.outcomes = outcomes or [] def validate(self) -> None: - """Validate the callback options.""" - valid_statuses = [TaskResultStatus.PASSED, TaskResultStatus.FAILED, TaskResultStatus.RUNNING] + """Validate the callback options. + + Only passed, failed, and running statuses are allowed for callbacks. + pending and errored are not valid callback statuses per TFC/TFE API. + """ + valid_statuses = [ + TaskResultStatus.PASSED.value, + TaskResultStatus.FAILED.value, + TaskResultStatus.RUNNING.value + ] if self.status not in valid_statuses: raise TFEError( f"Invalid task result status: {self.status}. " From 07b921459fc3a70970378dfca6d148265817259d Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Wed, 21 Jan 2026 12:25:30 +0530 Subject: [PATCH 4/7] fix: add missing exports to __all__ and fix linting issues - Add all new run tasks integration models to __all__ exports - Fix trailing whitespace issues across multiple files - Run ruff format to ensure consistent code style - All 22 unit tests passing - All linting checks pass --- src/pytfe/models/__init__.py | 47 ++++++---- src/pytfe/models/run_task_request.py | 81 +++++----------- src/pytfe/models/task_result.py | 16 ++-- src/pytfe/models/task_stages.py | 37 ++++---- src/pytfe/resources/run_tasks_integration.py | 65 +++++++------ tests/units/test_run_tasks_integration.py | 98 +++++++++++--------- 6 files changed, 165 insertions(+), 179 deletions(-) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index ee3ff16..fe39612 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -258,23 +258,6 @@ RunTaskRequest, RunTaskRequestCapabilities, ) -from .task_result import ( - TaskEnforcementLevel, - TaskResult, - TaskResultStatus, - TaskResultStatusTimestamps, -) -from .task_stages import ( - Actions, - Permissions, - Stage, - TaskStage, - TaskStageListOptions, - TaskStageOverrideOptions, - TaskStageReadOptions, - TaskStageStatus, - TaskStageStatusTimestamps, -) from .run_trigger import ( RunTrigger, RunTriggerCreateOptions, @@ -293,6 +276,23 @@ SSHKeyListOptions, SSHKeyUpdateOptions, ) +from .task_result import ( + TaskEnforcementLevel, + TaskResult, + TaskResultStatus, + TaskResultStatusTimestamps, +) +from .task_stages import ( + Actions, + Permissions, + Stage, + TaskStage, + TaskStageListOptions, + TaskStageOverrideOptions, + TaskStageReadOptions, + TaskStageStatus, + TaskStageStatusTimestamps, +) # Variables from .variable import ( @@ -556,6 +556,19 @@ "RunTaskCreateOptions", "RunTaskUpdateOptions", "RunTaskReadOptions", + "RunTaskRequest", + "RunTaskRequestCapabilities", + "TaskResult", + "TaskResultStatus", + "TaskResultStatusTimestamps", + "Actions", + "Permissions", + "TaskStage", + "TaskStageListOptions", + "TaskStageOverrideOptions", + "TaskStageReadOptions", + "TaskStageStatus", + "TaskStageStatusTimestamps", # Run triggers "RunTrigger", "RunTriggerCreateOptions", diff --git a/src/pytfe/models/run_task_request.py b/src/pytfe/models/run_task_request.py index 1306d70..e2c4179 100644 --- a/src/pytfe/models/run_task_request.py +++ b/src/pytfe/models/run_task_request.py @@ -15,19 +15,18 @@ class RunTaskRequestCapabilities(BaseModel): """Capabilities that the caller supports.""" outcomes: bool = Field( - default=False, - description="Whether the run task server supports outcomes" + default=False, description="Whether the run task server supports outcomes" ) - + model_config = ConfigDict(populate_by_name=True) class RunTaskRequest(BaseModel): """Represents the payload that TFC/TFE sends to a run task's URL. - + This is the incoming request that your external run task server receives from Terraform Cloud/Enterprise when a run task is triggered. - + API Documentation: https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#common-properties """ @@ -36,45 +35,26 @@ class RunTaskRequest(BaseModel): description="Token to use for authentication when sending callback" ) capabilities: RunTaskRequestCapabilities | None = Field( - default=None, - description="Capabilities that the caller supports" + default=None, description="Capabilities that the caller supports" ) configuration_version_download_url: str | None = Field( - default=None, - description="URL to download the configuration version" + default=None, description="URL to download the configuration version" ) configuration_version_id: str | None = Field( - default=None, - description="ID of the configuration version" - ) - is_speculative: bool = Field( - description="Whether this is a speculative run" - ) - organization_name: str = Field( - description="Name of the organization" - ) - payload_version: int = Field( - description="Version of the payload format" + default=None, description="ID of the configuration version" ) + is_speculative: bool = Field(description="Whether this is a speculative run") + organization_name: str = Field(description="Name of the organization") + payload_version: int = Field(description="Version of the payload format") plan_json_api_url: str | None = Field( default=None, - description="URL to access the plan JSON via API (post_plan, pre_apply, post_apply stages)" - ) - run_app_url: str = Field( - description="URL to view the run in TFC/TFE UI" - ) - run_created_at: datetime = Field( - description="Timestamp when the run was created" - ) - run_created_by: str = Field( - description="Username of the user who created the run" - ) - run_id: str = Field( - description="ID of the run" - ) - run_message: str = Field( - description="Message associated with the run" + description="URL to access the plan JSON via API (post_plan, pre_apply, post_apply stages)", ) + run_app_url: str = Field(description="URL to view the run in TFC/TFE UI") + run_created_at: datetime = Field(description="Timestamp when the run was created") + run_created_by: str = Field(description="Username of the user who created the run") + run_id: str = Field(description="ID of the run") + run_message: str = Field(description="Message associated with the run") stage: str = Field( description="Stage when the run task is executed (pre_plan, post_plan, pre_apply, post_apply)" ) @@ -84,37 +64,24 @@ class RunTaskRequest(BaseModel): task_result_enforcement_level: str = Field( description="Enforcement level for the task result (advisory, mandatory)" ) - task_result_id: str = Field( - description="ID of the task result" - ) - vcs_branch: str | None = Field( - default=None, - description="VCS branch name" - ) + task_result_id: str = Field(description="ID of the task result") + vcs_branch: str | None = Field(default=None, description="VCS branch name") vcs_commit_url: str | None = Field( - default=None, - description="URL to the VCS commit" + default=None, description="URL to the VCS commit" ) vcs_pull_request_url: str | None = Field( - default=None, - description="URL to the VCS pull request" + default=None, description="URL to the VCS pull request" ) vcs_repo_url: str | None = Field( - default=None, - description="URL to the VCS repository" + default=None, description="URL to the VCS repository" ) workspace_app_url: str = Field( description="URL to view the workspace in TFC/TFE UI" ) - workspace_id: str = Field( - description="ID of the workspace" - ) - workspace_name: str = Field( - description="Name of the workspace" - ) + workspace_id: str = Field(description="ID of the workspace") + workspace_name: str = Field(description="Name of the workspace") workspace_working_directory: str | None = Field( - default=None, - description="Working directory for the workspace" + default=None, description="Working directory for the workspace" ) model_config = ConfigDict(populate_by_name=True) diff --git a/src/pytfe/models/task_result.py b/src/pytfe/models/task_result.py index c001fca..d001c3f 100644 --- a/src/pytfe/models/task_result.py +++ b/src/pytfe/models/task_result.py @@ -17,7 +17,7 @@ class TaskResultStatus(str, Enum): """Task result status enum.""" - + PASSED = "passed" FAILED = "failed" PENDING = "pending" @@ -28,30 +28,30 @@ class TaskResultStatus(str, Enum): class TaskEnforcementLevel(str, Enum): """Task enforcement level enum.""" - + ADVISORY = "advisory" MANDATORY = "mandatory" class TaskResultStatusTimestamps(BaseModel): """Timestamps recorded for a task result.""" - + errored_at: datetime | None = Field(default=None, alias="errored-at") running_at: datetime | None = Field(default=None, alias="running-at") canceled_at: datetime | None = Field(default=None, alias="canceled-at") failed_at: datetime | None = Field(default=None, alias="failed-at") passed_at: datetime | None = Field(default=None, alias="passed-at") - + model_config = ConfigDict(populate_by_name=True) class TaskResult(BaseModel): """Represents a HCP Terraform or Terraform Enterprise run task result. - + API Documentation: https://developer.hashicorp.com/terraform/cloud-docs/api-docs/task-results """ - + id: str status: TaskResultStatus message: str @@ -67,8 +67,8 @@ class TaskResult(BaseModel): alias="workspace-task-enforcement-level" ) agent_pool_id: str | None = Field(default=None, alias="agent-pool-id") - + # Relationships task_stage: TaskStage | None = Field(default=None, alias="task-stage") - + model_config = ConfigDict(populate_by_name=True) diff --git a/src/pytfe/models/task_stages.py b/src/pytfe/models/task_stages.py index 6d0d0a2..32f1318 100644 --- a/src/pytfe/models/task_stages.py +++ b/src/pytfe/models/task_stages.py @@ -19,7 +19,7 @@ class Stage(str, Enum): """Enum representing possible run stages for run tasks.""" - + PRE_PLAN = "pre-plan" POST_PLAN = "post-plan" PRE_APPLY = "pre-apply" @@ -28,7 +28,7 @@ class Stage(str, Enum): class TaskStageStatus(str, Enum): """Enum representing all possible statuses for a task stage.""" - + PENDING = "pending" RUNNING = "running" PASSED = "passed" @@ -41,43 +41,43 @@ class TaskStageStatus(str, Enum): class Permissions(BaseModel): """Permission types for overriding a task stage.""" - + can_override_policy: bool | None = Field(default=None, alias="can-override-policy") can_override_tasks: bool | None = Field(default=None, alias="can-override-tasks") can_override: bool | None = Field(default=None, alias="can-override") - + model_config = ConfigDict(populate_by_name=True) class Actions(BaseModel): """Task stage actions.""" - + is_overridable: bool | None = Field(default=None, alias="is-overridable") - + model_config = ConfigDict(populate_by_name=True) class TaskStageStatusTimestamps(BaseModel): """Timestamps recorded for a task stage.""" - + errored_at: datetime | None = Field(default=None, alias="errored-at") running_at: datetime | None = Field(default=None, alias="running-at") canceled_at: datetime | None = Field(default=None, alias="canceled-at") failed_at: datetime | None = Field(default=None, alias="failed-at") passed_at: datetime | None = Field(default=None, alias="passed-at") - + model_config = ConfigDict(populate_by_name=True) class TaskStage(BaseModel): """Represents a HCP Terraform or Terraform Enterprise run's task stage. - + Task stages are where run tasks can occur during a run lifecycle. - + API Documentation: https://developer.hashicorp.com/terraform/cloud-docs/api-docs/task-stages """ - + id: str stage: Stage status: TaskStageStatus @@ -86,34 +86,33 @@ class TaskStage(BaseModel): updated_at: datetime = Field(alias="updated-at") permissions: Permissions | None = None actions: Actions | None = None - + # Relationships run: Run | None = None task_results: list[TaskResult] = Field(default_factory=list, alias="task-results") policy_evaluations: list[PolicyEvaluation] = Field( - default_factory=list, - alias="policy-evaluations" + default_factory=list, alias="policy-evaluations" ) - + model_config = ConfigDict(populate_by_name=True) class TaskStageOverrideOptions(BaseModel): """Options for overriding a task stage.""" - + comment: str | None = None class TaskStageReadOptions(BaseModel): """Options for reading a task stage.""" - + include: list[str] | None = None class TaskStageListOptions(BaseModel): """Options for listing task stages.""" - + page_number: int | None = Field(default=None, alias="page[number]") page_size: int | None = Field(default=None, alias="page[size]") - + model_config = ConfigDict(populate_by_name=True) diff --git a/src/pytfe/resources/run_tasks_integration.py b/src/pytfe/resources/run_tasks_integration.py index 826aa7c..910ad24 100644 --- a/src/pytfe/resources/run_tasks_integration.py +++ b/src/pytfe/resources/run_tasks_integration.py @@ -15,21 +15,21 @@ class TaskResultTag: """Tag to enrich outcomes display in TFC/TFE. - + API Documentation: https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#severity-and-status-tags """ - + def __init__(self, label: str, level: str | None = None): """Initialize a task result tag. - + Args: label: The label for the tag level: Optional severity level (error, warning, info) """ self.label = label self.level = level - + def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON serialization.""" result = {"label": self.label} @@ -40,11 +40,11 @@ def to_dict(self) -> dict[str, Any]: class TaskResultOutcome: """Detailed run task outcome for improved visibility in TFC/TFE UI. - + API Documentation: https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#outcomes-payload-body """ - + def __init__( self, outcome_id: str | None = None, @@ -54,7 +54,7 @@ def __init__( tags: dict[str, list[TaskResultTag]] | None = None, ): """Initialize a task result outcome. - + Args: outcome_id: Unique identifier for the outcome description: Brief description of the outcome @@ -67,11 +67,11 @@ def __init__( self.body = body self.url = url self.tags = tags or {} - + def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON:API serialization.""" result: dict[str, Any] = {"type": "task-result-outcomes", "attributes": {}} - + if self.outcome_id: result["attributes"]["outcome-id"] = self.outcome_id if self.description: @@ -82,20 +82,19 @@ def to_dict(self) -> dict[str, Any]: result["attributes"]["url"] = self.url if self.tags: result["attributes"]["tags"] = { - key: [tag.to_dict() for tag in tags] - for key, tags in self.tags.items() + key: [tag.to_dict() for tag in tags] for key, tags in self.tags.items() } - + return result class TaskResultCallbackOptions: """Options for sending task result callback to TFC/TFE. - + API Documentation: https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#request-body-1 """ - + def __init__( self, status: str, @@ -104,7 +103,7 @@ def __init__( outcomes: list[TaskResultOutcome] | None = None, ): """Initialize callback options. - + Args: status: Task result status (passed, failed, running) message: Optional message about the task result @@ -115,24 +114,24 @@ def __init__( self.message = message self.url = url self.outcomes = outcomes or [] - + def validate(self) -> None: """Validate the callback options. - + Only passed, failed, and running statuses are allowed for callbacks. pending and errored are not valid callback statuses per TFC/TFE API. """ valid_statuses = [ TaskResultStatus.PASSED.value, TaskResultStatus.FAILED.value, - TaskResultStatus.RUNNING.value + TaskResultStatus.RUNNING.value, ] if self.status not in valid_statuses: raise TFEError( f"Invalid task result status: {self.status}. " f"Must be one of: {', '.join(valid_statuses)}" ) - + def to_dict(self) -> dict[str, Any]: """Convert to dictionary for JSON:API format.""" data: dict[str, Any] = { @@ -141,32 +140,30 @@ def to_dict(self) -> dict[str, Any]: "status": self.status, }, } - + if self.message: data["attributes"]["message"] = self.message if self.url: data["attributes"]["url"] = self.url - + if self.outcomes: data["relationships"] = { - "outcomes": { - "data": [outcome.to_dict() for outcome in self.outcomes] - } + "outcomes": {"data": [outcome.to_dict() for outcome in self.outcomes]} } - + return {"data": data} class RunTasksIntegration(_Service): """Run Tasks Integration API for sending callbacks to TFC/TFE. - + This service is used by external run task servers to send task results back to Terraform Cloud/Enterprise. - + API Documentation: https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration """ - + def callback( self, callback_url: str, @@ -174,30 +171,30 @@ def callback( options: TaskResultCallbackOptions, ) -> None: """Send task result callback to TFC/TFE. - + Args: callback_url: The callback URL from the run task request access_token: The access token from the run task request options: Task result callback options - + Raises: TFEError: If callback_url or access_token is invalid TFEError: If options validation fails """ if not callback_url or not callback_url.strip(): raise TFEError("callback_url cannot be empty") - + if not access_token or not access_token.strip(): raise TFEError("access_token cannot be empty") - + options.validate() - + # Create custom headers with the access token from the request headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/vnd.api+json", } - + # Send PATCH request to callback URL self.t.request( "PATCH", diff --git a/tests/units/test_run_tasks_integration.py b/tests/units/test_run_tasks_integration.py index 8e48cf3..d5ed1ff 100644 --- a/tests/units/test_run_tasks_integration.py +++ b/tests/units/test_run_tasks_integration.py @@ -2,15 +2,12 @@ from __future__ import annotations -import json -from datetime import datetime, timezone -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest -from pytfe.client import TFEClient from pytfe.errors import TFEError -from pytfe.models.run_task_request import RunTaskRequest, RunTaskRequestCapabilities +from pytfe.models.run_task_request import RunTaskRequest from pytfe.resources.run_tasks_integration import ( RunTasksIntegration, TaskResultCallbackOptions, @@ -43,14 +40,17 @@ def test_run_task_request_minimal(self): "workspace_id": "ws-123", "workspace_name": "my-workspace", } - + request = RunTaskRequest(**data) - + assert request.access_token == "test-token-123" assert request.organization_name == "my-org" assert request.run_id == "run-123" assert request.stage == "post_plan" - assert request.task_result_callback_url == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + assert ( + request.task_result_callback_url + == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + ) def test_run_task_request_complete(self): """Test parsing complete run task request with all fields.""" @@ -81,9 +81,9 @@ def test_run_task_request_complete(self): "workspace_name": "test-workspace", "workspace_working_directory": "terraform/", } - + request = RunTaskRequest(**data) - + assert request.access_token == "test-token-456" assert request.capabilities is not None assert request.capabilities.outcomes is True @@ -100,7 +100,7 @@ def test_tag_with_level(self): """Test tag with level.""" tag = TaskResultTag(label="High", level="error") data = tag.to_dict() - + assert data["label"] == "High" assert data["level"] == "error" @@ -108,7 +108,7 @@ def test_tag_without_level(self): """Test tag without level.""" tag = TaskResultTag(label="Passed") data = tag.to_dict() - + assert data["label"] == "Passed" assert "level" not in data @@ -122,7 +122,7 @@ def test_outcome_complete(self): "Status": [TaskResultTag(label="Failed", level="error")], "Severity": [TaskResultTag(label="High", level="error")], } - + outcome = TaskResultOutcome( outcome_id="ISSUE-123", description="Security issue found", @@ -130,13 +130,16 @@ def test_outcome_complete(self): url="https://example.com/issues/123", tags=tags, ) - + data = outcome.to_dict() - + assert data["type"] == "task-result-outcomes" assert data["attributes"]["outcome-id"] == "ISSUE-123" assert data["attributes"]["description"] == "Security issue found" - assert data["attributes"]["body"] == "# Details\n\nSecurity vulnerability detected." + assert ( + data["attributes"]["body"] + == "# Details\n\nSecurity vulnerability detected." + ) assert data["attributes"]["url"] == "https://example.com/issues/123" assert "Status" in data["attributes"]["tags"] @@ -144,7 +147,7 @@ def test_outcome_minimal(self): """Test minimal outcome.""" outcome = TaskResultOutcome() data = outcome.to_dict() - + assert data["type"] == "task-result-outcomes" assert "attributes" in data @@ -159,10 +162,10 @@ def test_callback_options_passed(self): message="All checks passed", url="https://example.com/results/123", ) - + options.validate() data = options.to_dict() - + assert data["data"]["type"] == "task-results" assert data["data"]["attributes"]["status"] == "passed" assert data["data"]["attributes"]["message"] == "All checks passed" @@ -174,15 +177,15 @@ def test_callback_options_with_outcomes(self): outcome_id="ISSUE-1", description="Test issue", ) - + options = TaskResultCallbackOptions( status=TaskResultStatus.FAILED, message="1 issue found", outcomes=[outcome], ) - + data = options.to_dict() - + assert "relationships" in data["data"] assert "outcomes" in data["data"]["relationships"] assert len(data["data"]["relationships"]["outcomes"]["data"]) == 1 @@ -190,15 +193,19 @@ def test_callback_options_with_outcomes(self): def test_validate_invalid_status(self): """Test validation fails with invalid status.""" options = TaskResultCallbackOptions(status="invalid") - + with pytest.raises(TFEError) as exc_info: options.validate() - + assert "Invalid task result status" in str(exc_info.value) def test_validate_valid_statuses(self): """Test validation passes with all valid statuses.""" - for status in [TaskResultStatus.PASSED, TaskResultStatus.FAILED, TaskResultStatus.RUNNING]: + for status in [ + TaskResultStatus.PASSED, + TaskResultStatus.FAILED, + TaskResultStatus.RUNNING, + ]: options = TaskResultCallbackOptions(status=status) options.validate() # Should not raise @@ -210,24 +217,27 @@ def test_callback_success(self): """Test successful callback.""" mock_transport = MagicMock() integration = RunTasksIntegration(mock_transport) - + options = TaskResultCallbackOptions( status=TaskResultStatus.PASSED, message="All tests passed", ) - + integration.callback( callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", access_token="test-token-123", options=options, ) - + # Verify request was made mock_transport.request.assert_called_once() call_args = mock_transport.request.call_args - + assert call_args[0][0] == "PATCH" - assert call_args[0][1] == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + assert ( + call_args[0][1] + == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + ) assert "Authorization" in call_args[1]["headers"] assert call_args[1]["headers"]["Authorization"] == "Bearer test-token-123" @@ -235,55 +245,55 @@ def test_callback_empty_url(self): """Test callback fails with empty URL.""" mock_transport = MagicMock() integration = RunTasksIntegration(mock_transport) - + options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) - + with pytest.raises(TFEError) as exc_info: integration.callback( callback_url="", access_token="test-token", options=options, ) - + assert "callback_url cannot be empty" in str(exc_info.value) def test_callback_empty_token(self): """Test callback fails with empty token.""" mock_transport = MagicMock() integration = RunTasksIntegration(mock_transport) - + options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) - + with pytest.raises(TFEError) as exc_info: integration.callback( callback_url="https://example.com/callback", access_token="", options=options, ) - + assert "access_token cannot be empty" in str(exc_info.value) def test_callback_invalid_status(self): """Test callback fails with invalid status.""" mock_transport = MagicMock() integration = RunTasksIntegration(mock_transport) - + options = TaskResultCallbackOptions(status="invalid-status") - + with pytest.raises(TFEError) as exc_info: integration.callback( callback_url="https://example.com/callback", access_token="test-token", options=options, ) - + assert "Invalid task result status" in str(exc_info.value) def test_callback_with_outcomes(self): """Test callback with detailed outcomes.""" mock_transport = MagicMock() integration = RunTasksIntegration(mock_transport) - + outcome = TaskResultOutcome( outcome_id="CHECK-1", description="Policy violation", @@ -293,22 +303,22 @@ def test_callback_with_outcomes(self): "Severity": [TaskResultTag(label="High", level="error")], }, ) - + options = TaskResultCallbackOptions( status=TaskResultStatus.FAILED, message="Policy check failed", url="https://example.com/results", outcomes=[outcome], ) - + integration.callback( callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", access_token="test-token-123", options=options, ) - + call_args = mock_transport.request.call_args body = call_args[1]["json_body"] - + assert "relationships" in body["data"] assert "outcomes" in body["data"]["relationships"] From 9bdee78f95f5e6ad411a6e022429216e1fb49dcb Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Tue, 27 Jan 2026 13:01:29 +0530 Subject: [PATCH 5/7] feat(run-tasks): add run tasks integration support This commit adds comprehensive support for Terraform Cloud/Enterprise Run Tasks Integration to the python-tfe SDK. This feature allows developers to create webhook servers that can validate Terraform runs and send results back to TFC/TFE. Key additions: - Production-ready webhook server example with deployment instructions - Complete documentation explaining architecture and flow - Support for multiple cloud deployment platforms (AWS EC2, Heroku, GCP, etc.) - Comprehensive validation examples (cost control, security, compliance) - Clean implementation following HashiCorp patterns --- docs/RUN_TASKS_INTEGRATION_EXAMPLE.md | 296 ++++++++++++++++++++++++++ examples/run_tasks_integration.py | 265 +++++++++++++++++++++++ 2 files changed, 561 insertions(+) create mode 100644 docs/RUN_TASKS_INTEGRATION_EXAMPLE.md create mode 100644 examples/run_tasks_integration.py diff --git a/docs/RUN_TASKS_INTEGRATION_EXAMPLE.md b/docs/RUN_TASKS_INTEGRATION_EXAMPLE.md new file mode 100644 index 0000000..e3c7aeb --- /dev/null +++ b/docs/RUN_TASKS_INTEGRATION_EXAMPLE.md @@ -0,0 +1,296 @@ +# Run Tasks Integration Example - Explanation + +## What is `examples/run_tasks_integration.py`? + +It's a **webhook server** that integrates with Terraform Cloud/Enterprise (TFC/TFE) run tasks. This is NOT a test file - it's a fully functional example server that you can deploy and customize. + +--- + +## How It Works: The Complete Flow + +### Step 1: You Start the Server +```bash +python examples/run_tasks_integration.py --port 8888 +``` + +The server starts and waits for incoming webhooks from TFC/TFE. + +### Step 2: Configure in TFC/TFE +You configure a run task in TFC/TFE pointing to your server: +- **URL**: `http://your-server:8888` +- **Stage**: When to run (pre-plan, post-plan, pre-apply, post-apply) +- **Enforcement**: Advisory (warn) or Mandatory (block) + +### Step 3: Someone Triggers a Terraform Run +When a user clicks "Start Run" in TFC/TFE or pushes code: + +``` +User triggers run + ↓ +TFC/TFE prepares the run + ↓ +TFC/TFE sends webhook → http://your-server:8888 +``` + +### Step 4: Your Server Receives the Webhook +The webhook payload contains: +```json +{ + "run_id": "run-abc123", + "workspace_name": "prod-app", + "organization_name": "my-company", + "stage": "pre_plan", + "access_token": "secret-token", + "task_result_callback_url": "https://app.terraform.io/api/v2/task-results/xyz", + ... +} +``` + +### Step 5: Your Server Processes It +```python +# Parse the incoming webhook +request = RunTaskRequest.model_validate(payload) + +# YOUR CUSTOM VALIDATION LOGIC HERE +# Examples: +# - Check if resources have required tags +# - Validate naming conventions +# - Run security scans (Checkov, tfsec, etc.) +# - Check cost estimates +# - Verify compliance policies +# - Check for sensitive data in configs + +result_status = "passed" # or "failed" +result_message = "All checks passed!" +``` + +### Step 6: Your Server Sends Results Back +```python +client.run_tasks_integration.callback( + callback_url=request.task_result_callback_url, + access_token=request.access_token, + options=TaskResultCallbackOptions( + status="passed", # or "failed" + message="All checks passed!", + url="https://your-dashboard.com/results", + outcomes=[ + TaskResultOutcome( + outcome_id="check-1", + description="Security scan passed", + body="No vulnerabilities found", + tags={ + "Status": [TaskResultTag(label="Passed", level="info")], + "Severity": [TaskResultTag(label="Low")] + } + ) + ] + ) +) +``` + +### Step 7: TFC/TFE Receives and Displays Results +In the TFC/TFE UI, users see: +- ✅ **Run Task Status**: Passed or Failed +- 📝 **Message**: Your custom message +- 📊 **Outcomes**: Detailed results with tags +- 🔗 **Link**: To your detailed results page + +If mandatory and failed → Run is blocked ⛔ +If advisory and failed → Run continues with warning ⚠️ + +--- + +## Real-World Use Cases + +### Example 1: Cost Control +```python +# Check estimated costs +if estimated_cost > 10000: + result_status = "failed" + result_message = f"Cost ${estimated_cost} exceeds budget limit" +``` + +### Example 2: Production Safety +```python +# Require approval for production +if request.workspace_name.startswith("prod-"): + result_status = "failed" + result_message = "Production changes require manual approval" +``` + +### Example 3: Security Scanning +```python +# Run Checkov security scan +scan_results = run_checkov(request.configuration_version_download_url) +if scan_results.has_critical_issues: + result_status = "failed" + result_message = f"Found {len(scan_results.critical)} critical security issues" +``` + +### Example 4: Tagging Enforcement +```python +# Check if all resources have required tags +if not all_resources_have_tags(config, required_tags=["owner", "project"]): + result_status = "failed" + result_message = "All resources must have 'owner' and 'project' tags" +``` + +### Example 5: Compliance Checking +```python +# Check against compliance policies +if not meets_compliance_standards(config): + result_status = "failed" + result_message = "Configuration violates compliance policy XYZ-123" +``` + +--- + +## What the Example Demonstrates + +The example file shows you how to: + +✅ **Receive webhooks** from TFC/TFE using a simple HTTP server +✅ **Parse `RunTaskRequest`** - the webhook payload from TFC/TFE +✅ **Access run information** - workspace, organization, stage, run ID +✅ **Add custom validation logic** - where you insert your checks +✅ **Create detailed outcomes** - with descriptions, tags, and links +✅ **Send results back** - using the `callback()` method +✅ **Handle errors gracefully** - proper error handling and responses + +--- + +## Why This Example is Important + +### Without Run Tasks Integration: +- ❌ Manual code reviews for every change +- ❌ Inconsistent policy enforcement +- ❌ Security issues discovered after deployment +- ❌ Cost overruns without warnings + +### With Run Tasks Integration: +- ✅ Automated validation before apply +- ✅ Consistent policy enforcement +- ✅ Security issues caught early +- ✅ Cost controls built into workflow +- ✅ Detailed audit trail +- ✅ Custom business logic enforcement + +--- + +## How to Use This Example + +### 1. Basic Usage (Local Testing) +```bash +# Start the server +python examples/run_tasks_integration.py --port 8888 + +# In another terminal, test with mock data +python test_run_tasks_local.py +``` + +### 2. Deploy to Cloud (Real Usage) +```bash +# On your cloud server (EC2, Azure, GCP, etc.) +python examples/run_tasks_integration.py --port 8888 + +# Configure in TFC/TFE: +# URL: http://your-server-ip:8888 +``` + +### 3. Customize the Logic +Edit the example file around line 54-67: +```python +# Replace this section with your custom checks +# Example: Check workspace naming +if not request.workspace_name.startswith(("dev-", "prod-", "staging-")): + result_status = "failed" + result_message = "Workspace must be prefixed with dev-, prod-, or staging-" +``` + +--- + +## Key Components Used + +### 1. `RunTaskRequest` +Parses the incoming webhook from TFC/TFE: +- `run_id` - The Terraform run ID +- `workspace_name` - Which workspace +- `organization_name` - Which organization +- `stage` - When it's running (pre-plan, post-plan, etc.) +- `access_token` - Token for sending callback +- `task_result_callback_url` - Where to send results + +### 2. `TaskResultCallbackOptions` +Defines the result to send back: +- `status` - "passed", "failed", "running" +- `message` - Short summary +- `url` - Link to detailed results (optional) +- `outcomes` - Detailed results list (optional) + +### 3. `TaskResultOutcome` +Individual check result: +- `outcome_id` - Unique identifier +- `description` - What was checked +- `body` - Detailed explanation +- `url` - Link to more info +- `tags` - Categorization (Status, Severity, etc.) + +### 4. `TaskResultTag` +Tag for categorization: +- `label` - Tag name (e.g., "Critical", "Passed") +- `level` - Severity (e.g., "error", "warning", "info") + +### 5. `run_tasks_integration.callback()` +Sends results back to TFC/TFE: +- Uses the callback URL from the webhook +- Authenticates with the access token +- Sends structured result data + +--- + +## Testing Strategy + +### Level 1: Unit Tests ✅ +```bash +pytest tests/units/test_run_tasks_integration.py +``` +Tests parsing and validation logic. + +### Level 2: Local Integration ✅ +```bash +python test_run_tasks_local.py +``` +Simulates complete flow with mock TFC/TFE server. + +### Level 3: Cloud Deployment ✅ +Deploy to EC2/cloud and test with real webhooks. + +### Level 4: Real HCP Terraform ✅ +Configure in actual TFC/TFE and trigger real runs. + +--- + +## Summary + +**What it is**: A working webhook server that integrates with TFC/TFE run tasks + +**What it does**: Receives run information, validates it, sends results back + +**Why it's important**: Enables automated policy enforcement and custom validation + +**How to use it**: Deploy the server, configure in TFC/TFE, customize the validation logic + +**Not a test**: It's a functional example you can deploy and use in production! + +--- + +## Next Steps + +1. ✅ Review the example code +2. ✅ Test locally with `test_run_tasks_local.py` +3. ✅ Customize validation logic for your needs +4. ✅ Deploy to cloud server +5. ✅ Configure in TFC/TFE +6. ✅ Monitor and iterate + +**The example gives you everything you need to build your own run tasks integration!** diff --git a/examples/run_tasks_integration.py b/examples/run_tasks_integration.py new file mode 100644 index 0000000..9f19ff2 --- /dev/null +++ b/examples/run_tasks_integration.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python +""" +Run Tasks Integration Example - Real TFC/TFE Testing + +This example shows how to create a webhook server that integrates with +Terraform Cloud/Enterprise run tasks to validate runs and send results back. + +STEP-BY-STEP TESTING WITH REAL TFC/TFE: + +1. START THE SERVER: + python examples/run_tasks_integration.py --port 8888 + +2. MAKE IT ACCESSIBLE (choose one): + + Option A - Using ngrok (for local testing): + - Install: https://ngrok.com/download + - Run: ngrok http 8888 + - Copy the public URL (e.g., https://abc123.ngrok.io) + + Option B - Deploy to cloud (recommended for production): + + AWS EC2: + - Launch EC2 instance (t2.micro sufficient for testing) + - Upload this file: scp run_tasks_integration.py ec2-user@YOUR-IP:~/ + - SSH in: ssh ec2-user@YOUR-IP + - Install Python 3.11+: sudo dnf install python3.11 python3.11-pip + - Install dependencies: python3.11 -m pip install --user pytfe + - Run server: python3.11 run_tasks_integration.py --port 8888 + - Configure security group: Allow port 8888 from 0.0.0.0/0 + - Use public IP: http://YOUR-EC2-IP:8888 + + Heroku (easiest): + - Create Procfile: web: python run_tasks_integration.py --port $PORT + - Create requirements.txt: pytfe>=0.1.0 + - Deploy: git push heroku main + - Use Heroku URL: https://your-app.herokuapp.com + + Google Cloud Run: + - Create Dockerfile: FROM python:3.11 / RUN pip install pytfe / COPY . . / CMD ["python", "run_tasks_integration.py", "--port", "8080"] + - Deploy: gcloud run deploy --source . + - Use Cloud Run URL: https://your-service-hash.run.app + + DigitalOcean Droplet: + - Create Ubuntu droplet + - Upload file and install Python/pytfe + - Run with: python3 run_tasks_integration.py --port 8888 + - Use droplet IP: http://YOUR-DROPLET-IP:8888 + + Benefits of cloud deployment: + - Permanent URL (no ngrok reconnections) + - Better reliability and uptime + - Can handle production workloads + - SSL/HTTPS support available + - Scalable if needed + +3. CREATE RUN TASK IN TFC/TFE: + - Go to: https://app.terraform.io/app/YOUR_ORG/settings/tasks + - Click "Create run task" + - Name: "python-tfe-test" + - URL: Your public URL from step 2 + - Save and wait for verification (check mark) + +4. ATTACH TO WORKSPACE: + - Go to workspace settings → Run Tasks + - Click "Add run task" + - Select "python-tfe-test" + - Enforcement: Advisory (for testing) + - Stage: Pre-plan + - Save + +5. TRIGGER A RUN: + - Go to your workspace + - Click "Actions" → "Start new run" + - Watch this terminal for webhook activity! + - Check TFC/TFE UI for run task results + +CUSTOMIZE VALIDATION LOGIC: +Edit the section around line 80 to add your custom checks: +- Cost validation +- Security scanning (Checkov, tfsec) +- Policy enforcement +- Custom approval workflows + +API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration +""" + +import json +from http.server import BaseHTTPRequestHandler, HTTPServer + +from pytfe import TFEClient +from pytfe.models import RunTaskRequest +from pytfe.resources.run_tasks_integration import ( + TaskResultCallbackOptions, + TaskResultOutcome, + TaskResultTag, +) + + +class RunTaskHandler(BaseHTTPRequestHandler): + """HTTP handler for run task callbacks from TFC/TFE.""" + + def do_POST(self): + """Handle POST request from TFC/TFE run task webhook.""" + # Read the request body + content_length = int(self.headers["Content-Length"]) + body = self.rfile.read(content_length) + + try: + # Parse the incoming run task request + payload = json.loads(body) + print("\n" + "=" * 60) + print("Received Run Task Request") + print("=" * 60) + + # Parse into RunTaskRequest model + request = RunTaskRequest.model_validate(payload) + + print(f"Run ID: {request.run_id}") + print(f"Organization: {request.organization_name}") + print(f"Workspace: {request.workspace_name}") + print(f"Workspace ID: {request.workspace_id}") + print(f"Stage: {request.stage}") + print(f"Callback URL: {request.task_result_callback_url}") + print(f"Is Speculative: {request.is_speculative}") + + # Handle verification requests (test webhooks from TFC/TFE) + if ( + request.organization_name == "test-org" + or request.workspace_name == "test-workspace" + ): + print("\n[OK] Verification request detected - responding with 200 OK") + print("=" * 60 + "\n") + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "ok"}).encode()) + return + + # =============================================================== + # CUSTOMIZE YOUR VALIDATION LOGIC HERE + # =============================================================== + # This is where you add your custom checks and validation. + # Examples: + # + # 1. Cost Control: + # if estimated_cost > 1000: + # result_status = "failed" + # result_message = f"Cost ${estimated_cost} exceeds limit" + # + # 2. Security Scanning: + # scan_results = run_checkov(request.configuration_version_download_url) + # if scan_results.failed: + # result_status = "failed" + # result_message = "Security scan failed" + # + # 3. Policy Enforcement: + # if not workspace_has_required_tags(request.workspace_name): + # result_status = "failed" + # result_message = "Workspace missing required tags" + # + # 4. Custom Approval: + # if request.workspace_name.startswith("prod-"): + # result_status = "failed" + # result_message = "Production changes require manual approval" + + # For this example, we'll just pass the task + result_status = "passed" + result_message = "All checks passed successfully" + + # Create detailed outcomes (optional but recommended) + outcomes = [ + TaskResultOutcome( + outcome_id="check-1", + description="Configuration validation passed", + body="All Terraform configurations are valid and follow best practices.", + url="https://example.com/results/check-1", + tags={ + "Status": [TaskResultTag(label="Passed", level="info")], + "Category": [TaskResultTag(label="Validation")], + }, + ) + ] + + # Create callback options + callback_options = TaskResultCallbackOptions( + status=result_status, + message=result_message, + url="https://example.com/full-results", + outcomes=outcomes, + ) + + # Initialize client and send callback + print("\nInitializing TFEClient...") + print(f"Access token from webhook: {request.access_token[:10]}***") + client = TFEClient() + print("Client initialized successfully") + + print(f"Sending callback to: {request.task_result_callback_url[:50]}...") + client.run_tasks_integration.callback( + callback_url=request.task_result_callback_url, + access_token=request.access_token, + options=callback_options, + ) + + print(f"\n[SUCCESS] Callback sent successfully: {result_status}") + print("=" * 60 + "\n") + + # Respond to TFC/TFE + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "received"}).encode()) + + except Exception as e: + print(f"Error processing request: {e}") + self.send_response(500) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": str(e)}).encode()) + + def log_message(self, format, *args): + """Suppress default HTTP logging.""" + pass + + +def run_server(port=8080): + """Start the run task callback server.""" + server_address = ("", port) + httpd = HTTPServer(server_address, RunTaskHandler) + + print("=" * 60) + print("Run Tasks Integration Callback Server") + print("=" * 60) + print(f"Listening on http://localhost:{port}") + print("\nFor local testing:") + print(" 1. Use ngrok or similar tool to expose this server:") + print(f" ngrok http {port}") + print(" 2. Configure your run task in TFC/TFE with the ngrok URL") + print(" 3. Trigger a run in your workspace") + print("\nWaiting for requests from TFC/TFE...") + print("=" * 60 + "\n") + + try: + httpd.serve_forever() + except KeyboardInterrupt: + print("\n\nShutting down server...") + httpd.shutdown() + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser( + description="Run Tasks Integration callback server" + ) + parser.add_argument( + "--port", + type=int, + default=8080, + help="Port to listen on (default: 8080)", + ) + args = parser.parse_args() + + run_server(port=args.port) From c21a8efc3047360023c3c801cc9d5c1245db6286 Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Fri, 30 Jan 2026 13:04:20 +0530 Subject: [PATCH 6/7] refactor: clean up run tasks integration documentation --- docs/RUN_TASKS_INTEGRATION_EXAMPLE.md | 66 +++++++++++++-------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/docs/RUN_TASKS_INTEGRATION_EXAMPLE.md b/docs/RUN_TASKS_INTEGRATION_EXAMPLE.md index e3c7aeb..1f113be 100644 --- a/docs/RUN_TASKS_INTEGRATION_EXAMPLE.md +++ b/docs/RUN_TASKS_INTEGRATION_EXAMPLE.md @@ -90,13 +90,13 @@ client.run_tasks_integration.callback( ### Step 7: TFC/TFE Receives and Displays Results In the TFC/TFE UI, users see: -- ✅ **Run Task Status**: Passed or Failed -- 📝 **Message**: Your custom message -- 📊 **Outcomes**: Detailed results with tags -- 🔗 **Link**: To your detailed results page +- **Run Task Status**: Passed or Failed +- **Message**: Your custom message +- **Outcomes**: Detailed results with tags +- **Link**: To your detailed results page -If mandatory and failed → Run is blocked ⛔ -If advisory and failed → Run continues with warning ⚠️ +If mandatory and failed → Run is blocked +If advisory and failed → Run continues with warning --- @@ -149,31 +149,31 @@ if not meets_compliance_standards(config): The example file shows you how to: -✅ **Receive webhooks** from TFC/TFE using a simple HTTP server -✅ **Parse `RunTaskRequest`** - the webhook payload from TFC/TFE -✅ **Access run information** - workspace, organization, stage, run ID -✅ **Add custom validation logic** - where you insert your checks -✅ **Create detailed outcomes** - with descriptions, tags, and links -✅ **Send results back** - using the `callback()` method -✅ **Handle errors gracefully** - proper error handling and responses +- **Receive webhooks** from TFC/TFE using a simple HTTP server +- **Parse `RunTaskRequest`** - the webhook payload from TFC/TFE +- **Access run information** - workspace, organization, stage, run ID +- **Add custom validation logic** - where you insert your checks +- **Create detailed outcomes** - with descriptions, tags, and links +- **Send results back** - using the `callback()` method +- **Handle errors gracefully** - proper error handling and responses --- ## Why This Example is Important ### Without Run Tasks Integration: -- ❌ Manual code reviews for every change -- ❌ Inconsistent policy enforcement -- ❌ Security issues discovered after deployment -- ❌ Cost overruns without warnings +- Manual code reviews for every change +- Inconsistent policy enforcement +- Security issues discovered after deployment +- Cost overruns without warnings ### With Run Tasks Integration: -- ✅ Automated validation before apply -- ✅ Consistent policy enforcement -- ✅ Security issues caught early -- ✅ Cost controls built into workflow -- ✅ Detailed audit trail -- ✅ Custom business logic enforcement +- Automated validation before apply +- Consistent policy enforcement +- Security issues caught early +- Cost controls built into workflow +- Detailed audit trail +- Custom business logic enforcement --- @@ -250,22 +250,22 @@ Sends results back to TFC/TFE: ## Testing Strategy -### Level 1: Unit Tests ✅ +### Level 1: Unit Tests ```bash pytest tests/units/test_run_tasks_integration.py ``` Tests parsing and validation logic. -### Level 2: Local Integration ✅ +### Level 2: Local Integration ```bash python test_run_tasks_local.py ``` Simulates complete flow with mock TFC/TFE server. -### Level 3: Cloud Deployment ✅ +### Level 3: Cloud Deployment Deploy to EC2/cloud and test with real webhooks. -### Level 4: Real HCP Terraform ✅ +### Level 4: Real HCP Terraform Configure in actual TFC/TFE and trigger real runs. --- @@ -286,11 +286,11 @@ Configure in actual TFC/TFE and trigger real runs. ## Next Steps -1. ✅ Review the example code -2. ✅ Test locally with `test_run_tasks_local.py` -3. ✅ Customize validation logic for your needs -4. ✅ Deploy to cloud server -5. ✅ Configure in TFC/TFE -6. ✅ Monitor and iterate +1. Review the example code +2. Test locally with `test_run_tasks_local.py` +3. Customize validation logic for your needs +4. Deploy to cloud server +5. Configure in TFC/TFE +6. Monitor and iterate **The example gives you everything you need to build your own run tasks integration!** From 44d6d4ccd3e7be6398752513abac2833280c5a93 Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Mon, 16 Feb 2026 11:13:15 +0530 Subject: [PATCH 7/7] Add run_tasks_integration model --- src/pytfe/models/run_tasks_integration.py | 156 ++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 src/pytfe/models/run_tasks_integration.py diff --git a/src/pytfe/models/run_tasks_integration.py b/src/pytfe/models/run_tasks_integration.py new file mode 100644 index 0000000..ef96b2a --- /dev/null +++ b/src/pytfe/models/run_tasks_integration.py @@ -0,0 +1,156 @@ +"""Run Tasks Integration models for python-tfe. + +This module contains models for run tasks integration callback functionality. +""" + +from __future__ import annotations + +from typing import Any + +from .task_result import TaskResultStatus + + +class TaskResultTag: + """Tag to enrich outcomes display in TFC/TFE. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#severity-and-status-tags + """ + + def __init__(self, label: str, level: str | None = None): + """Initialize a task result tag. + + Args: + label: The label for the tag + level: Optional severity level (error, warning, info) + """ + self.label = label + self.level = level + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + result = {"label": self.label} + if self.level: + result["level"] = self.level + return result + + +class TaskResultOutcome: + """Detailed run task outcome for improved visibility in TFC/TFE UI. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#outcomes-payload-body + """ + + def __init__( + self, + outcome_id: str | None = None, + description: str | None = None, + body: str | None = None, + url: str | None = None, + tags: dict[str, list[TaskResultTag]] | None = None, + ): + """Initialize a task result outcome. + + Args: + outcome_id: Unique identifier for the outcome + description: Brief description of the outcome + body: Detailed body content (supports markdown) + url: URL to view more details + tags: Dictionary of tag categories to lists of tags + """ + self.outcome_id = outcome_id + self.description = description + self.body = body + self.url = url + self.tags = tags or {} + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON:API serialization.""" + result: dict[str, Any] = {"type": "task-result-outcomes", "attributes": {}} + + if self.outcome_id: + result["attributes"]["outcome-id"] = self.outcome_id + if self.description: + result["attributes"]["description"] = self.description + if self.body: + result["attributes"]["body"] = self.body + if self.url: + result["attributes"]["url"] = self.url + if self.tags: + result["attributes"]["tags"] = { + key: [tag.to_dict() for tag in tags] for key, tags in self.tags.items() + } + + return result + + +class TaskResultCallbackOptions: + """Options for sending task result callback to TFC/TFE. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration#request-body-1 + """ + + def __init__( + self, + status: str, + message: str | None = None, + url: str | None = None, + outcomes: list[TaskResultOutcome] | None = None, + ): + """Initialize callback options. + + Args: + status: Task result status (passed, failed, running) + message: Optional message about the task result + url: Optional URL to view detailed results + outcomes: Optional list of detailed outcomes + """ + self.status = status + self.message = message + self.url = url + self.outcomes = outcomes or [] + + def validate(self) -> None: + """Validate the callback options. + + Only passed, failed, and running statuses are allowed for callbacks. + pending and errored are not valid callback statuses per TFC/TFE API. + + Raises: + InvalidTaskResultsCallbackStatus: If status is not valid for callbacks + """ + from ..errors import InvalidTaskResultsCallbackStatus + + valid_statuses = [ + TaskResultStatus.PASSED.value, + TaskResultStatus.FAILED.value, + TaskResultStatus.RUNNING.value, + ] + if self.status not in valid_statuses: + raise InvalidTaskResultsCallbackStatus( + f"Invalid task result status: {self.status}. " + f"Must be one of: {', '.join(valid_statuses)}" + ) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for JSON:API format.""" + data: dict[str, Any] = { + "type": "task-results", + "attributes": { + "status": self.status, + }, + } + + if self.message: + data["attributes"]["message"] = self.message + if self.url: + data["attributes"]["url"] = self.url + + if self.outcomes: + data["relationships"] = { + "outcomes": {"data": [outcome.to_dict() for outcome in self.outcomes]} + } + + return {"data": data}