Skip to content

Execute independent DAG steps concurrently (opt-in) #344

@dgenio

Description

@dgenio

Summary

Implement opt-in concurrent execution of independent steps within a DAG level, fulfilling the concurrency already promised in the executor's documentation while preserving determinism guarantees.

Why this matters

DAG levels are computed precisely so that independent steps could run in parallel, but today every step runs sequentially — so fan-out flows (the main reason to use DAGFlow) gain no latency benefit over linear flows. For I/O-bound tools (MCP calls, file operations), level-parallel execution is the single largest latency lever available.

Current evidence

  • _compute_dag_levels() (executor.py ~lines 3697–3700) groups steps by level, then the execution loop (~lines 3835–4064) iterates for step in level_steps strictly sequentially.
  • The executor docstring (~line 871) states steps within a level "run sequentially" and that concurrent execution "is planned for v0.2".
  • The async DAG path (~lines 1655–1845) likewise awaits steps one at a time rather than via asyncio.gather.

Proposed implementation

  1. Add FlowExecutor(max_step_concurrency: int = 1) — default 1 preserves current behavior exactly.
  2. Async lane first (cleanest win): dispatch a level's steps with asyncio.gather bounded by a semaphore; collect StepRecords in deterministic (declaration) order regardless of completion order.
  3. Sync lane: use a ThreadPoolExecutor per run (not per level) for level steps when concurrency > 1; tools already run under thread-based timeouts, so reuse that machinery where possible.
  4. Determinism preservation: merge level outputs in declaration order (the existing sibling-collision rejection already makes ordering conflicts impossible for overlapping keys); evaluate branches only after the full level completes, as today.
  5. Safety contracts: respect ToolSafetyContract — steps whose contracts indicate non-idempotent side effects can be pinned to sequential execution within the level (document the rule).
  6. Cancellation/deadline: check between levels as today; on cancellation mid-level, let in-flight tools finish (consistent with the documented "never inside a tool" cancellation contract).
  7. Benchmark: extend benchmarks/ with a fan-out flow demonstrating the latency improvement; wire into bench.yml.

Acceptance criteria

  • Default behavior (concurrency=1) is bit-identical to today (full suite passes unchanged).
  • With concurrency>1, a fan-out DAG of N slow I/O steps completes in ~1 step-duration rather than ~N (benchmark-verified).
  • execution_log ordering, context merge results, and branch selection are identical regardless of concurrency setting (property test comparing concurrency 1 vs 4 outputs on randomized DAGs).
  • Sibling-collision and cancellation semantics are unchanged and tested under concurrency.

Test plan

  • New equivalence property test (results invariant across concurrency levels) in tests/property/.
  • Concurrency-specific tests using event-synchronized fake tools (no sleeps).
  • Benchmark job demonstrates the speedup without regressing compiled-execution metrics.

Migration notes

Not expected to require migration; the feature is opt-in with a compatibility default.

Risks and tradeoffs

  • Thread-based parallelism introduces real shared-state exposure in user tools; document clearly that tools must be thread-safe to opt in.
  • Interacts with the executor decomposition and concurrency-contract issues — sequence after them to avoid building on code slated to move.

Suggested labels

performance, architecture, reliability

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions