diff --git a/examples/agent-frameworks/crewai/README.md b/examples/agent-frameworks/crewai/README.md new file mode 100644 index 0000000..545fd00 --- /dev/null +++ b/examples/agent-frameworks/crewai/README.md @@ -0,0 +1,118 @@ +# CrewAI APort Task Verification + +APort verification decorator for CrewAI task functions — every task +execution is gated by an APort policy check before the task logic runs. + +## What + +`@aport_verify` wraps CrewAI task functions so that before a task executes, +the agent's APort passport is verified against a configurable policy pack. +If the agent doesn't satisfy the policy, the task is denied with a clear error. + +## Install + +```bash +pip install crewai # or just include src/ in your PYTHONPATH +``` + +No extra dependencies — the decorator uses only the standard library +(`urllib.request`) for APort API calls. + +## Quick Start + +```python +from src import aport_verify + +@aport_verify("code.repository.merge.v1") +def merge_code(repo: str, branch: str) -> str: + return f"Merged {branch} into {repo}" +``` + +## Key Features + +- **Works with sync and async task functions** +- **Configurable agent ID resolution**: static string → kwarg → env var +- **Fail-open / fail-closed modes** via `strict=True/False` +- **Context injection** via `context_builder` callback +- **Dependency-free** APort client (no external SDK required) + +## API Reference + +### `@aport_verify(policy, *, agent_id=None, client=None, strict=True, context_builder=None)` + +| Argument | Type | Description | +|---|---|---| +| `policy` | `str` | APort policy pack name (required) | +| `agent_id` | `str` | Static agent ID (or resolved from kwarg/env) | +| `client` | `APortHTTPClient` | HTTP client instance (uses default if None) | +| `strict` | `bool` | If `True`, raises on denial. If `False`, just logs and proceeds. | +| `context_builder` | `callable` | Receives same args as the function; return value is sent as verification context | + +### Agent ID Resolution (in priority order) + +1. Static `agent_id=` kwarg to the decorator +2. `agent_id` kwarg passed to the function at call time +3. `APORT_AGENT_ID` environment variable +4. Raises `APortVerificationError` if none found + +## Usage Examples + +### Basic usage + +```python +from src import aport_verify + +@aport_verify("finance.payment.refund.v1") +def process_refund(amount: float, agent_id: str = None) -> str: + return f"Refunded ${amount}" +``` + +### With static agent ID + +```python +@aport_verify("data.export.v1", agent_id="agent_export_001") +def export_data(format: str) -> str: + return f"Exported data as {format}" +``` + +### With context injection + +```python +@aport_verify( + "finance.payment.refund.v1", + context_builder=lambda amount, **kw: {"amount": amount} +) +def refund_item(amount: float, item: str) -> str: + return f"Refunded {item} for ${amount}" +``` + +### Non-strict mode (denial doesn't block execution) + +```python +@aport_verify("monitoring.alert.send", strict=False) +def send_alert(message: str) -> str: + return f"Alert: {message}" +``` + +### Async task + +```python +from src import aport_verify + +@aport_verify("code.repository.merge.v1") +async def async_merge(repo: str, branch: str) -> str: + return f"Merged {branch}" +``` + +## Development + +```bash +# Run tests +PYTHONPATH=src python3 -m unittest discover -s tests + +# Syntax check +python3 -m compileall src tests examples + +# Run example +PYTHONPATH=src python3 examples/basic_usage.py +``` \ No newline at end of file diff --git a/examples/agent-frameworks/crewai/examples/basic_usage.py b/examples/agent-frameworks/crewai/examples/basic_usage.py new file mode 100644 index 0000000..958444a --- /dev/null +++ b/examples/agent-frameworks/crewai/examples/basic_usage.py @@ -0,0 +1,101 @@ +"""Basic usage examples for APort CrewAI verification decorator.""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from aport_verify import ( + aport_verify, + APortVerificationError, + APortHTTPClient, + VerificationResult, +) + + +class MockClient: + def __init__(self, allow: bool = True): + self.allow = allow + + def verify(self, policy: str, agent_id: str, context=None): + return VerificationResult( + verified=self.allow, + allow=self.allow, + reasons=["mock verified"] if self.allow else ["mock denied"], + ) + + +# Use mock client for demo +mock_client = MockClient(allow=True) + + +@aport_verify("code.repository.merge.v1", client=mock_client) +def merge_code(repo: str, branch: str) -> str: + return f"Merged {branch} into {repo}" + + +@aport_verify("finance.payment.refund.v1", agent_id="agent_finance_001", client=mock_client) +def process_refund(amount: float, agent_id: str = None) -> str: + return f"Refunded ${amount:.2f}" + + +@aport_verify( + "data.export.v1", + client=mock_client, + context_builder=lambda format, **kw: {"format": format}, +) +def export_data(format: str) -> str: + return f"Exported data as {format}" + + +@aport_verify("monitoring.alert.send", strict=False, client=mock_client) +def send_alert(message: str) -> str: + return f"Alert sent: {message}" + + +def main(): + print("=" * 60) + print("CrewAI + APort Task Verification — Examples") + print("=" * 60) + print() + + print("1. Basic merge task:") + result = merge_code("main", "feature/login") + print(f" ✅ {result}") + print() + + print("2. Refund task with static agent ID:") + result = process_refund(49.99) + print(f" ✅ {result}") + print() + + print("3. Export task with context injection:") + result = export_data("csv") + print(f" ✅ {result}") + print() + + print("4. Non-strict alert task (denial doesn't block):") + result = send_alert("high cpu usage") + print(f" ✅ {result}") + print() + + print("5. Denied task (strict mode):") + deny_client = MockClient(allow=False) + deny_decorator = aport_verify("secure.action.v1", client=deny_client, strict=True) + + @deny_decorator + def secure_action(agent_id=None): + return "This should not run" + + try: + secure_action(agent_id="test_agent") + print(" ❌ Should have raised!") + except APortVerificationError as e: + print(f" ✅ Blocked as expected: {e}") + print() + + print("All examples completed.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/agent-frameworks/crewai/src/__init__.py b/examples/agent-frameworks/crewai/src/__init__.py new file mode 100644 index 0000000..5b3859e --- /dev/null +++ b/examples/agent-frameworks/crewai/src/__init__.py @@ -0,0 +1,15 @@ +"""APort CrewAI integration — verification decorator for CrewAI tasks.""" + +from src.aport_verify import ( + aport_verify, + APortVerificationError, + APortHTTPClient, + VerificationResult, +) + +__all__ = [ + "aport_verify", + "APortVerificationError", + "APortHTTPClient", + "VerificationResult", +] \ No newline at end of file diff --git a/examples/agent-frameworks/crewai/src/aport_verify.py b/examples/agent-frameworks/crewai/src/aport_verify.py new file mode 100644 index 0000000..cf28530 --- /dev/null +++ b/examples/agent-frameworks/crewai/src/aport_verify.py @@ -0,0 +1,210 @@ +"""APort CrewAI verification decorator and client.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional + + +@dataclass +class VerificationResult: + verified: bool + allow: bool + reasons: list[str] = None + passport: dict = None + + def __post_init__(self): + if self.reasons is None: + self.reasons = [] + if self.passport is None: + self.passport = {} + + +class APortVerificationError(Exception): + """Raised when APort verification fails.""" + + def __init__(self, message: str, result: Optional[VerificationResult] = None): + super().__init__(message) + self.result = result + + +class APortHTTPClient: + """Simple dependency-free APort HTTP client for verification.""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + timeout: int = 10, + ): + self.api_key = api_key or os.environ.get("APORT_API_KEY", "") + self.base_url = base_url or os.environ.get("APORT_BASE_URL", "https://api.aport.io") + self.timeout = timeout + + def verify( + self, + policy: str, + agent_id: str, + context: Optional[Dict[str, Any]] = None, + ) -> VerificationResult: + """Verify an agent against a policy pack.""" + import json + import urllib.request + import urllib.error + + if not self.api_key: + return VerificationResult( + verified=True, + allow=True, + reasons=["No API key configured — pass-through mode"], + ) + + payload = {"policy_pack": policy, "agent_id": agent_id} + if context: + payload["context"] = context + + req = urllib.request.Request( + f"{self.base_url}/verify", + data=json.dumps(payload).encode("utf-8"), + headers={ + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + data = json.loads(resp.read()) + return VerificationResult( + verified=data.get("verified", False), + allow=data.get("allow", False), + reasons=data.get("reasons", []), + passport=data.get("passport", {}), + ) + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8") if e.fp else "" + return VerificationResult( + verified=False, + allow=False, + reasons=[f"HTTP {e.code}: {body}"], + ) + except Exception as e: + return VerificationResult( + verified=False, + allow=False, + reasons=[f"Error: {str(e)}"], + ) + + +DEFAULT_CLIENT: Optional[APortHTTPClient] = None + + +def get_default_client() -> APortHTTPClient: + global DEFAULT_CLIENT + if DEFAULT_CLIENT is None: + DEFAULT_CLIENT = APortHTTPClient() + return DEFAULT_CLIENT + + +defaport_verify = Callable[..., Any] + + +def aport_verify( + policy: str, + agent_id: Optional[str] = None, + client: Optional[APortHTTPClient] = None, + strict: bool = True, + context_builder: Optional[Callable[..., Dict[str, Any]]] = None, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Decorator that verifies an agent's APort passport before executing a task. + + Args: + policy: The APort policy pack name to verify against. + agent_id: Static agent ID string. If not provided, resolved from: + - kwarg ``agent_id`` + - environment variable ``APORT_AGENT_ID`` + - raises APortVerificationError if none found + client: APortHTTPClient instance. Uses default global client if None. + strict: If True, raises APortVerificationError on denial. + If False, allows execution to proceed after denial. + context_builder: Optional callable that receives the same args/kwargs + as the decorated function and returns a dict of + context fields included in the verification request. + + Usage: + + @aport_verify("code.repository.merge.v1") + def merge_code(repo, branch): + ... + + @aport_verify("data.export.v1", agent_id="agent_123") + def export_data(format): + ... + + @aport_verify("finance.payment.refund.v1", context_builder=lambda amount, **kw: {"amount": amount}) + def process_refund(amount: float): + ... + """ + + def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: + def _resolve_agent_id(*args, **kwargs) -> str: + if agent_id: + return agent_id + if "agent_id" in kwargs: + return kwargs["agent_id"] + if args: + return str(args[0]) + env_id = os.environ.get("APORT_AGENT_ID") + if env_id: + return env_id + raise APortVerificationError( + "No agent_id found — set it as arg, kwarg, or env APORT_AGENT_ID" + ) + + def _build_context(*args, **kwargs) -> Dict[str, Any]: + if context_builder: + return context_builder(*args, **kwargs) + return {} + + def _sync_wrapper(*args, **kwargs): + client_ = client or get_default_client() + resolved_agent_id = _resolve_agent_id(*args, **kwargs) + ctx = _build_context(*args, **kwargs) + + result = client_.verify(policy, resolved_agent_id, ctx) + + if not result.allow: + error = APortVerificationError( + f"APort verification denied for agent {resolved_agent_id} " + f"against policy {policy}: {result.reasons}", + result, + ) + if strict: + raise error + return fn(*args, **kwargs) + + async def _async_wrapper(*args, **kwargs): + client_ = client or get_default_client() + resolved_agent_id = _resolve_agent_id(*args, **kwargs) + ctx = _build_context(*args, **kwargs) + + result = client_.verify(policy, resolved_agent_id, ctx) + + if not result.allow: + error = APortVerificationError( + f"APort verification denied for agent {resolved_agent_id} " + f"against policy {policy}: {result.reasons}", + result, + ) + if strict: + raise error + return await fn(*args, **kwargs) + + import asyncio + if asyncio.iscoroutinefunction(fn): + return _async_wrapper + return _sync_wrapper + + return decorator \ No newline at end of file diff --git a/examples/agent-frameworks/crewai/tests/test_aport_verify.py b/examples/agent-frameworks/crewai/tests/test_aport_verify.py new file mode 100644 index 0000000..68f11ef --- /dev/null +++ b/examples/agent-frameworks/crewai/tests/test_aport_verify.py @@ -0,0 +1,217 @@ +"""Tests for APort CrewAI verification decorator.""" + +import os +import sys +import unittest +from unittest.mock import patch, MagicMock +from contextlib import contextmanager + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from aport_verify import ( + aport_verify, + APortVerificationError, + APortHTTPClient, + VerificationResult, +) + + +class MockAPortClient: + """Mock client for testing.""" + + def __init__(self, result: VerificationResult): + self._result = result + + def verify(self, policy, agent_id, context=None): + return self._result + + +@contextmanager +def _env_helper(**vars): + original = {} + for k in vars: + original[k] = os.environ.get(k) + for k, v in vars.items(): + if v is None: + os.environ.pop(k, None) + else: + os.environ[k] = v + try: + yield + finally: + for k, v in original.items(): + if v is None: + os.environ.pop(k, None) + else: + os.environ[k] = v + + +class TestVerificationResult(unittest.TestCase): + def test_result_defaults(self): + r = VerificationResult(verified=True, allow=True) + self.assertEqual(r.reasons, []) + self.assertEqual(r.passport, {}) + + +class TestSyncVerification(unittest.TestCase): + def setUp(self): + self.client = MockAPortClient( + VerificationResult(verified=True, allow=True, reasons=["ok"]) + ) + + def test_verify_passes_and_runs(self): + @aport_verify("test.policy.v1", client=self.client) + def task(x): + return f"ran {x}" + + with _env_helper(APORT_AGENT_ID="sync_test_agent"): + result = task("arg") + self.assertEqual(result, "ran arg") + + def test_verify_denies_strict_raises(self): + deny_client = MockAPortClient( + VerificationResult(verified=False, allow=False, reasons=["denied"]) + ) + + @aport_verify("test.policy.v1", client=deny_client, strict=True) + def task(): + return "should not run" + + with _env_helper(APORT_AGENT_ID="deny_agent"): + with self.assertRaises(APortVerificationError) as ctx: + task() + self.assertIn("denied", str(ctx.exception)) + + def test_verify_denies_nonstrict_runs(self): + deny_client = MockAPortClient( + VerificationResult(verified=False, allow=False, reasons=["denied"]) + ) + + @aport_verify("test.policy.v1", client=deny_client, strict=False) + def task(x): + return f"ran {x}" + + with _env_helper(APORT_AGENT_ID="nonstrict_agent"): + result = task("arg") + self.assertEqual(result, "ran arg") + + +class TestAgentIDResolution(unittest.TestCase): + def test_static_agent_id(self): + client = MockAPortClient( + VerificationResult(verified=True, allow=True) + ) + + @aport_verify("test.policy.v1", agent_id="static_agent", client=client) + def task(): + return "ok" + + task() + + def test_kwarg_agent_id(self): + client = MockAPortClient( + VerificationResult(verified=True, allow=True) + ) + + @aport_verify("test.policy.v1", client=client) + def task(agent_id=None): + return agent_id + + result = task(agent_id="kwarg_agent") + self.assertEqual(result, "kwarg_agent") + + def test_env_var_agent_id(self): + client = MockAPortClient( + VerificationResult(verified=True, allow=True) + ) + + with _env_helper(APORT_AGENT_ID="env_agent"): + @aport_verify("test.policy.v1", client=client) + def task(): + return "ok" + + task() + + def test_no_agent_id_raises(self): + client = MockAPortClient( + VerificationResult(verified=True, allow=True) + ) + + with _env_helper(): + @aport_verify("test.policy.v1", client=client) + def task(): + return "ok" + + with self.assertRaises(APortVerificationError) as ctx: + task() + self.assertIn("No agent_id found", str(ctx.exception)) + + +class TestContextBuilder(unittest.TestCase): + def test_context_builder_called(self): + captured = {} + client = MockAPortClient( + VerificationResult(verified=True, allow=True) + ) + + def capture_context(*args, **kwargs): + captured.update(kwargs) + if args: + captured["args"] = args + return kwargs + + with _env_helper(APORT_AGENT_ID="ctx_test_agent"): + @aport_verify("test.policy.v1", client=client, context_builder=capture_context) + def task(amount, currency="USD"): + return f"refund {amount}" + + task(50.0, currency="EUR") + + # context builder receives both args and kwargs + self.assertEqual(len(captured.get("args", [])), 1) + self.assertEqual(captured["args"][0], 50.0) + self.assertEqual(captured.get("currency"), "EUR") + + +class TestAsyncVerification(unittest.TestCase): + def test_async_verify_passes(self): + client = MockAPortClient( + VerificationResult(verified=True, allow=True) + ) + + with _env_helper(APORT_AGENT_ID="async_test_agent"): + @aport_verify("test.policy.v1", client=client) + async def task(x): + return f"async {x}" + + import asyncio + result = asyncio.run(task("value")) + self.assertEqual(result, "async value") + + def test_async_verify_denies_strict_raises(self): + deny_client = MockAPortClient( + VerificationResult(verified=False, allow=False, reasons=["async denied"]) + ) + + with _env_helper(APORT_AGENT_ID="deny_test_agent"): + @aport_verify("test.policy.v1", client=deny_client, strict=True) + async def task(): + return "should not run" + + import asyncio + with self.assertRaises(APortVerificationError) as ctx: + asyncio.run(task()) + self.assertIn("async denied", str(ctx.exception)) + + +class TestAPortHTTPClient(unittest.TestCase): + def test_no_api_key_pass_through(self): + with _env_helper(APORT_API_KEY=None, APORT_BASE_URL=None): + client = APortHTTPClient() + result = client.verify("policy", "agent") + self.assertTrue(result.allow) + self.assertTrue(result.verified) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file