Harden the FlowExecutor execution core (#330, #331, #332, #335, #336, #337, #344, #354)#384
Conversation
…336, #337, #344, #354) Foundational extraction plus six behavioural fixes across the executor core, delivered as one cluster. - #330/#331: add internal `chainweaver/_execution` package for the no-I/O collaborators shared by both lanes; first extract is `merge_step_outputs`, the single context-merge used by linear and DAG, sync and async. - #337: add `on_context_collision` ("overwrite"/"warn" (default)/"error") to Flow and DAGFlow; route all primary merges through the shared helper; emit a `context_collision` compile warning; new `ContextKeyCollisionError`. - #344: opt-in `FlowExecutor(max_step_concurrency=N)` runs independent async DAG-level steps concurrently under a semaphore, preserving declaration-order determinism; add a fan-out benchmark. - #336: document and enforce a real concurrency contract — `stream_flow` uses per-thread run-scoped middleware (no shared-list mutation); lock InMemoryStepCache / InMemoryCheckpointer. - #332: `execute_flow_async` rejects unsupported constructs up front with the new typed `AsyncLaneUnsupportedError`, listing all offending constructs. - #335: `FlowRegistry.update_flow_state` copy-on-write; `accept_drift` / `set_flow_status` no longer mutate registry-shared Flow objects in place. - #354: AST import-contract test enforces the no-LLM/no-network/no-randomness executor invariants over executor.py and _execution/. Regenerates the public-API snapshot and flow schema; updates AGENTS.md, architecture.md, invariants.md, data-integrity.md, and CHANGELOG. All four validation commands pass; coverage 92%. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS
There was a problem hiding this comment.
⚠️ Performance Alert ⚠️
Possible performance regression was detected for benchmark 'ChainWeaver microbenchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.
| Benchmark suite | Current: 4a40f5f | Previous: 9d07a03 | Ratio |
|---|---|---|---|
compiled_overhead_ms_n5_llm200_tool0 |
0.33101299993631983 ms |
0.15802500007566778 ms |
2.09 |
This comment was automatically generated by workflow using github-action-benchmark.
CC: @dgenio
There was a problem hiding this comment.
Pull request overview
This PR hardens the FlowExecutor execution core by extracting shared, deterministic execution helpers into chainweaver/_execution/, tightening executor/runtime invariants, and adding multiple executor reliability features (context-collision policy, async-lane fail-fast, opt-in async DAG concurrency, concurrency contract, and copy-on-write flow state transitions).
Changes:
- Introduces
chainweaver._execution.merge_step_outputsas the single shared context-merge implementation and addson_context_collisionpolicy +ContextKeyCollisionError. - Adds opt-in async DAG level concurrency (
FlowExecutor(max_step_concurrency=...)) and strengthens executor concurrency behavior (run-scoped stream middleware, locked in-memory cache/checkpointer). - Enforces executor determinism invariants via an AST-based import-contract test and updates docs/snapshots/schemas accordingly.
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_flow_state_transitions.py | New tests asserting copy-on-write flow state transitions via the registry. |
| tests/test_executor_import_contract.py | New AST-based import-contract tests enforcing executor determinism invariants. |
| tests/test_executor_concurrency.py | New concurrency stress tests for executor streaming and in-memory backends. |
| tests/test_executor_async.py | Updates async-lane unsupported-feature tests to expect AsyncLaneUnsupportedError and expands coverage. |
| tests/test_dag_concurrency.py | New tests verifying async DAG level concurrency determinism + real overlap. |
| tests/test_context_collision.py | New tests covering context collision policies and compile-time collision warnings. |
| tests/test_composition.py | Updates async composition rejection test to the new typed async-lane error. |
| tests/fixtures/public_api.json | Regenerated public API snapshot for new exceptions and executor signature change. |
| schemas/flow.schema.json | Adds on_context_collision field to the serialized flow schema. |
| docs/data-integrity.md | Documents context merge behavior and the new collision policy semantics. |
| docs/agent-context/invariants.md | Documents the new automated import-contract enforcement. |
| docs/agent-context/architecture.md | Updates architecture map for _execution/, concurrency contract, and import-contract enforcement. |
| CHANGELOG.md | Changelog entry describing the execution-core hardening cluster. |
| chainweaver/registry.py | Adds update_flow_state for copy-on-write flow state transitions; updates set_flow_status. |
| chainweaver/flow.py | Adds ContextCollisionPolicy and on_context_collision to Flow/DAGFlow. |
| chainweaver/executor.py | Routes merges through merge_step_outputs, adds async DAG concurrency, updates drift acceptance to copy-on-write, and introduces run-scoped middleware for streaming. |
| chainweaver/exceptions.py | Adds ContextKeyCollisionError and AsyncLaneUnsupportedError. |
| chainweaver/compiler.py | Adds static context_collision compilation warnings (suppressed under overwrite policy). |
| chainweaver/checkpoint.py | Adds internal locking to InMemoryCheckpointer for concurrency safety. |
| chainweaver/cache.py | Adds internal locking + defensive copies to InMemoryStepCache for concurrency safety. |
| chainweaver/_execution/context.py | New deterministic shared context-merge helper with collision policy enforcement. |
| chainweaver/_execution/init.py | New private package init re-exporting merge_step_outputs. |
| chainweaver/init.py | Exposes new public exceptions in __all__. |
| benchmarks/README.md | Documents new DAG concurrency benchmark invocation. |
| benchmarks/bench_dag_concurrency.py | New benchmark demonstrating async DAG level concurrency speedup. |
| AGENTS.md | Updates repo map and documents collision policy, concurrency contract, async support matrix, and copy-on-write transitions. |
The benchmark alert on PR #384 flagged compiled_overhead_ms ~2x. Cause: the per-thread run-scoped middleware slot was read with `getattr(self._local, "middleware", None)`, which raised+caught an AttributeError on every hook fire (~12 per flow) when no run-scoped middleware was set — the common case. Subclass threading.local so `middleware` defaults to an empty list per thread, making `_fire_hook` a plain attribute read. Micro-benchmark of a 5-step flow returns to ~0.17 ms/call (in line with the pre-change baseline). Behaviour unchanged; full suite green. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS
…carve-out) Three Copilot review fixes: - executor.py (#337): async DAG level-to-level merge reported `flat_index` (already advanced past the level) and a generic "DAG level" name. Use the level index + "DAG level {idx}", matching the sync path, so a ContextKeyCollisionError / WARNING names the right level. Add an async-DAG collision regression test. - test_executor_import_contract.py (#354): resolve relative imports against the file's own package (e.g. `chainweaver/_execution/foo.py` doing `from .bar import x` now records `chainweaver._execution.bar`, not `chainweaver.bar`), so the transitive-closure walk can't skip reachable in-repo modules. - test_executor_import_contract.py (#354): the `uuid` carve-out allowlist was a no-op (uuid was never banned). Enforce it positively — assert reviewed carve-outs stay OFF the banned list — and drop the dead subtraction; update invariants.md wording. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS
Addresses PR #384 review: the concurrency-contract docstring claimed run-scoped state is per-thread, but `_active_flow_version`, `_in_replay`, and `_resume_snapshot` were still instance attributes read during execution, so concurrent runs of different flow versions could stamp the wrong `ExecutionResult.flow_version` or bypass the step cache. Move all three markers onto the per-thread `_RunScopedState` (alongside the stream collector), fully realizing #336's "move per-run state off the instance" goal. Single-thread semantics are preserved exactly — sub-flow recursion and replay/resume save/restore their markers on the same thread, which a threading.local slot handles identically. Adds a concurrent-different-versions regression test asserting each run stamps its own flow_version. https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS
merge_step_outputs ran a Python-level list-comprehension scan ([key for key in outputs if key in context]) on every linear/DAG step merge, even in the common case where a step adds only new keys under the default "warn" policy. Detect collisions first with a C-level key-view intersection (outputs.keys() & context.keys()), which is empty-fast, and only walk outputs to recover deterministic order for the error/log messages when a collision actually exists. Behaviour is unchanged (deterministic collision order, DEBUG-under- overwrite logging preserved); ~12% faster per no-collision merge, trimming the orchestration overhead flagged by the Bench CI alert on compiled_overhead_ms_n5. All 1547 tests pass.
Summary
Implements the executor-core cluster as a single coherent PR: the foundational extraction of #330/#331 plus six well-bounded behavioural fixes (#332, #335, #336, #337, #344, #354). All changes live in (or directly govern)
chainweaver/executor.pyand its new internal_execution/package, withflow.py,registry.py,compiler.py,cache.py, andcheckpoint.pytouched where the contract requires.Per the agreed scope, #330/#331 are delivered as a foundation (a single shared, no-I/O context-merge used by both lanes, in a new
_executionpackage) rather than a full file-split — the issue author themselves scoped the complete decomposition as incremental, one collaborator per PR. The remaining mechanical extraction stays tracked on #330.Changes
chainweaver/_executionpackage for the transport-agnostic, no-I/O collaborators shared by both lanes. First extraction ismerge_step_outputs, now the single context-merge used by linear and DAG, sync and async.Flow/DAGFlowgainon_context_collision("overwrite"/"warn"(default) /"error"). All primary merges route through the shared helper;compile_flowemits acontext_collisionwarning for statically detectable overwrites; new typedContextKeyCollisionError. DAG sibling collisions remain an unconditional error.FlowExecutor(max_step_concurrency=N)dispatches independent async DAG-level steps under anasyncio.Semaphore; records come back in declaration order, so results are deterministic regardless of the setting. Default1is bit-identical to today. Addsbenchmarks/bench_dag_concurrency.py(≈6x on a 6-leaf fan-out).stream_flowregisters its event collector as per-thread run-scoped middleware (no more shared-list mutation);InMemoryStepCache/InMemoryCheckpointerare internally locked; the contract is documented in theFlowExecutordocstring, AGENTS.md, and architecture.md.execute_flow_asyncraises the new typedAsyncLaneUnsupportedError(before any step runs), listing every unsupported construct (branches,decision_candidates, composed sub-flows). Adds a published sync/async support matrix.FlowRegistry.update_flow_stateperformsmodel_copy+ persist;accept_drift/set_flow_statusno longer mutate registry-sharedFlowobjects in place.executor.pyand_execution/import no LLM/network/randomness module (direct + transitive in-repo reach), documented in invariants.md.Testing
ruff check chainweaver/ tests/ examples/) — All checks passed!ruff format --check chainweaver/ tests/ examples/) — 202 files already formattedpython -m mypy chainweaver/ tests/) — Success: no issues in 165 source filespython -m pytest tests/) — 1545 passed, 1 skipped, 92.44% coveragetest_executor_import_contract.py,test_flow_state_transitions.py,test_executor_concurrency.py,test_context_collision.py,test_dag_concurrency.py, plus expanded async-lane coverage.Regenerated the public-API snapshot (
tests/fixtures/public_api.json) and flow schema (schemas/flow.schema.json); the snapshot regen also corrects a pre-existing staleness from the merged loadersource=change.Related Issues
Closes #330, #331, #332, #335, #336, #337, #344, #354.
Checklist
AGENTS.mdanddocs/agent-context/)Risks / caveats
WARNING(wasDEBUG); behaviour is otherwise unchanged. The"overwrite"policy restores silent last-write-wins.AsyncLaneUnsupportedErrorinstead ofFlowExecutionError. Acceptable under the pre-1.0 versioning policy; existing tests updated.execute_flowlane still runs DAG levels sequentially — its branch/skip state machine is high-risk to parallelise and is left as the tracked remainder of Execute independent DAG steps concurrently (opt-in) #344. MCP/I-O-bound tools run on the async lane, which is the intended beneficiary.https://claude.ai/code/session_01Ja8HkypgZVJka5LDcqmZzS
Generated by Claude Code