From b28ec93ade691ed7f71ca89a3ef5c9e3df1b54e6 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 13 Apr 2026 20:18:31 +1000 Subject: [PATCH 1/2] feat: add Immutable Inference Sandboxing (IIS) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a sandbox: spec key that gives the runner hard mechanical constraints over what each task is permitted to do — enforced before any tool call reaches the I/O layer. Changes: - spec/schema/oas-schema-1.4.json: new sandbox: key (root + per-task) with tools.allow/deny, http.allow_domains, file.allow_paths via $defs - oas_cli/runner.py: - _resolve_sandbox(): merges root + task-level sandbox (deepcopy, task wins) - _check_sandbox(): three structured error codes pre-dispatch: SANDBOX_TOOL_VIOLATION, SANDBOX_DOMAIN_VIOLATION, SANDBOX_PATH_VIOLATION - _invoke_with_tools(): new sandbox= kwarg; check fires before dispatch_tool_call - OARunError passthrough: sandbox violations no longer swallowed by except Exception - Chain-wide immutability: deepcopy at run_task_from_spec entry, every _resolve_chain dep call, and top of _run_single_task - examples/sandboxed-agent/: demo spec with root sandbox, per-task override, sample data file, and README explaining the OAS vs BCE boundary - tests/test_iis.py: 27 tests across _resolve_sandbox, _check_sandbox (all 3 codes + edge cases), integration (violation before dispatch), and chain-wide immutability invariants - docs/REFERENCE.md: full IIS section — boundary table, error codes, immutability guarantee, exclusions list, BCE rename roadmap note Made-with: Cursor --- docs/REFERENCE.md | 85 ++++++ examples/sandboxed-agent/README.md | 70 +++++ examples/sandboxed-agent/data/report.txt | 3 + examples/sandboxed-agent/spec.yaml | 118 +++++++ oas_cli/runner.py | 111 ++++++- spec/schema/oas-schema-1.4.json | 58 ++++ tests/test_history_threading.py | 4 +- tests/test_iis.py | 372 +++++++++++++++++++++++ 8 files changed, 816 insertions(+), 5 deletions(-) create mode 100644 examples/sandboxed-agent/README.md create mode 100644 examples/sandboxed-agent/data/report.txt create mode 100644 examples/sandboxed-agent/spec.yaml create mode 100644 tests/test_iis.py diff --git a/docs/REFERENCE.md b/docs/REFERENCE.md index 46f6525..2e04f88 100644 --- a/docs/REFERENCE.md +++ b/docs/REFERENCE.md @@ -415,6 +415,91 @@ A contract violation on a dependency stops the chain immediately and raises `CON --- +## Immutable Inference Sandboxing (IIS) + +OAS gives every spec a `sandbox:` block that defines **what a task is mechanically permitted to do**. The runner enforces these constraints _before_ any tool call reaches the I/O layer — no network connection is opened, no file handle is created, no exception needs to be caught. + +### The boundary + +| Layer | Concern | Mechanism | +|-------|---------|-----------| +| **OAS `sandbox:`** | Hard execution constraints | Runner blocks before dispatch | +| **BCE `behavioural_contract:`** | Policy / quality contract | Validated after the run | + +OAS controls **what a task can do**. BCE controls **what a task should do**. They are complementary and never overlap. + +### Root-level sandbox (applies to all tasks) + +```yaml +sandbox: + tools: + allow: [file.read, http.get] # allowlist — anything else is SANDBOX_TOOL_VIOLATION + # deny: [file.write] # denylist alternative (use one or the other) + http: + allow_domains: + - api.example.com # exact match or any subdomain + file: + allow_paths: + - ./data/ # resolved to absolute paths at check time +``` + +### Per-task sandbox (overrides root) + +A `sandbox:` key inside a task definition completely overrides the root sandbox for that task. Use this to _tighten_ constraints on sensitive tasks without changing the global default: + +```yaml +sandbox: + tools: + allow: [file.read, http.get] + +tasks: + check_status: + sandbox: + tools: + allow: [http.get] # file.read now also blocked for this task + http: + allow_domains: [status.openai.com] +``` + +### Error codes + +All sandbox violations raise `OARunError` immediately with one of three structured codes: + +| Code | Trigger | +|------|---------| +| `SANDBOX_TOOL_VIOLATION` | Tool name not in `allow` list, or in `deny` list | +| `SANDBOX_DOMAIN_VIOLATION` | HTTP host not in `allow_domains` (for `http.get` / `http.post`) | +| `SANDBOX_PATH_VIOLATION` | File path outside `allow_paths` (for `file.read` / `file.write`) | + +Path traversal (`../../`) is caught automatically — paths are resolved to absolute before comparison. + +### Input immutability + +Every task receives a **deep copy** of its input. Chain outputs merged into downstream inputs never mutate the caller's original dict. This is enforced at three levels: + +1. Entry to `run_task_from_spec` (public API boundary) +2. Each dependency call in `_resolve_chain` +3. Start of `_run_single_task` (guards delegated specs too) + +### What IIS deliberately excludes (belongs in BCE) + +| Concern | Why it's BCE | +|---------|-------------| +| Prompt injection detection | Requires interpretation — subjective, evolving | +| PII scanning | Team-specific policy | +| Compliance tagging | Audit concern, not execution constraint | +| Session / memory management | Out of scope for a stateless runner | + +### Future BCE rename note + +The BCE library currently uses `allowed_tools` in `behavioural_contract` as an audit field. A future BCE release will rename this to `expected_tools` to make it unambiguous that this is a _post-run audit assertion_, not an enforcement rule. The OAS `sandbox.tools.allow` list is the enforcement mechanism; BCE's audit field is observational only. + +### Example + +See [`examples/sandboxed-agent/`](../examples/sandboxed-agent/) for a working demo that exercises tool allowlists, domain restrictions, file path restrictions, and per-task sandbox overrides. + +--- + ## Engines (quick) | Engine | Env var | Notes | diff --git a/examples/sandboxed-agent/README.md b/examples/sandboxed-agent/README.md new file mode 100644 index 0000000..d378cdb --- /dev/null +++ b/examples/sandboxed-agent/README.md @@ -0,0 +1,70 @@ +# Sandboxed Agent — IIS Demo + +Demonstrates **Immutable Inference Sandboxing (IIS)**: the runner enforces hard +execution constraints _before_ any tool call reaches the I/O layer. + +## What IIS is + +| Layer | Concern | +|-------|---------| +| **OAS `sandbox:`** | Hard mechanical block — binary allow/deny enforced by the runner | +| **BCE `behavioural_contract:`** | Soft policy audit — validated after the run | + +OAS owns **what a task can do**. BCE owns **what a task should do**. They never overlap. + +## Sandbox keys + +```yaml +sandbox: + tools: + allow: [file.read, http.get] # allowlist (mutually exclusive with deny) + # deny: [file.write] # denylist alternative + http: + allow_domains: [api.example.com] # subdomains are also allowed + file: + allow_paths: [./data/] # resolved to absolute paths at check time +``` + +A **root-level** `sandbox:` applies to every task. A **task-level** `sandbox:` +completely overrides the root for that task, so you can tighten constraints +per-task without relaxing the global defaults. + +## Error codes + +| Code | Trigger | +|------|---------| +| `SANDBOX_TOOL_VIOLATION` | Tool name blocked by `allow` / `deny` | +| `SANDBOX_DOMAIN_VIOLATION` | HTTP host not in `allow_domains` | +| `SANDBOX_PATH_VIOLATION` | File path outside `allow_paths` | + +## Running the demo + +```bash +# Summarise the sample report (file.read allowed inside data/) +oa run spec.yaml --task summarise --input '{"filename": "report.txt"}' + +# Check OpenAI status (http.get allowed to status.openai.com) +oa run spec.yaml --task check_status --input '{}' +``` + +## Seeing a violation + +Try reading a file outside the allow_paths: + +```bash +oa run spec.yaml --task summarise --input '{"filename": "../../README.md"}' +# → OARunError: SANDBOX_PATH_VIOLATION +``` + +Or call a blocked domain (by temporarily editing the prompt): + +```bash +# The runner blocks the call before it reaches the network. +``` + +## What IIS deliberately does NOT do + +- **Prompt injection detection** — subjective, evolving, belongs in BCE +- **PII scanning** — policy-level concern, belongs in BCE +- **Memory / session management** — out of scope for a stateless runner +- **Parallel execution or retry logic** — not OAS concerns diff --git a/examples/sandboxed-agent/data/report.txt b/examples/sandboxed-agent/data/report.txt new file mode 100644 index 0000000..5fc3643 --- /dev/null +++ b/examples/sandboxed-agent/data/report.txt @@ -0,0 +1,3 @@ +Q3 2026 Revenue: $4.2M (+18% YoY) +Key drivers: enterprise subscriptions up 32%, SMB churn reduced to 4.1%. +Outlook: Q4 pipeline strong at $6.1M weighted, 3 enterprise renewals pending signature. diff --git a/examples/sandboxed-agent/spec.yaml b/examples/sandboxed-agent/spec.yaml new file mode 100644 index 0000000..78d3818 --- /dev/null +++ b/examples/sandboxed-agent/spec.yaml @@ -0,0 +1,118 @@ +open_agent_spec: "1.4.0" + +agent: + name: sandboxed-doc-reader + description: > + A read-only document assistant that can only access approved local files + and call approved APIs. Demonstrates OAS Immutable Inference Sandboxing (IIS): + the runner blocks any tool call that violates these constraints before it + reaches the I/O layer — no network call, no file open, no exception to catch. + role: analyst + +intelligence: + type: llm + engine: openai + model: gpt-4o-mini + +# ── Root-level sandbox ──────────────────────────────────────────────────────── +# Applies to every task in this spec unless a task declares its own sandbox block. +sandbox: + tools: + # Only these two tools are permitted. Any other tool (file.write, http.post, + # env.read, …) triggers SANDBOX_TOOL_VIOLATION before the model call. + allow: + - file.read + - http.get + http: + # http.get may only contact the official OpenAI status page. + # Any other domain triggers SANDBOX_DOMAIN_VIOLATION. + allow_domains: + - status.openai.com + file: + # file.read may only access the data/ subdirectory next to this spec. + # Any path outside that prefix triggers SANDBOX_PATH_VIOLATION. + allow_paths: + - ./data/ + +tools: + reader: + type: native + native: file.read + description: Read a local document file and return its text content. + + api_status: + type: native + native: http.get + description: Check the status of an external API endpoint. + +tasks: + summarise: + description: > + Read a document from the approved data/ directory and produce a concise summary. + The sandbox ensures the task cannot write files, call unapproved APIs, or + read files outside data/ — even if the model attempts to do so. + tools: + - reader + input: + type: object + properties: + filename: + type: string + description: "Filename inside the data/ directory to summarise (e.g. 'report.txt')." + required: + - filename + output: + type: object + properties: + summary: + type: string + description: A concise summary of the document. + word_count: + type: integer + description: Approximate word count of the original document. + required: + - summary + - word_count + prompts: + system: > + You are a document summariser. You will be given a file to read. + After reading it, return JSON with two fields: "summary" (a concise + paragraph) and "word_count" (integer). Return valid JSON only. + user: "Summarise the file: data/{filename}" + + check_status: + description: > + Fetch the OpenAI status page and report whether the API is operational. + This task has a stricter per-task sandbox that also disables file.read, + demonstrating how task-level sandbox overrides the root-level one. + # Per-task sandbox — completely replaces the root sandbox for this task. + sandbox: + tools: + allow: + - api_status + http: + allow_domains: + - status.openai.com + tools: + - api_status + input: + type: object + properties: {} + output: + type: object + properties: + status: + type: string + description: "Current API status (e.g. 'operational', 'degraded', 'outage')." + message: + type: string + description: Human-readable status summary. + required: + - status + - message + prompts: + system: > + You are a system health checker. Fetch the OpenAI status page and return + JSON with "status" (one of: operational, degraded, outage) and "message" + (a one-sentence plain-English summary). Return valid JSON only. + user: "Check https://status.openai.com/api/v2/status.json and report the status." diff --git a/oas_cli/runner.py b/oas_cli/runner.py index 4371bc7..2856234 100644 --- a/oas_cli/runner.py +++ b/oas_cli/runner.py @@ -7,6 +7,7 @@ from __future__ import annotations +import copy import json import logging import re @@ -14,6 +15,7 @@ import urllib.request from pathlib import Path from typing import Any +from urllib.parse import urlparse as _urlparse import yaml @@ -379,7 +381,7 @@ def _check_cycles(name: str) -> None: dep_result = _run_single_task( spec_data, dep_name, - dict(merged), + copy.deepcopy(dict(merged)), override_system=None, override_user=None, spec_path=spec_path, @@ -440,6 +442,87 @@ def _resolve_contract( return _merge_contracts(global_contract, task_contract) +def _resolve_sandbox(spec_data: dict[str, Any], task_name: str) -> dict[str, Any]: + """Effective sandbox for a task: root-level merged with task-level (task wins). + + A task-level sandbox key completely replaces the root key of the same name, + which matches the "task-level is an override, not an extension" mental model. + """ + root_sandbox: dict[str, Any] = spec_data.get("sandbox") or {} + tasks = spec_data.get("tasks") or {} + task_sandbox: dict[str, Any] = (tasks.get(task_name) or {}).get("sandbox") or {} + if not task_sandbox: + return copy.deepcopy(root_sandbox) + merged = copy.deepcopy(root_sandbox) + merged.update(copy.deepcopy(task_sandbox)) + return merged + + +def _check_sandbox( + tool_name: str, + arguments: dict[str, Any], + sandbox: dict[str, Any], + task_name: str, +) -> None: + """Raise OARunError if a tool call violates sandbox constraints. + + Three distinct codes for precise observability: + SANDBOX_TOOL_VIOLATION — tool name blocked by allow/deny list + SANDBOX_DOMAIN_VIOLATION — HTTP domain not in allow_domains + SANDBOX_PATH_VIOLATION — file path outside allow_paths + """ + tools_cfg = sandbox.get("tools") or {} + allow = tools_cfg.get("allow") + deny = tools_cfg.get("deny") + + if allow is not None and tool_name not in allow: + raise OARunError( + f"Sandbox violation: tool '{tool_name}' is not in the allow list for task " + f"'{task_name}'. Allowed: {allow}", + code="SANDBOX_TOOL_VIOLATION", + stage="sandbox", + task=task_name, + ) + if deny is not None and tool_name in deny: + raise OARunError( + f"Sandbox violation: tool '{tool_name}' is in the deny list for task '{task_name}'.", + code="SANDBOX_TOOL_VIOLATION", + stage="sandbox", + task=task_name, + ) + + if tool_name in ("http.get", "http.post"): + allow_domains = (sandbox.get("http") or {}).get("allow_domains") + if allow_domains is not None: + url = arguments.get("url", "") + host = _urlparse(url).netloc.split(":")[0] + if not any(host == d or host.endswith(f".{d}") for d in allow_domains): + raise OARunError( + f"Sandbox violation: domain '{host}' is not in allow_domains for task " + f"'{task_name}'. Allowed: {allow_domains}", + code="SANDBOX_DOMAIN_VIOLATION", + stage="sandbox", + task=task_name, + ) + + if tool_name in ("file.read", "file.write"): + allow_paths = (sandbox.get("file") or {}).get("allow_paths") + if allow_paths is not None: + file_path = arguments.get("path", "") + resolved = str(Path(file_path).resolve()) + if not any( + resolved.startswith(str(Path(p).resolve())) + for p in allow_paths + ): + raise OARunError( + f"Sandbox violation: path '{file_path}' is outside allow_paths for task " + f"'{task_name}'. Allowed: {allow_paths}", + code="SANDBOX_PATH_VIOLATION", + stage="sandbox", + task=task_name, + ) + + _MAX_TOOL_ITERATIONS = 10 @@ -450,6 +533,8 @@ def _invoke_with_tools( intelligence_config: dict[str, Any], task_name: str, history: list[dict] | None = None, + *, + sandbox: dict[str, Any] | None = None, ) -> str: """Multi-turn tool-call loop. @@ -463,6 +548,9 @@ def _invoke_with_tools( ``history`` is injected before the current user turn, mirroring the behaviour of the no-tools path so conversation context is never lost. + + ``sandbox`` is checked before every tool dispatch — a violation raises + OARunError with a structured SANDBOX_* code rather than calling the tool. """ provider = get_provider(intelligence_config) @@ -525,6 +613,9 @@ def _invoke_with_tools( # Execute every tool call and append results. for tc in result.tool_calls: + # Pre-dispatch sandbox check — hard block before any I/O. + if sandbox: + _check_sandbox(tc.name, tc.arguments, sandbox, task_name) try: tool_result = dispatch_tool_call(tc.name, tc.arguments, tools) except ToolError as exc: @@ -561,7 +652,14 @@ def _run_single_task( If the task declares ``spec:`` + ``task:`` it is a *delegated* task — the runner loads the referenced spec and executes that task transparently, returning the result as if it had been defined inline. + + ``input_data`` is treated as an immutable snapshot: we deep-copy it + immediately so no downstream mutation (tool calls, merges, delegation) + can leak back into the caller's dict. """ + # Immutable input snapshot — every task boundary gets its own copy. + input_data = copy.deepcopy(input_data) + tasks = spec_data.get("tasks") or {} task_def = tasks.get(task_name) or {} @@ -679,15 +777,20 @@ def _run_single_task( # history is a reserved input convention — never stored by OAS, just forwarded. history: list[dict] | None = input_data.get("history") or None + sandbox = _resolve_sandbox(spec_data, task_name) try: tools = resolve_task_tools(spec_data, task_name) if tools: raw_output = _invoke_with_tools( - system, user, tools, intelligence_config, task_name, history + system, user, tools, intelligence_config, task_name, history, + sandbox=sandbox or None, ) else: raw_output = invoke_intelligence(system, user, intelligence_config, history) + except OARunError: + # Structured errors (e.g. SANDBOX_* violations) pass through unchanged. + raise except (ProviderError, ToolError) as exc: raise OARunError( str(exc), @@ -804,7 +907,9 @@ def run_task_from_spec( spec_path is used to resolve relative ``spec:`` delegation references. """ - base_input: dict[str, Any] = dict(input_data or {}) + # Deep copy at the public entry point so the caller's dict is never mutated, + # even when a chain merges upstream outputs into downstream inputs. + base_input: dict[str, Any] = copy.deepcopy(dict(input_data or {})) chosen_task, _ = _choose_task(spec_data, task_name) # Seed the visited set with the calling spec so direct self-delegation is caught. diff --git a/spec/schema/oas-schema-1.4.json b/spec/schema/oas-schema-1.4.json index 5a646da..c37bdc0 100644 --- a/spec/schema/oas-schema-1.4.json +++ b/spec/schema/oas-schema-1.4.json @@ -376,6 +376,10 @@ "task": { "type": "string", "description": "Name of the task to run in the referenced spec (used with 'spec:'). Defaults to the same name as this task when omitted." + }, + "sandbox": { + "$ref": "#/$defs/sandboxDef", + "description": "Per-task sandbox overrides. Takes priority over the root-level sandbox block." } }, "required": [ @@ -459,6 +463,9 @@ }, "description": "DACP logging configuration" }, + "sandbox": { + "$ref": "#/$defs/sandboxDef" + }, "behavioural_contract": { "type": "object" }, @@ -518,5 +525,56 @@ }, "description": "Tool declarations available to tasks. Tasks reference tools by name in their tools: list." } + }, + "$defs": { + "sandboxDef": { + "type": "object", + "description": "Execution constraints: what this spec/task is mechanically permitted to do. Enforced by the runner before each tool dispatch.", + "properties": { + "tools": { + "type": "object", + "description": "Tool allow/deny list. Specify exactly one of allow or deny.", + "properties": { + "allow": { + "type": "array", + "items": {"type": "string"}, + "description": "Exhaustive list of tool names this task may call. Any other tool triggers SANDBOX_TOOL_VIOLATION." + }, + "deny": { + "type": "array", + "items": {"type": "string"}, + "description": "List of tool names this task must NOT call. Complement of allow — use one or the other." + } + }, + "not": {"required": ["allow", "deny"]}, + "additionalProperties": false + }, + "http": { + "type": "object", + "description": "HTTP call constraints.", + "properties": { + "allow_domains": { + "type": "array", + "items": {"type": "string"}, + "description": "Allowed hostnames (and subdomains) for http.get / http.post. Requests to other hosts trigger SANDBOX_DOMAIN_VIOLATION." + } + }, + "additionalProperties": false + }, + "file": { + "type": "object", + "description": "File I/O constraints.", + "properties": { + "allow_paths": { + "type": "array", + "items": {"type": "string"}, + "description": "Allowed directory prefixes for file.read / file.write. Paths outside these prefixes trigger SANDBOX_PATH_VIOLATION." + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false + } } } diff --git a/tests/test_history_threading.py b/tests/test_history_threading.py index 58ccb4f..42c3891 100644 --- a/tests/test_history_threading.py +++ b/tests/test_history_threading.py @@ -387,7 +387,7 @@ def test_history_forwarded_to_invoke_with_tools(self, tool_spec: Path, monkeypat received: dict = {} def fake_invoke_with_tools( - system, user, tools, config, task_name, history=None + system, user, tools, config, task_name, history=None, *, sandbox=None ): received["history"] = history return '{"reply": "ok"}' @@ -416,7 +416,7 @@ def test_no_history_tools_path_passes_none(self, tool_spec: Path, monkeypatch): received: dict = {} def fake_invoke_with_tools( - system, user, tools, config, task_name, history=None + system, user, tools, config, task_name, history=None, *, sandbox=None ): received["history"] = history return '{"reply": "ok"}' diff --git a/tests/test_iis.py b/tests/test_iis.py new file mode 100644 index 0000000..7ae00b4 --- /dev/null +++ b/tests/test_iis.py @@ -0,0 +1,372 @@ +"""Tests for Immutable Inference Sandboxing (IIS). + +Covers: + _resolve_sandbox — root / task-level merge semantics + _check_sandbox — SANDBOX_TOOL_VIOLATION, SANDBOX_DOMAIN_VIOLATION, + SANDBOX_PATH_VIOLATION + _invoke_with_tools integration — violation raises before dispatch + chain-wide immutability — upstream mutations never leak downstream +""" + +from __future__ import annotations + +import copy +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from oas_cli.runner import ( + OARunError, + _check_sandbox, + _resolve_sandbox, + run_task_from_spec, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _minimal_spec(*, tasks: dict | None = None, sandbox: dict | None = None) -> dict: + spec: dict = { + "open_agent_spec": "1.4.0", + "agent": {"name": "test-agent", "description": "test", "role": "analyst"}, + "intelligence": {"type": "llm", "engine": "openai", "model": "gpt-4o-mini"}, + "tasks": tasks or { + "run": { + "description": "test task", + "output": { + "type": "object", + "properties": {"result": {"type": "string"}}, + }, + "prompts": {"system": "You are helpful.", "user": "Do it."}, + } + }, + } + if sandbox is not None: + spec["sandbox"] = sandbox + return spec + + +# ── _resolve_sandbox ────────────────────────────────────────────────────────── + +class TestResolveSandbox: + def test_returns_empty_dict_when_no_sandbox(self): + spec = _minimal_spec() + assert _resolve_sandbox(spec, "run") == {} + + def test_returns_root_sandbox_when_no_task_override(self): + root = {"tools": {"allow": ["file.read"]}} + spec = _minimal_spec(sandbox=root) + assert _resolve_sandbox(spec, "run") == root + + def test_task_sandbox_overrides_root(self): + root = {"tools": {"allow": ["file.read", "http.get"]}} + task_sandbox = {"tools": {"allow": ["http.get"]}} + spec = _minimal_spec(sandbox=root) + spec["tasks"]["run"]["sandbox"] = task_sandbox + result = _resolve_sandbox(spec, "run") + assert result == {"tools": {"allow": ["http.get"]}} + + def test_task_sandbox_merges_missing_root_keys(self): + root = {"http": {"allow_domains": ["api.example.com"]}} + task_sandbox = {"tools": {"allow": ["http.get"]}} + spec = _minimal_spec(sandbox=root) + spec["tasks"]["run"]["sandbox"] = task_sandbox + result = _resolve_sandbox(spec, "run") + # task-level wins on overlap; root keys not overridden are inherited + assert result["tools"] == {"allow": ["http.get"]} + assert result["http"] == {"allow_domains": ["api.example.com"]} + + def test_returns_copy_not_reference(self): + root = {"tools": {"allow": ["file.read"]}} + spec = _minimal_spec(sandbox=root) + result = _resolve_sandbox(spec, "run") + result["tools"]["allow"].append("http.get") + # Mutating the result must not affect the spec + assert spec["sandbox"]["tools"]["allow"] == ["file.read"] + + +# ── _check_sandbox — tool enforcement ──────────────────────────────────────── + +class TestCheckSandboxToolViolation: + def test_allow_list_permits_listed_tool(self): + sandbox = {"tools": {"allow": ["file.read", "http.get"]}} + _check_sandbox("file.read", {}, sandbox, "task") # no exception + + def test_allow_list_blocks_unlisted_tool(self): + sandbox = {"tools": {"allow": ["file.read"]}} + with pytest.raises(OARunError) as exc_info: + _check_sandbox("http.get", {}, sandbox, "my_task") + err = exc_info.value + assert err.code == "SANDBOX_TOOL_VIOLATION" + assert err.task == "my_task" + assert "http.get" in str(err) + assert "allow" in str(err).lower() + + def test_deny_list_blocks_listed_tool(self): + sandbox = {"tools": {"deny": ["file.write", "env.read"]}} + with pytest.raises(OARunError) as exc_info: + _check_sandbox("file.write", {}, sandbox, "task") + err = exc_info.value + assert err.code == "SANDBOX_TOOL_VIOLATION" + assert "deny" in str(err).lower() or "file.write" in str(err) + + def test_deny_list_permits_unlisted_tool(self): + sandbox = {"tools": {"deny": ["file.write"]}} + _check_sandbox("file.read", {}, sandbox, "task") # no exception + + def test_empty_sandbox_permits_any_tool(self): + _check_sandbox("file.write", {}, {}, "task") # no exception + + def test_tool_not_in_allow_reports_full_allow_list(self): + allowed = ["file.read", "env.read"] + sandbox = {"tools": {"allow": allowed}} + with pytest.raises(OARunError) as exc_info: + _check_sandbox("http.post", {}, sandbox, "task") + assert "file.read" in str(exc_info.value) or str(allowed) in str(exc_info.value) + + +# ── _check_sandbox — domain enforcement ────────────────────────────────────── + +class TestCheckSandboxDomainViolation: + def test_allows_exact_domain_match(self): + sandbox = {"http": {"allow_domains": ["api.example.com"]}} + _check_sandbox("http.get", {"url": "https://api.example.com/v1"}, sandbox, "task") + + def test_allows_subdomain(self): + sandbox = {"http": {"allow_domains": ["example.com"]}} + _check_sandbox("http.get", {"url": "https://api.example.com/v1"}, sandbox, "task") + + def test_blocks_unlisted_domain(self): + sandbox = {"http": {"allow_domains": ["api.example.com"]}} + with pytest.raises(OARunError) as exc_info: + _check_sandbox("http.post", {"url": "https://evil.io/exfil"}, sandbox, "task") + err = exc_info.value + assert err.code == "SANDBOX_DOMAIN_VIOLATION" + assert "evil.io" in str(err) + + def test_blocks_domain_with_port(self): + sandbox = {"http": {"allow_domains": ["api.example.com"]}} + with pytest.raises(OARunError) as exc_info: + _check_sandbox("http.get", {"url": "https://attacker.com:443/steal"}, sandbox, "task") + assert exc_info.value.code == "SANDBOX_DOMAIN_VIOLATION" + + def test_domain_check_skipped_for_non_http_tool(self): + sandbox = {"http": {"allow_domains": ["api.example.com"]}} + # file.read with a "url"-like argument should not trigger domain check + _check_sandbox("file.read", {"url": "https://evil.io"}, sandbox, "task") + + def test_no_allow_domains_permits_any_url(self): + sandbox = {"http": {}} + _check_sandbox("http.get", {"url": "https://any-domain.io"}, sandbox, "task") + + +# ── _check_sandbox — path enforcement ──────────────────────────────────────── + +class TestCheckSandboxPathViolation: + def test_allows_path_inside_allow_paths(self, tmp_path): + data_dir = tmp_path / "data" + data_dir.mkdir() + sandbox = {"file": {"allow_paths": [str(data_dir)]}} + _check_sandbox("file.read", {"path": str(data_dir / "report.txt")}, sandbox, "task") + + def test_blocks_path_outside_allow_paths(self, tmp_path): + data_dir = tmp_path / "data" + data_dir.mkdir() + sandbox = {"file": {"allow_paths": [str(data_dir)]}} + with pytest.raises(OARunError) as exc_info: + _check_sandbox("file.read", {"path": str(tmp_path / "secret.txt")}, sandbox, "task") + err = exc_info.value + assert err.code == "SANDBOX_PATH_VIOLATION" + assert "secret.txt" in str(err) or "allow_paths" in str(err).lower() + + def test_blocks_path_traversal(self, tmp_path): + data_dir = tmp_path / "data" + data_dir.mkdir() + sandbox = {"file": {"allow_paths": [str(data_dir)]}} + traversal = str(data_dir / ".." / "secret.txt") + with pytest.raises(OARunError) as exc_info: + _check_sandbox("file.read", {"path": traversal}, sandbox, "task") + assert exc_info.value.code == "SANDBOX_PATH_VIOLATION" + + def test_path_check_skipped_for_non_file_tool(self, tmp_path): + sandbox = {"file": {"allow_paths": [str(tmp_path / "data")]}} + # http.get with a "path"-like argument must not trigger path check + _check_sandbox("http.get", {"path": "/etc/passwd"}, sandbox, "task") + + def test_no_allow_paths_permits_any_path(self): + sandbox = {"file": {}} + _check_sandbox("file.write", {"path": "/etc/shadow"}, sandbox, "task") + + def test_write_tool_also_checked(self, tmp_path): + data_dir = tmp_path / "data" + data_dir.mkdir() + sandbox = {"file": {"allow_paths": [str(data_dir)]}} + with pytest.raises(OARunError) as exc_info: + _check_sandbox("file.write", {"path": str(tmp_path / "exfil.txt")}, sandbox, "task") + assert exc_info.value.code == "SANDBOX_PATH_VIOLATION" + + +# ── Integration: sandbox fires before dispatch in _invoke_with_tools ────────── + +class TestSandboxIntegration: + """Use run_task_from_spec with a mocked provider + tool to verify the + sandbox check fires *before* dispatch_tool_call is reached.""" + + def _make_spec_with_tool_and_sandbox(self, sandbox: dict) -> dict: + spec = { + "open_agent_spec": "1.4.0", + "agent": {"name": "test", "description": "test", "role": "analyst"}, + "intelligence": {"type": "llm", "engine": "openai", "model": "gpt-4o-mini"}, + "sandbox": sandbox, + "tools": { + "reader": { + "type": "native", + "native": "file.read", + "description": "Read a file", + } + }, + "tasks": { + "read": { + "description": "Read task", + "tools": ["reader"], + "output": { + "type": "object", + "properties": {"content": {"type": "string"}}, + }, + "prompts": { + "system": "You are helpful.", + "user": "Read data/report.txt", + }, + } + }, + } + return spec + + def test_tool_violation_raised_before_dispatch(self): + """SANDBOX_TOOL_VIOLATION is raised; dispatch_tool_call is never called.""" + from oas_cli.tool_providers.base import InvokeResult, ToolCall + + # Provider returns a tool call for 'file.read' but sandbox blocks it. + fake_tc = ToolCall(id="tc1", name="file.read", arguments={"path": "data/report.txt"}) + fake_result = InvokeResult(is_final=False, text="", tool_calls=[fake_tc]) + + spec = self._make_spec_with_tool_and_sandbox( + {"tools": {"allow": []}} # empty allow list — nothing permitted + ) + + mock_provider = MagicMock() + mock_provider.supports_tools.return_value = True + mock_provider.invoke_with_tools.return_value = fake_result + + with ( + patch("oas_cli.runner.get_provider", return_value=mock_provider), + patch("oas_cli.runner.dispatch_tool_call") as mock_dispatch, + ): + with pytest.raises(OARunError) as exc_info: + run_task_from_spec(spec, task_name="read", input_data={}) + + assert exc_info.value.code == "SANDBOX_TOOL_VIOLATION" + mock_dispatch.assert_not_called() + + def test_permitted_tool_is_dispatched(self): + """When the tool is in the allow list, dispatch proceeds normally.""" + from oas_cli.tool_providers.base import InvokeResult, ToolCall + + fake_tc = ToolCall(id="tc1", name="file.read", arguments={"path": "data/report.txt"}) + intermediate = InvokeResult(is_final=False, text="", tool_calls=[fake_tc]) + final = InvokeResult(is_final=True, text='{"content": "hello"}', tool_calls=[]) + + spec = self._make_spec_with_tool_and_sandbox( + {"tools": {"allow": ["file.read"]}} + ) + + mock_provider = MagicMock() + mock_provider.supports_tools.return_value = True + mock_provider.invoke_with_tools.side_effect = [intermediate, final] + + with ( + patch("oas_cli.runner.get_provider", return_value=mock_provider), + patch("oas_cli.runner.dispatch_tool_call", return_value="file contents") as mock_dispatch, + ): + result = run_task_from_spec(spec, task_name="read", input_data={}) + + mock_dispatch.assert_called_once() + assert result["output"]["content"] == "hello" + + +# ── Chain-wide immutability ─────────────────────────────────────────────────── + +class TestInputImmutability: + """Verify that no task in the chain mutates its caller's input dict.""" + + def _chain_spec(self) -> dict: + """Two-task chain: upstream returns extra_field, downstream checks it doesn't + contaminate base_input back in run_task_from_spec.""" + return { + "open_agent_spec": "1.4.0", + "agent": {"name": "chain", "description": "chain test", "role": "analyst"}, + "intelligence": {"type": "llm", "engine": "openai", "model": "gpt-4o-mini"}, + "tasks": { + "upstream": { + "description": "upstream task", + "output": { + "type": "object", + "properties": {"extra_field": {"type": "string"}}, + }, + "prompts": {"system": "sys", "user": "user"}, + }, + "downstream": { + "description": "downstream task", + "depends_on": ["upstream"], + "output": { + "type": "object", + "properties": {"result": {"type": "string"}}, + }, + "prompts": {"system": "sys", "user": "user"}, + }, + }, + } + + def test_original_input_dict_unchanged_after_chain(self): + """The caller's input_data dict must be identical before and after the run.""" + original_input = {"base_key": "base_value"} + snapshot = copy.deepcopy(original_input) + + spec = self._chain_spec() + + def fake_invoke(system, user, config, history=None): + return '{"extra_field": "injected", "result": "done"}' + + with patch("oas_cli.runner.invoke_intelligence", side_effect=fake_invoke): + run_task_from_spec(spec, task_name="downstream", input_data=original_input) + + assert original_input == snapshot, ( + f"Input dict was mutated: before={snapshot}, after={original_input}" + ) + + def test_upstream_output_does_not_contaminate_second_run(self): + """Running the same spec twice with the same input produces the same output. + + If chain outputs leaked back into the shared input dict, the second run + would see extra_field in base_input and produce a different result. + """ + original_input = {"query": "hello"} + spec = self._chain_spec() + + def fake_invoke(system, user, config, history=None): + return '{"extra_field": "value", "result": "ok"}' + + with patch("oas_cli.runner.invoke_intelligence", side_effect=fake_invoke): + result1 = run_task_from_spec( + spec, task_name="downstream", input_data=copy.deepcopy(original_input) + ) + result2 = run_task_from_spec( + spec, task_name="downstream", input_data=copy.deepcopy(original_input) + ) + + # Both runs should see identical upstream chain outputs. + assert result1["chain"]["upstream"]["input"] == result2["chain"]["upstream"]["input"], ( + "Upstream task received different inputs across runs — input leaked between runs" + ) From 2ce8499030b8e058c648a04a0a1f38fed64e4601 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 13 Apr 2026 10:26:32 +0000 Subject: [PATCH 2/2] Fix Ruff linting issues by organizing and removing unused imports. --- oas_cli/runner.py | 10 +++++--- tests/test_iis.py | 58 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 50 insertions(+), 18 deletions(-) diff --git a/oas_cli/runner.py b/oas_cli/runner.py index 2856234..3803524 100644 --- a/oas_cli/runner.py +++ b/oas_cli/runner.py @@ -511,8 +511,7 @@ def _check_sandbox( file_path = arguments.get("path", "") resolved = str(Path(file_path).resolve()) if not any( - resolved.startswith(str(Path(p).resolve())) - for p in allow_paths + resolved.startswith(str(Path(p).resolve())) for p in allow_paths ): raise OARunError( f"Sandbox violation: path '{file_path}' is outside allow_paths for task " @@ -783,7 +782,12 @@ def _run_single_task( tools = resolve_task_tools(spec_data, task_name) if tools: raw_output = _invoke_with_tools( - system, user, tools, intelligence_config, task_name, history, + system, + user, + tools, + intelligence_config, + task_name, + history, sandbox=sandbox or None, ) else: diff --git a/tests/test_iis.py b/tests/test_iis.py index 7ae00b4..3553571 100644 --- a/tests/test_iis.py +++ b/tests/test_iis.py @@ -11,8 +11,6 @@ from __future__ import annotations import copy -import json -from pathlib import Path from unittest.mock import MagicMock, patch import pytest @@ -24,15 +22,16 @@ run_task_from_spec, ) - # ── Helpers ─────────────────────────────────────────────────────────────────── + def _minimal_spec(*, tasks: dict | None = None, sandbox: dict | None = None) -> dict: spec: dict = { "open_agent_spec": "1.4.0", "agent": {"name": "test-agent", "description": "test", "role": "analyst"}, "intelligence": {"type": "llm", "engine": "openai", "model": "gpt-4o-mini"}, - "tasks": tasks or { + "tasks": tasks + or { "run": { "description": "test task", "output": { @@ -50,6 +49,7 @@ def _minimal_spec(*, tasks: dict | None = None, sandbox: dict | None = None) -> # ── _resolve_sandbox ────────────────────────────────────────────────────────── + class TestResolveSandbox: def test_returns_empty_dict_when_no_sandbox(self): spec = _minimal_spec() @@ -89,6 +89,7 @@ def test_returns_copy_not_reference(self): # ── _check_sandbox — tool enforcement ──────────────────────────────────────── + class TestCheckSandboxToolViolation: def test_allow_list_permits_listed_tool(self): sandbox = {"tools": {"allow": ["file.read", "http.get"]}} @@ -129,19 +130,26 @@ def test_tool_not_in_allow_reports_full_allow_list(self): # ── _check_sandbox — domain enforcement ────────────────────────────────────── + class TestCheckSandboxDomainViolation: def test_allows_exact_domain_match(self): sandbox = {"http": {"allow_domains": ["api.example.com"]}} - _check_sandbox("http.get", {"url": "https://api.example.com/v1"}, sandbox, "task") + _check_sandbox( + "http.get", {"url": "https://api.example.com/v1"}, sandbox, "task" + ) def test_allows_subdomain(self): sandbox = {"http": {"allow_domains": ["example.com"]}} - _check_sandbox("http.get", {"url": "https://api.example.com/v1"}, sandbox, "task") + _check_sandbox( + "http.get", {"url": "https://api.example.com/v1"}, sandbox, "task" + ) def test_blocks_unlisted_domain(self): sandbox = {"http": {"allow_domains": ["api.example.com"]}} with pytest.raises(OARunError) as exc_info: - _check_sandbox("http.post", {"url": "https://evil.io/exfil"}, sandbox, "task") + _check_sandbox( + "http.post", {"url": "https://evil.io/exfil"}, sandbox, "task" + ) err = exc_info.value assert err.code == "SANDBOX_DOMAIN_VIOLATION" assert "evil.io" in str(err) @@ -149,7 +157,9 @@ def test_blocks_unlisted_domain(self): def test_blocks_domain_with_port(self): sandbox = {"http": {"allow_domains": ["api.example.com"]}} with pytest.raises(OARunError) as exc_info: - _check_sandbox("http.get", {"url": "https://attacker.com:443/steal"}, sandbox, "task") + _check_sandbox( + "http.get", {"url": "https://attacker.com:443/steal"}, sandbox, "task" + ) assert exc_info.value.code == "SANDBOX_DOMAIN_VIOLATION" def test_domain_check_skipped_for_non_http_tool(self): @@ -164,19 +174,24 @@ def test_no_allow_domains_permits_any_url(self): # ── _check_sandbox — path enforcement ──────────────────────────────────────── + class TestCheckSandboxPathViolation: def test_allows_path_inside_allow_paths(self, tmp_path): data_dir = tmp_path / "data" data_dir.mkdir() sandbox = {"file": {"allow_paths": [str(data_dir)]}} - _check_sandbox("file.read", {"path": str(data_dir / "report.txt")}, sandbox, "task") + _check_sandbox( + "file.read", {"path": str(data_dir / "report.txt")}, sandbox, "task" + ) def test_blocks_path_outside_allow_paths(self, tmp_path): data_dir = tmp_path / "data" data_dir.mkdir() sandbox = {"file": {"allow_paths": [str(data_dir)]}} with pytest.raises(OARunError) as exc_info: - _check_sandbox("file.read", {"path": str(tmp_path / "secret.txt")}, sandbox, "task") + _check_sandbox( + "file.read", {"path": str(tmp_path / "secret.txt")}, sandbox, "task" + ) err = exc_info.value assert err.code == "SANDBOX_PATH_VIOLATION" assert "secret.txt" in str(err) or "allow_paths" in str(err).lower() @@ -204,12 +219,15 @@ def test_write_tool_also_checked(self, tmp_path): data_dir.mkdir() sandbox = {"file": {"allow_paths": [str(data_dir)]}} with pytest.raises(OARunError) as exc_info: - _check_sandbox("file.write", {"path": str(tmp_path / "exfil.txt")}, sandbox, "task") + _check_sandbox( + "file.write", {"path": str(tmp_path / "exfil.txt")}, sandbox, "task" + ) assert exc_info.value.code == "SANDBOX_PATH_VIOLATION" # ── Integration: sandbox fires before dispatch in _invoke_with_tools ────────── + class TestSandboxIntegration: """Use run_task_from_spec with a mocked provider + tool to verify the sandbox check fires *before* dispatch_tool_call is reached.""" @@ -249,7 +267,9 @@ def test_tool_violation_raised_before_dispatch(self): from oas_cli.tool_providers.base import InvokeResult, ToolCall # Provider returns a tool call for 'file.read' but sandbox blocks it. - fake_tc = ToolCall(id="tc1", name="file.read", arguments={"path": "data/report.txt"}) + fake_tc = ToolCall( + id="tc1", name="file.read", arguments={"path": "data/report.txt"} + ) fake_result = InvokeResult(is_final=False, text="", tool_calls=[fake_tc]) spec = self._make_spec_with_tool_and_sandbox( @@ -274,7 +294,9 @@ def test_permitted_tool_is_dispatched(self): """When the tool is in the allow list, dispatch proceeds normally.""" from oas_cli.tool_providers.base import InvokeResult, ToolCall - fake_tc = ToolCall(id="tc1", name="file.read", arguments={"path": "data/report.txt"}) + fake_tc = ToolCall( + id="tc1", name="file.read", arguments={"path": "data/report.txt"} + ) intermediate = InvokeResult(is_final=False, text="", tool_calls=[fake_tc]) final = InvokeResult(is_final=True, text='{"content": "hello"}', tool_calls=[]) @@ -288,7 +310,9 @@ def test_permitted_tool_is_dispatched(self): with ( patch("oas_cli.runner.get_provider", return_value=mock_provider), - patch("oas_cli.runner.dispatch_tool_call", return_value="file contents") as mock_dispatch, + patch( + "oas_cli.runner.dispatch_tool_call", return_value="file contents" + ) as mock_dispatch, ): result = run_task_from_spec(spec, task_name="read", input_data={}) @@ -298,6 +322,7 @@ def test_permitted_tool_is_dispatched(self): # ── Chain-wide immutability ─────────────────────────────────────────────────── + class TestInputImmutability: """Verify that no task in the chain mutates its caller's input dict.""" @@ -367,6 +392,9 @@ def fake_invoke(system, user, config, history=None): ) # Both runs should see identical upstream chain outputs. - assert result1["chain"]["upstream"]["input"] == result2["chain"]["upstream"]["input"], ( + assert ( + result1["chain"]["upstream"]["input"] + == result2["chain"]["upstream"]["input"] + ), ( "Upstream task received different inputs across runs — input leaked between runs" )