diff --git a/README.md b/README.md index 2047200..d557465 100644 --- a/README.md +++ b/README.md @@ -1,356 +1,7 @@ # Keon Python SDK -> *Part of the [Keon Governance Platform](https://github.com/Keon-Systems).* -> *Documentation: [keon-docs](https://github.com/Keon-Systems/keon-docs)* -> *Website: [keon.systems](https://keon.systems)* - ---- - -> **Powered by [OMEGA](https://github.com/omega-brands). Governed by [Keon](https://github.com/Keon-Systems).** - -**Team Claude deliverable** โ€” Thin, safe-by-default Python client for Keon Runtime. - -Governance-first SDK with strict invariants, automatic retries, and structured error handling. - -## ๐ŸŽฏ Features - -- **Strict validation**: CorrelationId canonical form enforced -- **Receipt requirement**: Execute REQUIRES DecisionReceiptId (hard fail if absent) -- **Automatic retries**: Transient failures retry with exponential backoff -- **Structured errors**: Typed exceptions with machine-readable codes -- **Type-safe**: Full Pydantic v2 contracts with validation -- **Async-first**: Built on httpx for modern async Python - -## ๐Ÿ“ฆ Installation - ```bash pip install keon-sdk +python -c "from keon_sdk import verify_caes; print(verify_caes('pack.zip', bundle_path='trust-bundle.json'))" ``` -### Development - -```bash -pip install keon-sdk[dev] -``` - -## ๐Ÿš€ Quick Start - -```python -from keon_sdk import KeonClient - -async with KeonClient( - base_url="https://api.keon.systems/runtime/v1", - api_key="your-api-key", -) as client: - # Request decision - receipt = await client.decide( - tenant_id="tenant-123", - actor_id="user-456", - action="execute_workflow", - resource_type="workflow", - resource_id="workflow-789", - context={"environment": "production"}, - ) - - # Execute if allowed - if receipt.decision == "allow": - result = await client.execute( - receipt=receipt, - action="execute_workflow", - parameters={ - "workflowId": "workflow-789", - "inputs": {"param1": "value1"}, - }, - ) - print(f"Execution ID: {result.execution_id}") - else: - print(f"Denied: {receipt.reason}") -``` - -## ๐Ÿ“š Core API - -### KeonClient - -Main entry point for SDK. - -```python -client = KeonClient( - base_url="https://api.keon.systems/runtime/v1", - api_key="your-api-key", # Optional: use api_key OR bearer_token - bearer_token="your-jwt", # Optional: JWT bearer token - retry_policy=RetryPolicy.default(), # Optional: custom retry policy - timeout=30.0, # Optional: request timeout in seconds -) -``` - -### decide() - -Request a policy decision before execution. - -```python -receipt = await client.decide( - tenant_id="tenant-123", - actor_id="user-456", - action="execute_workflow", - resource_type="workflow", - resource_id="workflow-789", - context={"environment": "production"}, # Optional - correlation_id="t:tenant-123|c:...", # Optional: auto-generated if not provided -) - -# receipt.decision: "allow" | "deny" -# receipt.receipt_id: "dr-..." -# receipt.reason: Human-readable explanation -``` - -**Returns:** `DecisionReceipt` - -**Raises:** -- `InvalidCorrelationIdError`: CorrelationId format invalid -- `ValidationError`: Request validation failed -- `NetworkError`: Connection/timeout issues -- `ServerError`: 5xx server errors - -### execute() - -Execute an action under governance. - -**REQUIRES** a `DecisionReceipt` from `decide()`. Hard fails without receipt. - -```python -result = await client.execute( - receipt=receipt, # REQUIRED: from decide() - action="execute_workflow", - parameters={ # Optional: action-specific parameters - "workflowId": "workflow-789", - "inputs": {"param1": "value1"}, - }, -) - -# result.execution_id: "exec-..." -# result.status: "completed" | "running" | "failed" | etc. -# result.result: Action-specific result data -``` - -**Returns:** `ExecutionResult` - -**Raises:** -- `MissingReceiptError`: Receipt not provided -- `InvalidReceiptError`: Receipt invalid or expired -- `ExecutionDeniedError`: Policy denied execution -- `NetworkError`: Connection/timeout issues -- `ServerError`: 5xx server errors - -### decide_and_execute() - -Convenience method: decide + execute in one call. - -```python -result = await client.decide_and_execute( - tenant_id="tenant-123", - actor_id="user-456", - action="execute_workflow", - resource_type="workflow", - resource_id="workflow-789", - parameters={"inputs": {"param1": "value1"}}, - context={"environment": "production"}, -) -``` - -Automatically handles decide โ†’ execute flow. Raises `ExecutionDeniedError` if policy denies. - -## ๐Ÿ”’ Strict Invariants - -### 1. CorrelationId is Mandatory - -Format: `t:|c:` - -```python -# Valid -"t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890" - -# Invalid - will raise InvalidCorrelationIdError -"invalid-format" -"t:tenant-123:c:01932b3c-4d5e-7890-abcd-ef1234567890" # Wrong separator -"t:tenant-123|c:01932b3c-4d5e-4890-abcd-ef1234567890" # UUIDv4, not v7 -``` - -Auto-generated if not provided to `decide()`. - -### 2. Execute Requires Receipt - -```python -# โœ… Correct -receipt = await client.decide(...) -if receipt.decision == "allow": - result = await client.execute(receipt=receipt, action="...") - -# โŒ Wrong - raises MissingReceiptError -await client.execute(receipt=None, action="...") - -# โŒ Wrong - raises ExecutionDeniedError -denied_receipt = await client.decide(...) # decision = "deny" -await client.execute(receipt=denied_receipt, action="...") -``` - -### 3. TenantId and ActorId Required - -All operations require tenant and actor identification for audit and attribution. - -## ๐Ÿ”„ Retry Policy - -Safe-by-default retries for transient failures. - -### Default Policy - -```python -RetryPolicy.default() -# - Max attempts: 3 -# - Backoff: 1s, 2s, 4s (exponential) -# - Retries: NetworkError, ServerError (5xx), RateLimitError -# - No retry: ValidationError, ExecutionDeniedError, 4xx (except 429) -``` - -### Custom Policy - -```python -client = KeonClient( - base_url="...", - retry_policy=RetryPolicy( - max_attempts=5, - min_wait_seconds=0.5, - max_wait_seconds=30.0, - multiplier=2.0, - ), -) -``` - -### No Retry - -```python -client = KeonClient( - base_url="...", - retry_policy=RetryPolicy.no_retry(), -) -``` - -### Aggressive Retry - -```python -client = KeonClient( - base_url="...", - retry_policy=RetryPolicy.aggressive(), -) -``` - -## โŒ Error Handling - -All errors inherit from `KeonError`. - -```python -from keon_sdk import ( - KeonError, - ValidationError, - InvalidCorrelationIdError, - MissingReceiptError, - InvalidReceiptError, - ExecutionDeniedError, - NetworkError, - ServerError, - RateLimitError, - RetryExhaustedError, -) - -try: - result = await client.decide_and_execute(...) -except ExecutionDeniedError as e: - print(f"Policy denied: {e.message}") - print(f"Receipt ID: {e.details['receiptId']}") -except NetworkError as e: - print(f"Network issue: {e.message}") -except KeonError as e: - print(f"Keon error: {e.code} - {e.message}") -``` - -### Error Structure - -All errors have: -- `message`: Human-readable message -- `code`: Machine-readable error code -- `details`: Additional context dict - -## ๐Ÿงช Testing - -```bash -# Run tests -pytest - -# Run with coverage -pytest --cov=keon_sdk --cov-report=html - -# Run specific test -pytest tests/test_correlation_id.py -v - -# Type checking -mypy keon_sdk - -# Linting -ruff check keon_sdk -black --check keon_sdk -``` - -## ๐Ÿ—๏ธ Architecture - -``` -keon_sdk/ -โ”œโ”€โ”€ client.py # KeonClient - main SDK entry point -โ”œโ”€โ”€ gateway.py # RuntimeGateway protocol -โ”œโ”€โ”€ http_gateway.py # HTTP implementation with retries -โ”œโ”€โ”€ contracts.py # Pydantic models (from keon-contracts) -โ”œโ”€โ”€ errors.py # Typed exceptions -โ””โ”€โ”€ retry.py # Retry policy configuration -``` - -### Gateway Abstraction - -The SDK uses a `RuntimeGateway` protocol for flexibility: - -```python -from keon_sdk import KeonClient, RuntimeGateway - -# Custom gateway implementation -class MyCustomGateway(RuntimeGateway): - async def decide(self, request): - # Custom implementation - pass - - async def execute(self, request): - # Custom implementation - pass - -client = KeonClient(gateway=MyCustomGateway()) -``` - -## ๐Ÿ“‹ Requirements - -- Python >= 3.11 -- httpx >= 0.27.0 -- pydantic >= 2.0.0 -- tenacity >= 8.0.0 - -## ๐Ÿ”— Related - -- **Keon Contracts**: `keon-contracts` (OpenAPI source of truth) -- **Keon Runtime**: Execution platform -- **TypeScript SDK**: `@keon/sdk` - -## ๐Ÿ“„ License - -Apache License 2.0 - See LICENSE file - ---- - -**Version:** 1.0.0 -**Tag:** `keon-sdk-python-v1.0.0` -**Branch:** `team-claude/keon-sdk-python-v1` -**Team:** Claude ๐Ÿง  (Python SDK) diff --git a/pyproject.toml b/pyproject.toml index 61cf458..a227e95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,29 +5,25 @@ build-backend = "hatchling.build" [project] name = "keon-sdk" version = "1.0.0" -description = "Keon Python SDK - Thin, safe-by-default client for Keon Runtime" +description = "Keon Python SDK - verifier-only wrapper for evidence pack verification" readme = "README.md" requires-python = ">=3.11" -license = "MIT" +license = "Apache-2.0" authors = [ { name = "Keon Systems", email = "sdk@keon.systems" }, ] -keywords = ["keon", "governance", "ai", "runtime", "sdk"] +keywords = ["keon", "verification", "evidence", "sdk"] classifiers = [ "Development Status :: 4 - Beta", "Intended Audience :: Developers", - "License :: OSI Approved :: MIT License", + "License :: OSI Approved :: Apache Software License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Typing :: Typed", ] -dependencies = [ - "httpx>=0.27.0", - "pydantic>=2.0.0", - "tenacity>=8.0.0", -] +dependencies = [] [project.optional-dependencies] dev = [ @@ -38,7 +34,6 @@ dev = [ "ruff>=0.2.0", "black>=24.0.0", "mypy>=1.8.0", - "respx>=0.21.0", # HTTP mocking for tests ] [project.urls] @@ -47,8 +42,14 @@ Documentation = "https://docs.keon.systems/sdk/python" Repository = "https://github.com/m0r6aN/keon" Issues = "https://github.com/m0r6aN/keon/issues" +[project.scripts] +keon-sdk-mcp = "keon_sdk.mcp_server:main" + [tool.hatch.build.targets.wheel] -packages = ["keon_sdk"] +packages = ["src/keon_sdk"] + +[tool.hatch.build.targets.wheel.sources] +src = "" [tool.ruff] target-version = "py311" @@ -86,6 +87,7 @@ warn_no_return = true [tool.pytest.ini_options] testpaths = ["tests"] +pythonpath = ["src"] python_files = "test_*.py" python_classes = "Test*" python_functions = "test_*" diff --git a/src/keon_sdk/__init__.py b/src/keon_sdk/__init__.py new file mode 100644 index 0000000..eb4e147 --- /dev/null +++ b/src/keon_sdk/__init__.py @@ -0,0 +1,17 @@ +from .evidence_pack import Artifact, EvidencePack, PolicyHashManifest +from .receipt import DecisionReceipt, TrustBundle, VerificationResult +from .types import L3VerificationResult, VerificationError +from .verify import verify_caes + +__all__ = [ + "Artifact", + "DecisionReceipt", + "EvidencePack", + "L3VerificationResult", + "PolicyHashManifest", + "TrustBundle", + "VerificationError", + "VerificationResult", + "verify_caes", +] + diff --git a/src/keon_sdk/_cli.py b/src/keon_sdk/_cli.py new file mode 100644 index 0000000..8d2e741 --- /dev/null +++ b/src/keon_sdk/_cli.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +import json +import os +import shutil +import subprocess +from dataclasses import dataclass +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any + +from .evidence_pack import EvidencePack + + +class KeonCliError(RuntimeError): + pass + + +class KeonCliNotFoundError(KeonCliError): + pass + + +@dataclass(frozen=True) +class KeonCliResult: + returncode: int + stdout: str + stderr: str + payload: dict[str, Any] + offline_mode_used: bool = False + + +def resolve_keon_cli() -> list[str]: + env_path = os.environ.get("KEON_CLI_PATH") + if env_path: + return [env_path] + + on_path = shutil.which("keon") + if on_path: + return [on_path] + + repo_root = Path(__file__).resolve().parents[3] + candidates = ( + repo_root / "keon-systems" / "src" / "Keon.Cli" / "bin" / "Debug" / "net10.0" / "Keon.Cli.exe", + repo_root / "keon-systems" / "src" / "Keon.Cli" / "bin" / "Release" / "net10.0" / "Keon.Cli.exe", + repo_root / "keon-systems" / "src" / "Keon.Cli" / "bin" / "Debug" / "net10.0" / "Keon.Cli.dll", + repo_root / "keon-systems" / "src" / "Keon.Cli" / "bin" / "Release" / "net10.0" / "Keon.Cli.dll", + ) + for candidate in candidates: + if candidate.exists(): + if candidate.suffix == ".dll": + return ["dotnet", str(candidate)] + return [str(candidate)] + + raise KeonCliNotFoundError( + "Unable to locate the Keon CLI. Set KEON_CLI_PATH or install the keon executable." + ) + + +def run_keon_json(args: list[str], *, offline_mode_used: bool = False) -> KeonCliResult: + command = [*resolve_keon_cli(), *args] + completed = subprocess.run( + command, + capture_output=True, + text=True, + check=False, + ) + stdout = completed.stdout.strip() + if not stdout: + raise KeonCliError(completed.stderr.strip() or "Keon CLI produced no JSON output.") + try: + payload = json.loads(stdout) + except json.JSONDecodeError as exc: + raise KeonCliError(f"Keon CLI returned non-JSON output: {stdout}") from exc + if not isinstance(payload, dict): + raise KeonCliError("Keon CLI JSON output must be an object.") + return KeonCliResult( + returncode=completed.returncode, + stdout=completed.stdout, + stderr=completed.stderr, + payload=payload, + offline_mode_used=offline_mode_used, + ) + + +def verify_pack_json(pack_path: str, bundle_path: str | None = None) -> KeonCliResult: + args = ["verify-pack", "--path", pack_path] + temp_pubkey_path: str | None = None + offline_mode_requested = bundle_path is not None + if bundle_path: + args.extend(["--offline", "--trust-bundle", bundle_path]) + public_key_b64 = _resolve_public_key_from_bundle(pack_path, bundle_path) + if public_key_b64: + with NamedTemporaryFile("w", suffix=".pub", delete=False, encoding="utf-8") as handle: + handle.write(public_key_b64) + temp_pubkey_path = handle.name + args.extend(["--pubkey", temp_pubkey_path]) + try: + try: + return run_keon_json(args, offline_mode_used=offline_mode_requested) + except KeonCliError as exc: + if not offline_mode_requested or not _is_unknown_offline_flag(exc): + raise + fallback_args = [arg for arg in args if arg != "--offline"] + return run_keon_json(fallback_args, offline_mode_used=False) + finally: + if temp_pubkey_path: + Path(temp_pubkey_path).unlink(missing_ok=True) + + +def export_pack_json(cli_args: list[str]) -> KeonCliResult: + return run_keon_json(["export-pack", *cli_args]) + + +def _is_unknown_offline_flag(error: KeonCliError) -> bool: + message = str(error).lower() + return "unknown option" in message and "--offline" in message or "unrecognized option" in message and "--offline" in message + + +def _resolve_public_key_from_bundle(pack_path: str, bundle_path: str) -> str | None: + pack = EvidencePack.load(pack_path) + bundle = json.loads(Path(bundle_path).read_text(encoding="utf-8")) + bundle_payload = bundle.get("payload", bundle) if isinstance(bundle, dict) else {} + if not isinstance(bundle_payload, dict): + return None + tenants = bundle_payload.get("tenants", []) + if not isinstance(tenants, list): + return None + + signer_kid = _find_signer_kid(pack) + tenant_id = _find_tenant_id(pack) + matching_tenants = [ + tenant + for tenant in tenants + if isinstance(tenant, dict) + and (tenant_id is None or tenant.get("tenant_id", tenant.get("tenantId")) == tenant_id) + ] + for tenant in matching_tenants: + keys = tenant.get("keys", []) + if not isinstance(keys, list): + continue + for key in keys: + if not isinstance(key, dict): + continue + if signer_kid and key.get("kid") != signer_kid: + continue + public_key = key.get("public_key_b64", key.get("publicKeyB64")) + if isinstance(public_key, str) and public_key: + return public_key + for tenant in matching_tenants: + keys = tenant.get("keys", []) + if isinstance(keys, list): + for key in keys: + if isinstance(key, dict): + public_key = key.get("public_key_b64", key.get("publicKeyB64")) + if isinstance(public_key, str) and public_key: + return public_key + return None + + +def _find_signer_kid(pack: EvidencePack) -> str | None: + for artifact in pack.artifacts: + payload = artifact.payload + if artifact.type == "attestation" and isinstance(payload, dict): + key_id = payload.get("key_id") + if isinstance(key_id, str) and key_id: + return key_id + return None + + +def _find_tenant_id(pack: EvidencePack) -> str | None: + for artifact in pack.artifacts: + payload = artifact.payload + if artifact.type == "receipt" and isinstance(payload, dict): + tenant_id = payload.get("tenant_id") + if isinstance(tenant_id, str) and tenant_id: + return tenant_id + for artifact in pack.artifacts: + payload = artifact.payload + if artifact.type == "attestation" and isinstance(payload, dict): + tenant_id = payload.get("tenant_id") + if isinstance(tenant_id, str) and tenant_id: + return tenant_id + return None diff --git a/src/keon_sdk/evidence_pack.py b/src/keon_sdk/evidence_pack.py new file mode 100644 index 0000000..0f19c6c --- /dev/null +++ b/src/keon_sdk/evidence_pack.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from zipfile import ZipFile + + +@dataclass(frozen=True) +class Artifact: + name: str + type: str + sha256: str | None + version: str | None + payload: dict[str, Any] | None + + +@dataclass(frozen=True) +class PolicyHashManifest: + artifact: Artifact + payload: dict[str, Any] + + +@dataclass(frozen=True) +class EvidencePack: + artifacts: list[Artifact] + policy_hash_manifest: PolicyHashManifest | None + + @classmethod + def load(cls, path: str) -> "EvidencePack": + pack_path = Path(path) + with ZipFile(pack_path) as archive: + manifest_name = "manifest.json" + if manifest_name not in archive.namelist() and "pack_manifest.json" in archive.namelist(): + manifest_name = "pack_manifest.json" + manifest = json.loads(archive.read(manifest_name)) + artifact_specs = manifest.get("artifacts", []) + artifacts: list[Artifact] = [] + policy_hash_manifest: PolicyHashManifest | None = None + for spec in artifact_specs: + name = str(spec["name"]) + payload = _read_json_entry(archive, name) + artifact = Artifact( + name=name, + type=str(spec.get("type", "")), + sha256=_as_str_or_none(spec.get("sha256")), + version=_as_str_or_none(spec.get("version")), + payload=payload, + ) + artifacts.append(artifact) + if _is_policy_hash_artifact(artifact): + policy_hash_manifest = PolicyHashManifest(artifact=artifact, payload=payload or {}) + return cls(artifacts=artifacts, policy_hash_manifest=policy_hash_manifest) + + +def _read_json_entry(archive: ZipFile, name: str) -> dict[str, Any] | None: + try: + raw = archive.read(name) + except KeyError: + return None + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + return None + return parsed if isinstance(parsed, dict) else None + + +def _is_policy_hash_artifact(artifact: Artifact) -> bool: + slug = artifact.name.lower() + artifact_type = artifact.type.lower() + # TODO(WS-D): Replace this heuristic with the canonical artifact schema id once the extension lands. + return "policy" in slug and "hash" in slug or "policy_hash" in artifact_type + + +def _as_str_or_none(value: Any) -> str | None: + return value if isinstance(value, str) else None diff --git a/src/keon_sdk/mcp_server.py b/src/keon_sdk/mcp_server.py new file mode 100644 index 0000000..dc91142 --- /dev/null +++ b/src/keon_sdk/mcp_server.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import json +import sys +from dataclasses import asdict +from typing import Any + +from ._cli import export_pack_json +from .verify import verify_caes + + +TOOLS = [ + { + "name": "verify_pack", + "description": "Verify a Keon evidence pack and return the CLI JSON contract.", + "inputSchema": { + "type": "object", + "properties": { + "pack_path": {"type": "string"}, + "bundle_path": {"type": ["string", "null"]}, + }, + "required": ["pack_path"], + }, + }, + { + "name": "export_pack", + "description": "Pass through to keon export-pack using explicit CLI args.", + "inputSchema": { + "type": "object", + "properties": { + "cli_args": { + "type": "array", + "items": {"type": "string"}, + } + }, + "required": ["cli_args"], + }, + }, + { + "name": "check_l3_compliance", + "description": "Verify a pack and return the typed L3 invariant result.", + "inputSchema": { + "type": "object", + "properties": { + "pack_path": {"type": "string"}, + "bundle_path": {"type": ["string", "null"]}, + }, + "required": ["pack_path"], + }, + }, +] + + +def main() -> None: + for line in sys.stdin: + line = line.strip() + if not line: + continue + request = json.loads(line) + response = handle_request(request) + sys.stdout.write(json.dumps(response) + "\n") + sys.stdout.flush() + + +def handle_request(request: dict[str, Any]) -> dict[str, Any]: + method = request.get("method") + request_id = request.get("id") + if method == "initialize": + return _ok(request_id, {"protocolVersion": "2025-11-05", "serverInfo": {"name": "keon-sdk", "version": "1.0.0"}}) + if method == "tools/list": + return _ok(request_id, {"tools": TOOLS}) + if method == "tools/call": + params = request.get("params", {}) + name = params.get("name") + arguments = params.get("arguments", {}) + return _ok(request_id, {"content": [{"type": "json", "json": _call_tool(name, arguments)}]}) + return _error(request_id, -32601, "Method not found") + + +def _call_tool(name: Any, arguments: Any) -> dict[str, Any]: + if not isinstance(arguments, dict): + raise ValueError("Tool arguments must be an object.") + if name == "verify_pack": + result = verify_caes(arguments["pack_path"], bundle_path=arguments.get("bundle_path")) + return dict(result.raw_report) + if name == "export_pack": + cli_args = arguments["cli_args"] + if not isinstance(cli_args, list) or not all(isinstance(item, str) for item in cli_args): + raise ValueError("cli_args must be an array of strings.") + return export_pack_json(cli_args).payload + if name == "check_l3_compliance": + return asdict(verify_caes(arguments["pack_path"], bundle_path=arguments.get("bundle_path"))) + raise ValueError(f"Unknown tool: {name}") + + +def _ok(request_id: Any, result: dict[str, Any]) -> dict[str, Any]: + return {"jsonrpc": "2.0", "id": request_id, "result": result} + + +def _error(request_id: Any, code: int, message: str) -> dict[str, Any]: + return {"jsonrpc": "2.0", "id": request_id, "error": {"code": code, "message": message}} + diff --git a/src/keon_sdk/py.typed b/src/keon_sdk/py.typed new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/keon_sdk/py.typed @@ -0,0 +1 @@ + diff --git a/src/keon_sdk/receipt.py b/src/keon_sdk/receipt.py new file mode 100644 index 0000000..2657d87 --- /dev/null +++ b/src/keon_sdk/receipt.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +@dataclass(frozen=True) +class VerificationResult: + is_valid: bool + has_receipt_id: bool + tenant_authorized: bool + key_authorized: bool + + +@dataclass(frozen=True) +class TrustBundle: + payload: dict[str, Any] + + @classmethod + def load(cls, path: str) -> "TrustBundle": + return cls(payload=json.loads(Path(path).read_text(encoding="utf-8"))) + + +@dataclass(frozen=True) +class DecisionReceipt: + raw: dict[str, Any] + + def verify(self, bundle: TrustBundle) -> VerificationResult: + receipt_id = self.raw.get("receipt_id") or self.raw.get("receiptId") + tenant_id = self.raw.get("tenant_id") or self.raw.get("tenantId") + signer_kid = self.raw.get("signer_kid") or self.raw.get("key_id") or self.raw.get("kid") + has_receipt_id = isinstance(receipt_id, str) and bool(receipt_id) + authorized_tenant = _find_tenant(bundle.payload, tenant_id) + key_authorized = _tenant_has_key(authorized_tenant, signer_kid) + return VerificationResult( + is_valid=has_receipt_id and authorized_tenant is not None and key_authorized, + has_receipt_id=has_receipt_id, + tenant_authorized=authorized_tenant is not None, + key_authorized=key_authorized, + ) + + +def _find_tenant(bundle: dict[str, Any], tenant_id: Any) -> dict[str, Any] | None: + if not isinstance(tenant_id, str): + return None + tenants = bundle.get("tenants", []) + if not isinstance(tenants, list): + return None + for tenant in tenants: + if isinstance(tenant, dict) and tenant.get("tenant_id", tenant.get("tenantId")) == tenant_id: + return tenant + return None + + +def _tenant_has_key(tenant: dict[str, Any] | None, signer_kid: Any) -> bool: + if tenant is None or not isinstance(signer_kid, str): + return False + keys = tenant.get("keys", []) + if not isinstance(keys, list): + return False + for key in keys: + if isinstance(key, dict) and key.get("kid") == signer_kid: + return True + return False + diff --git a/src/keon_sdk/types.py b/src/keon_sdk/types.py new file mode 100644 index 0000000..93a980e --- /dev/null +++ b/src/keon_sdk/types.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Mapping + + +@dataclass(frozen=True) +class VerificationError: + code: str + + +@dataclass(frozen=True) +class L3VerificationResult: + schema_id: str | None + verdict: str | None + is_valid: bool + phase: int | None + pack_hash: str | None + tenant_id: str | None + signer_kids: tuple[str, ...] + pack_integrity: bool | None + signature_valid: bool | None + authorization_valid: bool | None + trust_bundle_provided: bool + outcome: str | None + errors: tuple[VerificationError, ...] + l3_01: bool + l3_02: bool + l3_03: bool + l3_04: bool + l3_05: bool + l3_06: bool + l3_07: bool + l3_08: bool + l3_09: bool + l3_10: bool + l3_11: bool + l3_12: bool + l3_13: bool + l3_14: bool + l3_15: bool + l3_16: bool + l3_17: bool + raw_report: Mapping[str, Any] + diff --git a/src/keon_sdk/verify.py b/src/keon_sdk/verify.py new file mode 100644 index 0000000..38df3b6 --- /dev/null +++ b/src/keon_sdk/verify.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +from typing import Any + +from ._cli import verify_pack_json +from .evidence_pack import EvidencePack +from .types import L3VerificationResult, VerificationError + +_TRUST_BUNDLE_ERROR_PREFIX = "KEON_VERIFY_TRUST_BUNDLE_" +_AUTHORIZATION_ERRORS = { + "KEON_VERIFY_TENANT_NOT_IN_BUNDLE", + "KEON_VERIFY_SIGNER_NOT_AUTHORIZED", + "KEON_VERIFY_SIGNER_KEY_REVOKED", + "KEON_VERIFY_SIGNER_KEY_DISABLED", + "KEON_VERIFY_SIGNER_KEY_EXPIRED", + "KEON_VERIFY_TENANT_AMBIGUOUS", +} +_RECEIPT_CHAIN_ERRORS = { + "KEON_VERIFY_RECEIPT_CHAIN_INVALID", + "KEON_VERIFY_PACK_INVALID", +} +_VERSION_ERRORS = { + "KEON_VERIFY_VERSION_INVALID", + "KEON_VERIFY_VERSION_MISSING", +} + + +def verify_caes(pack_path: str, bundle_path: str | None = None) -> L3VerificationResult: + evidence_pack = EvidencePack.load(pack_path) + cli_result = verify_pack_json(pack_path, bundle_path=bundle_path) + report = cli_result.payload + errors = tuple( + VerificationError(code=str(item["code"])) + for item in report.get("errors", []) + if isinstance(item, dict) and "code" in item + ) + error_codes = {item.code for item in errors} + artifact_types = {artifact.type.lower() for artifact in evidence_pack.artifacts} + artifact_names = {artifact.name.lower() for artifact in evidence_pack.artifacts} + # TODO(WS-D): Replace string matching with schema-backed artifact classification once pack extension + # artifacts are frozen and emitted with stable identifiers. + has_delegation_artifact = any( + "delegation" in artifact.name.lower() or "delegation" in artifact.type.lower() + for artifact in evidence_pack.artifacts + ) + has_chaos_attestation = any("chaos" in artifact.name.lower() for artifact in evidence_pack.artifacts) + has_policy_hash_manifest = evidence_pack.policy_hash_manifest is not None + has_only_structured_errors = all(code.startswith("KEON_") for code in error_codes) + offline_verification_used = cli_result.offline_mode_used and bool(bundle_path) and bool( + report.get("trust_bundle_provided", False) + ) + # TODO(WS-F): L3-04 and L3-15 need distinct signals from chaos attestation contents. Until WS-F + # ships those fields, keep these as conservative provisional checks instead of claiming a full proof. + l3_chaos_attestation_present = has_chaos_attestation + l3_retention_chaos_attested = has_chaos_attestation and "retention" in " ".join(artifact_names) + + return L3VerificationResult( + schema_id=_as_str(report.get("schema_id")), + verdict=_as_str(report.get("verdict")), + is_valid=bool(report.get("is_valid", False)), + phase=_as_int(report.get("phase")), + pack_hash=_as_str(report.get("pack_hash")), + tenant_id=_as_str(report.get("tenant_id")), + signer_kids=tuple(_as_str_list(report.get("signer_kids"))), + pack_integrity=_as_bool(report.get("pack_integrity")), + signature_valid=_as_bool(report.get("signature_valid")), + authorization_valid=_as_bool(report.get("authorization_valid")), + trust_bundle_provided=bool(report.get("trust_bundle_provided", bundle_path is not None)), + outcome=_as_str(report.get("outcome")), + errors=errors, + l3_01=has_policy_hash_manifest and "KEON_POLICY_HASH_MISMATCH" not in error_codes, + l3_02="receipt" in artifact_types and not error_codes.intersection(_RECEIPT_CHAIN_ERRORS), + l3_03=bool(report.get("signature_valid", False)), + l3_04=l3_chaos_attestation_present, + l3_05=bool(report.get("pack_integrity", False)), + l3_06=offline_verification_used, + l3_07=has_delegation_artifact and "KEON_DELEGATION_BINDING_MISSING" not in error_codes, + l3_08=has_policy_hash_manifest and has_delegation_artifact and has_chaos_attestation, + l3_09=not error_codes.intersection(_VERSION_ERRORS), + l3_10="KEON_PROVENANCE_BROKEN" not in error_codes, + l3_11=bundle_path is not None and not any(code.startswith(_TRUST_BUNDLE_ERROR_PREFIX) for code in error_codes), + l3_12=has_policy_hash_manifest, + l3_13=has_delegation_artifact, + l3_14=has_chaos_attestation, + l3_15=l3_retention_chaos_attested, + l3_16=has_only_structured_errors, + l3_17=has_delegation_artifact and "KEON_DELEGATION_BINDING_MISSING" not in error_codes, + raw_report=report, + ) + + +def _as_bool(value: Any) -> bool | None: + return value if isinstance(value, bool) else None + + +def _as_int(value: Any) -> int | None: + return value if isinstance(value, int) else None + + +def _as_str(value: Any) -> str | None: + return value if isinstance(value, str) else None + + +def _as_str_list(value: Any) -> list[str]: + if not isinstance(value, list): + return [] + return [item for item in value if isinstance(item, str)] diff --git a/tests/test_canonicalize.py b/tests/test_canonicalize.py deleted file mode 100644 index 186e997..0000000 --- a/tests/test_canonicalize.py +++ /dev/null @@ -1,158 +0,0 @@ -"""Tests for RFC 8785 canonicalization.""" - -import pytest -from keon_sdk.canonicalize import ( - canonicalize, - canonicalize_to_string, - canonicalize_bytes, - validate_integrity, -) - - -class TestKeyOrdering: - """JCS-001: UTF-16 code unit ordering.""" - - def test_uppercase_before_lowercase(self): - """Uppercase ASCII (0x41-0x5A) sorts before lowercase (0x61-0x7A).""" - input_data = { - "z_key": 3, - "a_key": 1, - "A_key": 0, - "m_key": 2, - } - - result = canonicalize_to_string(input_data) - assert result == '{"A_key":0,"a_key":1,"m_key":2,"z_key":3}' - - -class TestNumberNormalization: - """JCS-004: Number formatting.""" - - def test_integer(self): - assert canonicalize_to_string(42) == "42" - - def test_integer_from_float(self): - assert canonicalize_to_string(100.0) == "100" - - def test_negative_zero(self): - assert canonicalize_to_string(-0.0) == "0" - - def test_decimal_precision(self): - assert canonicalize_to_string(3.14159) == "3.14159" - - def test_small_decimal(self): - assert canonicalize_to_string(0.001) == "0.001" - - def test_nan_raises(self): - import math - - with pytest.raises(ValueError, match="NaN"): - canonicalize_to_string(math.nan) - - def test_infinity_raises(self): - import math - - with pytest.raises(ValueError, match="Infinity"): - canonicalize_to_string(math.inf) - - -class TestStringEscaping: - """String escaping rules.""" - - def test_escape_quotes(self): - assert canonicalize_to_string('say "hi"') == '"say \\"hi\\""' - - def test_escape_backslash(self): - assert canonicalize_to_string("path\\to") == '"path\\\\to"' - - def test_escape_newline(self): - assert canonicalize_to_string("line1\nline2") == '"line1\\nline2"' - - def test_escape_tab(self): - assert canonicalize_to_string("col1\tcol2") == '"col1\\tcol2"' - - def test_no_escape_unicode(self): - """RFC 8785: Non-control Unicode should be literal, not escaped.""" - result = canonicalize_to_string("Hello") - assert result == '"Hello"' - # Should NOT contain \\u00F6 - assert "\\u" not in result or "\\u00" in result # Only control chars escaped - - -class TestNullHandling: - """JCS-005: Explicit null handling.""" - - def test_explicit_null(self): - input_data = { - "present_null": None, - "present_value": "exists", - } - - result = canonicalize_to_string(input_data) - assert result == '{"present_null":null,"present_value":"exists"}' - - -class TestArrayOrder: - """Array order preservation.""" - - def test_preserve_order(self): - assert canonicalize_to_string([3, 1, 2]) == "[3,1,2]" - - -class TestWhitespace: - """Whitespace elimination.""" - - def test_no_whitespace(self): - input_data = { - "a": 1, - "b": { - "c": 2, - }, - } - - result = canonicalize_to_string(input_data) - assert result == '{"a":1,"b":{"c":2}}' - - -class TestValidateIntegrity: - """Integrity validation.""" - - def test_valid_canonical(self): - canonical = b'{"A":1,"a":2}' - assert validate_integrity(canonical) is True - - def test_invalid_whitespace(self): - not_canonical = b'{ "A": 1, "a": 2 }' - assert validate_integrity(not_canonical) is False - - def test_wrong_order(self): - wrong_order = b'{"a":2,"A":1}' - assert validate_integrity(wrong_order) is False - - -class TestRoundTrip: - """Round-trip invariant.""" - - def test_no_change_on_recanonicalze(self): - input_data = {"z": 3, "a": 1, "m": 2} - - first = canonicalize(input_data) - second = canonicalize_bytes(first) - - assert first == second - - -class TestCrossPlatformDeterminism: - """Cross-platform determinism vectors.""" - - def test_canonical_ordering_vector(self): - """Test vector from evidence-pack-test-vectors-v1.json.""" - input_data = { - "A_key": 0, - "a_key": 1, - "m_key": 2, - "z_key": 3, - } - - result = canonicalize_to_string(input_data) - assert result == '{"A_key":0,"a_key":1,"m_key":2,"z_key":3}' diff --git a/tests/test_correlation_id.py b/tests/test_correlation_id.py deleted file mode 100644 index b2a5785..0000000 --- a/tests/test_correlation_id.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -Test CorrelationId validation. - -Ensures canonical form is enforced: t:|c: -""" - -import pytest -from pydantic import ValidationError - -from keon_sdk.contracts import DecideRequest, ExecuteRequest - - -class TestCorrelationIdValidation: - """CorrelationId must match canonical form.""" - - def test_valid_correlation_id_decide(self) -> None: - """Valid CorrelationId is accepted in DecideRequest.""" - request = DecideRequest( - correlationId="t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890", - tenantId="tenant-123", - actorId="user-456", - action="execute_workflow", - resourceType="workflow", - resourceId="workflow-789", - ) - assert request.correlation_id == "t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890" - - def test_valid_correlation_id_execute(self) -> None: - """Valid CorrelationId is accepted in ExecuteRequest.""" - request = ExecuteRequest( - correlationId="t:tenant-abc|c:01932b3c-4d5e-7111-abcd-ef1234567890", - decisionReceiptId="dr-01932b3c-4d5e-7890-abcd-ef1234567890", - tenantId="tenant-abc", - actorId="user-123", - action="test_action", - ) - assert request.correlation_id == "t:tenant-abc|c:01932b3c-4d5e-7111-abcd-ef1234567890" - - def test_invalid_correlation_id_missing_prefix(self) -> None: - """CorrelationId without 't:' prefix is rejected.""" - with pytest.raises(ValidationError) as exc_info: - DecideRequest( - correlationId="tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890", - tenantId="tenant-123", - actorId="user-456", - action="execute_workflow", - resourceType="workflow", - resourceId="workflow-789", - ) - assert "canonical form" in str(exc_info.value).lower() - - def test_invalid_correlation_id_wrong_separator(self) -> None: - """CorrelationId with wrong separator is rejected.""" - with pytest.raises(ValidationError) as exc_info: - DecideRequest( - correlationId="t:tenant-123:c:01932b3c-4d5e-7890-abcd-ef1234567890", - tenantId="tenant-123", - actorId="user-456", - action="execute_workflow", - resourceType="workflow", - resourceId="workflow-789", - ) - assert "canonical form" in str(exc_info.value).lower() - - def test_invalid_correlation_id_not_uuidv7(self) -> None: - """CorrelationId with non-UUIDv7 format is rejected.""" - with pytest.raises(ValidationError) as exc_info: - DecideRequest( - correlationId="t:tenant-123|c:not-a-uuid", - tenantId="tenant-123", - actorId="user-456", - action="execute_workflow", - resourceType="workflow", - resourceId="workflow-789", - ) - assert "canonical form" in str(exc_info.value).lower() - - def test_invalid_correlation_id_uuidv4_not_v7(self) -> None: - """CorrelationId with UUIDv4 (not v7) is rejected.""" - # UUIDv4 has version nibble '4', not '7' - with pytest.raises(ValidationError) as exc_info: - DecideRequest( - correlationId="t:tenant-123|c:01932b3c-4d5e-4890-abcd-ef1234567890", - tenantId="tenant-123", - actorId="user-456", - action="execute_workflow", - resourceType="workflow", - resourceId="workflow-789", - ) - assert "canonical form" in str(exc_info.value).lower() - - def test_correlation_id_with_underscores_and_hyphens(self) -> None: - """TenantId can contain alphanumeric, hyphens, underscores.""" - request = DecideRequest( - correlationId="t:tenant_123-abc|c:01932b3c-4d5e-7890-abcd-ef1234567890", - tenantId="tenant_123-abc", - actorId="user-456", - action="execute_workflow", - resourceType="workflow", - resourceId="workflow-789", - ) - assert request.correlation_id == "t:tenant_123-abc|c:01932b3c-4d5e-7890-abcd-ef1234567890" diff --git a/tests/test_evidence_pack.py b/tests/test_evidence_pack.py new file mode 100644 index 0000000..1df633a --- /dev/null +++ b/tests/test_evidence_pack.py @@ -0,0 +1,36 @@ +import json +from pathlib import Path +from zipfile import ZipFile + +from keon_sdk import EvidencePack + + +def test_load_evidence_pack_reads_artifacts_and_policy_hash_manifest(tmp_path: Path) -> None: + pack_path = tmp_path / "pack.zip" + manifest = { + "artifacts": [ + { + "name": "artifacts/policy-hash-manifest.json", + "type": "policy_hash_manifest", + "sha256": "abc", + "version": "v1", + }, + { + "name": "receipts/r1.json", + "type": "receipt", + "sha256": "def", + "version": "v1", + }, + ] + } + with ZipFile(pack_path, "w") as archive: + archive.writestr("manifest.json", json.dumps(manifest)) + archive.writestr("artifacts/policy-hash-manifest.json", json.dumps({"policy_hash": "abc"})) + archive.writestr("receipts/r1.json", json.dumps({"receipt_id": "r1"})) + + pack = EvidencePack.load(str(pack_path)) + + assert len(pack.artifacts) == 2 + assert pack.policy_hash_manifest is not None + assert pack.policy_hash_manifest.payload["policy_hash"] == "abc" + diff --git a/tests/test_execute_requires_receipt.py b/tests/test_execute_requires_receipt.py deleted file mode 100644 index d142c3c..0000000 --- a/tests/test_execute_requires_receipt.py +++ /dev/null @@ -1,104 +0,0 @@ -""" -Test execute receipt requirement. - -Execute MUST have DecisionReceiptId - hard fail if absent. -""" - -import pytest -from pydantic import ValidationError - -from keon_sdk import DecisionReceipt, KeonClient, MissingReceiptError -from keon_sdk.contracts import DecisionEnum, ExecuteRequest - - -class TestExecuteRequiresReceipt: - """Execute requires DecisionReceiptId (hard fail if absent).""" - - def test_execute_request_missing_receipt_id(self) -> None: - """ExecuteRequest without DecisionReceiptId fails validation.""" - with pytest.raises(ValidationError) as exc_info: - ExecuteRequest( - correlationId="t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890", - # decisionReceiptId missing! - tenantId="tenant-123", - actorId="user-456", - action="test_action", - ) - # Should fail on required field - assert "decisionReceiptId" in str(exc_info.value).lower() or "required" in str( - exc_info.value - ).lower() - - def test_execute_request_empty_receipt_id(self) -> None: - """ExecuteRequest with empty DecisionReceiptId fails validation.""" - with pytest.raises(ValidationError) as exc_info: - ExecuteRequest( - correlationId="t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890", - decisionReceiptId="", # Empty! - tenantId="tenant-123", - actorId="user-456", - action="test_action", - ) - assert "receipt" in str(exc_info.value).lower() - - def test_execute_request_invalid_receipt_format(self) -> None: - """ExecuteRequest with invalid receipt format fails validation.""" - with pytest.raises(ValidationError) as exc_info: - ExecuteRequest( - correlationId="t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890", - decisionReceiptId="not-a-valid-receipt", - tenantId="tenant-123", - actorId="user-456", - action="test_action", - ) - assert "receipt" in str(exc_info.value).lower() - - def test_execute_request_valid_receipt_id(self) -> None: - """ExecuteRequest with valid DecisionReceiptId passes.""" - request = ExecuteRequest( - correlationId="t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890", - decisionReceiptId="dr-01932b3c-4d5e-7890-abcd-ef1234567890", - tenantId="tenant-123", - actorId="user-456", - action="test_action", - ) - assert request.decision_receipt_id == "dr-01932b3c-4d5e-7890-abcd-ef1234567890" - - @pytest.mark.asyncio - async def test_client_execute_without_receipt_fails(self) -> None: - """KeonClient.execute() without receipt raises MissingReceiptError.""" - client = KeonClient(base_url="http://localhost:8080/runtime/v1") - - with pytest.raises(MissingReceiptError): - await client.execute( - receipt=None, # type: ignore - action="test_action", - ) - - @pytest.mark.asyncio - async def test_client_execute_with_denied_receipt_fails(self) -> None: - """KeonClient.execute() with denied receipt raises ExecutionDeniedError.""" - from datetime import datetime, timezone - - from keon_sdk import ExecutionDeniedError - - client = KeonClient(base_url="http://localhost:8080/runtime/v1") - - denied_receipt = DecisionReceipt( - receiptId="dr-01932b3c-4d5e-7890-abcd-ef1234567891", - decision=DecisionEnum.DENY, - correlationId="t:tenant-123|c:01932b3c-4d5e-7890-abcd-ef1234567890", - tenantId="tenant-123", - actorId="user-456", - decidedAt=datetime.now(timezone.utc), - reason="Policy denies this action", - ) - - with pytest.raises(ExecutionDeniedError) as exc_info: - await client.execute( - receipt=denied_receipt, - action="test_action", - ) - - assert "denied" in str(exc_info.value).lower() - assert denied_receipt.receipt_id in str(exc_info.value) diff --git a/tests/test_keon_sentinels.py b/tests/test_keon_sentinels.py deleted file mode 100644 index 3b99d03..0000000 --- a/tests/test_keon_sentinels.py +++ /dev/null @@ -1,237 +0,0 @@ -"""Keon SDK Architecture Sentinels. - -Policy authority guards for keon-sdk-python: - - KEON-SDK-SENTINEL-1a: DecideRequest schema strict โ€” all required fields enforced, - invalid correlationId rejected at construction time - KEON-SDK-SENTINEL-1b: DecisionReceipt schema strict โ€” receipt format + decision field - KEON-SDK-SENTINEL-1c: Timeout โ†’ NetworkError; gateway never silently allows on failure - -A failure here means the Keon policy contract has been weakened and CI must block the merge. -""" - -import hashlib -import json - -import pytest - -_VALID_CORRELATION_ID = "t:test-tenant|c:01932b3c-4d5e-7890-abcd-ef1234567890" -_VALID_RECEIPT_ID = "dr-01932b3c-4d5e-7890-abcd-ef1234567890" - - -@pytest.mark.sentinel -class TestKeonSdkSentinels: - """Architecture guard tests for keon-sdk-python. - - Schema-level and unit-level checks that enforce the policy authority contract. - These tests have zero external dependencies โ€” they run in any environment. - """ - - # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - # KEON-SDK-SENTINEL-1a: DecideRequest schema is strict - # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - - def test_keon_sdk_sentinel_1a_decide_request_requires_correlation_id(self) -> None: - """KEON-SDK-SENTINEL-1a: DecideRequest without correlationId must fail at construction.""" - from pydantic import ValidationError - from keon_sdk.contracts import DecideRequest - - with pytest.raises(ValidationError): - DecideRequest( - tenantId="t", actorId="a", - action="x", resourceType="r", resourceId="id", - ) - - def test_keon_sdk_sentinel_1a_decide_request_rejects_invalid_correlation_id(self) -> None: - """KEON-SDK-SENTINEL-1a: Malformed correlationId must be rejected at construction.""" - from pydantic import ValidationError - from keon_sdk.contracts import DecideRequest - - with pytest.raises(ValidationError): - DecideRequest( - correlationId="not-valid", - tenantId="t", actorId="a", - action="x", resourceType="r", resourceId="id", - ) - - def test_keon_sdk_sentinel_1a_decide_request_requires_tenant_and_actor(self) -> None: - """KEON-SDK-SENTINEL-1a: tenantId and actorId are mandatory โ€” both absent cases fail.""" - from pydantic import ValidationError - from keon_sdk.contracts import DecideRequest - - with pytest.raises(ValidationError): # missing tenantId - DecideRequest( - correlationId=_VALID_CORRELATION_ID, - actorId="a", action="x", resourceType="r", resourceId="id", - ) - - with pytest.raises(ValidationError): # missing actorId - DecideRequest( - correlationId=_VALID_CORRELATION_ID, - tenantId="t", action="x", resourceType="r", resourceId="id", - ) - - def test_keon_sdk_sentinel_1a_decide_request_requires_subject_hash(self) -> None: - """KEON-SDK-SENTINEL-1a: subjectHash is mandatory โ€” absent case fails.""" - from pydantic import ValidationError - from keon_sdk.contracts import DecideRequest - - with pytest.raises(ValidationError): # missing subjectHash - DecideRequest( - correlationId=_VALID_CORRELATION_ID, - tenantId="t", actorId="a", action="x", - resourceType="r", resourceId="id", - # subjectHash intentionally omitted - ) - - def test_keon_sdk_sentinel_1a_empty_subject_hash_rejected(self) -> None: - """KEON-SDK-SENTINEL-1a: subjectHash="" (empty string) must be rejected.""" - from pydantic import ValidationError - from keon_sdk.contracts import DecideRequest - - with pytest.raises(ValidationError): - DecideRequest( - correlationId=_VALID_CORRELATION_ID, - tenantId="t", actorId="a", action="x", - resourceType="r", resourceId="id", - subjectHash="", - ) - - # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - # KEON-SDK-SENTINEL-1b: DecisionReceipt schema is strict - # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - - def test_keon_sdk_sentinel_1b_decision_receipt_requires_receipt_id(self) -> None: - """KEON-SDK-SENTINEL-1b: DecisionReceipt without receiptId must fail.""" - from datetime import datetime, timezone - from pydantic import ValidationError - from keon_sdk.contracts import DecisionEnum, DecisionReceipt - - with pytest.raises(ValidationError): - DecisionReceipt( - decision=DecisionEnum.ALLOW, - correlationId=_VALID_CORRELATION_ID, - tenantId="t", actorId="a", - decidedAt=datetime.now(timezone.utc), - ) - - def test_keon_sdk_sentinel_1b_decision_receipt_rejects_invalid_receipt_id(self) -> None: - """KEON-SDK-SENTINEL-1b: receiptId not matching ^dr- must fail.""" - from datetime import datetime, timezone - from pydantic import ValidationError - from keon_sdk.contracts import DecisionEnum, DecisionReceipt - - with pytest.raises(ValidationError): - DecisionReceipt( - receiptId="not-a-valid-receipt", - decision=DecisionEnum.ALLOW, - correlationId=_VALID_CORRELATION_ID, - tenantId="t", actorId="a", - decidedAt=datetime.now(timezone.utc), - ) - - def test_keon_sdk_sentinel_1b_decision_receipt_requires_decision(self) -> None: - """KEON-SDK-SENTINEL-1b: DecisionReceipt without decision field must fail.""" - from datetime import datetime, timezone - from pydantic import ValidationError - from keon_sdk.contracts import DecisionReceipt - - with pytest.raises(ValidationError): - DecisionReceipt( - receiptId=_VALID_RECEIPT_ID, - correlationId=_VALID_CORRELATION_ID, - tenantId="t", actorId="a", - decidedAt=datetime.now(timezone.utc), - ) - - # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - # KEON-SDK-SENTINEL-1c: Timeout โ†’ NetworkError (fail-closed) - # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ - - async def test_keon_sdk_sentinel_1c_timeout_is_fail_closed(self) -> None: - """KEON-SDK-SENTINEL-1c: Gateway timeout raises NetworkError โ€” never silently allows. - - Uses RetryPolicy.no_retry() to ensure a single attempt is made and the - NetworkError propagates without masking or silent fallback. - """ - from unittest.mock import AsyncMock - import httpx - from keon_sdk.contracts import DecideRequest - from keon_sdk.errors import NetworkError - from keon_sdk.http_gateway import HttpRuntimeGateway - from keon_sdk.retry import RetryPolicy - - mock_client = AsyncMock(spec=httpx.AsyncClient) - mock_client.post.side_effect = httpx.TimeoutException("Read timeout") - - gateway = HttpRuntimeGateway( - base_url="http://keon.example.com/runtime/v1", - http_client=mock_client, - retry_policy=RetryPolicy.no_retry(), - ) - request = DecideRequest( - correlationId=_VALID_CORRELATION_ID, - tenantId="test-tenant", actorId="user-456", - action="read_campaign", resourceType="campaign", - resourceId="campaign-123", - ) - - with pytest.raises(NetworkError) as exc_info: - await gateway.decide(request) - - assert exc_info.value.code == "NETWORK_ERROR", ( - f"KEON-SDK-SENTINEL-1c FAILED: timeout did not produce NetworkError, " - f"got code={exc_info.value.code!r}" - ) - - -# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ -# Drift tripwire hashes โ€” update only after doctrine review -# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ -_DECIDE_FIELDS_HASH = "5c7ae3e6d532d49a" -_RECEIPT_FIELDS_HASH = "d5f0c62ffa683fde" - - -def _compute_hash(data: object) -> str: - return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()[:16] - - -def _required_model_fields(model_cls: type) -> list[str]: - """Return sorted names of required fields (no default) from a Pydantic v2 model.""" - return sorted(name for name, info in model_cls.model_fields.items() if info.is_required()) - - -@pytest.mark.sentinel -class TestKeonSdkDriftTripwire: - """Drift tripwires: detect unapproved changes to Keon contract schemas. - - If the required-field set of DecideRequest or DecisionReceipt changes without - updating the stored hash, this class trips and blocks merge. To approve: - 1. Make the intentional schema change. - 2. Re-derive the hash (see SENTINELS.md for command). - 3. Update _DECIDE_FIELDS_HASH / _RECEIPT_FIELDS_HASH above. - 4. Commit: sentinel(drift): approve โ€” - """ - - def test_decide_request_schema_hash_unchanged(self) -> None: - """DRIFT: DecideRequest required-field set must match stored hash.""" - from keon_sdk.contracts import DecideRequest - - actual = _compute_hash(_required_model_fields(DecideRequest)) - assert actual == _DECIDE_FIELDS_HASH, ( - f"DRIFT TRIPWIRE TRIPPED โ€” DecideRequest required fields changed without approval. " - f"New hash={actual!r}. Fields now: {_required_model_fields(DecideRequest)}. " - f"Update _DECIDE_FIELDS_HASH after doctrine review." - ) - - def test_decision_receipt_schema_hash_unchanged(self) -> None: - """DRIFT: DecisionReceipt required-field set must match stored hash.""" - from keon_sdk.contracts import DecisionReceipt - - actual = _compute_hash(_required_model_fields(DecisionReceipt)) - assert actual == _RECEIPT_FIELDS_HASH, ( - f"DRIFT TRIPWIRE TRIPPED โ€” DecisionReceipt required fields changed without approval. " - f"New hash={actual!r}. Fields now: {_required_model_fields(DecisionReceipt)}. " - f"Update _RECEIPT_FIELDS_HASH after doctrine review." - ) - diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py new file mode 100644 index 0000000..1d38ae4 --- /dev/null +++ b/tests/test_mcp_server.py @@ -0,0 +1,49 @@ +from dataclasses import asdict + +import keon_sdk.mcp_server as mcp_server +from keon_sdk import L3VerificationResult, VerificationError + + +def test_check_l3_compliance_tool_returns_dataclass_payload(monkeypatch) -> None: + result = L3VerificationResult( + schema_id="keon.verify_pack.report.v1", + verdict="PASS", + is_valid=True, + phase=5, + pack_hash="sha256:test", + tenant_id="tenant-1", + signer_kids=("kid-1",), + pack_integrity=True, + signature_valid=True, + authorization_valid=True, + trust_bundle_provided=True, + outcome="valid", + errors=(VerificationError(code="KEON_NONE"),), + l3_01=True, + l3_02=True, + l3_03=True, + l3_04=True, + l3_05=True, + l3_06=True, + l3_07=True, + l3_08=True, + l3_09=True, + l3_10=True, + l3_11=True, + l3_12=True, + l3_13=True, + l3_14=True, + l3_15=True, + l3_16=True, + l3_17=True, + raw_report={"is_valid": True}, + ) + monkeypatch.setattr(mcp_server, "verify_caes", lambda pack_path, bundle_path=None: result) + + payload = mcp_server._call_tool( + "check_l3_compliance", + {"pack_path": "pack.zip", "bundle_path": "bundle.json"}, + ) + + assert payload == asdict(result) + diff --git a/tests/test_quickstart.py b/tests/test_quickstart.py new file mode 100644 index 0000000..d2e8046 --- /dev/null +++ b/tests/test_quickstart.py @@ -0,0 +1,6 @@ +from keon_sdk import verify_caes + + +def test_quickstart_import_exports_verify_caes() -> None: + assert callable(verify_caes) + diff --git a/tests/test_receipt.py b/tests/test_receipt.py new file mode 100644 index 0000000..48a6121 --- /dev/null +++ b/tests/test_receipt.py @@ -0,0 +1,27 @@ +from keon_sdk import DecisionReceipt, TrustBundle + + +def test_decision_receipt_verify_checks_bundle_membership() -> None: + receipt = DecisionReceipt( + { + "receipt_id": "dr-1", + "tenant_id": "tenant-1", + "signer_kid": "kid-1", + } + ) + bundle = TrustBundle( + { + "tenants": [ + { + "tenant_id": "tenant-1", + "keys": [{"kid": "kid-1"}], + } + ] + } + ) + + result = receipt.verify(bundle) + + assert result.is_valid is True + assert result.key_authorized is True + diff --git a/tests/test_verify.py b/tests/test_verify.py new file mode 100644 index 0000000..7c983ea --- /dev/null +++ b/tests/test_verify.py @@ -0,0 +1,123 @@ +from keon_sdk import verify_caes +from keon_sdk import _cli as cli_module +from keon_sdk import verify as verify_module + + +def test_verify_caes_maps_cli_report_to_l3_result(monkeypatch, tmp_path) -> None: + pack_path = tmp_path / "pack.zip" + pack_path.write_bytes( + b"PK\x05\x06" + b"\x00" * 18 + ) + + class StubEvidencePack: + artifacts = [] + policy_hash_manifest = None + + monkeypatch.setattr(verify_module, "EvidencePack", type("StubLoader", (), {"load": staticmethod(lambda _: StubEvidencePack())})) + monkeypatch.setattr( + verify_module, + "verify_pack_json", + lambda pack_path, bundle_path=None: type( + "Result", + (), + { + "payload": { + "schema_id": "keon.verify_pack.report.v1", + "verdict": "FAIL", + "is_valid": False, + "phase": 5, + "pack_hash": "sha256:test", + "tenant_id": "tenant-1", + "signer_kids": ["kid-1"], + "pack_integrity": True, + "signature_valid": True, + "authorization_valid": False, + "trust_bundle_provided": True, + "outcome": "invalid", + "errors": [{"code": "KEON_VERIFY_TENANT_NOT_IN_BUNDLE"}], + }, + "offline_mode_used": True, + }, + )(), + ) + + result = verify_caes(str(pack_path), bundle_path="bundle.json") + + assert result.phase == 5 + assert result.authorization_valid is False + assert result.l3_06 is True + assert result.l3_11 is True + assert result.errors[0].code == "KEON_VERIFY_TENANT_NOT_IN_BUNDLE" + + +def test_verify_pack_json_uses_bundle_key_as_pubkey(monkeypatch, tmp_path) -> None: + pack_path = tmp_path / "pack.zip" + bundle_path = tmp_path / "bundle.json" + pack_path.write_bytes(b"unused") + bundle_path.write_text("{}", encoding="utf-8") + captured: list[str] = [] + + class StubPack: + artifacts = [ + type("Artifact", (), {"type": "attestation", "payload": {"key_id": "kid-1"}})(), + type("Artifact", (), {"type": "receipt", "payload": {"tenant_id": "tenant-1"}})(), + ] + + monkeypatch.setattr(cli_module, "EvidencePack", type("Loader", (), {"load": staticmethod(lambda _: StubPack())})) + monkeypatch.setattr( + cli_module.Path, + "read_text", + lambda self, encoding="utf-8": '{"tenants":[{"tenant_id":"tenant-1","keys":[{"kid":"kid-1","public_key_b64":"QUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUFBQUE="}]}]}', + ) + monkeypatch.setattr( + cli_module, + "run_keon_json", + lambda args, offline_mode_used=False: captured.extend(args) or type( + "Result", + (), + { + "returncode": 1, + "stdout": "{}", + "stderr": "", + "payload": {}, + "offline_mode_used": offline_mode_used, + }, + )(), + ) + + cli_module.verify_pack_json(str(pack_path), bundle_path=str(bundle_path)) + + assert "--offline" in captured + assert "--pubkey" in captured + + +def test_verify_caes_l3_06_is_false_when_cli_falls_back_without_offline(monkeypatch, tmp_path) -> None: + pack_path = tmp_path / "pack.zip" + pack_path.write_bytes(b"PK\x05\x06" + b"\x00" * 18) + + class StubEvidencePack: + artifacts = [] + policy_hash_manifest = None + + monkeypatch.setattr(verify_module, "EvidencePack", type("StubLoader", (), {"load": staticmethod(lambda _: StubEvidencePack())})) + monkeypatch.setattr( + verify_module, + "verify_pack_json", + lambda pack_path, bundle_path=None: type( + "Result", + (), + { + "payload": { + "is_valid": False, + "phase": 5, + "trust_bundle_provided": True, + "errors": [], + }, + "offline_mode_used": False, + }, + )(), + ) + + result = verify_caes(str(pack_path), bundle_path="bundle.json") + + assert result.l3_06 is False