Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions agent_assembly/adapters/openai_agents/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ def get_supported_versions(self) -> list[str]:
return [">=1.0.0"]

def is_available(self) -> bool:
"""Check specifically for openai.agents module, not just openai base."""
"""Check for the OpenAI Agents framework's top-level ``agents`` package.

WHY ``agents`` and not ``openai.agents`` (AAASM-3528): the shipped
``openai-agents`` distribution exposes a top-level ``agents`` package;
``openai.agents`` does not exist. Detecting the wrong module made the
adapter report unavailable (or, when patched, a silent no-op).
"""
try:
return importlib.util.find_spec("openai.agents") is not None
return importlib.util.find_spec("agents") is not None
except (ModuleNotFoundError, ValueError):
return False

Expand Down
192 changes: 129 additions & 63 deletions agent_assembly/adapters/openai_agents/patch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
"""OpenAI Agents patch module."""
"""OpenAI Agents patch module.

Installs SDK-layer governance hooks for the OpenAI Agents framework (the
top-level ``agents`` package shipped by ``openai-agents``).

WHY this targets ``agents`` and ``on_invoke_tool`` (AAASM-3528):
The previous revision patched ``openai.agents`` and ``FunctionTool.__call__``.
Neither exists in the shipped framework: the package is the top-level ``agents``
module, and a tool is executed by the runner calling the tool instance's
``on_invoke_tool(ctx, input_json)`` coroutine β€” there is no ``__call__``. Because
that class-level attribute does not exist, the old patch silently never applied,
so every OpenAI-Agents tool call ran ungoverned (fail-open). This module patches
the real interception point.

Interception strategy: ``FunctionTool.on_invoke_tool`` is a *per-instance*
callable field, not a class method, so it cannot be patched once on the class.
Instead we wrap the dataclass ``__init__`` so that every ``FunctionTool``
constructed after ``apply()`` has its ``on_invoke_tool`` coroutine wrapped with
the governance pre-execution check, deny/pending handling, and result recording.
``Handoff.on_invoke_handoff`` is wrapped the same way for spawn-context tracking
and topology edge emission. ``Runner.run`` is a real classmethod and is wrapped
directly.
"""

from __future__ import annotations

Expand All @@ -17,17 +39,20 @@
)
from agent_assembly.core.spawn import _SPAWN_CTX, SpawnContext, spawn_context_scope

_ORIGINAL_FUNCTION_TOOL_CALL = "_agent_assembly_original_openai_agents_function_tool_call"
_ORIGINAL_FUNCTION_TOOL_INIT = "_agent_assembly_original_openai_agents_function_tool_init"
_PATCHED_FLAG = "_agent_assembly_openai_agents_function_tool_patched"
_WRAPPED_INVOKE_FLAG = "_agent_assembly_openai_agents_on_invoke_wrapped"
_ORIGINAL_RUNNER_RUN = "_agent_assembly_original_openai_agents_runner_run"
_RUNNER_PATCHED_FLAG = "_agent_assembly_openai_agents_runner_patched"
_ORIGINAL_HANDOFF_CALL = "_agent_assembly_original_openai_agents_handoff_call"
_ORIGINAL_HANDOFF_INIT = "_agent_assembly_original_openai_agents_handoff_init"
_HANDOFF_PATCHED_FLAG = "_agent_assembly_openai_agents_handoff_patched"
_PROCESS_AGENT_ID: str | None = None
_EDGE_EMITTER: Any = None
_MAX_AUDIT_RESULT_CHARS = 2000
_MAX_DELEGATION_REASON_CHARS = 256
_OPENAI_AGENTS_MODULE = "openai.agents"
# The shipped framework is the top-level ``agents`` package (openai-agents),
# NOT ``openai.agents`` (which does not exist). See AAASM-3528.
_OPENAI_AGENTS_MODULE = "agents"


def set_edge_emitter(emitter: Any) -> None:
Expand All @@ -51,25 +76,25 @@ def apply(self) -> bool:
function_tool_cls = _load_openai_agents_function_tool_class()
if function_tool_cls is None:
return False
_apply_function_tool_call_patch(function_tool_cls, self.callback_handler)
_apply_function_tool_patch(function_tool_cls, self.callback_handler)
runner_cls = _load_openai_agents_runner_class()
if runner_cls is not None:
_apply_runner_run_patch(runner_cls, self.process_agent_id)
handoff_cls = _load_openai_agents_handoff_class()
if handoff_cls is not None:
_apply_handoff_call_patch(handoff_cls, self.process_agent_id)
_apply_handoff_patch(handoff_cls, self.process_agent_id)
return True

def revert(self) -> None:
handoff_cls = _load_openai_agents_handoff_class()
if handoff_cls is not None:
_revert_handoff_call_patch(handoff_cls)
_revert_handoff_patch(handoff_cls)
runner_cls = _load_openai_agents_runner_class()
if runner_cls is not None:
_revert_runner_run_patch(runner_cls)
function_tool_cls = _load_openai_agents_function_tool_class()
if function_tool_cls is not None:
_revert_function_tool_call_patch(function_tool_cls)
_revert_function_tool_patch(function_tool_cls)
set_process_agent_id(None)
return None

Expand Down Expand Up @@ -131,50 +156,71 @@ def _extract_handoff_delegation_reason(handoff_obj: Any) -> str:
return "handoff"


def _apply_handoff_call_patch(handoff_cls: type[Any], process_agent_id: str | None) -> None:
def _apply_handoff_patch(handoff_cls: type[Any], process_agent_id: str | None) -> None:
"""Wrap ``Handoff.__init__`` so each instance's ``on_invoke_handoff`` runs inside
a spawn-context scope and emits a ``delegates_to`` topology edge.

WHY ``__init__`` and not a class method: like ``FunctionTool``, a handoff is
invoked through its per-instance ``on_invoke_handoff`` coroutine; there is no
``Handoff.__call__`` to patch on the class.
"""
if getattr(handoff_cls, _HANDOFF_PATCHED_FLAG, False):
return None

if not callable(handoff_cls):
original_init = handoff_cls.__init__

@wraps(original_init)
def patched_init(self: Any, *args: Any, **kwargs: Any) -> None:
original_init(self, *args, **kwargs)
_wrap_on_invoke_handoff(self, process_agent_id)

setattr(handoff_cls, _ORIGINAL_HANDOFF_INIT, original_init)
handoff_cls.__init__ = patched_init
setattr(handoff_cls, _HANDOFF_PATCHED_FLAG, True)
return None


def _wrap_on_invoke_handoff(handoff_obj: Any, process_agent_id: str | None) -> None:
original_invoke = getattr(handoff_obj, "on_invoke_handoff", None)
if not callable(original_invoke):
return None
if getattr(original_invoke, _WRAPPED_INVOKE_FLAG, False):
return None
original_call = handoff_cls.__call__

@wraps(original_call)
async def patched_call(self: Any, *args: Any, **kwargs: Any) -> Any:
@wraps(original_invoke)
async def wrapped_invoke(ctx: Any, input_json: Any) -> Any:
spawn_ctx = SpawnContext(
parent_agent_id=process_agent_id or "",
depth=_current_spawn_depth(),
spawned_by_tool=None,
delegation_reason=_extract_handoff_delegation_reason(self),
delegation_reason=_extract_handoff_delegation_reason(handoff_obj),
)
with spawn_context_scope(spawn_ctx):
result = original_call(self, *args, **kwargs)
result = original_invoke(ctx, input_json)
if inspect.isawaitable(result):
result = await result

# Emit a DelegatesTo edge from the delegating agent to the handoff target.
if _EDGE_EMITTER is not None and process_agent_id:
target_id = getattr(self, "agent_name", None) or getattr(self, "name", None) or "unknown"
target_id = getattr(handoff_obj, "agent_name", None) or getattr(handoff_obj, "name", None) or "unknown"
emit = getattr(_EDGE_EMITTER, "emit", None)
if callable(emit):
reason = _extract_handoff_delegation_reason(self)
reason = _extract_handoff_delegation_reason(handoff_obj)
emit(process_agent_id, str(target_id), "delegates_to", {"reason": reason})

return result

setattr(handoff_cls, _ORIGINAL_HANDOFF_CALL, original_call)
handoff_cls.__call__ = patched_call
setattr(handoff_cls, _HANDOFF_PATCHED_FLAG, True)
setattr(wrapped_invoke, _WRAPPED_INVOKE_FLAG, True)
handoff_obj.on_invoke_handoff = wrapped_invoke
return None


def _revert_handoff_call_patch(handoff_cls: type[Any]) -> None:
def _revert_handoff_patch(handoff_cls: type[Any]) -> None:
if not getattr(handoff_cls, _HANDOFF_PATCHED_FLAG, False):
return None
original_call = getattr(handoff_cls, _ORIGINAL_HANDOFF_CALL, None)
if callable(original_call):
handoff_cls.__call__ = original_call
for attr in (_ORIGINAL_HANDOFF_CALL, _HANDOFF_PATCHED_FLAG):
original_init = getattr(handoff_cls, _ORIGINAL_HANDOFF_INIT, None)
if callable(original_init):
handoff_cls.__init__ = original_init
for attr in (_ORIGINAL_HANDOFF_INIT, _HANDOFF_PATCHED_FLAG):
if hasattr(handoff_cls, attr):
delattr(handoff_cls, attr)
return None
Expand Down Expand Up @@ -250,6 +296,11 @@ def _resolve_agent_id(ctx: Any) -> str | None:
candidate = getattr(ctx, "agent_id", None)
if isinstance(candidate, str) and candidate:
return candidate
agent = getattr(ctx, "agent", None)
if agent is not None:
name = getattr(agent, "name", None)
if isinstance(name, str) and name:
return name
return _get_process_agent_id()


Expand Down Expand Up @@ -324,31 +375,23 @@ async def _wait_for_async_tool_approval(
return result


def _build_tool_result_error(
def _build_tool_deny_error(
*,
tool_name: str,
reason: str | None,
is_pending_rejection: bool,
) -> object:
try:
module = importlib.import_module(_OPENAI_AGENTS_MODULE)
except ImportError:
module = None

tool_result_cls = getattr(module, "ToolResult", None) if module is not None else None
) -> str:
"""Build the error string returned to the model when a tool call is denied.

WHY a string: the framework's ``on_invoke_tool`` contract states that returning
a string error message sends the error back to the LLM (instead of raising,
which fails the whole run). Denying by returning a governance error string
blocks execution without aborting the agent.
"""
reason_text = reason or "No reason provided."
if is_pending_rejection:
error_message = f"Approval denied for tool '{tool_name}': {reason_text}"
else:
error_message = f"Action blocked by governance policy for tool '{tool_name}': {reason_text}"

if isinstance(tool_result_cls, type):
try:
return tool_result_cls(error=error_message)
except Exception:
pass

return {"error": error_message}
return f"Approval denied for tool '{tool_name}': {reason_text}"
return f"Action blocked by governance policy for tool '{tool_name}': {reason_text}"


def _truncate_result_for_audit(result: object) -> str:
Expand Down Expand Up @@ -396,20 +439,43 @@ def _is_governance_error(error: Exception) -> bool:
return True


def _apply_function_tool_call_patch(function_tool_cls: type[Any], callback_handler: Any) -> None:
def _apply_function_tool_patch(function_tool_cls: type[Any], callback_handler: Any) -> None:
"""Patch ``FunctionTool.__init__`` so every tool instance is governed.

Each ``FunctionTool`` carries its execution logic in the per-instance
``on_invoke_tool`` coroutine that the runner calls. We wrap that coroutine at
construction time with the governance pre-execution check; this is the actual
interception point in the shipped framework (AAASM-3528).
"""
if getattr(function_tool_cls, _PATCHED_FLAG, False):
return None

if not callable(function_tool_cls):
original_init = function_tool_cls.__init__

@wraps(original_init)
def patched_init(self: Any, *args: Any, **kwargs: Any) -> None:
original_init(self, *args, **kwargs)
_wrap_on_invoke_tool(self, callback_handler)

setattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_INIT, original_init)
function_tool_cls.__init__ = patched_init
setattr(function_tool_cls, _PATCHED_FLAG, True)
return None


def _wrap_on_invoke_tool(tool_obj: Any, callback_handler: Any) -> None:
"""Wrap a single tool instance's ``on_invoke_tool`` coroutine with governance."""
original_invoke = getattr(tool_obj, "on_invoke_tool", None)
if not callable(original_invoke):
return None
if getattr(original_invoke, _WRAPPED_INVOKE_FLAG, False):
return None
original_call = function_tool_cls.__call__

@wraps(original_call)
async def patched_call(self: Any, ctx: Any, tool_input: Any, *args: Any, **kwargs: Any) -> Any:
tool_name = str(getattr(self, "name", self.__class__.__name__))
@wraps(original_invoke)
async def governed_invoke(ctx: Any, tool_input: Any) -> Any:
tool_name = str(getattr(tool_obj, "name", tool_obj.__class__.__name__))
agent_id = _resolve_agent_id(ctx)

decision: object = {"status": "allow"}
governance_failed = False
try:
decision = await _invoke_async_tool_check(
Expand All @@ -435,7 +501,7 @@ async def patched_call(self: Any, ctx: Any, tool_input: Any, *args: Any, **kwarg
status, reason = _normalize_decision(final_decision)

if status == "deny":
blocked_result = _build_tool_result_error(
blocked_result = _build_tool_deny_error(
tool_name=tool_name,
reason=reason,
is_pending_rejection=is_pending_flow,
Expand All @@ -454,7 +520,7 @@ async def patched_call(self: Any, ctx: Any, tool_input: Any, *args: Any, **kwarg
if not governance_failed:
raise

result = original_call(self, ctx, tool_input, *args, **kwargs)
result = original_invoke(ctx, tool_input)
if inspect.isawaitable(result):
result = await result

Expand All @@ -469,20 +535,20 @@ async def patched_call(self: Any, ctx: Any, tool_input: Any, *args: Any, **kwarg
)
return result

setattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL, original_call)
function_tool_cls.__call__ = patched_call
setattr(function_tool_cls, _PATCHED_FLAG, True)
setattr(governed_invoke, _WRAPPED_INVOKE_FLAG, True)
tool_obj.on_invoke_tool = governed_invoke
return None


def _revert_function_tool_call_patch(function_tool_cls: type[Any]) -> None:
def _revert_function_tool_patch(function_tool_cls: type[Any]) -> None:
if not getattr(function_tool_cls, _PATCHED_FLAG, False):
return None

original_call = getattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL, None)
if callable(original_call):
function_tool_cls.__call__ = original_call
original_init = getattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_INIT, None)
if callable(original_init):
function_tool_cls.__init__ = original_init

if hasattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL):
delattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_CALL)
if hasattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_INIT):
delattr(function_tool_cls, _ORIGINAL_FUNCTION_TOOL_INIT)
if hasattr(function_tool_cls, _PATCHED_FLAG):
delattr(function_tool_cls, _PATCHED_FLAG)
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ test = [
# installs `dev`), so the function-tool governance regression is actually
# exercised instead of skipped.
"pydantic-ai>=0.3.0",
# AAASM-3528: dev/test-only (NOT a runtime dependency). The shipped OpenAI
# Agents framework is the top-level `agents` package (NOT `openai.agents`),
# and a tool runs via its per-instance `on_invoke_tool` coroutine (NOT a
# `FunctionTool.__call__`). Installing it lets the `importorskip`-guarded
# integration test drive a real tool call, so a regression to the old
# fail-open no-op patch is actually caught instead of silently skipped.
"openai-agents>=0.1.0",
]
pre-commit-ci = [
"pre-commit>=3.5.0,<5",
Expand Down
Loading
Loading