Skip to content

Consolidate sync and async execution paths around a shared core #331

@dgenio

Description

@dgenio

Summary

Reduce duplication between execute_flow and execute_flow_async by extracting the shared, transport-agnostic execution logic (validation, context accumulation, result construction, loop structure) into a common core that both lanes drive.

Why this matters

Today, bug fixes and feature changes to flow execution must be applied twice. Divergence between the two lanes is already observable (see the companion issue on async feature parity), and every new executor feature increases the cost of keeping them aligned.

Current evidence

  • Flow setup, input validation, step-loop structure, output validation, and result building are duplicated between the sync path (executor.py ~lines 1210–1410) and async path (~lines 1546–1653), totaling roughly 450 lines of parallel control flow.
  • DAG-level iteration and output merging are implemented separately for sync (~lines 3702–4100) and async (~lines 1655–1845).
  • The async DAG path does not implement branch selection at all, while the sync path does — a direct consequence of the duplication.

Proposed implementation

  1. Identify the pure (no-I/O) phases shared by both lanes: flow lookup and status gating, input-schema validation, input-mapping resolution, context merge rules, output/context-schema validation, StepRecord/ExecutionResult construction, cost reporting, middleware payload assembly.
  2. Extract these into shared helper functions or a small internal ExecutionPlan/ExecutionState object consumed by both lanes.
  3. Keep only the genuinely different parts in each lane: how a tool is invoked (Tool.run vs Tool.run_async) and how steps are scheduled.
  4. Where the sync/async split is irreducible, consider the "sans-io" pattern: a generator-based core that yields "invoke this tool" commands, with thin sync and async drivers.
  5. Add a parity test module that runs the same flows through both lanes and asserts identical ExecutionResult content (modulo timestamps/durations).

Acceptance criteria

  • Shared validation/merge/result logic exists in exactly one place.
  • A parity test suite asserts sync and async lanes produce equivalent results for linear flows, DAG flows, retries, on-error policies, and cancellation.
  • Net line count of the execution layer decreases.
  • All existing tests pass unchanged.

Test plan

  • New parity tests (parameterized over sync/async).
  • Existing tests/test_flow_execution.py and tests/test_executor_async.py pass.
  • Full validation: ruff check, ruff format --check, mypy, pytest.

Migration notes

Not expected to require migration; public APIs are unchanged.

Risks and tradeoffs

  • Sans-io style cores add indirection; if that proves too complex, plain shared helpers still remove most duplication.
  • Best sequenced after (or together with) the FlowExecutor decomposition issue to avoid refactoring the same code twice.

Suggested labels

architecture, refactor, reliability

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions