From 4a40f5fd8b161ac236a1c32d19dce99721fa78f4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Jun 2026 13:41:06 +0000 Subject: [PATCH 1/5] feat: harden the FlowExecutor execution core (#330, #331, #332, #335, #336, #337, #344, #354) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundational extraction plus six behavioural fixes across the executor core, delivered as one cluster. - #330/#331: add internal `chainweaver/_execution` package for the no-I/O collaborators shared by both lanes; first extract is `merge_step_outputs`, the single context-merge used by linear and DAG, sync and async. - #337: add `on_context_collision` ("overwrite"/"warn" (default)/"error") to Flow and DAGFlow; route all primary merges through the shared helper; emit a `context_collision` compile warning; new `ContextKeyCollisionError`. - #344: opt-in `FlowExecutor(max_step_concurrency=N)` runs independent async DAG-level steps concurrently under a semaphore, preserving declaration-order determinism; add a fan-out benchmark. - #336: document and enforce a real concurrency contract — `stream_flow` uses per-thread run-scoped middleware (no shared-list mutation); lock InMemoryStepCache / InMemoryCheckpointer. - #332: `execute_flow_async` rejects unsupported constructs up front with the new typed `AsyncLaneUnsupportedError`, listing all offending constructs. - #335: `FlowRegistry.update_flow_state` copy-on-write; `accept_drift` / `set_flow_status` no longer mutate registry-shared Flow objects in place. - #354: AST import-contract test enforces the no-LLM/no-network/no-randomness executor invariants over executor.py and _execution/. Regenerates the public-API snapshot and flow schema; updates AGENTS.md, architecture.md, invariants.md, data-integrity.md, and CHANGELOG. All four validation commands pass; coverage 92%. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS --- AGENTS.md | 51 +++- CHANGELOG.md | 28 +++ benchmarks/README.md | 3 + benchmarks/bench_dag_concurrency.py | 136 +++++++++++ chainweaver/__init__.py | 4 + chainweaver/_execution/__init__.py | 19 ++ chainweaver/_execution/context.py | 75 ++++++ chainweaver/cache.py | 38 +-- chainweaver/checkpoint.py | 30 ++- chainweaver/compiler.py | 30 ++- chainweaver/exceptions.py | 63 +++++ chainweaver/executor.py | 311 +++++++++++++++++-------- chainweaver/flow.py | 13 ++ chainweaver/registry.py | 72 +++++- docs/agent-context/architecture.md | 5 +- docs/agent-context/invariants.md | 29 +++ docs/data-integrity.md | 28 ++- schemas/flow.schema.json | 20 ++ tests/fixtures/public_api.json | 24 +- tests/test_composition.py | 4 +- tests/test_context_collision.py | 202 ++++++++++++++++ tests/test_dag_concurrency.py | 182 +++++++++++++++ tests/test_executor_async.py | 95 +++++++- tests/test_executor_concurrency.py | 211 +++++++++++++++++ tests/test_executor_import_contract.py | 207 ++++++++++++++++ tests/test_flow_state_transitions.py | 161 +++++++++++++ 26 files changed, 1893 insertions(+), 148 deletions(-) create mode 100644 benchmarks/bench_dag_concurrency.py create mode 100644 chainweaver/_execution/__init__.py create mode 100644 chainweaver/_execution/context.py create mode 100644 tests/test_context_collision.py create mode 100644 tests/test_dag_concurrency.py create mode 100644 tests/test_executor_concurrency.py create mode 100644 tests/test_executor_import_contract.py create mode 100644 tests/test_flow_state_transitions.py diff --git a/AGENTS.md b/AGENTS.md index 517446d..9b4c958 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -43,13 +43,16 @@ chainweaver/ ├── contracts.py ToolSafetyContract + SideEffectLevel/StabilityLevel/DeterminismLevel enums + merge_safety() + evaluate_predicate() — determinism + operational safety vocabulary (#19, #125, #293, #9, #8) ├── decorators.py @tool decorator for zero-boilerplate tool definition ├── tools.py Tool class: named callable with Pydantic I/O schemas + schema_hash + safety contract (#19); Tool.from_flow() wraps a Flow as a Tool (#24) with derived safety (#125) -├── flow.py FlowStep + Flow + DAGFlow + FlowStatus + FlowLifecycle + FlowGovernance + DriftInfo + ConditionalEdge (#9) + determinism_level property (#8) -├── registry.py FlowRegistry: multi-version catalogue with status filtering (store-backed) +├── flow.py FlowStep + Flow + DAGFlow + FlowStatus + FlowLifecycle + FlowGovernance + DriftInfo + ConditionalEdge (#9) + determinism_level property (#8) + ContextCollisionPolicy / on_context_collision (#337) +├── registry.py FlowRegistry: multi-version catalogue with status filtering (store-backed) + copy-on-write update_flow_state (#335) ├── storage.py RegistryStore protocol + InMemoryStore + FileStore (#16) ├── analyzer.py ChainAnalyzer: offline schema-compatibility analysis (#77) ├── attest.py attest_flow() + AttestationReport: observed-determinism evidence (#154) ├── decisions.py DecisionCallback Protocol + DecisionContext + coerce_decision_callback (#102) -├── executor.py FlowExecutor: sequential/DAG runner + drift detection + stream_flow (main entry point) +├── executor.py FlowExecutor: sequential/DAG runner + drift detection + stream_flow + opt-in async DAG-level concurrency (max_step_concurrency, #344) (main entry point) +├── _execution/ Internal, no-I/O execution collaborators shared by both lanes (#330, #331); banned from importing LLM/network/random — see invariants +│ ├── __init__.py Re-exports merge_step_outputs +│ └── context.py merge_step_outputs: single context-merge honouring on_context_collision (#337) ├── middleware.py FlowExecutorMiddleware Protocol + lifecycle context models + BaseMiddleware (#131) ├── events.py FlowEvent streamable lifecycle payload yielded by FlowExecutor.stream_flow (#134) ├── cache.py StepCache Protocol + InMemoryStepCache + FileStepCache + StepCacheKey (#127) @@ -207,6 +210,48 @@ see the `DAGFlowStep` subsection below). | `capability_id` | `str \| None` | `None` | Optional Weaver Stack capability identifier (#90); when set, the flow is routable as a `SelectableItem` via `flow_to_selectable_item`. See [docs/agent-context/flow-as-capability.md](docs/agent-context/flow-as-capability.md). | | `governance` | `FlowGovernance` | active defaults | Review lifecycle, owner, replacement-tool list, savings estimates, and review notes (#259, #268). Separate from `FlowStatus`. | | `safety` | `ToolSafetyContract \| None` | `None` | Explicit flow-level side-effect, retry, dry-run, idempotency, and approval metadata (#293). `None` means unknown, not safe. | +| `on_context_collision` | `Literal["overwrite", "warn", "error"]` | `"warn"` | Policy when a step output overwrites an existing context key (#337). `"overwrite"` = silent last-write-wins; `"warn"` = log at WARNING then overwrite; `"error"` = abort with `ContextKeyCollisionError`. Applied by the single shared merge helper (`chainweaver._execution.merge_step_outputs`) on both linear and DAG, sync and async. DAG *sibling* collisions within one level remain an unconditional error regardless. | + +### Context-collision semantics (#337) + +The accumulated context is the data plane of every flow. A step output that +overwrites an existing key (including an `initial_input` key) is governed by +`Flow.on_context_collision` and enforced in exactly one place — +`chainweaver._execution.merge_step_outputs` — for both flow kinds and both +lanes. `compile_flow` additionally emits a `context_collision` warning for +statically detectable overwrites (suppressed under `"overwrite"`). See +[docs/data-integrity.md](docs/data-integrity.md#context-key-collisions-337). + +### Concurrency contract (#336) + +A single `FlowExecutor` instance supports **concurrent** `execute_flow` / +`execute_flow_async` / `stream_flow` calls: run-scoped state (e.g. the stream +event collector) lives per-thread on a `threading.local` slot, and the bundled +`InMemoryStepCache` / `InMemoryCheckpointer` are internally locked. The one +rule: **mutating operations (`register_tool`, `add_middleware`, +`accept_drift`) must not run concurrently with executions** — do them at setup. + +### Async lane support matrix (#332) + +`execute_flow_async` raises `AsyncLaneUnsupportedError` (before any step runs) +for features it does not yet honour, rather than diverging silently: + +| Feature | `execute_flow` (sync) | `execute_flow_async` | +|---------|:---------------------:|:--------------------:| +| Linear flows | ✅ | ✅ | +| DAG flows (no branching) | ✅ | ✅ | +| Opt-in DAG-level concurrency (#344) | sequential | ✅ (`max_step_concurrency`) | +| Conditional branches / `default_next` (#9) | ✅ | ❌ rejected | +| `decision_candidates` (#102) | ✅ | ❌ rejected | +| Composed sub-flow (`flow_name`, #75) | ✅ | ❌ rejected | +| Step cache / checkpoint resume | ✅ | bypassed | + +### State transitions (#335) + +`accept_drift` and `set_flow_status` never mutate a registry-held `Flow` in +place — they go through `FlowRegistry.update_flow_state`, which performs a +`model_copy(update=...)`, persists the new object, and returns it. Callers +needing the new state must re-fetch via `get_flow`. **Read-only properties (not fields):** `input_schema`, `output_schema`, and `context_schema` resolve their `*_schema_ref` counterparts to a diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e58e0b..ebdb6e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **FlowExecutor execution-core hardening** (#330, #331, #332, #335, #336, + #337, #344): a cluster of architecture/reliability improvements to the + executor core. + - A new internal `chainweaver._execution` package holds the transport-agnostic, + no-I/O building blocks shared by both lanes (#330, #331), starting with + `merge_step_outputs` — a single context-merge implementation used by linear + and DAG, sync and async. + - `Flow` / `DAGFlow` gain `on_context_collision` (`"overwrite"` / `"warn"` + (default) / `"error"`) governing what happens when a step output overwrites + an existing context key; `compile_flow` emits a `context_collision` warning + for statically detectable overwrites; new `ContextKeyCollisionError` (#337). + - Opt-in concurrent execution of independent DAG-level steps in the async lane + via `FlowExecutor(max_step_concurrency=N)` (default `1` = sequential, + bit-identical); results stay deterministic regardless of the setting (#344). + - `FlowExecutor` now documents and supports a real concurrency contract: + `stream_flow` registers its event collector as per-thread run-scoped + middleware (no more shared-list mutation), and `InMemoryStepCache` / + `InMemoryCheckpointer` are internally locked (#336). + - `execute_flow_async` rejects unsupported constructs (branching, + `decision_candidates`, composed sub-flows) up front with the new typed + `AsyncLaneUnsupportedError`, listing every offending construct (#332). + - `FlowRegistry.update_flow_state` performs copy-on-write state transitions; + `accept_drift` / `set_flow_status` no longer mutate registry-shared `Flow` + objects in place (#335). + - The executor determinism invariants (no LLM / no network / no randomness) + are now mechanically enforced by an AST import-contract test over + `executor.py` and `_execution/` (#354). + - **Coding-agent macro-flow compilation loop** (#254, #256, #257, #260, #261, #262, #263, #266, #267, #312, #313, #314): a new `chainweaver.traces` module makes the *observe → mine → score → draft → backtest* loop diff --git a/benchmarks/README.md b/benchmarks/README.md index 56b5754..347063c 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -21,6 +21,9 @@ python benchmarks/bench_naive_vs_compiled.py --output results/bench.json python benchmarks/bench_naive_vs_compiled.py \ --output results/bench.json \ --full-output results/bench-full.json + +# Sequential vs concurrent DAG-level execution (#344) +python benchmarks/bench_dag_concurrency.py --leaves 6 --io-ms 50 ``` `time.sleep` is used to simulate the LLM round-trip — no real LLM is diff --git a/benchmarks/bench_dag_concurrency.py b/benchmarks/bench_dag_concurrency.py new file mode 100644 index 0000000..698586d --- /dev/null +++ b/benchmarks/bench_dag_concurrency.py @@ -0,0 +1,136 @@ +"""Sequential vs concurrent DAG-level execution benchmark (issue #344). + +Demonstrates the latency win of opt-in concurrent execution of independent DAG +steps in the async lane. Builds a fan-out flow — one root feeding ``N`` +independent I/O-bound leaves — and runs it through ``execute_flow_async`` at +``max_step_concurrency=1`` (sequential, the default) and ``max_step_concurrency=N`` +(fully parallel level). Each leaf simulates I/O with ``asyncio.sleep`` — no real +network traffic. + +Run from the repository root:: + + python benchmarks/bench_dag_concurrency.py + python benchmarks/bench_dag_concurrency.py --leaves 8 --io-ms 50 --repeats 5 + +With ``N`` leaves each taking ``io_ms`` of I/O, the sequential lane takes +~``N * io_ms`` while the concurrent lane takes ~``io_ms`` — a ~``N``x speedup +on the level. Results are deterministic regardless of concurrency (verified by +the test suite); this script only measures wall-clock latency. +""" + +from __future__ import annotations + +import argparse +import asyncio +import time +from statistics import median +from typing import Any + +from pydantic import BaseModel, create_model + +from chainweaver.executor import FlowExecutor +from chainweaver.flow import DAGFlow, DAGFlowStep +from chainweaver.registry import FlowRegistry +from chainweaver.tools import Tool + + +class _SeedIn(BaseModel): + n: int + + +class _SeedOut(BaseModel): + seed: int + + +class _LeafIn(BaseModel): + seed: int + + +def _build_executor( + num_leaves: int, io_seconds: float, concurrency: int +) -> tuple[FlowExecutor, str]: + async def _root(inp: _SeedIn) -> dict[str, Any]: + return {"seed": inp.n} + + registry = FlowRegistry() + steps: list[DAGFlowStep] = [ + DAGFlowStep(step_id="root", tool_name="root", input_mapping={"n": "n"}) + ] + for i in range(num_leaves): + steps.append( + DAGFlowStep( + step_id=f"leaf{i}", + tool_name=f"leaf{i}", + input_mapping={"seed": "seed"}, + depends_on=["root"], + ) + ) + registry.register_flow( + DAGFlow(name="fan_out", version="1.0.0", description="fan-out", steps=steps) + ) + executor = FlowExecutor(registry=registry, max_step_concurrency=concurrency) + executor.register_tool( + Tool(name="root", description="", input_schema=_SeedIn, output_schema=_SeedOut, fn=_root) + ) + for i in range(num_leaves): + fields: dict[str, Any] = {f"r{i}": (int, ...)} + out_model = create_model(f"Leaf{i}Out", **fields) + + async def _leaf(inp: _LeafIn, _i: int = i) -> dict[str, Any]: + await asyncio.sleep(io_seconds) + return {f"r{_i}": inp.seed + _i} + + executor.register_tool( + Tool( + name=f"leaf{i}", + description="", + input_schema=_LeafIn, + output_schema=out_model, + fn=_leaf, + ) + ) + return executor, "fan_out" + + +async def _time_run(executor: FlowExecutor, flow_name: str, repeats: int) -> float: + durations: list[float] = [] + for _ in range(repeats): + t0 = time.perf_counter() + result = await executor.execute_flow_async(flow_name, {"n": 0}) + durations.append(time.perf_counter() - t0) + assert result.success + return median(durations) + + +async def _main(num_leaves: int, io_ms: float, repeats: int) -> None: + io_seconds = io_ms / 1000.0 + seq_executor, name = _build_executor(num_leaves, io_seconds, concurrency=1) + con_executor, _ = _build_executor(num_leaves, io_seconds, concurrency=num_leaves) + + seq = await _time_run(seq_executor, name, repeats) + con = await _time_run(con_executor, name, repeats) + + print(f"fan-out DAG: {num_leaves} leaves x {io_ms:.0f}ms I/O, median of {repeats} runs") + print(f" sequential (max_step_concurrency=1): {seq * 1000:8.1f} ms") + print(f" concurrent (max_step_concurrency={num_leaves}):{con * 1000:8.1f} ms") + if con > 0: + print(f" speedup: {seq / con:8.1f}x") + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--leaves", type=int, default=6, help="Independent leaf steps in the level." + ) + parser.add_argument( + "--io-ms", type=float, default=50.0, help="Simulated per-leaf I/O latency (ms)." + ) + parser.add_argument( + "--repeats", type=int, default=5, help="Timed runs per case (median reported)." + ) + args = parser.parse_args() + asyncio.run(_main(args.leaves, args.io_ms, args.repeats)) + + +if __name__ == "__main__": + main() diff --git a/chainweaver/__init__.py b/chainweaver/__init__.py index 9f46935..6c9ac46 100644 --- a/chainweaver/__init__.py +++ b/chainweaver/__init__.py @@ -85,10 +85,12 @@ from chainweaver.events import FlowEvent from chainweaver.exceptions import ( AgentTraceImportError, + AsyncLaneUnsupportedError, ChainWeaverError, CheckpointDriftError, CheckpointerNotConfiguredError, CheckpointNotFoundError, + ContextKeyCollisionError, ContribError, CostProfileError, DAGDefinitionError, @@ -236,6 +238,7 @@ "PROVIDER_PRICES", "AgentTraceEvent", "AgentTraceImportError", + "AsyncLaneUnsupportedError", "AttestationInputError", "AttestationReport", "BacktestMismatch", @@ -257,6 +260,7 @@ "CompilationResult", "CompilationWarning", "ConditionalEdge", + "ContextKeyCollisionError", "ContribError", "CostProfile", "CostProfileError", diff --git a/chainweaver/_execution/__init__.py b/chainweaver/_execution/__init__.py new file mode 100644 index 0000000..a97dafd --- /dev/null +++ b/chainweaver/_execution/__init__.py @@ -0,0 +1,19 @@ +"""Internal execution collaborators for :class:`chainweaver.executor.FlowExecutor`. + +This private package holds the transport-agnostic, no-I/O building blocks that +both the synchronous and asynchronous execution lanes share (issues #330, #331). +Extracting shared logic here keeps it defined in exactly one place — so a fix or +a policy change applies to both lanes at once — and gives the determinism +import-contract check (issue #354) a focused surface to guard. + +Everything here is bound by the three hard executor invariants: **no LLM calls, +no network I/O, no randomness.** Nothing in this package may import a banned +module (see ``docs/agent-context/invariants.md`` and +``tests/test_executor_import_contract.py``). +""" + +from __future__ import annotations + +from chainweaver._execution.context import merge_step_outputs + +__all__ = ["merge_step_outputs"] diff --git a/chainweaver/_execution/context.py b/chainweaver/_execution/context.py new file mode 100644 index 0000000..c4d342b --- /dev/null +++ b/chainweaver/_execution/context.py @@ -0,0 +1,75 @@ +"""Shared execution-context merge logic (issues #337, #331). + +The accumulated execution context is the data plane of every flow. Both the +linear and DAG lanes — sync and async — merge each step's outputs into this +running context, and both previously did so with subtly different rules: + +- linear flows logged collisions at ``DEBUG`` and silently overwrote; +- DAG flows rejected same-level sibling collisions but silently overwrote + level-to-level. + +:func:`merge_step_outputs` is the single implementation all four paths now call, +so the collision policy is defined and enforced in exactly one place. The +flow-level ``on_context_collision`` setting selects the behaviour: + +- ``"overwrite"`` — historical last-write-wins, logged at ``DEBUG``; +- ``"warn"`` (default) — log at ``WARNING`` before overwriting; +- ``"error"`` — abort with :class:`ContextKeyCollisionError` naming the step + and the colliding keys. + +DAG *sibling* collisions within a single level are handled separately by the +DAG runner and remain an unconditional error regardless of this policy — they +are genuinely ambiguous (no defined ordering between siblings). + +This module is on the deterministic execution path: no LLM, no network, no +randomness. +""" + +from __future__ import annotations + +import logging +from collections.abc import Mapping +from typing import Any + +from chainweaver.exceptions import ContextKeyCollisionError +from chainweaver.flow import ContextCollisionPolicy + + +def merge_step_outputs( + context: dict[str, Any], + outputs: Mapping[str, Any], + *, + policy: ContextCollisionPolicy, + flow_name: str, + step_index: int, + step_name: str, + logger: logging.Logger, +) -> None: + """Merge *outputs* into *context* in place, applying the collision *policy*. + + Args: + context: The running execution context, mutated in place. + outputs: The step's validated outputs to merge in. + policy: The flow's ``on_context_collision`` setting. + flow_name: Name of the executing flow (for diagnostics / errors). + step_index: Zero-based index of the step that produced *outputs*. + step_name: Display name of the step (for diagnostics / errors). + logger: Logger used for ``DEBUG`` / ``WARNING`` collision messages. + + Raises: + ContextKeyCollisionError: When *policy* is ``"error"`` and one or more + output keys already exist in *context*. + """ + collisions = [key for key in outputs if key in context] + if collisions: + if policy == "error": + raise ContextKeyCollisionError(flow_name, step_index, step_name, collisions) + log = logger.warning if policy == "warn" else logger.debug + for key in collisions: + log( + "Step %d (%s): context key '%s' overwritten", + step_index, + step_name, + key, + ) + context.update(outputs) diff --git a/chainweaver/cache.py b/chainweaver/cache.py index bfcfa51..e9f7427 100644 --- a/chainweaver/cache.py +++ b/chainweaver/cache.py @@ -44,6 +44,7 @@ import os import re import tempfile +import threading from pathlib import Path from typing import Any, Protocol, runtime_checkable @@ -115,34 +116,39 @@ class InMemoryStepCache: Use this for batch executions, tests, and any workload where the cache only needs to live for the lifetime of a process. - **Concurrency**: this class wraps a plain ``dict`` and offers no - internal synchronization. It is safe to use from a single thread - of execution per :class:`FlowExecutor` instance (which is the - documented executor contract). Sharing a single - ``InMemoryStepCache`` across threads requires the caller to wrap - accesses in an external :class:`threading.Lock`, or to use - :class:`FileStepCache` (which delegates atomicity to the - filesystem) instead. + **Concurrency** (issue #336): every accessor is guarded by an + internal :class:`threading.Lock`, so a single ``InMemoryStepCache`` + is safe to share across the concurrent runs of one + :class:`FlowExecutor`. The lock is held only for the dict + operation itself (a get/set/clear), never across a tool + invocation, so contention is negligible. ``get`` returns a + defensive copy, so a reader can never observe a half-written entry. """ def __init__(self) -> None: self._store: dict[str, dict[str, Any]] = {} + self._lock = threading.Lock() def get(self, key: StepCacheKey) -> dict[str, Any] | None: - cached = self._store.get(key.digest) - if cached is None: - return None - # Return a defensive copy so callers can't mutate the cache. - return dict(cached) + with self._lock: + cached = self._store.get(key.digest) + if cached is None: + return None + # Return a defensive copy so callers can't mutate the cache. + return dict(cached) def set(self, key: StepCacheKey, output: dict[str, Any]) -> None: - self._store[key.digest] = dict(output) + snapshot = dict(output) + with self._lock: + self._store[key.digest] = snapshot def clear(self) -> None: - self._store.clear() + with self._lock: + self._store.clear() def __len__(self) -> int: - return len(self._store) + with self._lock: + return len(self._store) class FileStepCache: diff --git a/chainweaver/checkpoint.py b/chainweaver/checkpoint.py index 19b55d6..4c32b8c 100644 --- a/chainweaver/checkpoint.py +++ b/chainweaver/checkpoint.py @@ -37,6 +37,7 @@ import os import re import tempfile +import threading from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable @@ -127,31 +128,38 @@ class InMemoryCheckpointer: Use this for unit tests and any scenario where the checkpoint only needs to live for the lifetime of a process. - **Concurrency**: this class wraps a plain ``dict`` and offers no - internal synchronization. It is safe to use from a single thread - of execution per :class:`FlowExecutor` (which is the documented - executor contract). For cross-process or cross-thread - crash-resume, use :class:`FileCheckpointer` (which delegates - atomicity to the filesystem) instead. + **Concurrency** (issue #336): every accessor is guarded by an + internal :class:`threading.Lock`, so a single ``InMemoryCheckpointer`` + is safe to share across the concurrent runs of one + :class:`FlowExecutor`. The lock is held only for the dict + operation itself, never across a tool invocation. For + cross-process crash-resume, use :class:`FileCheckpointer` (which + delegates atomicity to the filesystem) instead. """ def __init__(self) -> None: self._store: dict[str, ExecutionSnapshot] = {} + self._lock = threading.Lock() def save(self, snapshot: ExecutionSnapshot) -> None: - self._store[snapshot.trace_id] = snapshot + with self._lock: + self._store[snapshot.trace_id] = snapshot def load(self, trace_id: str) -> ExecutionSnapshot | None: - return self._store.get(trace_id) + with self._lock: + return self._store.get(trace_id) def delete(self, trace_id: str) -> None: - self._store.pop(trace_id, None) + with self._lock: + self._store.pop(trace_id, None) def list_trace_ids(self) -> list[str]: - return list(self._store.keys()) + with self._lock: + return list(self._store.keys()) def __len__(self) -> int: - return len(self._store) + with self._lock: + return len(self._store) class FileCheckpointer: diff --git a/chainweaver/compiler.py b/chainweaver/compiler.py index df3e09e..31fad84 100644 --- a/chainweaver/compiler.py +++ b/chainweaver/compiler.py @@ -281,8 +281,36 @@ def compile_flow(flow: Flow, tools: dict[str, Tool]) -> CompilationResult: ) ) - # Update context with this tool's output fields. + # Statically detectable context-key collision (issue #337): this step's + # output fields overwrite keys already in the accumulated context. + # Suppressed when the flow opts into overwrite-on-collision, which is + # the documented escape hatch for intentional refine-in-place pipelines. tool_output_fields = _get_model_fields(tool.output_schema) + if flow.on_context_collision != "overwrite": + for name in tool_output_fields: + if name in context_fields: + warnings.append( + CompilationWarning( + step_index=idx, + tool_name=step.tool_name, + field_name=name, + issue_type="context_collision", + detail=( + f"Step {idx} ('{step.tool_name}'): output key '{name}' " + f"overwrites an existing context key. With " + f"on_context_collision='{flow.on_context_collision}' this " + f"is " + + ( + "logged at runtime" + if flow.on_context_collision == "warn" + else "a runtime error" + ) + + "." + ), + ) + ) + + # Update context with this tool's output fields. for name, finfo in tool_output_fields.items(): context_fields[name] = _get_field_type(finfo) diff --git a/chainweaver/exceptions.py b/chainweaver/exceptions.py index f76e1e4..9f2da2e 100644 --- a/chainweaver/exceptions.py +++ b/chainweaver/exceptions.py @@ -202,6 +202,69 @@ def __init__( super().__init__(f"Flow '{flow_name}' cancelled before step {step_index} ({reason}).") +class ContextKeyCollisionError(ChainWeaverError): + """Raised when a step output collides with an existing context key under the + ``on_context_collision="error"`` policy (issue #337). + + The accumulated execution context is a flow's data plane. By default a step + that emits a key already present in the context (including the initial + input) overwrites it — silently before #337, at ``WARNING`` since. A flow + that sets ``on_context_collision="error"`` opts into hard failure instead: + rather than letting a reordering or an added step drop earlier data + unnoticed, the run aborts with this typed error naming the offending step + and the colliding keys. + + Attributes: + flow_name: Name of the flow whose context collided. + step_index: Zero-based index of the step that produced the collision. + step_name: Display name of the offending step. + keys: The output keys that collided with existing context keys. + """ + + def __init__(self, flow_name: str, step_index: int, step_name: str, keys: list[str]) -> None: + self.flow_name = flow_name + self.step_index = step_index + self.step_name = step_name + self.keys = list(keys) + joined = ", ".join(repr(key) for key in keys) + super().__init__( + f"Flow '{flow_name}' step {step_index} ('{step_name}') would overwrite " + f"existing context key(s) {joined}; on_context_collision='error' aborts " + f"rather than dropping earlier data." + ) + + +class AsyncLaneUnsupportedError(ChainWeaverError): + """Raised when :meth:`FlowExecutor.execute_flow_async` is given a flow that + uses execution features the async lane does not yet support (issue #332). + + The async lane (issue #80) does not implement conditional branching + (``branches`` / ``default_next``, #9), guided decision callbacks + (``decision_candidates``, #102), or composed sub-flow steps (``flow_name``, + #75). Rather than executing such a flow with those directives **silently + dropped** — which would yield a different result than the synchronous + :meth:`execute_flow` and undermine the determinism promise — the executor + fails fast, before the first step runs, listing every unsupported construct + it found. Route the flow through :meth:`execute_flow` until async parity + lands. + + Attributes: + flow_name: Name of the flow that could not run on the async lane. + unsupported: Human-readable descriptions of each unsupported construct + found, one per offending step/feature. + """ + + def __init__(self, flow_name: str, unsupported: list[str]) -> None: + self.flow_name = flow_name + self.unsupported = list(unsupported) + joined = "; ".join(unsupported) + super().__init__( + f"Flow '{flow_name}' uses features unsupported by execute_flow_async: " + f"{joined}. Run it via the synchronous execute_flow until the async " + f"lane reaches parity." + ) + + class FlowCompositionError(ChainWeaverError): """Raised when a composed flow's sub-flow references are invalid (issue #75). diff --git a/chainweaver/executor.py b/chainweaver/executor.py index 88034d2..9d2c30b 100644 --- a/chainweaver/executor.py +++ b/chainweaver/executor.py @@ -28,6 +28,7 @@ from pydantic import BaseModel, ConfigDict, Field, ValidationError from tenacity import RetryError, Retrying, retry_if_exception_type, stop_after_attempt, wait_fixed +from chainweaver._execution import merge_step_outputs from chainweaver.cache import StepCache, StepCacheKey, compute_input_value_hash from chainweaver.cancellation import CancellationToken from chainweaver.checkpoint import Checkpointer, ExecutionSnapshot @@ -41,6 +42,7 @@ ) from chainweaver.events import FlowEvent from chainweaver.exceptions import ( + AsyncLaneUnsupportedError, CheckpointDriftError, CheckpointerNotConfiguredError, CheckpointNotFoundError, @@ -499,20 +501,24 @@ class FlowExecutor: There are **no LLM calls** at any point in this process. - **Concurrency**: a single :class:`FlowExecutor` instance is **not** - safe for concurrent :meth:`execute_flow`, :meth:`stream_flow`, - :meth:`resume_flow`, or :meth:`replay_flow` calls. The - middleware list, the step cache, the checkpointer, the - ``_in_replay`` flag, the ``_resume_snapshot`` slot, and the - internal stream-collector all live on instance state. Two - concurrent ``stream_flow`` calls on the same executor would each - push a collector that the other run also dispatches to — yielding - cross-talk where each generator receives the other flow's - events. If you need to drive multiple flows in parallel, - instantiate one :class:`FlowExecutor` per worker (sharing the - underlying :class:`FlowRegistry`, :class:`StepCache`, and - :class:`Checkpointer` instances if appropriate — each backend's - docstring describes its own concurrency contract). + **Concurrency contract** (issue #336): a single :class:`FlowExecutor` + instance supports **concurrent** :meth:`execute_flow`, + :meth:`execute_flow_async`, and :meth:`stream_flow` calls. Run-scoped + state — the stream event collector and (where applicable) replay/resume + markers — is held per-thread on a :class:`threading.local` slot rather + than the shared instance, so concurrent runs never dispatch each other's + lifecycle events. The bundled :class:`~chainweaver.cache.InMemoryStepCache` + and :class:`~chainweaver.checkpoint.InMemoryCheckpointer` are internally + locked, so sharing them across one executor's concurrent runs is safe. + + The contract has one rule: **mutating operations must not run concurrently + with executions.** :meth:`register_tool`, :meth:`add_middleware`, + :meth:`remove_middleware`, and :meth:`accept_drift` mutate shared + configuration and are expected to happen during setup, before (or between) + runs — not while another thread is mid-execution. Read-only execution is + concurrency-safe; reconfiguration is not. + + See ``tests/test_executor_concurrency.py`` for the enforced stress tests. Args: registry: The :class:`~chainweaver.registry.FlowRegistry` that holds @@ -537,6 +543,15 @@ class FlowExecutor: composition graph is validated before execution; deeper chains (or cycles) raise :class:`~chainweaver.exceptions.FlowCompositionError`. + max_step_concurrency: Maximum number of independent steps within a + single DAG level that :meth:`execute_flow_async` dispatches + concurrently (issue #344). Defaults to ``1`` (strictly + sequential — bit-identical to the historical behaviour). Values + ``> 1`` bound concurrent dispatch via an + :class:`asyncio.Semaphore`; results are deterministic regardless of + the setting, but opted-in tools must be safe to run concurrently. + The synchronous :meth:`execute_flow` lane currently always runs + level steps sequentially. Example:: @@ -564,13 +579,31 @@ def __init__( decision_callback: DecisionCallback | DecisionCallable | None = None, discover_plugins: bool = False, max_composition_depth: int = 10, + max_step_concurrency: int = 1, ) -> None: + if max_step_concurrency < 1: + raise ValueError(f"max_step_concurrency must be >= 1, got {max_step_concurrency}.") self._registry = registry self._tools: dict[str, Tool] = {} + # Opt-in concurrent execution of independent steps within a DAG level + # (issue #344). ``1`` (the default) preserves the historical + # strictly-sequential behaviour exactly. Higher values bound the number + # of a level's steps that ``execute_flow_async`` dispatches at once via + # an :class:`asyncio.Semaphore`; ``StepRecord`` ordering, context-merge + # results, and sibling-collision detection stay identical regardless of + # the setting. Tools must be safe to run concurrently to opt in. + self._max_step_concurrency = max_step_concurrency self._cost_profile = cost_profile self._redaction_policy = redaction_policy self._trace_recorder = trace_recorder self._middleware: list[FlowExecutorMiddleware] = list(middleware) if middleware else [] + # Per-run, per-thread middleware (issue #336). ``stream_flow`` and any + # other run-scoped observer registers here instead of mutating the + # shared ``self._middleware`` list, so two concurrent runs on one + # executor never dispatch each other's events. Keyed by thread because + # each run executes within a single thread (the calling thread, or the + # stream worker thread). + self._local = threading.local() # Guided decision-point callback (issue #102). Wraps a bare # callable in an adapter so the executor can call # ``self._decision_callback.decide(ctx)`` uniformly regardless @@ -689,7 +722,9 @@ def _fire_hook( Hooks that raise are logged at ``WARNING`` and the iteration continues — middleware bugs never abort a flow. """ - for idx, mw in enumerate(self._middleware): + run_scoped = getattr(self._local, "middleware", None) + chain = self._middleware if not run_scoped else [*self._middleware, *run_scoped] + for idx, mw in enumerate(chain): handler = getattr(mw, hook, None) if handler is None: continue @@ -704,6 +739,21 @@ def _fire_hook( exc, ) + @contextlib.contextmanager + def _scoped_middleware(self, middleware: FlowExecutorMiddleware) -> Iterator[None]: + """Register *middleware* for the duration of the current thread's run. + + Run-scoped middleware lives on a :class:`threading.local` slot rather + than the shared ``self._middleware`` list, so concurrent runs on one + executor (issue #336) never see each other's run-scoped observers. + """ + existing: list[FlowExecutorMiddleware] = getattr(self._local, "middleware", []) + self._local.middleware = [*existing, middleware] + try: + yield + finally: + self._local.middleware = existing + def _fire_flow_start(self, ctx: FlowStartContext) -> None: self._fire_hook("on_flow_start", ctx) @@ -786,6 +836,8 @@ def with_replaced_tools(self, tools: Iterable[Tool]) -> FlowExecutor: checkpointer=self._checkpointer, delete_on_success=self._delete_on_success, decision_callback=self._decision_callback, + max_composition_depth=self._max_composition_depth, + max_step_concurrency=self._max_step_concurrency, ) for tool in tools: clone.register_tool(tool) @@ -833,8 +885,15 @@ def accept_drift(self, flow_name: str, *, version: str | None = None) -> None: for step in flow.steps: if step.display_name in self._tools: new_hashes[step.display_name] = self._tools[step.display_name].schema_hash - flow.tool_schema_hashes = new_hashes - flow.status = FlowStatus.ACTIVE + # Copy-on-write via the registry (issue #335): never mutate the shared + # registry-held Flow in place — that would silently alter the state + # observed by other holders of the same object (e.g. a FlowServer). + self._registry.update_flow_state( + flow_name, + version=version, + status=FlowStatus.ACTIVE, + tool_schema_hashes=new_hashes, + ) def _handle_schema_drift(self, old_tool: Tool, new_tool: Tool) -> None: """Mark affected flows as NEEDS_REVIEW when a tool's schema changes.""" @@ -1321,15 +1380,15 @@ def execute_flow( f"Step {idx} ({step.display_name}) succeeded but produced no outputs" ) - for key in record.outputs: - if key in context: - _logger.debug( - "Step %d (%s): context key '%s' overwritten", - idx, - step.display_name, - key, - ) - context.update(record.outputs) + merge_step_outputs( + context, + record.outputs, + policy=flow.on_context_collision, + flow_name=flow_name, + step_index=idx, + step_name=step.display_name, + logger=_logger, + ) # Crash-resume checkpoint (issue #128) — write after every # successful step so a fresh process can resume from here. @@ -1441,10 +1500,12 @@ async def execute_flow_async( async lane in a follow-up. The async lane does **not** yet honour conditional branching - (``branches`` / ``default_next``, #9) or decision callbacks - (``decision_candidates``, #102). Flows declaring those features - raise :class:`FlowExecutionError` up front rather than executing - with the directives silently dropped — use the synchronous + (``branches`` / ``default_next``, #9), decision callbacks + (``decision_candidates``, #102), or composed sub-flow steps + (``flow_name``, #75). Flows declaring those features raise + :class:`AsyncLaneUnsupportedError` up front — listing every + unsupported construct — rather than executing with the directives + silently dropped (issue #332); use the synchronous :meth:`execute_flow` for such flows until the async lane reaches parity. @@ -1466,6 +1527,9 @@ async def execute_flow_async( An :class:`ExecutionResult` with the full execution log. Raises: + AsyncLaneUnsupportedError: When the flow uses conditional + branching, decision callbacks, or composed sub-flow steps, + which the async lane does not yet support (issue #332). FlowCancelledError: When *deadline* has passed or *cancel_token* is cancelled at a step boundary. """ @@ -1489,49 +1553,38 @@ def _assert_async_lane_supported(flow: Any) -> None: """Reject flows using execution features the async lane can't honour. ``execute_flow_async`` is a v0.1 lane (issue #80). It does not - yet implement the conditional-branching (#9) or decision-callback - (#102) semantics the synchronous :meth:`execute_flow` supports. - The async DAG path also builds a plain tool proxy per step, so - those directives would be **silently dropped** — producing a - different result than the sync lane for the same flow. Fail fast - with a clear error (rather than diverge silently) so callers route - such flows through :meth:`execute_flow` until the async lane gains - parity. ``step_type='capability'`` is handled separately by the - DAG path, which already errors loudly on it. + yet implement the conditional-branching (#9), decision-callback + (#102), or composed sub-flow (#75) semantics the synchronous + :meth:`execute_flow` supports. The async DAG path builds a plain + tool proxy per step, so those directives would be **silently + dropped** — producing a different result than the sync lane for the + same flow. This collects *every* unsupported construct in the flow + and raises a single :class:`AsyncLaneUnsupportedError` **before any + step runs** (issue #332), so callers see the full set of reasons at + once and route such flows through :meth:`execute_flow` until the + async lane gains parity. """ + unsupported: list[str] = [] for idx, step in enumerate(flow.steps): if getattr(step, "flow_name", None) is not None: - raise FlowExecutionError( - step.flow_name, - idx, - "execute_flow_async does not support composed sub-flow " - "(flow_name) steps (issue #75) yet; run this flow via the " - "synchronous execute_flow.", + unsupported.append( + f"step {idx} ('{step.flow_name}'): composed sub-flow " + "(flow_name) steps (issue #75)" ) if getattr(step, "decision_candidates", None): - raise FlowExecutionError( - step.display_name, - idx, - "execute_flow_async does not support decision_candidates " - "(issue #102) yet; run this flow via the synchronous " - "execute_flow.", + unsupported.append( + f"step {idx} ('{step.display_name}'): decision_candidates (issue #102)" ) if getattr(step, "branches", None): - raise FlowExecutionError( - step.display_name, - idx, - "execute_flow_async does not support conditional branches " - "(issue #9) yet; run this flow via the synchronous " - "execute_flow.", + unsupported.append( + f"step {idx} ('{step.display_name}'): conditional branches (issue #9)" ) if getattr(step, "default_next", None) is not None: - raise FlowExecutionError( - step.display_name, - idx, - "execute_flow_async does not support default_next routing " - "(issue #9) yet; run this flow via the synchronous " - "execute_flow.", + unsupported.append( + f"step {idx} ('{step.display_name}'): default_next routing (issue #9)" ) + if unsupported: + raise AsyncLaneUnsupportedError(flow.name, unsupported) async def _execute_linear_flow_async( self, @@ -1618,7 +1671,15 @@ async def _execute_linear_flow_async( raise RuntimeError( f"Step {idx} ({step.display_name}) succeeded but produced no outputs" ) - context.update(record.outputs) + merge_step_outputs( + context, + record.outputs, + policy=flow.on_context_collision, + flow_name=flow_name, + step_index=idx, + step_name=step.display_name, + logger=_logger, + ) if flow.output_schema is not None: validation_record = self._validate_flow_schema( @@ -1724,6 +1785,11 @@ async def _execute_dag_flow_async( initial_input=initial_input, ) level_outputs: dict[str, Any] = {} + + # Assign each step its declaration-order flat index up front so the + # execution log is identical regardless of concurrency (#344), and + # reject capability steps (async lane runs tool steps only). + indexed_steps: list[tuple[int, Any]] = [] for step in level_steps: if step.step_type != "tool": err = FlowExecutionError( @@ -1757,16 +1823,21 @@ async def _execute_dag_flow_async( perf_start=flow_t0, initial_input=initial_input, ) + indexed_steps.append((flat_index, step)) + flat_index += 1 - proxy = FlowStep( - tool_name=step.display_name, - input_mapping=step.input_mapping, - ) - record = await self._execute_step_async( - flat_index, proxy, context, flow.name, trace_id - ) + # Run the level's steps — bounded-concurrent when opted in (#344), + # strictly sequential by default. Records come back in declaration + # order in either case; ``context`` is read-only during the level + # (outputs are merged only after it completes), so concurrent input + # resolution is safe. + records = await self._run_dag_level_async(indexed_steps, context, flow.name, trace_id) + + # Process results in declaration order: append to the log, abort on + # the first failure, and reject sibling key collisions — identical + # to sequential execution regardless of the concurrency setting. + for (_, step), record in zip(indexed_steps, records, strict=True): log.append(record) - flat_index += 1 if not record.success: return self._make_result( flow_name=flow.name, @@ -1810,7 +1881,17 @@ async def _execute_dag_flow_async( initial_input=initial_input, ) level_outputs[key] = value - context.update(level_outputs) + # Level-to-level merge honours the flow's collision policy (#337); + # within-level sibling collisions were already rejected above. + merge_step_outputs( + context, + level_outputs, + policy=flow.on_context_collision, + flow_name=flow.name, + step_index=flat_index, + step_name="DAG level", + logger=_logger, + ) if flow.output_schema is not None: validation_record = self._validate_flow_schema( @@ -1844,6 +1925,48 @@ async def _execute_dag_flow_async( initial_input=initial_input, ) + async def _run_dag_level_async( + self, + indexed_steps: list[tuple[int, Any]], + context: dict[str, Any], + flow_name: str, + trace_id: str, + ) -> list[StepRecord]: + """Execute one DAG level's steps, returning records in declaration order. + + With ``max_step_concurrency == 1`` (the default) steps run strictly + sequentially — bit-identical to the historical async DAG path. With a + higher bound the steps are dispatched concurrently under an + :class:`asyncio.Semaphore` (issue #344); :func:`asyncio.gather` + preserves the order of the awaitables, so the returned records are + always in *declaration* order regardless of completion order. + + ``context`` is only read here (input resolution) — level outputs are + merged into it by the caller after the level completes — so concurrent + execution introduces no shared-state writes on the executor's side. + Opted-in tools must themselves be safe to run concurrently. + """ + + async def _run_one(step_index: int, step: Any) -> StepRecord: + proxy = FlowStep( + tool_name=step.display_name, + input_mapping=step.input_mapping, + ) + return await self._execute_step_async(step_index, proxy, context, flow_name, trace_id) + + if self._max_step_concurrency <= 1 or len(indexed_steps) <= 1: + return [await _run_one(idx, step) for idx, step in indexed_steps] + + semaphore = asyncio.Semaphore(self._max_step_concurrency) + + async def _run_bounded(step_index: int, step: Any) -> StepRecord: + async with semaphore: + return await _run_one(step_index, step) + + return list( + await asyncio.gather(*(_run_bounded(idx, step) for idx, step in indexed_steps)) + ) + async def _execute_step_async( self, step_index: int, @@ -2184,16 +2307,16 @@ def stream_flow( collector = _StreamCollectorMiddleware(events) exc_holder: list[BaseException] = [] - # Per-call mutation of the middleware list is the simplest - # correct implementation — see the issue's "boring correct" - # note. FlowExecutor is not documented as thread-safe; users - # who need concurrent stream_flow calls should use distinct - # FlowExecutor instances. - self._middleware.append(collector) + # Register the event collector as *run-scoped* middleware on the worker + # thread (issue #336): it is visible only to this run's execution, so + # concurrent ``stream_flow`` / ``execute_flow`` calls on the same + # executor never receive each other's events. The shared + # ``self._middleware`` list is never mutated. def _worker() -> None: try: - self.execute_flow(flow_name, initial_input, force=force) + with self._scoped_middleware(collector): + self.execute_flow(flow_name, initial_input, force=force) except BaseException as exc: exc_holder.append(exc) finally: @@ -2226,10 +2349,9 @@ def _worker() -> None: flow_name, ) thread.join() - # Defensive cleanup — collector should always still be in - # the list, but remove_middleware swallows ValueError in case - # a user called add_middleware/remove between start and finish. - self.remove_middleware(collector) + # No cleanup needed: the collector lived on the worker thread's + # run-scoped middleware slot (issue #336) and was popped when the + # worker's ``_scoped_middleware`` context exited. # ------------------------------------------------------------------ # Internal helpers @@ -4117,15 +4239,18 @@ def _execute_dag_flow( skipped_ids.add(dep_id) log.extend(level_records) - # Merge all level outputs into context after the level completes. - for key in level_outputs: - if key in context: - _logger.debug( - "DAGFlow '%s': context key '%s' overwritten by level output", - flow.name, - key, - ) - context.update(level_outputs) + # Merge all level outputs into context after the level completes, + # honouring the flow's collision policy (#337). Within-level + # sibling collisions were already rejected above. + merge_step_outputs( + context, + level_outputs, + policy=flow.on_context_collision, + flow_name=flow.name, + step_index=absolute_level_idx, + step_name=f"DAG level {absolute_level_idx}", + logger=_logger, + ) # DAG snapshot at level boundary (issue #128). A resume # restarts from the next un-completed level — within a diff --git a/chainweaver/flow.py b/chainweaver/flow.py index fc1a93a..cc33c0a 100644 --- a/chainweaver/flow.py +++ b/chainweaver/flow.py @@ -57,6 +57,17 @@ class FlowLifecycle(str, Enum): ARCHIVED = "archived" +# Context key-collision policy (issue #337). Governs what happens when a step +# produces an output key that already exists in the accumulated execution +# context (including the initial input). ``"overwrite"`` keeps the historical +# silent last-write-wins behaviour; ``"warn"`` (the default) logs at WARNING +# before overwriting; ``"error"`` aborts the run with a typed +# ``ContextKeyCollisionError`` naming the step and colliding keys. DAG +# *sibling* collisions within one level remain an unconditional error +# regardless of this policy — they are genuinely ambiguous. +ContextCollisionPolicy = Literal["overwrite", "warn", "error"] + + _LIFECYCLE_TRANSITIONS: dict[FlowLifecycle, frozenset[FlowLifecycle]] = { FlowLifecycle.OBSERVED: frozenset({FlowLifecycle.SUGGESTED, FlowLifecycle.IGNORED}), FlowLifecycle.SUGGESTED: frozenset({FlowLifecycle.DRAFT, FlowLifecycle.IGNORED}), @@ -572,6 +583,7 @@ class Flow(BaseModel): capability_id: str | None = None governance: FlowGovernance = Field(default_factory=FlowGovernance) safety: ToolSafetyContract | None = None + on_context_collision: ContextCollisionPolicy = "warn" @staticmethod def schema_ref_from(cls: type[BaseModel]) -> str: @@ -935,6 +947,7 @@ class DAGFlow(BaseModel): capability_id: str | None = None governance: FlowGovernance = Field(default_factory=FlowGovernance) safety: ToolSafetyContract | None = None + on_context_collision: ContextCollisionPolicy = "warn" @staticmethod def schema_ref_from(cls: type[BaseModel]) -> str: diff --git a/chainweaver/registry.py b/chainweaver/registry.py index 5b61017..3e4f07f 100644 --- a/chainweaver/registry.py +++ b/chainweaver/registry.py @@ -27,6 +27,11 @@ AnyFlow = Flow | DAGFlow +# Sentinel distinguishing "argument not supplied" from an explicit ``None`` +# value for :meth:`FlowRegistry.update_flow_state` (``None`` is a meaningful +# value for ``tool_schema_hashes`` — it means "no drift snapshot"). +_UNSET: object = object() + def _parse_version(flow_name: str, version: str) -> Version: """Parse *version* as PEP 440, wrapping `InvalidVersion` in `InvalidFlowVersionError`.""" @@ -191,14 +196,66 @@ def get_active_flows(self) -> list[AnyFlow]: """Shortcut: list only ACTIVE flows.""" return self.list_flows(status=FlowStatus.ACTIVE) + def update_flow_state( + self, + flow_name: str, + *, + version: str | None = None, + status: FlowStatus | None = None, + tool_schema_hashes: dict[str, str] | None | object = _UNSET, + ) -> AnyFlow: + """Transition a flow's mutable state without mutating the stored object (issue #335). + + Flow state transitions (status changes, drift re-snapshots) go through + the registry, which owns persistence. Rather than writing fields on the + ``Flow`` instance — which is a *shared reference* for in-memory stores, + so a write would silently alter the state seen by every other holder + (e.g. a long-running :class:`chainweaver.mcp.FlowServer`) — this method + produces a fresh object via ``model_copy(update=...)``, persists it, and + returns it. Callers that need the new state must use the return value or + re-fetch via :meth:`get_flow`. + + Args: + flow_name: Name of the flow to update. + version: If provided, targets a specific version. Otherwise targets + the latest version. + status: New status. When ``None`` the status is left unchanged. + tool_schema_hashes: New drift snapshot. When omitted the hashes are + left unchanged; pass ``None`` to clear the snapshot explicitly. + + Returns: + The updated flow object (the freshly copied instance now in the + store), or the unchanged stored object when no fields were supplied. + + Raises: + FlowNotFoundError: When no flow with *flow_name* is registered. + """ + flow = self.get_flow(flow_name, version=version) + updates: dict[str, object] = {} + if status is not None: + updates["status"] = status + if tool_schema_hashes is not _UNSET: + updates["tool_schema_hashes"] = tool_schema_hashes + if not updates: + return flow + new_flow = flow.model_copy(update=updates) + # Persist the new object. For ``InMemoryStore`` this swaps the stored + # reference for the copy (the original instance held by other callers is + # left untouched); for ``FileStore`` it re-writes the JSON file. + self._store.save_flow(new_flow, overwrite=True) + self._touch_latest(new_flow.name, new_flow.version) + return new_flow + def set_flow_status( self, flow_name: str, status: FlowStatus, *, version: str | None = None ) -> None: - """Update a flow's status. + """Update a flow's status without mutating the registry-held object. - For in-memory stores the mutation is in-place; for stores that - snapshot on read (e.g. :class:`~chainweaver.storage.FileStore`) the - updated flow is re-saved. + Delegates to :meth:`update_flow_state`, which performs a + copy-on-write transition (issue #335): the stored object is replaced + with an updated copy and persisted, so a shared ``Flow`` reference held + elsewhere is never silently altered. Callers needing the new state must + re-fetch via :meth:`get_flow`. Args: flow_name: Name of the flow to update. @@ -209,12 +266,7 @@ def set_flow_status( Raises: FlowNotFoundError: When no flow with *flow_name* is registered. """ - flow = self.get_flow(flow_name, version=version) - flow.status = status - # Persist the mutation back to the store. For ``InMemoryStore`` the - # save is a no-op identity write (objects are mutated in place); for - # ``FileStore`` it re-writes the JSON file with the new status. - self._store.save_flow(flow, overwrite=True) + self.update_flow_state(flow_name, version=version, status=status) def list_flow_versions(self, name: str) -> list[str]: """Return all registered versions of a flow, sorted ascending. diff --git a/docs/agent-context/architecture.md b/docs/agent-context/architecture.md index 6bb026c..3617591 100644 --- a/docs/agent-context/architecture.md +++ b/docs/agent-context/architecture.md @@ -32,7 +32,8 @@ and tools, the same flow produces the same output every time. | `storage.py` | `RegistryStore` Protocol + `InMemoryStore` (default) + `FileStore` (one JSON file per flow) | Filenames are `{name}@{version}.flow.json`; concurrent multi-process access not coordinated | | `analyzer.py` | `ChainAnalyzer`: offline schema-compatibility analysis — compatibility matrix, chain enumeration, suggested flows (#77) | Pure static pass: no LLM, no network, no randomness; cycle-free DFS bounded by `max_depth` | | `decisions.py` | `DecisionCallback` Protocol + `DecisionContext` + `coerce_decision_callback` (#102) | Pure protocol module — no executor logic, no network, no randomness; the executor depends on it but it does not depend on the executor | -| `executor.py` | Run flows step-by-step (linear) or level-by-level (DAG), validate I/O, merge context, drift detection, invoke `DecisionCallback` at decision points, dispatch capability steps via the `_execute_capability_step` hook. `execute_flow_async` provides the async lane (#80). | **No LLM, no network I/O, no randomness.** No `agent-kernel` / `weaver-spec` / `contextweaver` imports — those live in `integrations/` and reach the executor only via the `DecisionCallback` seam and `KernelBackedExecutor` subclass hook | +| `executor.py` | Run flows step-by-step (linear) or level-by-level (DAG), validate I/O, merge context, drift detection, invoke `DecisionCallback` at decision points, dispatch capability steps via the `_execute_capability_step` hook. `execute_flow_async` provides the async lane (#80) with opt-in DAG-level concurrency (`max_step_concurrency`, #344). Supports concurrent calls per the concurrency contract (#336). | **No LLM, no network I/O, no randomness** — mechanically enforced by `tests/test_executor_import_contract.py` (#354). No `agent-kernel` / `weaver-spec` / `contextweaver` imports — those live in `integrations/` and reach the executor only via the `DecisionCallback` seam and `KernelBackedExecutor` subclass hook | +| `_execution/` | Internal, no-I/O execution collaborators shared by both lanes (#330, #331): `context.merge_step_outputs` is the single context-merge honouring `on_context_collision` (#337). | Same three invariants as `executor.py`; covered by the import-contract check (#354). Private package — nothing here is in top-level `__all__`. | | `integrations/weaver_spec.py` | Re-exports the `weaver-contracts` types (`SelectableItem`, `RoutingDecision`, `CapabilityToken`, …); `flow_to_selectable_item()` exporter; routing resolvers (`make_routing_decision`, `selected_capability_id`, `resolve_flow_from_routing_decision`); `WEAVER_SPEC_VERSION` (#91, #107, #233) | Consumes the published `weaver-contracts` package behind the `[weaver-stack]` extra — guarded import raises a clear `ImportError` without it | | `integrations/contextweaver.py` | `RoutingDecisionAdapter` (`DecisionCallback` impl) + `ContextweaverClient` Protocol + `StaticRoutingClient` (#106) | Translates `RoutingDecision` → tool name; no hard dep on a `contextweaver` SDK | | `integrations/agent_kernel.py` | `KernelBackedExecutor` (FlowExecutor subclass) + `KernelProtocol` + `InMemoryKernel` (#89) | Overrides only `_execute_capability_step`; no LLM, no randomness; kernel side-effects are the kernel's responsibility | @@ -69,6 +70,8 @@ and tools, the same flow produces the same output every time. | `DecisionCallback` Protocol in `chainweaver/decisions.py` (#102) | Single LLM-router / contextual-narrowing extension point. Executor invokes it for steps with `decision_candidates` set; failures wrap as `DecisionCallbackError` and abort the step (no silent fall-through to `tool_name`). | | Consume `weaver-contracts` behind an optional extra (#91, #107, #233) | The weaver-spec contract ships on PyPI as `weaver-contracts`. ChainWeaver consumes its I-03 / I-04 / I-07 dataclasses directly via the optional `[weaver-stack]` extra rather than carrying mirror types — `chainweaver.integrations.weaver_spec` (and the `contextweaver` / `agent_kernel` adapters that import it) guard the import and raise a clear `ImportError` when the extra is absent, so the base install stays dependency-light. | | `KernelBackedExecutor` as a subclass, not a flag (#89) | Subclass overrides only the `_execute_capability_step` hook. Keeps the executor's three invariants (no LLM, no network I/O, no randomness in `executor.py`) intact — kernel side-effects live in `integrations/agent_kernel.py`. | +| Concurrency contract: run-scoped state off the instance (#336) | One `FlowExecutor` supports concurrent runs. Run-scoped state (the `stream_flow` event collector) lives on a `threading.local` slot rather than the shared `self._middleware`, so concurrent runs never cross-talk. Mutating ops (`register_tool`, `add_middleware`, `accept_drift`) are setup-phase-only. | +| State transitions are copy-on-write (#335) | `accept_drift` / `set_flow_status` route through `FlowRegistry.update_flow_state` (`model_copy` + persist) instead of mutating a registry-shared `Flow` in place, so other holders of the object are never silently altered. | | Cycle detection at registration time | Fail fast — no silent deferral to execution. Belt-and-suspenders check also runs in the executor for flows created without registry. | | Branch targets must be direct dependents (#9) | Keeps conditional routing local — a `ConditionalEdge.target_step_id` (or `default_next`) must reference a step that already lists the branching step in its `depends_on`. This makes "skipped" propagation a one-hop computation in the executor and prevents branches from jumping across unrelated subgraphs. Enforced at registration time by `validate_dag_topology`. | | AST-based predicate evaluator (#9) | Predicate strings are parsed with `ast.parse(mode="eval")` and walked against an explicit node allow-list — `eval`/`exec` are **never** called. The grammar deliberately excludes attribute access, function calls, and binary arithmetic so predicates stay routing decisions, not computations (unary `+`/`-` is permitted so signed literals like `n == -1` parse). | diff --git a/docs/agent-context/invariants.md b/docs/agent-context/invariants.md index def8bc6..c16269e 100644 --- a/docs/agent-context/invariants.md +++ b/docs/agent-context/invariants.md @@ -52,6 +52,35 @@ They are non-negotiable. Network I/O and randomness are allowed in **tool functions** — the executor only manages the data flow between tools. +### Automated enforcement (since #354) + +The three hard invariants are mechanically enforced by +`tests/test_executor_import_contract.py`, which runs as part of the normal +`pytest` suite (no separate CI job). It performs a static, AST-based check: + +- **Direct imports** — `executor.py` and every module under + `chainweaver/_execution/` must not import any banned module. The banned set + is the network/LLM/randomness sources (`random`, `secrets`, `socket`, + `http`, `urllib`, `requests`, `httpx`, `aiohttp`, `openai`, `anthropic`) plus + the in-repo modules already marked "banned from executor.py" in the repo map + (`compiler_llm`, `optimizer`, `observer`, `traces`, `lessons`, `service`, + `_offline_llm`). +- **Transitive in-repo reach** — following `chainweaver.*` imports out of the + execution modules, none of the deterministic-execution closure may reach a + banned in-repo module, so a helper cannot smuggle an LLM proposer onto the + execution path indirectly. + +A PR that adds `import random` (or any banned import) to the execution modules +fails this test with a message pointing back at this document. + +**Allowlist:** `uuid` is the single reviewed exception, for the trace-id +carve-out above. A blanket "`random` absent from `sys.modules`" check is +deliberately not used because `flow.py` legitimately imports `random` for the +opt-in jitter carve-out; the contract is therefore scoped to the +*execution-module boundary* (`executor.py` + `_execution/`), which is exactly +where the invariants apply. Expanding the banned list is cheap; keep the +allowlist conservative and document every addition here. + --- ## Package-wide invariants diff --git a/docs/data-integrity.md b/docs/data-integrity.md index 48c0aef..d2b179a 100644 --- a/docs/data-integrity.md +++ b/docs/data-integrity.md @@ -41,13 +41,35 @@ schema. See [Guarantee 5](#guarantee-5-schema-validated-execution-context) and > steps. **Mechanism:** after `validate(output)` succeeds, the validated dict is merged into the -context via `context.update(output_dict)`. The merge is unconditional. Keys present -before the step remain in the context (unless explicitly overwritten by a same-named -output field). +context by the single shared merge helper `chainweaver._execution.merge_step_outputs`, +used by both the linear and DAG lanes (sync and async). Keys present before the step +remain in the context unless overwritten by a same-named output field, in which case the +flow's collision policy applies (see below). **Where it can break:** a downstream step's `input_mapping` chooses not to forward a field. That's by design — selectivity is the caller's prerogative, not a loss event. +### Context key collisions (#337) + +When a step output key already exists in the accumulated context (including an +`initial_input` key), the flow-level `on_context_collision` setting governs what happens. +The same policy applies uniformly to linear and DAG flows and to both execution lanes; +the rule is enforced in one place (`merge_step_outputs`). + +| `on_context_collision` | Behaviour | Use when | +|------------------------|-----------|----------| +| `"overwrite"` | Silent last-write-wins, logged at `DEBUG`. | Intentional refine-in-place pipelines that re-emit a key on purpose. | +| `"warn"` (**default**) | Log at `WARNING`, then overwrite. | You want visibility into accidental shadowing without failing runs. | +| `"error"` | Abort the run with `ContextKeyCollisionError` naming the step and colliding keys. | Strict pipelines where silently dropping earlier data is a bug. | + +DAG **sibling** collisions — two steps in the *same* level producing the same key — +remain an unconditional `FlowExecutionError` regardless of the policy, because there is +no defined ordering between siblings to resolve the conflict. + +`compile_flow` surfaces statically detectable collisions ahead of execution as a +`context_collision` compilation warning (suppressed when the policy is `"overwrite"`), +so authors can catch shadowing at compile time. + --- ## Guarantee 3 — Type safety at every boundary diff --git a/schemas/flow.schema.json b/schemas/flow.schema.json index 3c5d368..0ceefe4 100644 --- a/schemas/flow.schema.json +++ b/schemas/flow.schema.json @@ -74,6 +74,16 @@ "title": "Name", "type": "string" }, + "on_context_collision": { + "default": "warn", + "enum": [ + "overwrite", + "warn", + "error" + ], + "title": "On Context Collision", + "type": "string" + }, "output_schema_ref": { "anyOf": [ { @@ -366,6 +376,16 @@ "title": "Name", "type": "string" }, + "on_context_collision": { + "default": "warn", + "enum": [ + "overwrite", + "warn", + "error" + ], + "title": "On Context Collision", + "type": "string" + }, "output_schema_ref": { "anyOf": [ { diff --git a/tests/fixtures/public_api.json b/tests/fixtures/public_api.json index 21bfa18..24f3344 100644 --- a/tests/fixtures/public_api.json +++ b/tests/fixtures/public_api.json @@ -2,6 +2,7 @@ "__all__": [ "AgentTraceEvent", "AgentTraceImportError", + "AsyncLaneUnsupportedError", "AttestationInputError", "AttestationReport", "BUILTIN_PROPERTIES", @@ -24,6 +25,7 @@ "CompilationResult", "CompilationWarning", "ConditionalEdge", + "ContextKeyCollisionError", "ContribError", "CostProfile", "CostProfileError", @@ -198,6 +200,12 @@ "qualname": "AgentTraceImportError", "signature": "(detail: str, *, source: str | None = None, line: int | None = None) -> None" }, + "AsyncLaneUnsupportedError": { + "kind": "class", + "module": "chainweaver.exceptions", + "qualname": "AsyncLaneUnsupportedError", + "signature": "(flow_name: str, unsupported: list[str]) -> None" + }, "AttestationInputError": { "kind": "class", "module": "chainweaver.attest", @@ -373,6 +381,12 @@ "module": "chainweaver.flow", "qualname": "ConditionalEdge" }, + "ContextKeyCollisionError": { + "kind": "class", + "module": "chainweaver.exceptions", + "qualname": "ContextKeyCollisionError", + "signature": "(flow_name: str, step_index: int, step_name: str, keys: list[str]) -> None" + }, "ContribError": { "kind": "class", "module": "chainweaver.exceptions", @@ -427,6 +441,7 @@ "governance": "chainweaver.flow.FlowGovernance", "input_schema_ref": "str | NoneType", "name": "str", + "on_context_collision": "Literal['overwrite', 'warn', 'error']", "output_schema_ref": "str | NoneType", "safety": "chainweaver.contracts.ToolSafetyContract | NoneType", "status": "chainweaver.flow.FlowStatus", @@ -599,6 +614,7 @@ "governance": "chainweaver.flow.FlowGovernance", "input_schema_ref": "str | NoneType", "name": "str", + "on_context_collision": "Literal['overwrite', 'warn', 'error']", "output_schema_ref": "str | NoneType", "safety": "chainweaver.contracts.ToolSafetyContract | NoneType", "status": "chainweaver.flow.FlowStatus", @@ -679,7 +695,7 @@ "kind": "class", "module": "chainweaver.executor", "qualname": "FlowExecutor", - "signature": "(registry: FlowRegistry, *, cost_profile: CostProfile | None = None, redaction_policy: RedactionPolicy | None = None, trace_recorder: TraceRecorder | None = None, middleware: list[FlowExecutorMiddleware] | None = None, step_cache: StepCache | None = None, checkpointer: Checkpointer | None = None, delete_on_success: bool = True, decision_callback: DecisionCallback | DecisionCallable | None = None, discover_plugins: bool = False, max_composition_depth: int = 10) -> None" + "signature": "(registry: FlowRegistry, *, cost_profile: CostProfile | None = None, redaction_policy: RedactionPolicy | None = None, trace_recorder: TraceRecorder | None = None, middleware: list[FlowExecutorMiddleware] | None = None, step_cache: StepCache | None = None, checkpointer: Checkpointer | None = None, delete_on_success: bool = True, decision_callback: DecisionCallback | DecisionCallable | None = None, discover_plugins: bool = False, max_composition_depth: int = 10, max_step_concurrency: int = 1) -> None" }, "FlowExecutorMiddleware": { "kind": "class", @@ -1356,19 +1372,19 @@ "kind": "function", "module": "chainweaver.serialization", "qualname": "flow_from_dict", - "signature": "(data: dict[str, Any]) -> AnyFlow" + "signature": "(data: dict[str, Any], *, source: str | None = None) -> AnyFlow" }, "flow_from_json": { "kind": "function", "module": "chainweaver.serialization", "qualname": "flow_from_json", - "signature": "(data: str) -> AnyFlow" + "signature": "(data: str, *, source: str | None = None) -> AnyFlow" }, "flow_from_yaml": { "kind": "function", "module": "chainweaver.serialization", "qualname": "flow_from_yaml", - "signature": "(data: str) -> AnyFlow" + "signature": "(data: str, *, source: str | None = None) -> AnyFlow" }, "flow_schema_json": { "kind": "function", diff --git a/tests/test_composition.py b/tests/test_composition.py index 075a65a..7592679 100644 --- a/tests/test_composition.py +++ b/tests/test_composition.py @@ -21,9 +21,9 @@ ) from chainweaver.cost import CostProfile from chainweaver.exceptions import ( + AsyncLaneUnsupportedError, FlowCancelledError, FlowCompositionError, - FlowExecutionError, ) # Upper bound for the deterministic cancel-barrier waits (#244). Generous enough @@ -378,7 +378,7 @@ async def test_async_rejects_subflow_steps(self) -> None: steps=[FlowStep(flow_name="inc", input_mapping={"n": "n"})], ) ) - with pytest.raises(FlowExecutionError, match=r"flow_name.*steps"): + with pytest.raises(AsyncLaneUnsupportedError, match=r"sub-flow"): await executor.execute_flow_async("parent_async", {"n": 1}) diff --git a/tests/test_context_collision.py b/tests/test_context_collision.py new file mode 100644 index 0000000..79fa768 --- /dev/null +++ b/tests/test_context_collision.py @@ -0,0 +1,202 @@ +"""Context key-collision policy across linear and DAG flows (issue #337). + +A step output that overwrites an existing context key is governed by one shared +merge implementation (``chainweaver._execution.merge_step_outputs``) for both +flow kinds and both lanes. The flow-level ``on_context_collision`` setting +selects the behaviour: ``"overwrite"`` (silent last-write-wins), ``"warn"`` +(default; log at WARNING), or ``"error"`` (abort with a typed error). +""" + +from __future__ import annotations + +import logging +from typing import Any + +import pytest +from pydantic import BaseModel + +from chainweaver.compiler import compile_flow +from chainweaver.exceptions import ContextKeyCollisionError +from chainweaver.executor import FlowExecutor +from chainweaver.flow import DAGFlow, DAGFlowStep, Flow, FlowStep +from chainweaver.registry import FlowRegistry +from chainweaver.tools import Tool + +_EXECUTOR_LOGGER = "chainweaver.executor" + + +# --------------------------------------------------------------------------- +# Tools: step 2 re-emits the ``value`` key produced by step 1 -> collision. +# --------------------------------------------------------------------------- + + +class NumIn(BaseModel): + number: int + + +class ValOut(BaseModel): + value: int + + +def _to_value(inp: NumIn) -> dict[str, Any]: + return {"value": inp.number} + + +class ValIn(BaseModel): + value: int + + +def _bump(inp: ValIn) -> dict[str, Any]: + return {"value": inp.value + 1} + + +def _tools() -> list[Tool]: + return [ + Tool( + name="to_value", + description="number -> value", + input_schema=NumIn, + output_schema=ValOut, + fn=_to_value, + ), + Tool( + name="bump", + description="value -> value+1 (re-emits 'value')", + input_schema=ValIn, + output_schema=ValOut, + fn=_bump, + ), + ] + + +def _linear_flow(policy: str) -> Flow: + return Flow( + name="collide_linear", + version="1.0.0", + description="Two steps both producing 'value'.", + steps=[ + FlowStep(tool_name="to_value", input_mapping={"number": "number"}), + FlowStep(tool_name="bump", input_mapping={"value": "value"}), + ], + on_context_collision=policy, # type: ignore[arg-type] + ) + + +def _dag_flow(policy: str) -> DAGFlow: + return DAGFlow( + name="collide_dag", + version="1.0.0", + description="Level 2 re-emits the key produced at level 1.", + steps=[ + DAGFlowStep(step_id="a", tool_name="to_value", input_mapping={"number": "number"}), + DAGFlowStep( + step_id="b", + tool_name="bump", + input_mapping={"value": "value"}, + depends_on=["a"], + ), + ], + on_context_collision=policy, # type: ignore[arg-type] + ) + + +def _executor(flow: Flow | DAGFlow) -> FlowExecutor: + registry = FlowRegistry() + registry.register_flow(flow) + executor = FlowExecutor(registry=registry) + for tool in _tools(): + executor.register_tool(tool) + return executor + + +# --------------------------------------------------------------------------- +# Default + overwrite +# --------------------------------------------------------------------------- + + +def test_default_policy_is_warn() -> None: + assert Flow(name="f", version="1.0.0", description="", steps=[]).on_context_collision == "warn" + assert ( + DAGFlow(name="f", version="1.0.0", description="", steps=[]).on_context_collision == "warn" + ) + + +@pytest.mark.parametrize("make_flow", [_linear_flow, _dag_flow]) +def test_overwrite_is_silent_last_write_wins( + make_flow: Any, caplog: pytest.LogCaptureFixture +) -> None: + executor = _executor(make_flow("overwrite")) + with caplog.at_level(logging.WARNING, logger=_EXECUTOR_LOGGER): + result = executor.execute_flow(make_flow("overwrite").name, {"number": 5}) + assert result.success + assert result.final_output is not None and result.final_output["value"] == 6 + assert not [r for r in caplog.records if "overwritten" in r.message] + + +# --------------------------------------------------------------------------- +# warn +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("make_flow", [_linear_flow, _dag_flow]) +def test_warn_logs_and_overwrites(make_flow: Any, caplog: pytest.LogCaptureFixture) -> None: + flow = make_flow("warn") + executor = _executor(flow) + with caplog.at_level(logging.WARNING, logger=_EXECUTOR_LOGGER): + result = executor.execute_flow(flow.name, {"number": 5}) + assert result.success + assert result.final_output is not None and result.final_output["value"] == 6 + warnings = [ + r for r in caplog.records if "overwritten" in r.message and r.levelno == logging.WARNING + ] + assert warnings, "expected a WARNING-level collision log" + + +# --------------------------------------------------------------------------- +# error +# --------------------------------------------------------------------------- + + +def test_error_policy_aborts_linear_with_typed_error() -> None: + flow = _linear_flow("error") + executor = _executor(flow) + with pytest.raises(ContextKeyCollisionError) as exc_info: + executor.execute_flow(flow.name, {"number": 5}) + assert exc_info.value.keys == ["value"] + assert exc_info.value.step_index == 1 + assert exc_info.value.flow_name == "collide_linear" + + +def test_error_policy_aborts_dag_with_typed_error() -> None: + flow = _dag_flow("error") + executor = _executor(flow) + with pytest.raises(ContextKeyCollisionError) as exc_info: + executor.execute_flow(flow.name, {"number": 5}) + assert exc_info.value.keys == ["value"] + + +async def test_error_policy_aborts_async_linear() -> None: + flow = _linear_flow("error") + executor = _executor(flow) + with pytest.raises(ContextKeyCollisionError): + await executor.execute_flow_async(flow.name, {"number": 5}) + + +# --------------------------------------------------------------------------- +# compile-time static warning +# --------------------------------------------------------------------------- + + +def test_compile_flow_warns_on_static_collision() -> None: + flow = _linear_flow("warn") + tools = {tool.name: tool for tool in _tools()} + result = compile_flow(flow, tools) + collisions = [w for w in result.warnings if w.issue_type == "context_collision"] + assert collisions and collisions[0].field_name == "value" + + +def test_compile_flow_suppresses_collision_warning_under_overwrite() -> None: + flow = _linear_flow("overwrite") + tools = {tool.name: tool for tool in _tools()} + result = compile_flow(flow, tools) + assert not [w for w in result.warnings if w.issue_type == "context_collision"] diff --git a/tests/test_dag_concurrency.py b/tests/test_dag_concurrency.py new file mode 100644 index 0000000..9a253e5 --- /dev/null +++ b/tests/test_dag_concurrency.py @@ -0,0 +1,182 @@ +"""Opt-in concurrent execution of independent DAG steps (issue #344). + +``FlowExecutor(max_step_concurrency=N)`` lets ``execute_flow_async`` dispatch up +to ``N`` independent steps of a DAG level at once. Determinism is preserved: +``StepRecord`` ordering, the merged context, and sibling-collision detection are +identical regardless of the concurrency setting. Tests use an event-gated tool +(no ``sleep``) to prove real overlap deterministically. +""" + +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest +from pydantic import BaseModel, create_model + +from chainweaver.executor import FlowExecutor +from chainweaver.flow import DAGFlow, DAGFlowStep +from chainweaver.registry import FlowRegistry +from chainweaver.tools import Tool + + +class SeedIn(BaseModel): + n: int + + +class SeedOut(BaseModel): + seed: int + + +async def _root_fn(inp: SeedIn) -> dict[str, Any]: + return {"seed": inp.n} + + +class LeafIn(BaseModel): + seed: int + + +def _make_leaf_tool(i: int, gate: _Gate | None = None) -> Tool: + """A leaf tool emitting a distinct output key ``r{i}`` (no sibling clash).""" + fields: dict[str, Any] = {f"r{i}": (int, ...)} + out_model = create_model(f"Leaf{i}Out", **fields) + + async def fn(inp: LeafIn, _i: int = i) -> dict[str, Any]: + if gate is not None: + await gate.arrive() + return {f"r{_i}": inp.seed + _i} + + return Tool( + name=f"leaf{i}", + description=f"leaf {i}", + input_schema=LeafIn, + output_schema=out_model, + fn=fn, + ) + + +def _fan_out_flow(num_leaves: int) -> DAGFlow: + """root -> {leaf0, leaf1, ...} — the leaves form one independent level.""" + steps: list[DAGFlowStep] = [ + DAGFlowStep(step_id="root", tool_name="root", input_mapping={"n": "n"}) + ] + for i in range(num_leaves): + steps.append( + DAGFlowStep( + step_id=f"leaf{i}", + tool_name=f"leaf{i}", + input_mapping={"seed": "seed"}, + depends_on=["root"], + ) + ) + return DAGFlow( + name="fan_out", + version="1.0.0", + description="fan-out DAG", + steps=steps, + ) + + +def _root_tool() -> Tool: + return Tool( + name="root", + description="root", + input_schema=SeedIn, + output_schema=SeedOut, + fn=_root_fn, + ) + + +class _Gate: + """Releases all arrivals only once *n* coroutines have arrived (no sleeps).""" + + def __init__(self, n: int) -> None: + self._n = n + self.count = 0 + self.max_observed = 0 + self._event = asyncio.Event() + + async def arrive(self) -> None: + self.count += 1 + self.max_observed = max(self.max_observed, self.count) + if self.count >= self._n: + self._event.set() + await asyncio.wait_for(self._event.wait(), timeout=5.0) + self.count -= 1 + + +# --------------------------------------------------------------------------- +# Determinism: results invariant across concurrency settings +# --------------------------------------------------------------------------- + + +def _result_fingerprint(result: Any) -> tuple[Any, ...]: + return ( + result.success, + tuple(sorted((result.final_output or {}).items())), + tuple( + (r.tool_name, r.step_index, tuple(sorted((r.outputs or {}).items()))) + for r in result.execution_log + ), + ) + + +async def test_results_invariant_across_concurrency_levels() -> None: + num_leaves = 6 + fingerprints = [] + for concurrency in (1, 2, 4, 6): + registry = FlowRegistry() + registry.register_flow(_fan_out_flow(num_leaves)) + executor = FlowExecutor(registry=registry, max_step_concurrency=concurrency) + executor.register_tool(_root_tool()) + for i in range(num_leaves): + executor.register_tool(_make_leaf_tool(i)) + result = await executor.execute_flow_async("fan_out", {"n": 10}) + assert result.success + fingerprints.append(_result_fingerprint(result)) + + # Every concurrency setting produced an identical ExecutionResult. + assert all(fp == fingerprints[0] for fp in fingerprints) + # And the merged context is the expected fan-out. + expected = {"n": 10, "seed": 10, **{f"r{i}": 10 + i for i in range(num_leaves)}} + registry = FlowRegistry() + registry.register_flow(_fan_out_flow(num_leaves)) + executor = FlowExecutor(registry=registry, max_step_concurrency=4) + executor.register_tool(_root_tool()) + for i in range(num_leaves): + executor.register_tool(_make_leaf_tool(i)) + result = await executor.execute_flow_async("fan_out", {"n": 10}) + assert result.final_output == expected + + +# --------------------------------------------------------------------------- +# Real overlap +# --------------------------------------------------------------------------- + + +async def test_level_steps_run_concurrently_when_opted_in() -> None: + num_leaves = 4 + gate = _Gate(num_leaves) + registry = FlowRegistry() + registry.register_flow(_fan_out_flow(num_leaves)) + executor = FlowExecutor(registry=registry, max_step_concurrency=num_leaves) + executor.register_tool(_root_tool()) + for i in range(num_leaves): + executor.register_tool(_make_leaf_tool(i, gate=gate)) + + # If the leaves ran sequentially, the first would block on the gate forever + # (the others never arrive) and wait_for would time out -> failure. + result = await executor.execute_flow_async("fan_out", {"n": 0}) + assert result.success + assert gate.max_observed == num_leaves + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + + +def test_invalid_concurrency_rejected() -> None: + with pytest.raises(ValueError, match="max_step_concurrency"): + FlowExecutor(registry=FlowRegistry(), max_step_concurrency=0) diff --git a/tests/test_executor_async.py b/tests/test_executor_async.py index 3ed5bdc..e1ec598 100644 --- a/tests/test_executor_async.py +++ b/tests/test_executor_async.py @@ -18,7 +18,7 @@ FlowStep, Tool, ) -from chainweaver.exceptions import FlowExecutionError +from chainweaver.exceptions import AsyncLaneUnsupportedError class _Inp(BaseModel): @@ -201,7 +201,13 @@ async def test_dag_with_async_tools(self) -> None: class TestExecuteFlowAsyncUnsupportedFeatures: """The async lane (v0.1) must fail fast — not silently diverge — on - execution features it does not yet honour (issues #9, #102).""" + execution features it does not yet honour (issues #9, #75, #102). + + Rejection raises a typed :class:`AsyncLaneUnsupportedError` before any step + runs and lists every unsupported construct found in the flow (issue #332). + ``AsyncLaneUnsupportedError`` is a :class:`ChainWeaverError`, so callers can + still catch the whole family with one ``except``. + """ async def test_decision_candidates_rejected(self) -> None: registry = FlowRegistry() @@ -219,7 +225,7 @@ async def test_decision_candidates_rejected(self) -> None: ) registry.register_flow(flow) executor = FlowExecutor(registry=registry) - with pytest.raises(FlowExecutionError, match="decision_candidates"): + with pytest.raises(AsyncLaneUnsupportedError, match="decision_candidates"): await executor.execute_flow_async("decide", {"n": 1}) async def test_conditional_branches_rejected(self) -> None: @@ -245,9 +251,90 @@ async def test_conditional_branches_rejected(self) -> None: ) registry.register_flow(dag) executor = FlowExecutor(registry=registry) - with pytest.raises(FlowExecutionError, match="conditional branches"): + with pytest.raises(AsyncLaneUnsupportedError, match="conditional branches"): await executor.execute_flow_async("branchy", {"n": 1}) + async def test_default_next_rejected(self) -> None: + registry = FlowRegistry() + dag = DAGFlow( + name="routed", + version="1.0.0", + description="", + steps=[ + DAGFlowStep( + step_id="a", + tool_name="async_increment", + input_mapping={"n": "n"}, + # default_next is only valid alongside branches (it is the + # no-branch-matched fallback), so both appear on this step. + branches=[ConditionalEdge(target_step_id="b", predicate="n > 0")], + default_next="b", + ), + DAGFlowStep( + step_id="b", + tool_name="async_double_value", + input_mapping={"value": "value"}, + depends_on=["a"], + ), + ], + ) + registry.register_flow(dag) + executor = FlowExecutor(registry=registry) + with pytest.raises(AsyncLaneUnsupportedError, match="default_next"): + await executor.execute_flow_async("routed", {"n": 1}) + + async def test_subflow_step_rejected(self) -> None: + registry = FlowRegistry() + leaf = Flow( + name="leaf", + version="1.0.0", + description="", + steps=[FlowStep(tool_name="async_increment", input_mapping={"n": "n"})], + ) + parent = Flow( + name="parent", + version="1.0.0", + description="", + steps=[FlowStep(flow_name="leaf", input_mapping={"n": "n"})], + ) + registry.register_flow(leaf) + registry.register_flow(parent) + executor = FlowExecutor(registry=registry) + with pytest.raises(AsyncLaneUnsupportedError, match="sub-flow"): + await executor.execute_flow_async("parent", {"n": 1}) + + async def test_error_lists_all_unsupported_constructs(self) -> None: + registry = FlowRegistry() + flow = Flow( + name="multi", + version="1.0.0", + description="", + steps=[ + FlowStep( + tool_name="async_increment", + input_mapping={"n": "n"}, + decision_candidates=["async_increment", "async_double_value"], + ), + FlowStep(flow_name="leaf", input_mapping={"n": "n"}), + ], + ) + leaf = Flow( + name="leaf", + version="1.0.0", + description="", + steps=[FlowStep(tool_name="async_increment", input_mapping={"n": "n"})], + ) + registry.register_flow(leaf) + registry.register_flow(flow) + executor = FlowExecutor(registry=registry) + with pytest.raises(AsyncLaneUnsupportedError) as exc_info: + await executor.execute_flow_async("multi", {"n": 1}) + # Both unsupported constructs are reported in one error, before any step. + assert len(exc_info.value.unsupported) == 2 + message = str(exc_info.value) + assert "decision_candidates" in message + assert "sub-flow" in message + class TestExecuteFlowAsyncFallback: async def test_fallback_marks_record(self) -> None: diff --git a/tests/test_executor_concurrency.py b/tests/test_executor_concurrency.py new file mode 100644 index 0000000..345bbc8 --- /dev/null +++ b/tests/test_executor_concurrency.py @@ -0,0 +1,211 @@ +"""Concurrency contract for FlowExecutor and the in-memory backends (issue #336). + +A single :class:`FlowExecutor` instance supports concurrent ``execute_flow`` and +``stream_flow`` calls: run-scoped state lives per-thread, so concurrent runs +never observe each other's lifecycle events or version markers. The bundled +in-memory cache and checkpointer are internally locked. + +Tests synchronize threads with :class:`threading.Barrier` to force maximal +interleaving — no ``sleep`` calls — so they are deterministic across the CI +matrix. +""" + +from __future__ import annotations + +import threading +from datetime import datetime, timezone +from typing import Any + +from pydantic import BaseModel + +from chainweaver.cache import InMemoryStepCache, StepCacheKey +from chainweaver.checkpoint import ExecutionSnapshot, InMemoryCheckpointer +from chainweaver.events import FlowEvent +from chainweaver.executor import FlowExecutor +from chainweaver.flow import Flow, FlowStep +from chainweaver.registry import FlowRegistry +from chainweaver.tools import Tool + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class NumIn(BaseModel): + number: int + + +class NumOut(BaseModel): + value: int + + +def _double_fn(inp: NumIn) -> dict[str, Any]: + return {"value": inp.number * 2} + + +def _build_executor() -> FlowExecutor: + registry = FlowRegistry() + registry.register_flow( + Flow( + name="double_flow", + version="1.2.3", + description="Doubles a number.", + steps=[FlowStep(tool_name="double", input_mapping={"number": "number"})], + ) + ) + executor = FlowExecutor(registry=registry) + executor.register_tool( + Tool( + name="double", + description="Doubles a number.", + input_schema=NumIn, + output_schema=NumOut, + fn=_double_fn, + ) + ) + return executor + + +# --------------------------------------------------------------------------- +# Executor-level concurrency +# --------------------------------------------------------------------------- + + +def test_concurrent_stream_flow_has_no_event_crosstalk() -> None: + executor = _build_executor() + num_runs = 8 + barrier = threading.Barrier(num_runs) + collected: dict[int, list[FlowEvent]] = {} + errors: list[BaseException] = [] + + def run(i: int) -> None: + try: + barrier.wait() + collected[i] = list(executor.stream_flow("double_flow", {"number": i})) + except BaseException as exc: + errors.append(exc) + + threads = [threading.Thread(target=run, args=(i,)) for i in range(num_runs)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert not errors, errors + assert len(collected) == num_runs + + seen_trace_ids: set[str] = set() + for i, events in collected.items(): + # Every event in a run shares one trace id — no foreign events leaked in. + trace_ids = {event.trace_id for event in events} + assert len(trace_ids) == 1, f"run {i} saw cross-talk: {trace_ids}" + seen_trace_ids |= trace_ids + + kinds = [event.kind for event in events] + assert kinds[0] == "flow_start" + assert kinds[-1] == "flow_end" + assert "step_start" in kinds and "step_end" in kinds + + start = next(event for event in events if event.kind == "flow_start") + assert start.initial_input == {"number": i} + assert start.flow_version == "1.2.3" + + # Distinct runs minted distinct trace ids. + assert len(seen_trace_ids) == num_runs + + +def test_concurrent_execute_flow_returns_correct_per_run_results() -> None: + executor = _build_executor() + num_runs = 16 + barrier = threading.Barrier(num_runs) + results: dict[int, Any] = {} + errors: list[BaseException] = [] + + def run(i: int) -> None: + try: + barrier.wait() + results[i] = executor.execute_flow("double_flow", {"number": i}) + except BaseException as exc: + errors.append(exc) + + threads = [threading.Thread(target=run, args=(i,)) for i in range(num_runs)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert not errors, errors + assert len(results) == num_runs + trace_ids = {result.trace_id for result in results.values()} + assert len(trace_ids) == num_runs # one unique trace per run + for i, result in results.items(): + assert result.success + assert result.final_output == {"number": i, "value": i * 2} + assert result.flow_version == "1.2.3" + + +# --------------------------------------------------------------------------- +# In-memory backend concurrency +# --------------------------------------------------------------------------- + + +def test_in_memory_step_cache_concurrent_get_set() -> None: + cache = InMemoryStepCache() + num = 64 + keys = [ + StepCacheKey(tool_name=f"tool{i}", schema_hash="h", input_value_hash=str(i)) + for i in range(num) + ] + barrier = threading.Barrier(num) + errors: list[BaseException] = [] + + def worker(i: int) -> None: + try: + barrier.wait() + cache.set(keys[i], {"v": i}) + assert cache.get(keys[i]) == {"v": i} + except BaseException as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(num)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert not errors, errors + assert len(cache) == num + + +def test_in_memory_checkpointer_concurrent_save_load() -> None: + checkpointer = InMemoryCheckpointer() + num = 64 + barrier = threading.Barrier(num) + errors: list[BaseException] = [] + + def worker(i: int) -> None: + try: + snapshot = ExecutionSnapshot( + trace_id=f"trace{i}", + flow_name="f", + flow_version="1.0.0", + initial_input={"number": i}, + started_at=datetime.now(timezone.utc), + context={"value": i}, + completed_steps=1, + ) + barrier.wait() + checkpointer.save(snapshot) + loaded = checkpointer.load(f"trace{i}") + assert loaded is not None and loaded.context == {"value": i} + except BaseException as exc: + errors.append(exc) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(num)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert not errors, errors + assert len(checkpointer) == num diff --git a/tests/test_executor_import_contract.py b/tests/test_executor_import_contract.py new file mode 100644 index 0000000..a74353e --- /dev/null +++ b/tests/test_executor_import_contract.py @@ -0,0 +1,207 @@ +"""Import-contract enforcement for the executor's determinism invariants (issue #354). + +The three hard executor invariants — no LLM/AI client calls, no network I/O, +and no randomness in the execution path — are documented in AGENTS.md §4 and +``docs/agent-context/invariants.md``. Documentation-enforced invariants erode +as the contributor base (human and automated) grows. This module gives them a +mechanical, *static* CI check so a regression fails ``pytest`` with a message +pointing at the invariants doc. + +The check has two layers: + +1. **Direct imports** — the execution modules (``executor.py`` plus everything + under the ``chainweaver/_execution`` package) must not import any banned + module, except for an explicit, reviewed allowlist. +2. **Transitive in-repo reach** — following ``chainweaver.*`` imports from the + execution modules, none of the deterministic-execution closure may reach a + banned in-repo source of nondeterminism / LLM behavior (``compiler_llm``, + ``optimizer``, ``observer``, ``traces``, ``lessons``, ``service``, + ``_offline_llm``). + +A blanket "``random`` must be absent from ``sys.modules``" check is deliberately +*not* used: ``flow.py`` legitimately imports :mod:`random` for opt-in +``RetryPolicy`` backoff jitter (the jitter carve-out in invariants.md), and +``flow.py`` is a model-layer dependency of the executor. The invariant is +enforced at the *execution-module boundary* — ``executor.py`` and +``_execution/`` themselves never import :mod:`random` — which is exactly what +this contract verifies. +""" + +from __future__ import annotations + +import ast +from collections import deque +from pathlib import Path + +import chainweaver + +# --- Banned imports --------------------------------------------------------- +# Standard-library and third-party modules that would breach the executor's +# "no network I/O / no randomness / no LLM client" invariants if imported by +# the execution path. Matched against the *root* package of every import. +BANNED_EXTERNAL = frozenset( + { + "random", + "secrets", + "socket", + "http", + "urllib", + "requests", + "httpx", + "aiohttp", + "openai", + "anthropic", + } +) + +# In-repo modules that are explicitly "banned from executor.py" in the repo map +# (AGENTS.md): build-time LLM proposers, the live observer/trace recorders, and +# the long-running service layer. These are sources of LLM behavior or runtime +# I/O that must never be reachable from the deterministic execution path. +BANNED_INREPO = frozenset( + { + "chainweaver.compiler_llm", + "chainweaver.optimizer", + "chainweaver.observer", + "chainweaver.traces", + "chainweaver.lessons", + "chainweaver.service", + "chainweaver._offline_llm", + } +) + +# Reviewed, deliberate exceptions to ``BANNED_EXTERNAL`` for the execution +# modules. ``uuid`` mints opaque trace-correlation ids only (the trace-id +# carve-out in invariants.md); it never influences which tools run or any value +# passed between them. Keep this list conservative — every entry needs a +# documented rationale in invariants.md. +ALLOWED_EXTERNAL = frozenset({"uuid"}) + +_PKG_ROOT = Path(chainweaver.__file__).parent +_INVARIANTS_DOC = "docs/agent-context/invariants.md" + + +def _execution_module_paths() -> list[Path]: + """Return the source files that make up the deterministic execution path.""" + paths = [_PKG_ROOT / "executor.py"] + execution_pkg = _PKG_ROOT / "_execution" + if execution_pkg.is_dir(): + paths.extend(sorted(execution_pkg.rglob("*.py"))) + return paths + + +def _module_to_path(module: str) -> Path | None: + """Map a dotted ``chainweaver.*`` module name to its source file, if present. + + Resolves purely by path so it never imports (and therefore never runs) + package ``__init__`` side effects. + """ + parts = module.split(".") + if parts[0] != "chainweaver": + return None + rel = parts[1:] + if not rel: + return _PKG_ROOT / "__init__.py" + as_module = _PKG_ROOT.joinpath(*rel).with_suffix(".py") + if as_module.is_file(): + return as_module + as_package = _PKG_ROOT.joinpath(*rel) / "__init__.py" + if as_package.is_file(): + return as_package + return None + + +def _collect_imports(path: Path) -> tuple[set[str], set[str]]: + """Return ``(external_roots, inrepo_modules)`` imported by *path*. + + ``external_roots`` are the top-level names of every non-``chainweaver`` + import; ``inrepo_modules`` are the fully dotted ``chainweaver.*`` targets. + Relative imports (``from . import x``) are normalized against the file's + package so they count as in-repo. + """ + tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) + external: set[str] = set() + inrepo: set[str] = set() + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + root = alias.name.split(".")[0] + if root == "chainweaver": + inrepo.add(alias.name) + else: + external.add(root) + elif isinstance(node, ast.ImportFrom): + if node.level and node.level > 0: + # Relative import inside the chainweaver package -> in-repo. + if node.module: + inrepo.add(f"chainweaver.{node.module}") + continue + if node.module is None: + continue + root = node.module.split(".")[0] + if root == "chainweaver": + inrepo.add(node.module) + else: + external.add(root) + return external, inrepo + + +def test_execution_modules_have_no_banned_direct_imports() -> None: + """``executor.py`` and ``_execution/*`` import nothing on the banned lists.""" + violations: list[str] = [] + for path in _execution_module_paths(): + external, inrepo = _collect_imports(path) + rel = path.relative_to(_PKG_ROOT.parent) + for name in sorted((external & BANNED_EXTERNAL) - ALLOWED_EXTERNAL): + violations.append(f"{rel}: banned external import '{name}'") + for name in sorted(inrepo & BANNED_INREPO): + violations.append(f"{rel}: banned in-repo import '{name}'") + assert not violations, ( + "Execution-path determinism invariants violated (see " + f"{_INVARIANTS_DOC}):\n " + "\n ".join(violations) + ) + + +def test_execution_closure_never_reaches_banned_inrepo_modules() -> None: + """No ``chainweaver.*`` module reachable from the execution path is banned. + + Walks the in-repo import graph starting from the execution modules so a + helper cannot smuggle an LLM proposer / observer / service module onto the + deterministic path indirectly. + """ + seen: set[str] = set() + queue: deque[str] = deque() + for path in _execution_module_paths(): + _, inrepo = _collect_imports(path) + queue.extend(inrepo) + + offending: list[str] = [] + while queue: + module = queue.popleft() + if module in seen: + continue + seen.add(module) + if module in BANNED_INREPO: + offending.append(module) + continue # Don't descend into a banned module. + module_path = _module_to_path(module) + if module_path is None: + continue + _, inrepo = _collect_imports(module_path) + queue.extend(inrepo - seen) + + assert not offending, ( + "Banned in-repo modules are reachable from the deterministic execution " + f"path (see {_INVARIANTS_DOC}):\n " + "\n ".join(sorted(offending)) + ) + + +def test_banned_lists_are_documented_and_consistent() -> None: + """Guard against an empty allowlist/banned-list regression.""" + assert BANNED_EXTERNAL, "BANNED_EXTERNAL must not be empty" + assert BANNED_INREPO, "BANNED_INREPO must not be empty" + # The allowlist must not silently re-permit a banned module. + assert not (ALLOWED_EXTERNAL & BANNED_EXTERNAL), ( + "ALLOWED_EXTERNAL overlaps BANNED_EXTERNAL — an allowlisted module is " + "also banned, which would defeat the contract." + ) diff --git a/tests/test_flow_state_transitions.py b/tests/test_flow_state_transitions.py new file mode 100644 index 0000000..2744708 --- /dev/null +++ b/tests/test_flow_state_transitions.py @@ -0,0 +1,161 @@ +"""Copy-on-write flow state transitions (issue #335). + +``accept_drift`` and ``set_flow_status`` must never mutate a ``Flow`` retrieved +from the registry in place — that object is a shared reference for in-memory +stores, so an in-place write would silently alter the state observed by every +other holder (e.g. a long-running ``FlowServer`` or a second executor sharing +the registry). Transitions go through ``FlowRegistry.update_flow_state``, which +replaces the stored object with an updated copy. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from pydantic import BaseModel + +from chainweaver.executor import FlowExecutor +from chainweaver.flow import Flow, FlowStatus, FlowStep +from chainweaver.registry import FlowRegistry +from chainweaver.storage import FileStore +from chainweaver.tools import Tool + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class NumIn(BaseModel): + number: int + + +class NumOut(BaseModel): + value: int + + +def _double_fn(inp: NumIn) -> dict[str, Any]: + return {"value": inp.number * 2} + + +def _make_tool() -> Tool: + return Tool( + name="double", + description="Doubles a number.", + input_schema=NumIn, + output_schema=NumOut, + fn=_double_fn, + ) + + +def _make_flow( + *, + status: FlowStatus = FlowStatus.NEEDS_REVIEW, + tool_schema_hashes: dict[str, str] | None = None, +) -> Flow: + return Flow( + name="state_flow", + version="0.1.0", + description="A flow used for state-transition tests.", + steps=[FlowStep(tool_name="double", input_mapping={"number": "number"})], + status=status, + tool_schema_hashes=tool_schema_hashes, + ) + + +# --------------------------------------------------------------------------- +# Shared-reference safety +# --------------------------------------------------------------------------- + + +def test_accept_drift_does_not_mutate_shared_flow() -> None: + registry = FlowRegistry() + registry.register_flow( + _make_flow(status=FlowStatus.NEEDS_REVIEW, tool_schema_hashes={"double": "stale-hash"}) + ) + executor = FlowExecutor(registry=registry) + tool = _make_tool() + executor.register_tool(tool) + + original = registry.get_flow("state_flow") + + executor.accept_drift("state_flow") + + # The instance captured before the transition is untouched. + assert original.status is FlowStatus.NEEDS_REVIEW + assert original.tool_schema_hashes == {"double": "stale-hash"} + + # The registry now returns the updated state on a fresh object. + updated = registry.get_flow("state_flow") + assert updated is not original + assert updated.status is FlowStatus.ACTIVE + assert updated.tool_schema_hashes == {"double": tool.schema_hash} + + +def test_set_flow_status_does_not_mutate_shared_flow() -> None: + registry = FlowRegistry() + registry.register_flow(_make_flow(status=FlowStatus.ACTIVE)) + + original = registry.get_flow("state_flow") + registry.set_flow_status("state_flow", FlowStatus.DISABLED) + + assert original.status is FlowStatus.ACTIVE # unchanged + assert registry.get_flow("state_flow").status is FlowStatus.DISABLED + + +def test_two_executors_sharing_one_registry_observe_consistent_state() -> None: + registry = FlowRegistry() + registry.register_flow( + _make_flow(status=FlowStatus.NEEDS_REVIEW, tool_schema_hashes={"double": "stale-hash"}) + ) + executor_a = FlowExecutor(registry=registry) + executor_b = FlowExecutor(registry=registry) + executor_a.register_tool(_make_tool()) + + executor_a.accept_drift("state_flow") + + # The second executor, reading through the same registry, sees the + # intentional updated state — not a stale in-memory snapshot. + assert executor_b.registry.get_flow("state_flow").status is FlowStatus.ACTIVE + + +# --------------------------------------------------------------------------- +# Persistence +# --------------------------------------------------------------------------- + + +def test_update_flow_state_persists_to_filestore(tmp_path: Path) -> None: + store = FileStore(tmp_path) + registry = FlowRegistry(store=store) + registry.register_flow(_make_flow(status=FlowStatus.NEEDS_REVIEW)) + + registry.set_flow_status("state_flow", FlowStatus.ACTIVE) + + # A fresh registry reading the same directory reflects the transition. + reloaded = FlowRegistry(store=FileStore(tmp_path)) + assert reloaded.get_flow("state_flow").status is FlowStatus.ACTIVE + + +# --------------------------------------------------------------------------- +# update_flow_state semantics +# --------------------------------------------------------------------------- + + +def test_update_flow_state_no_args_returns_stored_object() -> None: + registry = FlowRegistry() + registry.register_flow(_make_flow(status=FlowStatus.ACTIVE)) + stored = registry.get_flow("state_flow") + + result = registry.update_flow_state("state_flow") + + assert result is stored # no copy made when nothing changes + + +def test_update_flow_state_can_clear_hashes_explicitly() -> None: + registry = FlowRegistry() + registry.register_flow(_make_flow(tool_schema_hashes={"double": "h"})) + + updated = registry.update_flow_state("state_flow", tool_schema_hashes=None) + + assert updated.tool_schema_hashes is None + assert registry.get_flow("state_flow").tool_schema_hashes is None From 5d75caba2e5fff0c96dd2f6e6d79e0e065027739 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Jun 2026 13:46:37 +0000 Subject: [PATCH 2/5] perf: keep run-scoped middleware off the AttributeError path (#336) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The benchmark alert on PR #384 flagged compiled_overhead_ms ~2x. Cause: the per-thread run-scoped middleware slot was read with `getattr(self._local, "middleware", None)`, which raised+caught an AttributeError on every hook fire (~12 per flow) when no run-scoped middleware was set — the common case. Subclass threading.local so `middleware` defaults to an empty list per thread, making `_fire_hook` a plain attribute read. Micro-benchmark of a 5-step flow returns to ~0.17 ms/call (in line with the pre-change baseline). Behaviour unchanged; full suite green. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS --- chainweaver/executor.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/chainweaver/executor.py b/chainweaver/executor.py index 9d2c30b..e64e510 100644 --- a/chainweaver/executor.py +++ b/chainweaver/executor.py @@ -113,6 +113,18 @@ class _StreamSentinel: _STREAM_SENTINEL: _StreamSentinel = _StreamSentinel() +class _RunScopedState(threading.local): + """Per-thread run-scoped executor state (issue #336). + + Subclassing :class:`threading.local` so ``middleware`` defaults to an empty + list on every thread keeps the hot ``_fire_hook`` path a plain attribute + read — no ``getattr`` default and no per-call ``AttributeError`` round-trip. + """ + + def __init__(self) -> None: + self.middleware: list[FlowExecutorMiddleware] = [] + + class _StreamCollectorMiddleware(BaseMiddleware): """Per-call middleware that pushes lifecycle events onto a queue. @@ -603,7 +615,7 @@ def __init__( # executor never dispatch each other's events. Keyed by thread because # each run executes within a single thread (the calling thread, or the # stream worker thread). - self._local = threading.local() + self._local = _RunScopedState() # Guided decision-point callback (issue #102). Wraps a bare # callable in an adapter so the executor can call # ``self._decision_callback.decide(ctx)`` uniformly regardless @@ -722,7 +734,7 @@ def _fire_hook( Hooks that raise are logged at ``WARNING`` and the iteration continues — middleware bugs never abort a flow. """ - run_scoped = getattr(self._local, "middleware", None) + run_scoped = self._local.middleware chain = self._middleware if not run_scoped else [*self._middleware, *run_scoped] for idx, mw in enumerate(chain): handler = getattr(mw, hook, None) @@ -747,7 +759,7 @@ def _scoped_middleware(self, middleware: FlowExecutorMiddleware) -> Iterator[Non than the shared ``self._middleware`` list, so concurrent runs on one executor (issue #336) never see each other's run-scoped observers. """ - existing: list[FlowExecutorMiddleware] = getattr(self._local, "middleware", []) + existing = self._local.middleware self._local.middleware = [*existing, middleware] try: yield From af4c14b1bdee792dfedea23f37cec20084e03c75 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Jun 2026 13:52:17 +0000 Subject: [PATCH 3/5] fix: address PR #384 review (collision diagnostics, import-contract, carve-out) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three Copilot review fixes: - executor.py (#337): async DAG level-to-level merge reported `flat_index` (already advanced past the level) and a generic "DAG level" name. Use the level index + "DAG level {idx}", matching the sync path, so a ContextKeyCollisionError / WARNING names the right level. Add an async-DAG collision regression test. - test_executor_import_contract.py (#354): resolve relative imports against the file's own package (e.g. `chainweaver/_execution/foo.py` doing `from .bar import x` now records `chainweaver._execution.bar`, not `chainweaver.bar`), so the transitive-closure walk can't skip reachable in-repo modules. - test_executor_import_contract.py (#354): the `uuid` carve-out allowlist was a no-op (uuid was never banned). Enforce it positively — assert reviewed carve-outs stay OFF the banned list — and drop the dead subtraction; update invariants.md wording. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS --- chainweaver/executor.py | 8 ++-- docs/agent-context/invariants.md | 17 ++++--- tests/test_context_collision.py | 12 +++++ tests/test_executor_import_contract.py | 62 +++++++++++++++++++------- 4 files changed, 72 insertions(+), 27 deletions(-) diff --git a/chainweaver/executor.py b/chainweaver/executor.py index e64e510..d3f6ff8 100644 --- a/chainweaver/executor.py +++ b/chainweaver/executor.py @@ -1783,7 +1783,7 @@ async def _execute_dag_flow_async( flat_index = 0 levels = self._compute_dag_levels(flow) - for level_steps in levels: + for level_idx, level_steps in enumerate(levels): # Cooperative cancellation between topological levels (issue #142). self._check_cancellation( flow_name=flow.name, @@ -1895,13 +1895,15 @@ async def _execute_dag_flow_async( level_outputs[key] = value # Level-to-level merge honours the flow's collision policy (#337); # within-level sibling collisions were already rejected above. + # Report the level index (not ``flat_index``, which has advanced past + # the level's steps) so collision diagnostics name the right level. merge_step_outputs( context, level_outputs, policy=flow.on_context_collision, flow_name=flow.name, - step_index=flat_index, - step_name="DAG level", + step_index=level_idx, + step_name=f"DAG level {level_idx}", logger=_logger, ) diff --git a/docs/agent-context/invariants.md b/docs/agent-context/invariants.md index c16269e..6a98068 100644 --- a/docs/agent-context/invariants.md +++ b/docs/agent-context/invariants.md @@ -73,13 +73,16 @@ The three hard invariants are mechanically enforced by A PR that adds `import random` (or any banned import) to the execution modules fails this test with a message pointing back at this document. -**Allowlist:** `uuid` is the single reviewed exception, for the trace-id -carve-out above. A blanket "`random` absent from `sys.modules`" check is -deliberately not used because `flow.py` legitimately imports `random` for the -opt-in jitter carve-out; the contract is therefore scoped to the -*execution-module boundary* (`executor.py` + `_execution/`), which is exactly -where the invariants apply. Expanding the banned list is cheap; keep the -allowlist conservative and document every addition here. +**Carve-outs:** `uuid` is the single reviewed exception, for the trace-id +carve-out above. It is kept deliberately *off* the banned list (rather than +banned-then-re-permitted); the contract test asserts reviewed carve-outs stay +unbanned, so banning one later trips the test and forces a conscious review. A +blanket "`random` absent from `sys.modules`" check is deliberately not used +because `flow.py` legitimately imports `random` for the opt-in jitter carve-out; +the contract is therefore scoped to the *execution-module boundary* +(`executor.py` + `_execution/`), which is exactly where the invariants apply. +Expanding the banned list is cheap; keep carve-outs conservative and document +every addition here. --- diff --git a/tests/test_context_collision.py b/tests/test_context_collision.py index 79fa768..2ee04ba 100644 --- a/tests/test_context_collision.py +++ b/tests/test_context_collision.py @@ -182,6 +182,18 @@ async def test_error_policy_aborts_async_linear() -> None: await executor.execute_flow_async(flow.name, {"number": 5}) +async def test_error_policy_async_dag_reports_correct_level() -> None: + # The level-to-level merge must name the colliding *level* (level 1 here), + # not the post-incremented flat step counter. + flow = _dag_flow("error") + executor = _executor(flow) + with pytest.raises(ContextKeyCollisionError) as exc_info: + await executor.execute_flow_async(flow.name, {"number": 5}) + assert exc_info.value.keys == ["value"] + assert exc_info.value.step_index == 1 + assert "DAG level" in exc_info.value.step_name + + # --------------------------------------------------------------------------- # compile-time static warning # --------------------------------------------------------------------------- diff --git a/tests/test_executor_import_contract.py b/tests/test_executor_import_contract.py index a74353e..d81a463 100644 --- a/tests/test_executor_import_contract.py +++ b/tests/test_executor_import_contract.py @@ -11,7 +11,9 @@ 1. **Direct imports** — the execution modules (``executor.py`` plus everything under the ``chainweaver/_execution`` package) must not import any banned - module, except for an explicit, reviewed allowlist. + module. Entropy/IO-adjacent names the executor legitimately needs (``uuid`` + for trace ids) are reviewed carve-outs kept deliberately *off* the banned + list rather than re-permitted after the fact. 2. **Transitive in-repo reach** — following ``chainweaver.*`` imports from the execution modules, none of the deterministic-execution closure may reach a banned in-repo source of nondeterminism / LLM behavior (``compiler_llm``, @@ -70,11 +72,13 @@ } ) -# Reviewed, deliberate exceptions to ``BANNED_EXTERNAL`` for the execution -# modules. ``uuid`` mints opaque trace-correlation ids only (the trace-id -# carve-out in invariants.md); it never influences which tools run or any value -# passed between them. Keep this list conservative — every entry needs a -# documented rationale in invariants.md. +# Reviewed external carve-outs: modules the execution path is deliberately +# allowed to use and which must therefore stay OFF ``BANNED_EXTERNAL``. ``uuid`` +# mints opaque trace-correlation ids only (the trace-id carve-out in +# invariants.md); it never influences which tools run or any value passed +# between them. ``test_banned_lists_are_documented_and_consistent`` asserts +# these names are not banned, so banning one later trips the test and forces a +# conscious review rather than silently breaking a legitimate import. ALLOWED_EXTERNAL = frozenset({"uuid"}) _PKG_ROOT = Path(chainweaver.__file__).parent @@ -111,15 +115,32 @@ def _module_to_path(module: str) -> Path | None: return None +def _package_parts(path: Path) -> list[str]: + """Return the dotted package the module at *path* lives in, as a parts list. + + ``chainweaver/_execution/context.py`` -> ``["chainweaver", "_execution"]``; + ``chainweaver/_execution/__init__.py`` -> ``["chainweaver", "_execution"]``; + ``chainweaver/executor.py`` -> ``["chainweaver"]``. + """ + parts = list(path.relative_to(_PKG_ROOT.parent).with_suffix("").parts) + if parts and parts[-1] == "__init__": + return parts[:-1] + return parts[:-1] + + def _collect_imports(path: Path) -> tuple[set[str], set[str]]: """Return ``(external_roots, inrepo_modules)`` imported by *path*. ``external_roots`` are the top-level names of every non-``chainweaver`` import; ``inrepo_modules`` are the fully dotted ``chainweaver.*`` targets. - Relative imports (``from . import x``) are normalized against the file's - package so they count as in-repo. + Relative imports (``from . import x`` / ``from .bar import y``) are resolved + against the file's own package — dropping ``node.level - 1`` trailing + components for each extra leading dot — so they record the correct dotted + module (e.g. ``chainweaver/_execution/foo.py`` doing ``from .bar import x`` + records ``chainweaver._execution.bar``, not ``chainweaver.bar``). """ tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) + pkg_parts = _package_parts(path) external: set[str] = set() inrepo: set[str] = set() for node in ast.walk(tree): @@ -132,9 +153,12 @@ def _collect_imports(path: Path) -> tuple[set[str], set[str]]: external.add(root) elif isinstance(node, ast.ImportFrom): if node.level and node.level > 0: - # Relative import inside the chainweaver package -> in-repo. - if node.module: - inrepo.add(f"chainweaver.{node.module}") + # Resolve relative to the file's package: each leading dot beyond + # the first strips one trailing package component. + base = pkg_parts[: len(pkg_parts) - (node.level - 1)] + target = [*base, *(node.module.split(".") if node.module else [])] + if target and target[0] == "chainweaver": + inrepo.add(".".join(target)) continue if node.module is None: continue @@ -152,7 +176,7 @@ def test_execution_modules_have_no_banned_direct_imports() -> None: for path in _execution_module_paths(): external, inrepo = _collect_imports(path) rel = path.relative_to(_PKG_ROOT.parent) - for name in sorted((external & BANNED_EXTERNAL) - ALLOWED_EXTERNAL): + for name in sorted(external & BANNED_EXTERNAL): violations.append(f"{rel}: banned external import '{name}'") for name in sorted(inrepo & BANNED_INREPO): violations.append(f"{rel}: banned in-repo import '{name}'") @@ -197,11 +221,15 @@ def test_execution_closure_never_reaches_banned_inrepo_modules() -> None: def test_banned_lists_are_documented_and_consistent() -> None: - """Guard against an empty allowlist/banned-list regression.""" + """Guard the banned/carve-out lists against regressions.""" assert BANNED_EXTERNAL, "BANNED_EXTERNAL must not be empty" assert BANNED_INREPO, "BANNED_INREPO must not be empty" - # The allowlist must not silently re-permit a banned module. - assert not (ALLOWED_EXTERNAL & BANNED_EXTERNAL), ( - "ALLOWED_EXTERNAL overlaps BANNED_EXTERNAL — an allowlisted module is " - "also banned, which would defeat the contract." + assert ALLOWED_EXTERNAL, "ALLOWED_EXTERNAL must document the reviewed carve-outs" + # Enforce the carve-out policy: a reviewed exception (e.g. ``uuid``) must stay + # OFF the banned list. If a future change bans one of these, this fails and + # forces a conscious review instead of silently breaking a legitimate import. + overlap = ALLOWED_EXTERNAL & BANNED_EXTERNAL + assert not overlap, ( + f"Reviewed carve-out(s) {sorted(overlap)} are also in BANNED_EXTERNAL; " + f"a carve-out must not be banned (see {_INVARIANTS_DOC})." ) From 819405fe334aeaffaef753debd500baa882630ae Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Jun 2026 13:55:47 +0000 Subject: [PATCH 4/5] fix: move per-run executor markers to thread-local (#336) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses PR #384 review: the concurrency-contract docstring claimed run-scoped state is per-thread, but `_active_flow_version`, `_in_replay`, and `_resume_snapshot` were still instance attributes read during execution, so concurrent runs of different flow versions could stamp the wrong `ExecutionResult.flow_version` or bypass the step cache. Move all three markers onto the per-thread `_RunScopedState` (alongside the stream collector), fully realizing #336's "move per-run state off the instance" goal. Single-thread semantics are preserved exactly — sub-flow recursion and replay/resume save/restore their markers on the same thread, which a threading.local slot handles identically. Adds a concurrent-different-versions regression test asserting each run stamps its own flow_version. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS --- chainweaver/executor.py | 80 ++++++++++++++++-------------- tests/test_executor_concurrency.py | 50 +++++++++++++++++++ 2 files changed, 94 insertions(+), 36 deletions(-) diff --git a/chainweaver/executor.py b/chainweaver/executor.py index d3f6ff8..ad05c33 100644 --- a/chainweaver/executor.py +++ b/chainweaver/executor.py @@ -116,13 +116,28 @@ class _StreamSentinel: class _RunScopedState(threading.local): """Per-thread run-scoped executor state (issue #336). - Subclassing :class:`threading.local` so ``middleware`` defaults to an empty - list on every thread keeps the hot ``_fire_hook`` path a plain attribute - read — no ``getattr`` default and no per-call ``AttributeError`` round-trip. + Subclassing :class:`threading.local` so the slots default per thread keeps + the hot ``_fire_hook`` path a plain attribute read — no ``getattr`` default + and no per-call ``AttributeError`` round-trip — and, crucially, isolates the + per-run markers so concurrent ``execute_flow`` / ``execute_flow_async`` calls + on one executor never race: + + - ``middleware``: run-scoped lifecycle middleware (e.g. the ``stream_flow`` + event collector), composed on top of the shared ``self._middleware``. + - ``active_flow_version``: version of the flow currently executing on this + thread (#201); ``_make_result`` stamps ``ExecutionResult.flow_version`` + from it. Sub-flow recursion save/restores it on the same thread. + - ``in_replay``: ``True`` while inside ``replay_flow`` so the step cache is + bypassed (replay must always re-execute). + - ``resume_snapshot``: the snapshot a ``resume_flow`` call is resuming from, + consumed by the relevant ``_execute_*`` path. """ def __init__(self) -> None: self.middleware: list[FlowExecutorMiddleware] = [] + self.active_flow_version: str = "" + self.in_replay: bool = False + self.resume_snapshot: ExecutionSnapshot | None = None class _StreamCollectorMiddleware(BaseMiddleware): @@ -630,29 +645,22 @@ def __init__( # When set, eligible step outputs are read from / written to # this cache before the tool callable runs. self._step_cache = step_cache - # ``True`` while inside replay_flow / _replay_linear_from so - # the cache is bypassed — replay must always re-execute (per - # the existing replay semantics). - self._in_replay = False # Crash-resume checkpointer (issue #128). When set, an # ExecutionSnapshot is written after every successful linear # step or DAG level. On terminal completion the snapshot is # deleted iff ``delete_on_success`` is ``True``. self._checkpointer = checkpointer self._delete_on_success = delete_on_success - # Resumption state — populated by ``resume_flow`` before - # invoking the relevant ``_execute_*`` path so the loops know - # where to start and which records to prepend. - self._resume_snapshot: ExecutionSnapshot | None = None - # Version of the flow currently executing (issue #201). Set at the - # top of every result-producing path so ``_make_result`` can stamp - # ``ExecutionResult.flow_version`` without threading the value - # through ~30 call sites. Sub-flow composition (issue #75) recurses - # through ``execute_flow``; ``_execute_step`` save/restores this - # around the recursive call so the parent's value is not clobbered. - # Like ``_in_replay`` / ``_resume_snapshot`` this assumes the - # documented "one executor per concurrent run" contract. - self._active_flow_version: str = "" + # Per-run markers — the in-replay flag (cache bypass), the resume + # snapshot, and the executing flow version (#201) — live on the + # per-thread ``_RunScopedState`` (issue #336) alongside the stream + # collector, not the shared instance, so concurrent runs of different + # flows/versions never stamp each other's ``flow_version`` or bypass + # the cache. Sub-flow composition (#75) recurses through + # ``execute_flow`` on the *same* thread and save/restores + # ``_local.active_flow_version`` around the recursive call, so the + # parent's value is not clobbered; replay/resume set/restore their + # markers on the same thread too. # Flow composition (issue #75): the maximum nesting depth of # ``flow_name`` sub-flow references. Checked statically before # execution so runaway / cyclic recursion fails loudly. @@ -965,8 +973,8 @@ def replay_flow( # Bypass the step cache for the duration of replay — replay # must always re-execute tools (per the existing replay # semantics and issue #127's acceptance criteria). - previous_in_replay = self._in_replay - self._in_replay = True + previous_in_replay = self._local.in_replay + self._local.in_replay = True try: if resume_from_step <= 0: new_result = self.execute_flow(result.flow_name, dict(result.initial_input)) @@ -975,7 +983,7 @@ def replay_flow( raise ValueError("resume_from_step is not supported for DAGFlow yet.") new_result = self._replay_linear_from(flow, result, resume_from_step) finally: - self._in_replay = previous_in_replay + self._local.in_replay = previous_in_replay diffs: list[StepDiff] = [] if mode is ReplayMode.VERIFY: @@ -998,7 +1006,7 @@ def _replay_linear_from( """Re-run *flow* starting at index *resume_from_step* with a context seeded from the original execution log. """ - self._active_flow_version = flow.version + self._local.active_flow_version = flow.version if resume_from_step > len(flow.steps): raise ValueError( f"resume_from_step={resume_from_step} exceeds step count {len(flow.steps)}." @@ -1265,7 +1273,7 @@ def execute_flow( is cancelled at a step boundary. """ flow = self._registry.get_flow(flow_name, version=version) - self._active_flow_version = flow.version + self._local.active_flow_version = flow.version if not force and flow.status != FlowStatus.ACTIVE: raise FlowStatusError(flow_name, flow.status.value) @@ -1607,7 +1615,7 @@ async def _execute_linear_flow_async( cancel_token: CancellationToken | None = None, ) -> ExecutionResult: """Async-native counterpart to the linear branch of :meth:`execute_flow`.""" - self._active_flow_version = flow.version + self._local.active_flow_version = flow.version trace_id = _new_trace_id() flow_started_at = _now_utc() flow_t0 = time.perf_counter() @@ -1741,7 +1749,7 @@ async def _execute_dag_flow_async( merged with sibling-key-conflict detection between levels. Cancellation (issue #142) is checked between levels. """ - self._active_flow_version = flow.version + self._local.active_flow_version = flow.version trace_id = _new_trace_id() flow_started_at = _now_utc() flow_t0 = time.perf_counter() @@ -2573,7 +2581,7 @@ def _make_result( ) result = ExecutionResult( flow_name=flow_name, - flow_version=self._active_flow_version, + flow_version=self._local.active_flow_version, success=success, final_output=final_output, execution_log=execution_log, @@ -2746,7 +2754,7 @@ def _resume_linear_flow( snapshot: ExecutionSnapshot, ) -> ExecutionResult: """Continue a linear execution from *snapshot.completed_steps*.""" - self._active_flow_version = flow.version + self._local.active_flow_version = flow.version trace_id = snapshot.trace_id flow_name = snapshot.flow_name flow_started_at = snapshot.started_at @@ -2872,11 +2880,11 @@ def _resume_dag_flow( # via the try/finally in resume_flow's caller — but resume_flow # doesn't wrap in try/finally; we wrap here instead to keep # the contract local. - self._resume_snapshot = snapshot + self._local.resume_snapshot = snapshot try: return self._execute_dag_flow(flow, dict(snapshot.initial_input)) finally: - self._resume_snapshot = None + self._local.resume_snapshot = None def _record_observed_trace(self, result: ExecutionResult) -> None: """Mirror an :class:`ExecutionResult` into the configured TraceRecorder.""" @@ -3153,7 +3161,7 @@ def _execute_subflow_step( # not just at the parent boundary (issue #142). The deadline is an # absolute wall-clock instant, so passing it through unchanged keeps # one shared budget across the whole composed run. - saved_version = self._active_flow_version + saved_version = self._local.active_flow_version try: sub_result = self.execute_flow( sub_name, @@ -3162,7 +3170,7 @@ def _execute_subflow_step( cancel_token=cancel_token, ) finally: - self._active_flow_version = saved_version + self._local.active_flow_version = saved_version ended_at = _now_utc() duration_ms = (time.perf_counter() - perf_start) * 1000.0 @@ -3462,7 +3470,7 @@ def _finish(record: StepRecord) -> StepRecord: # the same key. If validation fails, fall through to the # normal execution path, which surfaces the same error. cache_key: StepCacheKey | None = None - if self._step_cache is not None and tool.cacheable and not self._in_replay: + if self._step_cache is not None and tool.cacheable and not self._local.in_replay: try: validated = tool.input_schema.model_validate(inputs) except ValidationError: @@ -3863,11 +3871,11 @@ def _execute_dag_flow( Returns: An :class:`ExecutionResult` with the full execution log. """ - self._active_flow_version = flow.version + self._local.active_flow_version = flow.version # Resume support (issue #128): when _resume_snapshot is set, # reuse its trace_id / started_at / context / log and skip the # already-completed DAG levels. - resume = self._resume_snapshot + resume = self._local.resume_snapshot if resume is not None: trace_id = resume.trace_id flow_started_at = resume.started_at diff --git a/tests/test_executor_concurrency.py b/tests/test_executor_concurrency.py index 345bbc8..45aa145 100644 --- a/tests/test_executor_concurrency.py +++ b/tests/test_executor_concurrency.py @@ -144,6 +144,56 @@ def run(i: int) -> None: assert result.flow_version == "1.2.3" +def test_concurrent_runs_of_different_versions_stamp_correct_version() -> None: + # Exercises the per-run-marker race (#336): `active_flow_version` is + # thread-local, so concurrent runs of different versions never stamp each + # other's flow_version. + registry = FlowRegistry() + for version in ("1.0.0", "2.0.0"): + registry.register_flow( + Flow( + name="versioned", + version=version, + description="Doubles a number.", + steps=[FlowStep(tool_name="double", input_mapping={"number": "number"})], + ) + ) + executor = FlowExecutor(registry=registry) + executor.register_tool( + Tool( + name="double", + description="Doubles a number.", + input_schema=NumIn, + output_schema=NumOut, + fn=_double_fn, + ) + ) + + num_runs = 16 + barrier = threading.Barrier(num_runs) + mismatches: list[tuple[str, str]] = [] + errors: list[BaseException] = [] + + def run(i: int) -> None: + version = "1.0.0" if i % 2 == 0 else "2.0.0" + try: + barrier.wait() + result = executor.execute_flow("versioned", {"number": i}, version=version) + if result.flow_version != version: + mismatches.append((version, result.flow_version)) + except BaseException as exc: + errors.append(exc) + + threads = [threading.Thread(target=run, args=(i,)) for i in range(num_runs)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert not errors, errors + assert not mismatches, f"flow_version cross-talk: {mismatches}" + + # --------------------------------------------------------------------------- # In-memory backend concurrency # --------------------------------------------------------------------------- From 23c539ee92039589fe03436a5c243ea9ee7fe6cc Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Jun 2026 16:16:02 +0000 Subject: [PATCH 5/5] perf: skip per-step collision scan when outputs add only new keys merge_step_outputs ran a Python-level list-comprehension scan ([key for key in outputs if key in context]) on every linear/DAG step merge, even in the common case where a step adds only new keys under the default "warn" policy. Detect collisions first with a C-level key-view intersection (outputs.keys() & context.keys()), which is empty-fast, and only walk outputs to recover deterministic order for the error/log messages when a collision actually exists. Behaviour is unchanged (deterministic collision order, DEBUG-under- overwrite logging preserved); ~12% faster per no-collision merge, trimming the orchestration overhead flagged by the Bench CI alert on compiled_overhead_ms_n5. All 1547 tests pass. --- chainweaver/_execution/context.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/chainweaver/_execution/context.py b/chainweaver/_execution/context.py index c4d342b..cb7afcf 100644 --- a/chainweaver/_execution/context.py +++ b/chainweaver/_execution/context.py @@ -60,8 +60,12 @@ def merge_step_outputs( ContextKeyCollisionError: When *policy* is ``"error"`` and one or more output keys already exist in *context*. """ - collisions = [key for key in outputs if key in context] - if collisions: + # Fast path: a C-level key-view intersection skips the per-step Python scan + # (and its list allocation) when a step only adds new keys — the common case + # on the execution hot path. Only when an actual collision exists do we walk + # ``outputs`` to recover deterministic order for the error / log messages. + if outputs.keys() & context.keys(): + collisions = [key for key in outputs if key in context] if policy == "error": raise ContextKeyCollisionError(flow_name, step_index, step_name, collisions) log = logger.warning if policy == "warn" else logger.debug