Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker — documentation governance.

Adding chainweaver/step_index.py is a module-add. AGENTS.md requires add/remove/rename-module changes to update the repo-map tree and docs/agent-context/architecture.md in the same PR. Neither was updated (Codex flagged this too).

Please add a repo-map line, e.g.:

├── step_index.py      FLOW_INPUT_STEP_INDEX + flow_output_step_index(flow): named
                       sentinels for flow-level validation StepRecords (#339)

and a matching module-boundary entry in architecture.md. The StepRecord table update on L283 is correct — this is only about the repo map + architecture doc.


Generated by Claude Code

Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ integration.

| Field | Type | Meaning |
|-------|------|---------|
| `step_index` | `int` | Zero-based position (`-1` = flow-input validation, `len(steps)` = flow-output validation). |
| `step_index` | `int` | Zero-based position (`FLOW_INPUT_STEP_INDEX` = flow-input validation, `flow_output_step_index(flow)` = flow-output/context validation). |
| `tool_name` | `str` | Tool invoked (or flow name for validation records). |
| `inputs` | `dict` | Validated inputs passed to the tool. |
| `outputs` | `dict \| None` | Validated outputs, or `None` on failure. |
Expand Down
4 changes: 4 additions & 0 deletions chainweaver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
FlowBuilder, FlowRegistry, FlowExecutor, RetryPolicy,
ExecutionPlan, ExecutionResult, ReplayMode, ReplayResult,
StepDiff, StepPlan, StepRecord,
FLOW_INPUT_STEP_INDEX, flow_output_step_index,
RedactionPolicy, TraceRecorder, ObservedStep, ObservedTrace,
CostProfile, CostReport, PriceSnap, PROVIDER_PRICES, lookup_price,
validate_dag_topology,
Expand Down Expand Up @@ -191,6 +192,7 @@
ServiceMetrics,
ServiceProposal,
)
from chainweaver.step_index import FLOW_INPUT_STEP_INDEX, flow_output_step_index
from chainweaver.storage import FileStore, InMemoryStore, RegistryStore
from chainweaver.testing.replay import FixtureStaleError
from chainweaver.tools import Tool
Expand Down Expand Up @@ -233,6 +235,7 @@

__all__ = [
"BUILTIN_PROPERTIES",
"FLOW_INPUT_STEP_INDEX",
"PROVIDER_PRICES",
"AgentTraceEvent",
"AgentTraceImportError",
Expand Down Expand Up @@ -374,6 +377,7 @@
"flow_from_dict",
"flow_from_json",
"flow_from_yaml",
"flow_output_step_index",
"flow_schema_json",
"flow_to_ascii",
"flow_to_dict",
Expand Down
5 changes: 3 additions & 2 deletions chainweaver/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pydantic.fields import FieldInfo

from chainweaver.flow import Flow
from chainweaver.step_index import flow_output_step_index
from chainweaver.tools import Tool

# Types considered numeric for widening compatibility.
Expand Down Expand Up @@ -293,7 +294,7 @@ def compile_flow(flow: Flow, tools: dict[str, Tool]) -> CompilationResult:
if name not in context_fields:
errors.append(
CompilationError(
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
tool_name=flow.name,
field_name=name,
issue_type="output_schema_gap",
Expand All @@ -309,7 +310,7 @@ def compile_flow(flow: Flow, tools: dict[str, Tool]) -> CompilationResult:
if not _types_compatible(actual_type, expected_type):
errors.append(
CompilationError(
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
tool_name=flow.name,
field_name=name,
issue_type="output_type_mismatch",
Expand Down
47 changes: 25 additions & 22 deletions chainweaver/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
)
from chainweaver.observation import TraceRecorder
from chainweaver.registry import AnyFlow, FlowRegistry
from chainweaver.step_index import FLOW_INPUT_STEP_INDEX, flow_output_step_index
from chainweaver.tools import Tool

_logger = get_logger("chainweaver.executor")
Expand Down Expand Up @@ -336,8 +337,9 @@ class StepRecord(BaseModel):
step_index: Position of this record in the flow. For normal steps
this is the zero-based step index. Two sentinel values are
used for flow-level schema validation:
``-1`` — input validation (before any step runs),
``len(steps)`` — output validation (after all steps complete).
``FLOW_INPUT_STEP_INDEX`` — input validation (before any step
runs), and ``flow_output_step_index(flow)`` — output validation
(after all steps complete).
tool_name: Name of the tool that was invoked (or the flow name for
flow-level validation records).
inputs: The validated inputs that were passed to the tool.
Expand Down Expand Up @@ -423,10 +425,10 @@ class ExecutionResult(BaseModel):
one entry per executed tool step. When ``input_schema`` or
``output_schema`` is set on the flow and the corresponding
validation **fails**, a synthetic record is appended carrying
the validation error (``step_index == -1`` for input failures,
``step_index == len(steps)`` for output failures); successful
validations do not produce records, so the log is unchanged
on the happy path.
the validation error (``FLOW_INPUT_STEP_INDEX`` for input
failures, ``flow_output_step_index(flow)`` for output failures);
successful validations do not produce records, so the log is
unchanged on the happy path.
trace_id: UUID4 hex string assigned at the start of the execution.
Use this to correlate the result with logs or external systems.
started_at: UTC timestamp when the execution began.
Expand Down Expand Up @@ -1234,7 +1236,7 @@ def execute_flow(
flow_name=flow_name,
payload=initial_input,
schema=flow.input_schema,
step_index=-1,
step_index=FLOW_INPUT_STEP_INDEX,
context_label="flow_input",
)
if validation_record is not None:
Expand Down Expand Up @@ -1349,7 +1351,7 @@ def execute_flow(
flow_name=flow_name,
payload=context,
schema=flow.output_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_output",
)
if validation_record is not None:
Expand All @@ -1376,7 +1378,7 @@ def execute_flow(
flow_name=flow_name,
payload=context,
schema=flow.context_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_context",
)
if context_record is not None:
Expand Down Expand Up @@ -1570,7 +1572,7 @@ async def _execute_linear_flow_async(
flow_name=flow_name,
payload=initial_input,
schema=flow.input_schema,
step_index=-1,
step_index=FLOW_INPUT_STEP_INDEX,
context_label="flow_input",
)
if validation_record is not None:
Expand Down Expand Up @@ -1625,7 +1627,7 @@ async def _execute_linear_flow_async(
flow_name=flow_name,
payload=context,
schema=flow.output_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_output",
)
if validation_record is not None:
Expand Down Expand Up @@ -1689,7 +1691,7 @@ async def _execute_dag_flow_async(
flow_name=flow.name,
payload=initial_input,
schema=flow.input_schema,
step_index=-1,
step_index=FLOW_INPUT_STEP_INDEX,
context_label="flow_input",
)
if validation_record is not None:
Expand Down Expand Up @@ -1817,7 +1819,7 @@ async def _execute_dag_flow_async(
flow_name=flow.name,
payload=context,
schema=flow.output_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_output",
)
if validation_record is not None:
Expand Down Expand Up @@ -2402,10 +2404,11 @@ def _make_result(
Args:
tool_step_count: Number of *tool* step records in
``execution_log`` (excluding the synthetic flow-level
schema-validation records that may carry ``step_index ==
-1`` or ``step_index == len(steps)``). Used to compute
``cost_report.llm_calls_avoided`` so validation records
don't inflate the estimate. When ``None`` (the default),
schema-validation records that may carry
``FLOW_INPUT_STEP_INDEX`` or ``flow_output_step_index(flow)``).
Used to compute ``cost_report.llm_calls_avoided`` so
validation records don't inflate the estimate. When ``None``
(the default),
falls back to ``len(execution_log)`` for callers that do
not append validation records. Composed sub-flow steps
(issue #75) are expanded to their nested tool invocations
Expand Down Expand Up @@ -2676,7 +2679,7 @@ def _resume_linear_flow(
flow_name=flow_name,
payload=context,
schema=flow.output_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_output",
)
if validation_record is not None:
Expand All @@ -2698,7 +2701,7 @@ def _resume_linear_flow(
flow_name=flow_name,
payload=context,
schema=flow.context_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_context",
)
if context_record is not None:
Expand Down Expand Up @@ -3765,7 +3768,7 @@ def _execute_dag_flow(
flow_name=flow.name,
payload=initial_input,
schema=flow.input_schema,
step_index=-1,
step_index=FLOW_INPUT_STEP_INDEX,
context_label="flow_input",
)
if validation_record is not None:
Expand Down Expand Up @@ -4147,7 +4150,7 @@ def _execute_dag_flow(
flow_name=flow.name,
payload=context,
schema=flow.output_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_output",
)
if validation_record is not None:
Expand All @@ -4174,7 +4177,7 @@ def _execute_dag_flow(
flow_name=flow.name,
payload=context,
schema=flow.context_schema,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
context_label="flow_context",
)
if context_record is not None:
Expand Down
5 changes: 3 additions & 2 deletions chainweaver/export/callable.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from chainweaver.exceptions import FlowExecutionError
from chainweaver.export._schema import derive_flow_input_schema
from chainweaver.step_index import FLOW_INPUT_STEP_INDEX, flow_output_step_index

if TYPE_CHECKING: # pragma: no cover — type-only references
from chainweaver.executor import FlowExecutor
Expand Down Expand Up @@ -78,7 +79,7 @@ def _call(raw_inputs: dict[str, Any]) -> dict[str, Any]:
failed = next((r for r in result.execution_log if not r.success), None)
if failed is None:
detail = "Flow execution failed without recording a failing step."
step_index = -1
step_index = FLOW_INPUT_STEP_INDEX
tool_name = flow_name
else:
detail = failed.error_message or failed.error_type or "Unknown error."
Expand All @@ -88,7 +89,7 @@ def _call(raw_inputs: dict[str, Any]) -> dict[str, Any]:
if result.final_output is None:
raise FlowExecutionError(
tool_name=flow_name,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
detail="Flow reported success but produced no final output.",
)
return result.final_output
Expand Down
3 changes: 2 additions & 1 deletion chainweaver/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from chainweaver.contracts import SideEffectLevel, ToolSafetyContract, merge_safety
from chainweaver.exceptions import FlowExecutionError
from chainweaver.flow import DAGFlow, Flow, FlowLifecycle
from chainweaver.step_index import FLOW_INPUT_STEP_INDEX

try: # Optional dependency.
from fastmcp import FastMCP
Expand Down Expand Up @@ -328,7 +329,7 @@ async def _dispatcher(**kwargs: Any) -> dict[str, Any]:
if last is not None and last.error_type is not None
else "flow execution failed without recorded step error"
)
raise FlowExecutionError(flow_name, -1, detail)
raise FlowExecutionError(flow_name, FLOW_INPUT_STEP_INDEX, detail)
if output_schema is not None and result.final_output is not None:
validated_out = output_schema.model_validate(result.final_output)
return validated_out.model_dump()
Expand Down
19 changes: 19 additions & 0 deletions chainweaver/step_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Named step-index sentinels for flow-level validation records."""

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Document the new step-index module

Adding chainweaver/step_index.py is an add-module architecture change, but the AGENTS.md repo map still omits it and docs/agent-context/architecture.md has no module-boundary entry for the new public helper. AGENTS.md §10 explicitly requires add/remove/rename modules to update both the AGENTS.md repo map and architecture.md in the same PR, so leaving these stale breaks the repository’s documented source of truth for future agents.

Useful? React with 👍 / 👎.


from __future__ import annotations

from collections.abc import Sized
from typing import Protocol


class _FlowLike(Protocol):
steps: Sized
Comment on lines +9 to +10

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker — fails mypy (12 errors in CI).

The _FlowLike Protocol declares steps: Sized as a mutable attribute, which mypy checks invariantly — so Flow.steps: list[FlowStep] (a subtype of Sized, not an exact match) does not satisfy the protocol. This breaks all six flow_output_step_index(flow) call sites, violating the "must pass mypy" invariant.

Fix — make the member read-only so it's covariant:

class _FlowLike(Protocol):
    @property
    def steps(self) -> Sized: ...

Verified: this clears all 12 errors (Success: no issues found). Suggest adding mypy to your local verification — the PR description ran ruff + pytest but not mypy, which is why this slipped through.


Generated by Claude Code



# Synthetic record emitted when flow input validation fails before step 0.
FLOW_INPUT_STEP_INDEX = -1


def flow_output_step_index(flow: _FlowLike) -> int:
"""Return the synthetic index used after the final flow step."""
return len(flow.steps)
11 changes: 5 additions & 6 deletions chainweaver/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
ToolTimeoutError,
)
from chainweaver.flow import DAGFlow, DAGFlowStep, Flow, FlowStep
from chainweaver.step_index import FLOW_INPUT_STEP_INDEX, flow_output_step_index

if TYPE_CHECKING:
from chainweaver.executor import FlowExecutor
Expand Down Expand Up @@ -507,7 +508,7 @@ def _flow_fn(validated_input: BaseModel) -> dict[str, Any]:
failed = next((r for r in result.execution_log if not r.success), None)
if failed is None:
detail = "Flow execution failed without recording a failing step."
step_index = -1
step_index = FLOW_INPUT_STEP_INDEX
else:
detail = failed.error_message or failed.error_type or "Unknown error."
step_index = failed.step_index
Expand All @@ -516,13 +517,11 @@ def _flow_fn(validated_input: BaseModel) -> dict[str, Any]:
# Defensive: a successful run should always have a final_output,
# but the executor's contract allows None on failure paths and
# this is the only place the closure can guarantee non-None.
# Use ``len(flow.steps)`` (the flow-output validation sentinel
# per AGENTS.md §5 StepRecord) — this anomaly is a flow-output
# contract violation, not a flow-input validation failure
# (which is what ``step_index=-1`` would denote).
# Use the flow-output validation sentinel per AGENTS.md
# StepRecord: this anomaly is a flow-output contract violation.
raise FlowExecutionError(
tool_name=tool_name,
step_index=len(flow.steps),
step_index=flow_output_step_index(flow),
detail="Flow reported success but produced no final output.",
)
return result.final_output
Expand Down
13 changes: 13 additions & 0 deletions tests/fixtures/public_api.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"ExecutionPlan",
"ExecutionResult",
"ExecutionSnapshot",
"FLOW_INPUT_STEP_INDEX",
"FaultConfig",
"FileCheckpointer",
"FileStepCache",
Expand Down Expand Up @@ -142,6 +143,7 @@
"flow_from_dict",
"flow_from_json",
"flow_from_yaml",
"flow_output_step_index",
"flow_schema_json",
"flow_to_ascii",
"flow_to_dict",
Expand Down Expand Up @@ -559,6 +561,11 @@
"module": "chainweaver.checkpoint",
"qualname": "ExecutionSnapshot"
},
"FLOW_INPUT_STEP_INDEX": {
"kind": "int",
"module": null,
"qualname": null
},
"FaultConfig": {
"kind": "class",
"module": "chainweaver.fuzz",
Expand Down Expand Up @@ -1370,6 +1377,12 @@
"qualname": "flow_from_yaml",
"signature": "(data: str) -> AnyFlow"
},
"flow_output_step_index": {
"kind": "function",
"module": "chainweaver.step_index",
"qualname": "flow_output_step_index",
"signature": "(flow: _FlowLike) -> int"
},
"flow_schema_json": {
"kind": "function",
"module": "chainweaver.schemas",
Expand Down
14 changes: 11 additions & 3 deletions tests/test_data_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@

from pydantic import BaseModel

from chainweaver import Flow, FlowExecutor, FlowRegistry, FlowStep, Tool
from chainweaver import (
FLOW_INPUT_STEP_INDEX,
Flow,
FlowExecutor,
FlowRegistry,
FlowStep,
Tool,
flow_output_step_index,
)


class NumberInput(BaseModel):
Expand Down Expand Up @@ -186,7 +194,7 @@ def test_schema_validated_execution_context() -> None:
input_result = input_executor.execute_flow("input_validated", {"number": "bad"})

assert not input_result.success
assert input_result.execution_log[0].step_index == -1
assert input_result.execution_log[0].step_index == FLOW_INPUT_STEP_INDEX
assert input_result.execution_log[0].error_type == "SchemaValidationError"

output_validated = Flow(
Expand All @@ -203,5 +211,5 @@ def test_schema_validated_execution_context() -> None:
output_result = output_executor.execute_flow("output_validated", {"number": 7})

assert not output_result.success
assert output_result.execution_log[-1].step_index == len(output_validated.steps)
assert output_result.execution_log[-1].step_index == flow_output_step_index(output_validated)
assert output_result.execution_log[-1].error_type == "SchemaValidationError"
Loading
Loading