Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ chainweaver/
├── compiler_llm.py Offline build-time LLM flow compiler: LLMProposal + llm_propose_flows() + write_proposals() (#28); banned from executor.py
├── optimizer.py Offline build-time tool-description optimizer: OptimizationStrategy + ToolDescriptionProposal + optimize_tool_descriptions()/optimize_new_tool_description() (#100); banned from executor.py
├── _offline_llm.py Private shared internals for the offline LLM proposers: LLMFn type + parse_llm_yaml() + render_tool_catalogue() (#28, #100)
├── contracts.py ToolSafetyContract + SideEffectLevel/StabilityLevel/DeterminismLevel enums + merge_safety() + evaluate_predicate() — determinism + operational safety vocabulary (#19, #125, #293, #9, #8)
├── contracts.py ToolSafetyContract + SideEffectLevel/StabilityLevel/DeterminismLevel enums + merge_safety() + side_effect_exceeds() (#356) + evaluate_predicate() — determinism + operational safety vocabulary (#19, #125, #293, #9, #8)
├── approvals.py ApprovalCallback Protocol + ApprovalContext/ApprovalDecision/ApprovalRecord + coerce_approval_callback — execution-time ToolSafetyContract enforcement seam (#356); mirrors decisions.py
├── decorators.py @tool decorator for zero-boilerplate tool definition
├── tools.py Tool class: named callable with Pydantic I/O schemas + schema_hash + safety contract (#19); Tool.from_flow() wraps a Flow as a Tool (#24) with derived safety (#125)
├── tools.py Tool class: named callable with Pydantic I/O schemas + schema_hash + safety contract (#19) + metadata provenance (#358/#359/#371) + dry_run_fn/run_dry (#357); Tool.from_flow() wraps a Flow as a Tool (#24) with derived safety (#125)
├── flow.py FlowStep + Flow + DAGFlow + FlowStatus + FlowLifecycle + FlowGovernance + DriftInfo + ConditionalEdge (#9) + determinism_level property (#8) + ContextCollisionPolicy / on_context_collision (#337)
├── registry.py FlowRegistry: multi-version catalogue with status filtering (store-backed) + copy-on-write update_flow_state (#335)
├── storage.py RegistryStore protocol + InMemoryStore + FileStore (#16)
├── analyzer.py ChainAnalyzer: offline schema-compatibility analysis (#77)
├── attest.py attest_flow() + AttestationReport: observed-determinism evidence (#154)
├── decisions.py DecisionCallback Protocol + DecisionContext + coerce_decision_callback (#102)
├── executor.py FlowExecutor: sequential/DAG runner + drift detection + stream_flow + opt-in async DAG-level concurrency (max_step_concurrency, #344) (main entry point)
├── executor.py FlowExecutor: sequential/DAG runner + drift detection + stream_flow + opt-in async DAG-level concurrency (max_step_concurrency, #344) + opt-in execution-time safety enforcement (approval_callback/strict_safety/max_side_effect_level, #356) + dry-run mode (execute_flow(dry_run=...), #357) (main entry point)
├── _execution/ Internal, no-I/O execution collaborators shared by both lanes (#330, #331); banned from importing LLM/network/random — see invariants
│ ├── __init__.py Re-exports merge_step_outputs
│ └── context.py merge_step_outputs: single context-merge honouring on_context_collision (#337)
Expand All @@ -68,7 +69,7 @@ chainweaver/
├── mcp/ MCP integration (issues #70, #72, #150); requires chainweaver[mcp]
│ ├── __init__.py Public surface: MCPToolAdapter, FlowServer, jsonschema_to_pydantic
│ ├── _schema.py JSON Schema ↔ Pydantic bridge
│ ├── adapter.py MCPToolAdapter: wrap MCP server tools as ChainWeaver Tools (#70, #150)
│ ├── adapter.py MCPToolAdapter: wrap MCP server tools as ChainWeaver Tools (#70, #150) + untrusted-metadata trust controls — annotation_trust→ToolSafetyContract (#371), MetadataPolicy name/description sanitisation (#359), schema-hash pinning + on_drift (#358), build_pin_file/load_pins
│ └── server.py FlowServer: safely expose governed flows as MCP tools via FastMCP (#72, #259, #294)
├── contrib/ Curated deterministic stdlib tools (#145); pip install 'chainweaver[contrib]'
│ ├── __init__.py Re-exports the public tool set
Expand Down Expand Up @@ -320,6 +321,7 @@ integration.
| `started_at` | `datetime` | UTC timestamp when execution began. |
| `ended_at` | `datetime` | UTC timestamp when execution finished. |
| `total_duration_ms` | `float` | Wall-clock duration in ms (via `time.perf_counter`). |
| `dry_run` | `bool` | `True` when produced by `execute_flow(dry_run=True)` (#357); a rehearsal trace, never a real run. |

### `StepRecord` (Pydantic `BaseModel`)

Expand All @@ -335,6 +337,7 @@ integration.
| `started_at` | `datetime` | UTC timestamp when the step began. |
| `ended_at` | `datetime` | UTC timestamp when the step finished. |
| `duration_ms` | `float` | Wall-clock duration in ms (via `time.perf_counter`). |
| `approval` | `ApprovalRecord \| None` | The decision for a step gated by an execution-time approval callback (#356); `None` when no approval was required. |

> **Serialization:** `ExecutionResult` and `StepRecord` are Pydantic models;
> `result.model_dump_json()` and `ExecutionResult.model_validate_json(...)`
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ All errors are typed and traceable:
| `SchemaValidationError` | Input or output fails Pydantic validation |
| `InputMappingError` | A mapping key is not present in the context |
| `FlowExecutionError` | The tool callable raises an unexpected exception |
| `ApprovalDeniedError` | An execution-time approval callback denied a step, raised, or returned an invalid value — or `strict_safety=True` and a required-approval step has no callback |
| `SafetyCeilingError` | A step's `ToolSafetyContract.side_effects` exceeds the executor's configured `max_side_effect_level` |
| `ToolDefinitionError` | The `@tool` decorator cannot build a tool from a function |
| `DAGDefinitionError` | A `DAGFlow` has a cycle, duplicate `step_id`, or unknown dependency |
| `FlowCompositionError` | A composed flow has a sub-flow cycle, exceeds `max_composition_depth`, or references an unregistered sub-flow |
Expand All @@ -812,6 +814,8 @@ All errors are typed and traceable:
| `FixtureStaleError` | A `record_then_replay` replay invocation cannot be matched to a recording (missing/stale fixture) |
| `FuzzConfigError` | A property-based fuzzing run is misconfigured (no properties, `runs < 1`, a flow with no `input_schema` and no base input, or an unsupported input-field type) |
| `CostProfileError` | A cost estimate is requested for a `(provider, model)` pair absent from the maintained `PROVIDER_PRICES` table |
| `MCPMetadataError` | A server-provided MCP tool name fails the adapter's `MetadataPolicy` (and `on_invalid_name="error"`) |
| `MCPSchemaDriftError` | A pinned MCP tool's raw schema changed under `MCPToolAdapter(on_drift="error")` |

All exceptions inherit from `ChainWeaverError`.

Expand Down
26 changes: 26 additions & 0 deletions chainweaver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@

from chainweaver import cli
from chainweaver.analyzer import ChainAnalyzer, Suggestion, ToolChain, suggest_optimizations
from chainweaver.approvals import (
ApprovalCallable,
ApprovalCallback,
ApprovalContext,
ApprovalDecision,
ApprovalRecord,
BaseApprovalCallback,
coerce_approval_callback,
)
from chainweaver.attest import AttestationInputError, AttestationReport, attest_flow
from chainweaver.builder import FlowBuilder, FlowBuilderError
from chainweaver.cache import FileStepCache, InMemoryStepCache, StepCache, StepCacheKey
Expand All @@ -66,6 +75,7 @@
ToolSafetyContract,
evaluate_predicate,
merge_safety,
side_effect_exceeds,
)
from chainweaver.cost import (
PROVIDER_PRICES,
Expand All @@ -85,6 +95,7 @@
from chainweaver.events import FlowEvent
from chainweaver.exceptions import (
AgentTraceImportError,
ApprovalDeniedError,
AsyncLaneUnsupportedError,
ChainWeaverError,
CheckpointDriftError,
Expand All @@ -106,11 +117,14 @@
InvalidFlowVersionError,
KernelInvocationError,
MCPError,
MCPMetadataError,
MCPSchemaConversionError,
MCPSchemaDriftError,
MCPToolInvocationError,
OfflineLLMError,
PluginDiscoveryError,
PredicateSyntaxError,
SafetyCeilingError,
SchemaValidationError,
ToolDefinitionError,
ToolNotFoundError,
Expand Down Expand Up @@ -238,11 +252,18 @@
"PROVIDER_PRICES",
"AgentTraceEvent",
"AgentTraceImportError",
"ApprovalCallable",
"ApprovalCallback",
"ApprovalContext",
"ApprovalDecision",
"ApprovalDeniedError",
"ApprovalRecord",
"AsyncLaneUnsupportedError",
"AttestationInputError",
"AttestationReport",
"BacktestMismatch",
"BacktestReport",
"BaseApprovalCallback",
"BaseDecisionCallback",
"BaseMiddleware",
"CancellationToken",
Expand Down Expand Up @@ -321,7 +342,9 @@
"LessonEvidenceStep",
"LessonReview",
"MCPError",
"MCPMetadataError",
"MCPSchemaConversionError",
"MCPSchemaDriftError",
"MCPToolInvocationError",
"ObservedStep",
"ObservedTrace",
Expand All @@ -337,6 +360,7 @@
"ReplayMode",
"ReplayResult",
"RetryPolicy",
"SafetyCeilingError",
"SafetyLevel",
"SchemaValidationError",
"ServiceConfig",
Expand Down Expand Up @@ -369,6 +393,7 @@
"check_flow_compatibility",
"classify_safety",
"cli",
"coerce_approval_callback",
"coerce_decision_callback",
"compile_flow",
"discover_flows",
Expand Down Expand Up @@ -397,6 +422,7 @@
"result_to_mermaid",
"schema_fingerprint",
"score_candidate",
"side_effect_exceeds",
"suggest_optimizations",
"tool",
"trace_to_lesson_candidate",
Expand Down
188 changes: 188 additions & 0 deletions chainweaver/approvals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Execution-time approval seam for ToolSafetyContract enforcement (issue #356).

ChainWeaver already ships a rich, composable safety vocabulary
(:mod:`chainweaver.contracts`): side-effect levels, approval flags, dry-run
support, and ``merge_safety()``. In v1 the contract was purely *advisory* —
:class:`~chainweaver.executor.FlowExecutor` never acted on it. An
:class:`ApprovalCallback` is the opt-in seam that makes the contract
*actionable*: when a step's effective contract has ``requires_approval=True``
and a callback is registered, the executor asks the callback to approve the
step **before** the tool function runs.

The seam deliberately mirrors :class:`~chainweaver.decisions.DecisionCallback`
(issue #102): the executor only ever *calls* a user-supplied callback, so the
three hard executor invariants (no LLM, no network I/O, no randomness in
:mod:`chainweaver.executor`) are preserved — the callback is where a host can
inject a human prompt, a policy service, or an RPC, none of which the executor
performs itself.

Two equivalent callback shapes are accepted, exactly like the decision seam::

# Class-based
class CliApprover:
def approve(self, ctx: ApprovalContext) -> ApprovalDecision:
return ApprovalDecision.APPROVE

# Plain callable
def approve_all(ctx: ApprovalContext) -> ApprovalDecision:
return ApprovalDecision.APPROVE

FlowExecutor(registry, approval_callback=approve_all)

Failure semantics: a ``DENY`` decision, a callback that raises, or a callback
that returns a non-:class:`ApprovalDecision` aborts the step with
:class:`~chainweaver.exceptions.ApprovalDeniedError` and a failed
``StepRecord`` — the same abort-the-step path tool failures take.
"""

from __future__ import annotations

from collections.abc import Callable
from enum import Enum
from typing import Any, Protocol, runtime_checkable

from pydantic import BaseModel, ConfigDict

from chainweaver.contracts import ToolSafetyContract


class ApprovalDecision(str, Enum):
"""Explicit outcome of an :class:`ApprovalCallback` — no boolean ambiguity.

Attributes:
APPROVE: Allow the step's tool to run.
DENY: Refuse the step; the executor aborts it with
:class:`~chainweaver.exceptions.ApprovalDeniedError`.
"""

APPROVE = "approve"
DENY = "deny"


class ApprovalContext(BaseModel):
"""Snapshot of execution state passed to an :class:`ApprovalCallback`.

Attributes:
trace_id: UUID4 hex string for the running execution.
flow_name: Name of the flow being executed.
step_index: Zero-based position of the step inside the flow.
step_id: ``DAGFlowStep.step_id`` when running a ``DAGFlow``; ``None``
for linear ``Flow`` execution.
tool_name: Name of the tool the step is about to run.
inputs: The step's resolved (already redacted, when a redaction policy
is configured) inputs. Read-only — mutating has no effect.
safety: The effective :class:`ToolSafetyContract` that triggered the
approval gate.
"""

model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)

trace_id: str
flow_name: str
step_index: int
step_id: str | None
tool_name: str
inputs: dict[str, Any]
safety: ToolSafetyContract


class ApprovalRecord(BaseModel):
"""Audit record of an approval decision, attached to ``StepRecord.approval``.

Persisted on the trace so a completed :class:`~chainweaver.executor.ExecutionResult`
is a full record of which side-effecting steps were gated and how they were
resolved.

Attributes:
decision: The :class:`ApprovalDecision` the callback returned (or
``DENY`` when the callback raised / no callback was registered under
``strict_safety``).
reason: Optional human-readable explanation carried alongside the
decision.
"""

model_config = ConfigDict(frozen=True)

decision: ApprovalDecision
reason: str | None = None


@runtime_checkable
class ApprovalCallback(Protocol):
"""Structural protocol for execution-time approval callbacks (issue #356)."""

def approve(self, ctx: ApprovalContext) -> ApprovalDecision:
"""Return :attr:`ApprovalDecision.APPROVE` or :attr:`ApprovalDecision.DENY`.

Args:
ctx: Snapshot of the flow execution state at the approval point.

Returns:
An :class:`ApprovalDecision`. Returning anything else aborts the
step with :class:`~chainweaver.exceptions.ApprovalDeniedError`.

Raises:
Exception: Any exception is caught by the executor, converted to an
:class:`~chainweaver.exceptions.ApprovalDeniedError`, and aborts
the step like any other tool failure.
"""
...


class BaseApprovalCallback:
"""Convenience base class for class-based :class:`ApprovalCallback`.

Subclass and override :meth:`approve`. Stateful approvers (batching
prompts, caching policy decisions) typically inherit from this; pure
stateless approvers can use a plain function and skip the class entirely.
"""

def approve(self, ctx: ApprovalContext) -> ApprovalDecision: # pragma: no cover — abstract
raise NotImplementedError("BaseApprovalCallback subclasses must override 'approve'.")


# Type alias for accepted callback shapes; bare callables are wrapped so the
# executor's call site stays uniform (``cb.approve(ctx)``).
ApprovalCallable = Callable[[ApprovalContext], ApprovalDecision]


class _CallableApprovalCallback:
"""Adapter that wraps a bare callable into an :class:`ApprovalCallback`."""

__slots__ = ("_fn",)

def __init__(self, fn: ApprovalCallable) -> None:
self._fn = fn

def approve(self, ctx: ApprovalContext) -> ApprovalDecision:
return self._fn(ctx)


def coerce_approval_callback(
cb: ApprovalCallback | ApprovalCallable | None,
) -> ApprovalCallback | None:
"""Normalize *cb* into an :class:`ApprovalCallback`, or ``None``.

Accepts either an object implementing ``approve(ctx)`` or a bare callable
with the equivalent signature. Bare callables are wrapped so the executor
can call ``cb.approve(ctx)`` uniformly.

Args:
cb: An :class:`ApprovalCallback`, a bare callable, or ``None``.

Returns:
An :class:`ApprovalCallback` instance, or ``None`` if *cb* was ``None``.

Raises:
TypeError: If *cb* is neither an :class:`ApprovalCallback` nor callable.
"""
if cb is None:
return None
if isinstance(cb, ApprovalCallback):
return cb
if callable(cb):
return _CallableApprovalCallback(cb)
raise TypeError(
f"approval_callback must implement ApprovalCallback or be callable; "
f"got {type(cb).__name__}."
)
24 changes: 24 additions & 0 deletions chainweaver/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ def schema_fingerprint(model: type[BaseModel]) -> str:
return hashlib.sha256(canonical.encode()).hexdigest()[:16]


def schema_dict_fingerprint(raw_schema: dict[str, object]) -> str:
"""Compute a deterministic fingerprint of a *raw* JSON Schema dict (issue #358).

Counterpart to :func:`schema_fingerprint`, which fingerprints a Pydantic
model. This variant fingerprints a JSON Schema *mapping* directly — used by
:class:`~chainweaver.mcp.adapter.MCPToolAdapter` to pin the schemas a remote
MCP server advertises *before* they are projected to Pydantic, so a server
silently changing a tool's ``inputSchema`` / ``outputSchema`` between sessions
is detectable.

The canonicalisation (sorted keys, compact separators) makes the fingerprint
insensitive to JSON key ordering, so a server reordering schema keys without
changing their meaning does not register as drift.

Args:
raw_schema: A JSON-Schema mapping (e.g. an MCP tool's ``inputSchema``).

Returns:
A 16-character hex digest string, matching :func:`schema_fingerprint`.
"""
canonical = json.dumps(raw_schema, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode()).hexdigest()[:16]


@dataclass
class CompatibilityIssue:
"""A single compatibility problem detected between a flow and its tools.
Expand Down
Loading
Loading