Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/wayflowcore/source/core/api/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ Events
.. autoclass:: wayflowcore.events.event.ConversationExecutionFinishedEvent
:exclude-members: to_tracing_info

.. _statesnapshotevent:
.. autoclass:: wayflowcore.events.event.StateSnapshotEvent
:exclude-members: to_tracing_info

.. _toolexecutionstartevent:
.. autoclass:: wayflowcore.events.event.ToolExecutionStartEvent
:exclude-members: to_tracing_info
Expand Down
22 changes: 22 additions & 0 deletions docs/wayflowcore/source/core/api/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ Deserialization
.. autofunction:: wayflowcore.serialization.serializer.autodeserialize


Conversation State Snapshots
----------------------------

.. _dumpconversationstate:
.. autofunction:: wayflowcore.serialization.dump_conversation_state

.. _serializeconversationstate:
.. autofunction:: wayflowcore.serialization.serialize_conversation_state

.. _deserializeconversationstate:
.. autofunction:: wayflowcore.serialization.deserialize_conversation_state

.. _loadconversationstate:
.. autofunction:: wayflowcore.serialization.load_conversation_state

.. _deserializeconversation:
.. autofunction:: wayflowcore.serialization.deserialize_conversation

.. _dumpvariablestate:
.. autofunction:: wayflowcore.serialization.dump_variable_state


Plugins
-------

Expand Down
10 changes: 10 additions & 0 deletions docs/wayflowcore/source/core/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ WayFlow |current_version|
New features
^^^^^^^^^^^^

* **State snapshot tracing events:**

Added configurable conversation state snapshots for tracing, with emission at conversation, node, or tool boundaries and bridging into Agent Spec state snapshot events.
Added resumable conversation state serialization so persisted conversations can be restored and continued.
Snapshot emission is covered on both synchronous and asynchronous execution paths, with snapshot ownership currently scoped to the active conversation.

* **OAuth support for MCP Clients:**

MCP Clients now support OAuth-based authorization.
Expand Down Expand Up @@ -62,6 +68,10 @@ Possibly Breaking Changes
Bug fixes
^^^^^^^^^

* **State snapshot test coverage:**

Reduced duplicated flow/agent, sync/async, nested-conversation, internal-turn, serialization, and Agent Spec tracing wrappers in the state snapshot tests, and moved the reusable state snapshot fixtures plus explicit tracing/runtime helpers into shared test helper modules/plugins.

WayFlow 26.1.1
--------------

Expand Down
61 changes: 61 additions & 0 deletions docs/wayflowcore/source/core/howtoguides/howto_tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,67 @@ Here's an example of how to use it in your code.
:start-after: .. start-##_Enable_Agent_Spec_Tracing
:end-before: .. end-##_Enable_Agent_Spec_Tracing

State snapshot events
---------------------

WayFlow can also emit ``StateSnapshotEvent`` payloads at conversation, step, and tool
boundaries by passing a ``StateSnapshotPolicy`` to ``conversation.execute()`` or
``conversation.execute_async()``. When ``AgentSpecEventListener`` is registered and
the installed ``pyagentspec`` version exposes ``StateSnapshotEmitted``, these runtime
snapshots are bridged into Agent Spec ``StateSnapshotEmitted`` events on the
owning conversation/component span. The WayFlow-only ``variable_state`` payload
is not bridged, because Agent Spec does not define WayFlow variable semantics.
When ``include_variable_state=True``, variable values must already be
JSON-serializable.
``StateSnapshotEvent.conversation_id`` is the logical/public conversation id,
while ``state_snapshot["conversation"]["id"]`` identifies the concrete runtime
conversation instance that emitted the snapshot. The lightweight
``state_snapshot["conversation"]`` / ``state_snapshot["execution"]`` sections are
intended for inspection and tracing. Only the root conversation-turn
checkpoints emitted for the conversation passed directly to ``execute()`` /
``execute_async()`` include ``state_snapshot["conversation_state"]``, which is
the authoritative serialized WayFlow conversation blob used for resumability.
Internal tool/node snapshots intentionally omit that serialized blob to stay
lightweight. To restore from a resumable checkpoint, use
``wayflowcore.serialization.deserialize_conversation(...)`` or
``deserialize_conversation_state(...)`` together with
``load_conversation_state(...)``.
Snapshot intervals are cumulative. ``TOOL_TURNS`` includes the
``CONVERSATION_TURNS`` checkpoints, ``NODE_TURNS`` includes the
``CONVERSATION_TURNS`` checkpoints, and ``ALL_INTERNAL_TURNS`` includes all
conversation, tool, and node boundaries. Only the conversation passed directly
to ``execute()`` / ``execute_async()`` emits the authoritative turn-level
resumability checkpoints for that run. Nested child conversations may still
emit internal tracing snapshots, but those child-runtime snapshots are tracing
checkpoints unless and until a stronger contract is introduced.
``CONVERSATION_TURNS`` emits an opening snapshot at execution start and a
closing turn snapshot at the end of the turn. That closing payload is emitted
before the live conversation object commits the new status, but its serialized
payload is synthesized so that after ``execute()`` returns it matches the
committed state seen by the caller. Snapshots are emitted only when the
corresponding boundary event occurs. If a turn is interrupted mid-turn, WayFlow
does not synthesize a turn-end snapshot; the latest already-emitted opening or
internal snapshot is the recovery point.
For flows, ``NODE_TURNS`` uses flow-iteration start/end events, which align with
per-step execution. For agents, the same policy emits snapshots around each
decision-loop iteration. Tool start/end snapshots are emitted only for
``TOOL_TURNS`` and ``ALL_INTERNAL_TURNS``.

.. code-block:: python

from wayflowcore.executors.statesnapshotpolicy import (
StateSnapshotInterval,
StateSnapshotPolicy,
)

status = conversation.execute(
state_snapshot_policy=StateSnapshotPolicy(
state_snapshot_interval=StateSnapshotInterval.CONVERSATION_TURNS,
include_variable_state=True,
extra_state_builder=lambda conv: {"ui": {"active_tab": "plan"}},
)
)


Agent Spec Exporting/Loading
============================
Expand Down
131 changes: 125 additions & 6 deletions wayflowcore/src/wayflowcore/agentspec/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0) or Universal Permissive License
# (UPL) 1.0 (LICENSE-UPL or https://oss.oracle.com/licenses/upl), at your option.
import json
from dataclasses import dataclass
from typing import Dict, Optional, Union, cast

from pyagentspec import Component as AgentSpecComponent
Expand All @@ -25,6 +26,7 @@
from pyagentspec.tracing.events import LlmGenerationResponse as AgentSpecLlmGenerationResponse
from pyagentspec.tracing.events import NodeExecutionEnd as AgentSpecNodeExecutionEnd
from pyagentspec.tracing.events import NodeExecutionStart as AgentSpecNodeExecutionStart
from pyagentspec.tracing.events import StateSnapshotEmitted as AgentSpecStateSnapshotEmitted
from pyagentspec.tracing.events import ToolExecutionRequest as AgentSpecToolExecutionRequest
from pyagentspec.tracing.events import ToolExecutionResponse as AgentSpecToolExecutionResponse
from pyagentspec.tracing.events.llmgeneration import ToolCall as AgentSpecToolCall
Expand All @@ -39,16 +41,19 @@
from wayflowcore._utils.formatting import stringify
from wayflowcore.agentspec import AgentSpecExporter
from wayflowcore.component import Component
from wayflowcore.conversation import Conversation, _get_active_conversations
from wayflowcore.events.event import (
AgentExecutionFinishedEvent,
AgentExecutionStartedEvent,
ConversationMessageStreamChunkEvent,
EndSpanEvent,
Event,
ExceptionRaisedEvent,
FlowExecutionFinishedEvent,
FlowExecutionStartedEvent,
LlmGenerationRequestEvent,
LlmGenerationResponseEvent,
StateSnapshotEvent,
StepInvocationResultEvent,
StepInvocationStartEvent,
ToolExecutionResultEvent,
Expand All @@ -59,6 +64,12 @@
from wayflowcore.tracing.span import LlmGenerationSpan, get_active_span_stack, get_current_span


@dataclass(frozen=True)
class _ConversationSpanOwner:
runtime_conversation_id: str
span: AgentSpecSpan


class AgentSpecEventListener(EventListener):
"""Event listener that emits traces according to the Open Agent Spec Tracing standard"""

Expand All @@ -70,6 +81,12 @@ def __init__(self) -> None:
self.agentspec_exporter: AgentSpecExporter = AgentSpecExporter()
# We keep a registry of conversions, so that we do not repeat the conversion for the same object twice
self.agentspec_components_registry: Dict[str, AgentSpecComponent] = {}
# State snapshots belong to the span that owns their logical
# conversation_id, not necessarily to the runtime span that was active
# when the snapshot event was emitted. Nested flow sub-conversations can
# intentionally reuse the same logical conversation_id, so we also track
# the live runtime conversation id that currently owns that stream.
self._conversation_span_owners: Dict[str, _ConversationSpanOwner] = {}
# Track last assistant message id and a robust mapping tool_request_id -> assistant message id.
# Some providers may emit tool events before final assistant message id is known; we allow
# temporarily missing ids and backfill on LLM response.
Expand All @@ -83,16 +100,94 @@ def _convert_to_agentspec(self, component: Component) -> AgentSpecComponent:
)
return self.agentspec_components_registry[component.id]

def _get_active_wayflow_conversation(self) -> Conversation | None:
active_conversations = _get_active_conversations(return_copy=False)
if not active_conversations:
return None
return active_conversations[-1]

def _register_current_conversation_span(self, agentspec_span: AgentSpecSpan) -> None:
active_conversation = self._get_active_wayflow_conversation()
if active_conversation is None:
return

current_owner = self._conversation_span_owners.get(active_conversation.conversation_id)
if (
current_owner is not None
and current_owner.runtime_conversation_id != active_conversation.id
and current_owner.span.end_time is None
):
return

self._conversation_span_owners[active_conversation.conversation_id] = (
_ConversationSpanOwner(
runtime_conversation_id=active_conversation.id,
span=agentspec_span,
)
)

def _get_snapshot_owner_span(
self,
event: StateSnapshotEvent,
current_agentspec_span: AgentSpecSpan | None,
) -> AgentSpecSpan | None:
# Keep snapshot ownership resolution centralized here. Today we only
# support direct span ownership plus shared-conversation routing for
# nested flows. Future multi-agent tracing can extend this method to
# route snapshots to a dedicated Swarm/ManagerWorkers wrapper span
# without changing the StateSnapshotEvent handling below.
owner = self._conversation_span_owners.get(event.conversation_id)
if owner is not None:
snapshot_conversation = (
event.state_snapshot.get("conversation") if event.state_snapshot else {}
)
snapshot_runtime_conversation_id = (
snapshot_conversation.get("id") if isinstance(snapshot_conversation, dict) else None
)
if snapshot_runtime_conversation_id != owner.runtime_conversation_id:
return None
return owner.span

return current_agentspec_span

def _get_current_conversation_owner(self) -> _ConversationSpanOwner | None:
active_conversation = self._get_active_wayflow_conversation()
if active_conversation is None:
return None
return self._conversation_span_owners.get(active_conversation.conversation_id)

def _span_has_state_snapshot(self, agentspec_span: AgentSpecSpan) -> bool:
return any(
isinstance(span_event, AgentSpecStateSnapshotEmitted)
for span_event in agentspec_span.events
)

def _span_has_execution_end_event(self, agentspec_span: AgentSpecSpan) -> bool:
return any(
isinstance(span_event, (AgentSpecFlowExecutionEnd, AgentSpecAgentExecutionEnd))
for span_event in agentspec_span.events
)

def _end_conversation_span_if_ready(self, agentspec_span: AgentSpecSpan) -> None:
owner = self._get_current_conversation_owner()
if (
owner is not None
and owner.span is agentspec_span
and self._span_has_state_snapshot(agentspec_span)
):
return
agentspec_span.end()

def __call__(self, event: Event) -> None:
# We intercept the wayflow events, and based on the type of event:
# - if it corresponds to a span start event, we create the corresponding agent spec span, and we start it
# - we map the wayflow event to the corresponding agent spec one, and we emit that
# - if it corresponds to a span end event, we retrieve the corresponding agent spec span, and we close it
current_span = get_current_span()
if not current_span:
if current_span is None:
return
current_agentspec_span = self.agentspec_spans_registry.get(current_span.span_id, None)
current_span_name = current_span.name or ""
current_agentspec_span = self.agentspec_spans_registry.get(current_span.span_id, None)
event_name = event.name or ""
match event:
case LlmGenerationRequestEvent():
Expand Down Expand Up @@ -312,6 +407,20 @@ def __call__(self, event: Event) -> None:
)
)
current_agentspec_span.end()
case StateSnapshotEvent():
owner_span = self._get_snapshot_owner_span(event, current_agentspec_span)
if not owner_span:
return
snapshot_event = AgentSpecStateSnapshotEmitted(
id=event.event_id,
name=event_name,
conversation_id=event.conversation_id,
state_snapshot=event.state_snapshot,
extra_state=event.extra_state,
)
owner_span.add_event(snapshot_event)
if owner_span.end_time is None and self._span_has_execution_end_event(owner_span):
owner_span.end()
case FlowExecutionStartedEvent():
# Flow execution starts. Create the new agent spec span, start it, add the event
agentspec_flow = cast(
Expand All @@ -332,8 +441,11 @@ def __call__(self, event: Event) -> None:
inputs={},
)
)
self._register_current_conversation_span(current_agentspec_span)
case FlowExecutionFinishedEvent():
# Flow execution ends. Add the event to the agent spec span and close the span
# Flow execution ends. Add the event to the agent spec span. If this span owns
# the logical conversation checkpoint stream, delay closing until the final
# StateSnapshotEvent is bridged so span processors still see the final snapshot.
if not current_agentspec_span:
return
agentspec_flow = cast(
Expand All @@ -354,7 +466,7 @@ def __call__(self, event: Event) -> None:
branch_selected=branch_selected,
)
)
current_agentspec_span.end()
self._end_conversation_span_if_ready(current_agentspec_span)
case AgentExecutionStartedEvent():
# Agent execution starts. Create the new agent spec span, start it, add the event
agentspec_agent = cast(
Expand All @@ -375,8 +487,11 @@ def __call__(self, event: Event) -> None:
inputs={},
)
)
self._register_current_conversation_span(current_agentspec_span)
case AgentExecutionFinishedEvent():
# Agent execution ends. Add the event to the agent spec span and close the span
# Agent execution ends. Add the event to the agent spec span. If this span owns
# the logical conversation checkpoint stream, delay closing until the final
# StateSnapshotEvent is bridged so span processors still see the final snapshot.
if not current_agentspec_span:
return
agentspec_agent = cast(
Expand All @@ -395,7 +510,7 @@ def __call__(self, event: Event) -> None:
outputs=outputs,
)
)
current_agentspec_span.end()
self._end_conversation_span_if_ready(current_agentspec_span)
case ExceptionRaisedEvent():
if not current_agentspec_span:
return
Expand All @@ -408,3 +523,7 @@ def __call__(self, event: Event) -> None:
exception_stacktrace=str(event.exception.__traceback__),
)
)
case EndSpanEvent():
if not current_agentspec_span or current_agentspec_span.end_time is not None:
return
current_agentspec_span.end()
Loading