Skip to content

Define and enforce an explicit concurrency contract for FlowExecutor #336

@dgenio

Description

@dgenio

Summary

Define an explicit, tested concurrency contract for FlowExecutor — and make the few pieces of cross-call shared state either thread-safe or per-run.

Why this matters

The executor's thread-safety story currently lives only in docstrings. Several instance attributes are mutated across a run's lifetime (_middleware, _active_flow_version, _in_replay, _resume_snapshot), so two concurrent calls on the same executor can interleave state. As ChainWeaver is increasingly embedded in servers (MCP FlowServer, future hosts), "one executor, many concurrent requests" is the natural usage pattern — it should either work or fail loudly.

Current evidence

  • executor.py (~line 2192): stream_flow appends a streaming middleware to self._middleware and removes it afterwards; concurrent stream_flow calls on one executor can cross-talk events. The docstring (~lines 502–515) warns about this but nothing enforces it.
  • Instance-level mode flags: _active_flow_version (mutated at the top of every execution and restored during sub-flow recursion), _in_replay (set/restored around replay), _resume_snapshot (set during resume, cleared in finally).
  • InMemoryStepCache (cache.py ~lines 119–146) and InMemoryCheckpointer (checkpoint.py ~lines 138–154) are plain dicts documented as not thread-safe; registry.py _latest has no documented concurrency contract.

Proposed implementation

  1. Decide and document the contract. Recommended: "a single FlowExecutor instance supports concurrent execute_flow/execute_flow_async calls; mutating operations (register_tool, accept_drift, middleware registration) must not run concurrently with executions."
  2. Move per-run state off the instance: bundle _active_flow_version, _in_replay, _resume_snapshot, and the streaming middleware into a per-run context object threaded through the call stack (this pairs naturally with the FlowExecutor decomposition issue).
  3. For stream_flow, compose the event-forwarding middleware into a per-run middleware list instead of mutating self._middleware.
  4. Add a threading.Lock around registry/tool mutation paths, or document them as setup-phase-only and add a debug-mode assertion that detects mutation during active runs.
  5. Mark InMemoryStepCache/InMemoryCheckpointer thread-safety explicitly; add simple locks (cheap, low-contention) so the in-memory reference implementations are safe by default.
  6. Document the contract in the class docstring, AGENTS.md, and docs/agent-context/architecture.md.

Acceptance criteria

  • A documented concurrency contract exists in the FlowExecutor docstring and docs site.
  • A stress test runs N threads × M concurrent execute_flow and stream_flow calls on one executor and asserts: no cross-run event leakage, correct per-run versions, no exceptions.
  • In-memory cache/checkpointer pass a concurrent get/set test.

Test plan

  • New tests/test_executor_concurrency.py with thread-pool stress tests (kept deterministic via barriers, no sleeps).
  • Full validation commands pass; tests stable across the 15-job CI matrix.

Migration notes

Not expected to require migration; this strengthens guarantees without changing public APIs.

Risks and tradeoffs

  • Concurrency tests must be written carefully to avoid flakiness (use events/barriers, generous timeouts).
  • Per-run context threading touches many internal signatures; sequence with the decomposition refactor to avoid double churn.

Suggested labels

reliability, architecture, testing

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions