Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ chainweaver/
├── contracts.py ToolSafetyContract + SideEffectLevel/StabilityLevel/DeterminismLevel enums + merge_safety() + evaluate_predicate() — determinism + operational safety vocabulary (#19, #125, #293, #9, #8)
├── decorators.py @tool decorator for zero-boilerplate tool definition
├── tools.py Tool class: named callable with Pydantic I/O schemas + schema_hash + safety contract (#19); Tool.from_flow() wraps a Flow as a Tool (#24) with derived safety (#125)
├── flow.py FlowStep + Flow + DAGFlow + FlowStatus + FlowLifecycle + FlowGovernance + DriftInfo + ConditionalEdge (#9) + determinism_level property (#8)
├── registry.py FlowRegistry: multi-version catalogue with status filtering (store-backed)
├── flow.py FlowStep + Flow + DAGFlow + FlowStatus + FlowLifecycle + FlowGovernance + DriftInfo + ConditionalEdge (#9) + determinism_level property (#8) + ContextCollisionPolicy / on_context_collision (#337)
├── registry.py FlowRegistry: multi-version catalogue with status filtering (store-backed) + copy-on-write update_flow_state (#335)
├── storage.py RegistryStore protocol + InMemoryStore + FileStore (#16)
├── analyzer.py ChainAnalyzer: offline schema-compatibility analysis (#77)
├── attest.py attest_flow() + AttestationReport: observed-determinism evidence (#154)
├── decisions.py DecisionCallback Protocol + DecisionContext + coerce_decision_callback (#102)
├── executor.py FlowExecutor: sequential/DAG runner + drift detection + stream_flow (main entry point)
├── executor.py FlowExecutor: sequential/DAG runner + drift detection + stream_flow + opt-in async DAG-level concurrency (max_step_concurrency, #344) (main entry point)
├── _execution/ Internal, no-I/O execution collaborators shared by both lanes (#330, #331); banned from importing LLM/network/random — see invariants
│ ├── __init__.py Re-exports merge_step_outputs
│ └── context.py merge_step_outputs: single context-merge honouring on_context_collision (#337)
├── middleware.py FlowExecutorMiddleware Protocol + lifecycle context models + BaseMiddleware (#131)
├── events.py FlowEvent streamable lifecycle payload yielded by FlowExecutor.stream_flow (#134)
├── cache.py StepCache Protocol + InMemoryStepCache + FileStepCache + StepCacheKey (#127)
Expand Down Expand Up @@ -207,6 +210,48 @@ see the `DAGFlowStep` subsection below).
| `capability_id` | `str \| None` | `None` | Optional Weaver Stack capability identifier (#90); when set, the flow is routable as a `SelectableItem` via `flow_to_selectable_item`. See [docs/agent-context/flow-as-capability.md](docs/agent-context/flow-as-capability.md). |
| `governance` | `FlowGovernance` | active defaults | Review lifecycle, owner, replacement-tool list, savings estimates, and review notes (#259, #268). Separate from `FlowStatus`. |
| `safety` | `ToolSafetyContract \| None` | `None` | Explicit flow-level side-effect, retry, dry-run, idempotency, and approval metadata (#293). `None` means unknown, not safe. |
| `on_context_collision` | `Literal["overwrite", "warn", "error"]` | `"warn"` | Policy when a step output overwrites an existing context key (#337). `"overwrite"` = silent last-write-wins; `"warn"` = log at WARNING then overwrite; `"error"` = abort with `ContextKeyCollisionError`. Applied by the single shared merge helper (`chainweaver._execution.merge_step_outputs`) on both linear and DAG, sync and async. DAG *sibling* collisions within one level remain an unconditional error regardless. |

### Context-collision semantics (#337)

The accumulated context is the data plane of every flow. A step output that
overwrites an existing key (including an `initial_input` key) is governed by
`Flow.on_context_collision` and enforced in exactly one place —
`chainweaver._execution.merge_step_outputs` — for both flow kinds and both
lanes. `compile_flow` additionally emits a `context_collision` warning for
statically detectable overwrites (suppressed under `"overwrite"`). See
[docs/data-integrity.md](docs/data-integrity.md#context-key-collisions-337).

### Concurrency contract (#336)

A single `FlowExecutor` instance supports **concurrent** `execute_flow` /
`execute_flow_async` / `stream_flow` calls: run-scoped state (e.g. the stream
event collector) lives per-thread on a `threading.local` slot, and the bundled
`InMemoryStepCache` / `InMemoryCheckpointer` are internally locked. The one
rule: **mutating operations (`register_tool`, `add_middleware`,
`accept_drift`) must not run concurrently with executions** — do them at setup.

### Async lane support matrix (#332)

`execute_flow_async` raises `AsyncLaneUnsupportedError` (before any step runs)
for features it does not yet honour, rather than diverging silently:

| Feature | `execute_flow` (sync) | `execute_flow_async` |
|---------|:---------------------:|:--------------------:|
| Linear flows | ✅ | ✅ |
| DAG flows (no branching) | ✅ | ✅ |
| Opt-in DAG-level concurrency (#344) | sequential | ✅ (`max_step_concurrency`) |
| Conditional branches / `default_next` (#9) | ✅ | ❌ rejected |
| `decision_candidates` (#102) | ✅ | ❌ rejected |
| Composed sub-flow (`flow_name`, #75) | ✅ | ❌ rejected |
| Step cache / checkpoint resume | ✅ | bypassed |

### State transitions (#335)

`accept_drift` and `set_flow_status` never mutate a registry-held `Flow` in
place — they go through `FlowRegistry.update_flow_state`, which performs a
`model_copy(update=...)`, persists the new object, and returns it. Callers
needing the new state must re-fetch via `get_flow`.

**Read-only properties (not fields):** `input_schema`, `output_schema`, and
`context_schema` resolve their `*_schema_ref` counterparts to a
Expand Down
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **FlowExecutor execution-core hardening** (#330, #331, #332, #335, #336,
#337, #344): a cluster of architecture/reliability improvements to the
executor core.
- A new internal `chainweaver._execution` package holds the transport-agnostic,
no-I/O building blocks shared by both lanes (#330, #331), starting with
`merge_step_outputs` — a single context-merge implementation used by linear
and DAG, sync and async.
- `Flow` / `DAGFlow` gain `on_context_collision` (`"overwrite"` / `"warn"`
(default) / `"error"`) governing what happens when a step output overwrites
an existing context key; `compile_flow` emits a `context_collision` warning
for statically detectable overwrites; new `ContextKeyCollisionError` (#337).
- Opt-in concurrent execution of independent DAG-level steps in the async lane
via `FlowExecutor(max_step_concurrency=N)` (default `1` = sequential,
bit-identical); results stay deterministic regardless of the setting (#344).
- `FlowExecutor` now documents and supports a real concurrency contract:
`stream_flow` registers its event collector as per-thread run-scoped
middleware (no more shared-list mutation), and `InMemoryStepCache` /
`InMemoryCheckpointer` are internally locked (#336).
- `execute_flow_async` rejects unsupported constructs (branching,
`decision_candidates`, composed sub-flows) up front with the new typed
`AsyncLaneUnsupportedError`, listing every offending construct (#332).
- `FlowRegistry.update_flow_state` performs copy-on-write state transitions;
`accept_drift` / `set_flow_status` no longer mutate registry-shared `Flow`
objects in place (#335).
- The executor determinism invariants (no LLM / no network / no randomness)
are now mechanically enforced by an AST import-contract test over
`executor.py` and `_execution/` (#354).

- **Coding-agent macro-flow compilation loop** (#254, #256, #257, #260,
#261, #262, #263, #266, #267, #312, #313, #314): a new `chainweaver.traces`
module makes the *observe → mine → score → draft → backtest* loop
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ python benchmarks/bench_naive_vs_compiled.py --output results/bench.json
python benchmarks/bench_naive_vs_compiled.py \
--output results/bench.json \
--full-output results/bench-full.json

# Sequential vs concurrent DAG-level execution (#344)
python benchmarks/bench_dag_concurrency.py --leaves 6 --io-ms 50
```

`time.sleep` is used to simulate the LLM round-trip — no real LLM is
Expand Down
136 changes: 136 additions & 0 deletions benchmarks/bench_dag_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""Sequential vs concurrent DAG-level execution benchmark (issue #344).

Demonstrates the latency win of opt-in concurrent execution of independent DAG
steps in the async lane. Builds a fan-out flow — one root feeding ``N``
independent I/O-bound leaves — and runs it through ``execute_flow_async`` at
``max_step_concurrency=1`` (sequential, the default) and ``max_step_concurrency=N``
(fully parallel level). Each leaf simulates I/O with ``asyncio.sleep`` — no real
network traffic.

Run from the repository root::

python benchmarks/bench_dag_concurrency.py
python benchmarks/bench_dag_concurrency.py --leaves 8 --io-ms 50 --repeats 5

With ``N`` leaves each taking ``io_ms`` of I/O, the sequential lane takes
~``N * io_ms`` while the concurrent lane takes ~``io_ms`` — a ~``N``x speedup
on the level. Results are deterministic regardless of concurrency (verified by
the test suite); this script only measures wall-clock latency.
"""

from __future__ import annotations

import argparse
import asyncio
import time
from statistics import median
from typing import Any

from pydantic import BaseModel, create_model

from chainweaver.executor import FlowExecutor
from chainweaver.flow import DAGFlow, DAGFlowStep
from chainweaver.registry import FlowRegistry
from chainweaver.tools import Tool


class _SeedIn(BaseModel):
n: int


class _SeedOut(BaseModel):
seed: int


class _LeafIn(BaseModel):
seed: int


def _build_executor(
num_leaves: int, io_seconds: float, concurrency: int
) -> tuple[FlowExecutor, str]:
async def _root(inp: _SeedIn) -> dict[str, Any]:
return {"seed": inp.n}

registry = FlowRegistry()
steps: list[DAGFlowStep] = [
DAGFlowStep(step_id="root", tool_name="root", input_mapping={"n": "n"})
]
for i in range(num_leaves):
steps.append(
DAGFlowStep(
step_id=f"leaf{i}",
tool_name=f"leaf{i}",
input_mapping={"seed": "seed"},
depends_on=["root"],
)
)
registry.register_flow(
DAGFlow(name="fan_out", version="1.0.0", description="fan-out", steps=steps)
)
executor = FlowExecutor(registry=registry, max_step_concurrency=concurrency)
executor.register_tool(
Tool(name="root", description="", input_schema=_SeedIn, output_schema=_SeedOut, fn=_root)
)
for i in range(num_leaves):
fields: dict[str, Any] = {f"r{i}": (int, ...)}
out_model = create_model(f"Leaf{i}Out", **fields)

async def _leaf(inp: _LeafIn, _i: int = i) -> dict[str, Any]:
await asyncio.sleep(io_seconds)
return {f"r{_i}": inp.seed + _i}

executor.register_tool(
Tool(
name=f"leaf{i}",
description="",
input_schema=_LeafIn,
output_schema=out_model,
fn=_leaf,
)
)
return executor, "fan_out"


async def _time_run(executor: FlowExecutor, flow_name: str, repeats: int) -> float:
durations: list[float] = []
for _ in range(repeats):
t0 = time.perf_counter()
result = await executor.execute_flow_async(flow_name, {"n": 0})
durations.append(time.perf_counter() - t0)
assert result.success
return median(durations)


async def _main(num_leaves: int, io_ms: float, repeats: int) -> None:
io_seconds = io_ms / 1000.0
seq_executor, name = _build_executor(num_leaves, io_seconds, concurrency=1)
con_executor, _ = _build_executor(num_leaves, io_seconds, concurrency=num_leaves)

seq = await _time_run(seq_executor, name, repeats)
con = await _time_run(con_executor, name, repeats)

print(f"fan-out DAG: {num_leaves} leaves x {io_ms:.0f}ms I/O, median of {repeats} runs")
print(f" sequential (max_step_concurrency=1): {seq * 1000:8.1f} ms")
print(f" concurrent (max_step_concurrency={num_leaves}):{con * 1000:8.1f} ms")
if con > 0:
print(f" speedup: {seq / con:8.1f}x")


def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--leaves", type=int, default=6, help="Independent leaf steps in the level."
)
parser.add_argument(
"--io-ms", type=float, default=50.0, help="Simulated per-leaf I/O latency (ms)."
)
parser.add_argument(
"--repeats", type=int, default=5, help="Timed runs per case (median reported)."
)
args = parser.parse_args()
asyncio.run(_main(args.leaves, args.io_ms, args.repeats))


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions chainweaver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@
from chainweaver.events import FlowEvent
from chainweaver.exceptions import (
AgentTraceImportError,
AsyncLaneUnsupportedError,
ChainWeaverError,
CheckpointDriftError,
CheckpointerNotConfiguredError,
CheckpointNotFoundError,
ContextKeyCollisionError,
ContribError,
CostProfileError,
DAGDefinitionError,
Expand Down Expand Up @@ -236,6 +238,7 @@
"PROVIDER_PRICES",
"AgentTraceEvent",
"AgentTraceImportError",
"AsyncLaneUnsupportedError",
"AttestationInputError",
"AttestationReport",
"BacktestMismatch",
Expand All @@ -257,6 +260,7 @@
"CompilationResult",
"CompilationWarning",
"ConditionalEdge",
"ContextKeyCollisionError",
"ContribError",
"CostProfile",
"CostProfileError",
Expand Down
19 changes: 19 additions & 0 deletions chainweaver/_execution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Internal execution collaborators for :class:`chainweaver.executor.FlowExecutor`.

This private package holds the transport-agnostic, no-I/O building blocks that
both the synchronous and asynchronous execution lanes share (issues #330, #331).
Extracting shared logic here keeps it defined in exactly one place — so a fix or
a policy change applies to both lanes at once — and gives the determinism
import-contract check (issue #354) a focused surface to guard.

Everything here is bound by the three hard executor invariants: **no LLM calls,
no network I/O, no randomness.** Nothing in this package may import a banned
module (see ``docs/agent-context/invariants.md`` and
``tests/test_executor_import_contract.py``).
"""

from __future__ import annotations

from chainweaver._execution.context import merge_step_outputs

__all__ = ["merge_step_outputs"]
79 changes: 79 additions & 0 deletions chainweaver/_execution/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Shared execution-context merge logic (issues #337, #331).

The accumulated execution context is the data plane of every flow. Both the
linear and DAG lanes — sync and async — merge each step's outputs into this
running context, and both previously did so with subtly different rules:

- linear flows logged collisions at ``DEBUG`` and silently overwrote;
- DAG flows rejected same-level sibling collisions but silently overwrote
level-to-level.

:func:`merge_step_outputs` is the single implementation all four paths now call,
so the collision policy is defined and enforced in exactly one place. The
flow-level ``on_context_collision`` setting selects the behaviour:

- ``"overwrite"`` — historical last-write-wins, logged at ``DEBUG``;
- ``"warn"`` (default) — log at ``WARNING`` before overwriting;
- ``"error"`` — abort with :class:`ContextKeyCollisionError` naming the step
and the colliding keys.

DAG *sibling* collisions within a single level are handled separately by the
DAG runner and remain an unconditional error regardless of this policy — they
are genuinely ambiguous (no defined ordering between siblings).

This module is on the deterministic execution path: no LLM, no network, no
randomness.
"""

from __future__ import annotations

import logging
from collections.abc import Mapping
from typing import Any

from chainweaver.exceptions import ContextKeyCollisionError
from chainweaver.flow import ContextCollisionPolicy


def merge_step_outputs(
context: dict[str, Any],
outputs: Mapping[str, Any],
*,
policy: ContextCollisionPolicy,
flow_name: str,
step_index: int,
step_name: str,
logger: logging.Logger,
) -> None:
"""Merge *outputs* into *context* in place, applying the collision *policy*.

Args:
context: The running execution context, mutated in place.
outputs: The step's validated outputs to merge in.
policy: The flow's ``on_context_collision`` setting.
flow_name: Name of the executing flow (for diagnostics / errors).
step_index: Zero-based index of the step that produced *outputs*.
step_name: Display name of the step (for diagnostics / errors).
logger: Logger used for ``DEBUG`` / ``WARNING`` collision messages.

Raises:
ContextKeyCollisionError: When *policy* is ``"error"`` and one or more
output keys already exist in *context*.
"""
# Fast path: a C-level key-view intersection skips the per-step Python scan
# (and its list allocation) when a step only adds new keys — the common case
# on the execution hot path. Only when an actual collision exists do we walk
# ``outputs`` to recover deterministic order for the error / log messages.
if outputs.keys() & context.keys():
collisions = [key for key in outputs if key in context]
if policy == "error":
raise ContextKeyCollisionError(flow_name, step_index, step_name, collisions)
log = logger.warning if policy == "warn" else logger.debug
for key in collisions:
log(
"Step %d (%s): context key '%s' overwritten",
step_index,
step_name,
key,
)
context.update(outputs)
Loading
Loading