[rollout, tool] feat: add experimental agent framework and gateway runtime#6299
[rollout, tool] feat: add experimental agent framework and gateway runtime#6299zackcxb wants to merge 10 commits into
Conversation
…ntime -- make the new public agent surface own session orchestration and backend routing This reconstructs the framework and gateway surface directly on upstream/main, without carrying the old experimental-agent migration history. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… boundary, replay output API, and tutorial path reviewable These tests exercise the framework/gateway contract on CPU and keep the minimal tutorial aligned with the unified generate_sequences entry. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…llouts -- FSDP no-padding workers require readback tensors to keep jagged semantics The upstream TransferQueue readback can densify equal-shaped sequence fields; the gateway sync smoke still needs an opt-in bridge to normalize token fields before compute_log_prob. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
|
There was a problem hiding this comment.
Code Review
This pull request introduces the verl.agent package, establishing a framework and an OpenAI-compatible gateway for agent-based reinforcement learning. Key additions include the OpenAICompatibleAgentFramework for session orchestration and trajectory scoring, and the GatewayActor for managing agent interactions and trajectory collection. The PR also includes TransferQueue utilities to ensure nested tensor consistency and a comprehensive suite of CPU-only unit tests. Review feedback identified a critical issue in the gateway where response_logprobs could become misaligned with response_ids if the backend returns inconsistent logprob data, potentially causing validation failures during trajectory assembly.
| if output.log_probs is not None: | ||
| active_trajectory.response_logprobs.extend(list(output.log_probs)) |
There was a problem hiding this comment.
The response_logprobs list in TrajectoryBuffer can become misaligned with response_ids if the backend (rollout server) does not consistently return logprobs for every turn, or if logprobs tracking starts after the first turn.
Specifically:
- If Turn 1 returns no logprobs but Turn 2 does,
response_logprobswill only contain Turn 2's logprobs, whileresponse_idscontains tokens from both turns. - If Turn 1 had logprobs but Turn 2 returns
None,response_logprobswill not be extended for the new tokens, causing a length mismatch.
This mismatch will cause a ValueError in validate_trajectory during assembly. The gateway should ensure length alignment by padding with zeros when logprobs are partially missing or when tracking begins mid-session.
if output.log_probs is not None:
# Ensure alignment if we just started receiving logprobs for this trajectory
if not active_trajectory.response_logprobs and active_trajectory.response_ids:
active_trajectory.response_logprobs = [0.0] * len(active_trajectory.response_ids)
active_trajectory.response_logprobs.extend(list(output.log_probs))
elif active_trajectory.response_logprobs:
# Pad with zeros if we were tracking logprobs but this turn has none
active_trajectory.response_logprobs.extend([0.0] * len(response_ids))
Per-sample agent dispatch and compatibility with existing agent loopsFirst, thanks for this PR — the gateway + framework design is clean and the TQ output alignment means downstream trainers can adopt it with minimal changes. A question about agent compatibility: The issueIn the current # Current behavior: each sample picks its agent loop
batch.non_tensor_batch["agent_name"] = np.array([
"single_turn_agent", # sample 0: math
"tool_agent", # sample 1: coding
...
])In What's already in placeThe framework already has def _runner_kwargs_for_sample(self, sample_fields):
kwargs = {}
if "tools_kwargs" in sample_fields:
kwargs["tools_kwargs"] = sample_fields["tools_kwargs"]
if "agent_name" in sample_fields:
kwargs["agent_name"] = sample_fields["agent_name"]
return kwargsWith that, a thin dispatcher wrapper ( QuestionAre there plans to adapt the existing |
| session.request_tools = tools | ||
| self._touch_session(session) | ||
|
|
||
| return JSONResponse( |
There was a problem hiding this comment.
Hope to add compatibility of stream mode chat completion for the openclaw.
There was a problem hiding this comment.
Thanks for noticing the agent_name problem, I’ll resolve it soon.
We know stream output support is needed, yet it won’t be added in this initial PR.
Do you have existing use cases requiring seamless fully online RL with no impact on user experience?
There was a problem hiding this comment.
Thanks for the quick response!
On agent dispatch — looking forward to the update, happy to help test when ready.
On stream mode — completely understandable this sits outside the initial PR. OpenClaw doesn't currently support disabling streaming via config; doing so would require non-trivial changes to the upstream pi-agent dependency. We've cherry-picked this PR onto our branch and are doing light adaptation + experiments on our side. Stream support can be a follow-up and we'd be glad to collaborate on it then.
There was a problem hiding this comment.
Sounds great! I have resolved the agent_name issue and also completed a series of other code refactors. Hopefully these adjustments won’t disrupt your established workflow.
We recognize the unique requirements of online RL scenarios and intend to add streaming mode support in future iterations. You are more than welcome to submit related contributions once this initial PR is finalized and merged.
Additionally, I’d like to mention that we plan to gradually migrate this PR to the uni-agent repository instead of keeping it within VERL: verl-project/uni-agent#25
Since the Gateway + Framework works as an independent standalone module, this migration should require little extra adaptation work.
Remove the unused padded assembler surface and trajectory identity placeholders, then make OpenAICompatibleAgentFramework satisfy the sync trainer rollout-manager contract directly. Add the transitional build_agent_framework entry factory so recipe adapters can shrink without adding a RolloutManager wrapper or hydra namespace fallback. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…traction Switch framework reward path to direct RewardLoopWorker dispatch with score-last + broadcast strategy, matching legacy AgentLoopWorkerTQ. Delete _build_reward_dispatcher / RewardFn / SessionRewardContext — extension via subclass override of _score_trajectories. - entry.py: self-load HFModelConfig; promote adapter from recipe to core - framework.py: inline _score_trajectories with reward_loop_worker_handles - types.py: add complete_session to SessionRuntime Protocol - helpers.py: delete (dead code after Phase A cleanup) - __init__.py: remove stale re-exports Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…arse tolerance - runtime.py: spread GatewayActors across CPU nodes via NodeAffinitySchedulingStrategy round-robin (mirrors AgentLoopWorker) - runtime.py + manager.py: expose complete_session for explicit session lifecycle signaling - gateway.py: add _FINISH_REASON_MAP normalizing vLLM stop reasons to OpenAI spec values; document vLLM information loss - gateway.py: loosen _validate_tools to accept non-dict tool schemas - gateway.py: pass parsed_tools to extract_tool_calls with tolerant pydantic parse (try/except fallback to None) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…ool parse - test_generate_sequences: migrate from reward_fn lambda to monkeypatch pattern; add score-last broadcast + no-handles tests - test_gateway_actor: parametrize finish_reason normalization; add tool parse tolerance tests - test_session_runtime: add round-robin placement assertion - support.py: delete (inlined into individual test files) - minimal_e2e.py: align with entry.py adapter promotion Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…sion concurrency cap Always write rollout_log_probs and rm_scores (zero-filled when absent) so the sync trainer's select_fields never hits a missing field across a mixed batch where some sessions lack logprobs/reward. Without this, bypass-mode _compute_old_log_prob KeyErrors on rollout_log_probs. Add opt-in max_concurrent_sessions (0 = unlimited) backed by a lazy-initialized asyncio.Semaphore that rebinds if the running loop changes, since Ray actors may run sessions on a different loop than __init__. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…ferences Add docs/advance/agent_framework.rst covering architecture, components, agent runner authoring guide, and configuration reference. Link from docs/start/agentic_rl.rst and docs/index.rst toctree. Add examples/grpo_trainer/run_deepeyes_gateway_grpo.sh as a parameterized training script following the existing run_qwen2_5_vl_7b_fsdp.sh pattern. Update tutorial README to reflect current architecture (reward dispatch, zero-fill fields, session concurrency). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
The Agent Gateway data model is a 1:1 mapping: one session = one linear conversation = one trajectory. All state is stored as singleton fields on @dataclass
class GatewaySessionState:
message_history: list[dict] = field(...) # single conversation
active_trajectory: TrajectoryBuffer | None = None # single token buffer
trajectories: list[Trajectory] = field(...) # sequential historyIn a sub-agent scenario, the parent agent invokes multiple tools that each run independent conversations through the same session's Limitation 1: Shared
|
| # | Change | Location |
|---|---|---|
| 1 | Add SubContext holding message_history, active_trajectory, request_tools, image_data, video_data |
types.py |
| 2 | Replace singleton fields in GatewaySessionState with sub_contexts: list[SubContext] + `current_sub: SubContext |
None` for backward compatibility |
| 3 | In _handle_chat_completions, prefix-match incoming messages against all SubContexts; pick the best match (longest prefix) |
gateway.py |
| 4 | On finalization, materialize and collect trajectories from all sub-contexts | gateway.py |
The generation_lock and request serialization remain as-is — sequential state mutation is fine. The key insight is that the gateway can match each incoming request to the correct conversation branch internally, without any API-level change.
当前 Agent Gateway 的数据模型是 1:1 映射:一个 session = 一条线性对话 = 一个 trajectory。所有状态以单例字段形式存储于 GatewaySessionState:
@dataclass
class GatewaySessionState:
message_history: list[dict] = field(...) # 单条对话
active_trajectory: TrajectoryBuffer | None = None # 单个 token 缓冲区
trajectories: list[Trajectory] = field(...) # 顺序历史在 sub-agent 场景中,主 agent 调用多个工具,每个工具通过同一个 session 的 /v1/chat/completions 端点展开独立对话。由于所有子 agent 共享同一份单例状态,gateway 无法为每个子对话维护正确、隔离的轨迹。
限制 1:共享 message_history 导致上下文交叉污染
子 agent A 更新 session.message_history 后,子 agent B 的下一次请求会以 A 修改后的历史进行前缀匹配。由于 B 的对话逻辑上与 A 无关,_is_request_context_prefix 会产生误判——B 的请求被当作上下文发散,触发全量重编码。KV cache 优化丢失,且最终生成的轨迹中混入了不相关子对话的 token。
限制 2:单一 active_trajectory 交错写入无关 token 序列
由于只有一个 TrajectoryBuffer,子 agent A 和子 agent B 的响应 token 会按到达顺序追加到同一个缓冲区。结果是一条 trajectory 在互不相关的对话之间跳跃,产生语义混乱的训练数据。
建议方案:内部多轨迹前缀检测
API 契约无需变更。现有 /v1/chat/completions 端点保持不变。Gateway 内部维护一组活跃子上下文——每个子上下文拥有独立的 message_history、active_trajectory 及相关元数据。
请求到达时的处理流程:
- 对所有活跃子上下文做前缀检测(而非仅对最新的一个)。
- 选择与入站 messages 前缀匹配最长的那个子上下文。
- 将 assistant 响应追加到该子上下文的 trajectory。
- Session 最终化时,收集所有子上下文的 trajectories。
最小改动方案
| # | 改动 | 位置 |
|---|---|---|
| 1 | 新增 SubContext,包含 message_history、active_trajectory、request_tools、image_data、video_data |
types.py |
| 2 | GatewaySessionState 中的单例字段替换为 sub_contexts: list[SubContext] + `current_sub: SubContext |
None`(向后兼容) |
| 3 | _handle_chat_completions 中,将入站 messages 与所有 SubContext 做前缀匹配,选最佳匹配(最长前缀) |
gateway.py |
| 4 | 最终化时,从所有子上下文物化并收集 trajectories | gateway.py |
generation_lock 和请求串行化保持不变——顺序执行状态变更是合理的。核心思路是 gateway 能在内部将每个请求匹配到正确的对话分支,无需任何 API 层面的改动。
这个诉求很合理,我们最近确实在考虑对subagent场景的支持,允许维护多个active trajectories确实有意义。不过关于子上下文的匹配这里我有疑问,“选择与入站 messages 前缀匹配最长的那个子上下文”这个标准合理吗?subagent发出的openAI request和普通的请求是否有其他更稳固的办法做区分,还是说这个取决于agent的行为? 如果有具体的agent例子会很有帮助。 |
Hi, thank you so much for your suggestion 😊 Your design is similar to mine. But I think the limitation you mentioned is not very accurate. Also, I want to add something about the real benefit of using prefix trie.
In response to the above limitation, it is correct that However, the claim that "the resulting trajectories contain interleaved tokens from unrelated sub-conversations" is not accurate for the current prefix-mismatch path. In that path, gateway does not append B's generated tokens into A's active trajectory. Instead, it materializes A's active trajectory, starts a fresh The gateway checks: elif _is_request_context_prefix(session=session, messages=messages, tools=tools):
...
else:
materialized_trajectory = self._build_materialized_trajectory(
session=session,
active=session.active_trajectory,
)
... request_chat_template_kwargs=request_chat_template_kwargs,
)
active_trajectory = TrajectoryBuffer(prompt_ids=prompt_ids)Because B's messages are not a prefix continuation of A's That branch creates a fresh token buffer for B: active_trajectory = TrajectoryBuffer(prompt_ids=prompt_ids)This is the key point: B does not reuse A's So the real limitation is not token-level interleaving inside one trajectory. The real limitation is that a session only keeps one current The Real Problem and Gain of Prefix Trie StorageThe real problem of current limitation is
I will explain why these three problems exist and why a Prefix Trie Storage approach could fix those problems. 1. Support branch reattachmentCurrent limitation only compares each new request against the current active trajectory, which is also the most recently committed branch. It does not compare against older materialized trajectories or a branch tree. Therefore, if an agent backtracks to an older branch and continues from there, gateway cannot recognize that continuation. Consider this branching pattern: The agent sends the same parent messages multiple times and gets multiple candidate assistant responses: Then it selects A branch-aware trajectory builder should produce 3 trajectories: Current gateway instead only checks the continuation request against the most recent active history. If the most recent active branch is So the session ends up with 4 trajectory segments: Therefore, the real limitation is: Best-of-n, rejection sampling, and similar agents often sample multiple sibling candidates and later continue from one selected older candidate. Under the current implementation, we may collect trajectories that are duplicated but incomplete, which reduces trajectory effectiveness and training throughput. In addition, according to https://arxiv.org/pdf/2605.24220, the existence of such trajectories may potentially lead to reward hacking, although this depends on the reward function design and is outside the scope of this proposal for now. 2. Less Retokenization and StorageA trajectory trie would also help reduce unnecessary retokenization. In the current code, if the request does not prefix-match the current active history, gateway takes the full-encode path: else:
materialized_trajectory = self._build_materialized_trajectory(
session=session,
active=session.active_trajectory,
)
image_data, video_data = await self._extract_multi_modal_data(messages)
prompt_ids = self._encode_full(
messages,
tools=tools,
image_data=image_data,
video_data=video_data,
request_chat_template_kwargs=request_chat_template_kwargs,
)
active_trajectory = TrajectoryBuffer(prompt_ids=prompt_ids)So on prefix mismatch, the current implementation re-renders and re-tokenizes the entire request A prefix trie could improve this by:
3. Allow Concurrency for LLM GenerationThe current implementation uses both
async with session.generation_lock:
async with session.request_lock:
# read session.message_history / session.active_trajectory
# decide prefix-match vs split
# build generation_context_ids
output = await self._backend.generate(...)
async with session.request_lock:
# write the result back to the single active trajectory
session.active_trajectory = active_trajectory
session.message_history = messages + [assistant_msg]Because This is mainly because the current state model has only one mutable With a prefix trie / branch store, we do not need a single global active trajectory. Each request can match a trie node, generate using a request-local buffer, and then attach its assistant message back to that node:
Hello,非常感谢你的设计。你的设计和我的想法比较相似。不过我认为你提到的 limitation 并不完全准确。另外,我也想补充一下使用 prefix trie 的真正收益。
对于上面的 limitation,我认为 但是,“resulting trajectories contain interleaved tokens from unrelated sub-conversations” 这个说法,对于当前 prefix mismatch 路径来说并不准确。在这条路径里,gateway 不会把 B 生成的 tokens append 到 A 的 active trajectory 里。相反,它会先 materialize A 的 active trajectory,然后为 B 创建一个新的 gateway 会做如下判断: elif _is_request_context_prefix(session=session, messages=messages, tools=tools):
...
else:
materialized_trajectory = self._build_materialized_trajectory(
session=session,
active=session.active_trajectory,
)
... request_chat_template_kwargs=request_chat_template_kwargs,
)
active_trajectory = TrajectoryBuffer(prompt_ids=prompt_ids)因为 B 的 messages 不是 A 的 这个分支会为 B 创建一个新的 token buffer: active_trajectory = TrajectoryBuffer(prompt_ids=prompt_ids)关键点是:B 不会复用 A 的 所以真正的问题不是一条 trajectory 内部发生 token-level interleaving。真正的问题是,一个 session 只保存一份当前的 Prefix Trie Storage 真正解决的问题和收益当前实现真正的问题是:
下面我会解释为什么这三个问题存在,以及为什么 Prefix Trie Storage 可以修复这些问题。 1. 支持 Branch Reattachment当前 limitation 的核心在于:每个新请求只会和当前 active trajectory 比较,而当前 active trajectory 也是最近一次提交的 branch。它不会和更早 materialized 的 trajectories 比较,也不会和一棵 branch tree 比较。因此,如果 agent 回溯到一个较早的 branch 并从那里继续生长,gateway 无法识别出这个 continuation。 考虑下面这种 branching pattern: agent 对同一个 parent messages 发起多次请求,并得到多个候选 assistant response: 然后它选择 一个 branch-aware trajectory builder 应该产出 3 条 trajectories: 而当前 gateway 只会把 continuation request 和最近的 active history 比较。如果最近的 active branch 是 所以这个 session 最终会得到 4 个 trajectory segments: 因此,真正的 limitation 是: 这种 best-of-n、rejection sampling 等 agent 会采样多个 sibling candidates,然后稍后从某个被选中的旧 candidate 继续生长。如果按照目前的实现,我们会collect到一些重复但又不完整的trajectory,这会降低的 trajectory 有效性和训练的througput。同时根据https://arxiv.org/pdf/2605.24220,这类trajectory的存在有可能会导致reward hacking(不过这个涉及到reward function的设计,暂时不在本proposal的讨论范围内)。 2. 更少的 Retokenization 和存储trajectory trie 也可以减少不必要的 retokenization。 在当前代码里,如果请求无法 prefix-match 当前 active history,gateway 会走 full-encode 路径: else:
materialized_trajectory = self._build_materialized_trajectory(
session=session,
active=session.active_trajectory,
)
image_data, video_data = await self._extract_multi_modal_data(messages)
prompt_ids = self._encode_full(
messages,
tools=tools,
image_data=image_data,
video_data=video_data,
request_chat_template_kwargs=request_chat_template_kwargs,
)
active_trajectory = TrajectoryBuffer(prompt_ids=prompt_ids)所以在 prefix mismatch 时,当前实现会重新 render 并重新 tokenize 整个 request prefix trie 可以通过以下方式改进:
3. 支持 LLM Generation 并发当前实现同时使用
async with session.generation_lock:
async with session.request_lock:
# read session.message_history / session.active_trajectory
# decide prefix-match vs split
# build generation_context_ids
output = await self._backend.generate(...)
async with session.request_lock:
# write the result back to the single active trajectory
session.active_trajectory = active_trajectory
session.message_history = messages + [assistant_msg]因为 这主要是因为当前 state model 只有一个 mutable 的 使用 prefix trie / branch store 后,我们不再需要一个全局唯一的 active trajectory。每个请求都可以匹配到一个 trie node,使用 request-local buffer 做 generation,然后把 assistant message 挂回到对应 node 上:
|
What does this PR do?
This PR adds an experimental agent framework and gateway runtime for multi-turn agent-style rollout in VERL, according to #5790.
Specifically, it:
verl.experimental.agent_frameworkfor a new abstraction for agent systems, with an example implementation that is compatible with TransferQueue,verl.experimental.agent_gatewayfor OpenAI-compatible session serving and sticky session routing, and tool-parser wiring, with aGatewayServingRuntimethat delegates backend routing toLLMServerClient,Note: the PR currently also includes an opt-in TransferQueue nested-readback bridge in
verl.utils.transferqueue_utils(gated byVERL_FORCE_TQ_NESTED_READBACK=1). This is a temporary workaround for a known issue #6261. The commit is not part of the PR and will be removed once the bug is fixed.Related: RFC #5790. Supersedes draft PR #5931 (will be closed).
WIP:
CLIAgentFrameworkbase + reference external-agent recipe (Deepeyes and SWE-agent)RewardLoopWorker/NaiveRewardManagerintegration (dict rewards, sandbox fusion, reward router)GatewayActordefault placement strategy (e.g. at least one per node) once multi-node validation is inVERL_FORCE_TQ_NESTED_READBACKbridge when the upstream TransferQueue fix landsdocs/once the CLI framework direction is committedChecklist Before Starting
https://github.com/verl-project/verl/pulls?q=is%3Apr+agent+framework+gateway
[{modules}] {type}: {description}[rollout, tool] feat: add agent framework and gateway runtimeTest
Result: 60 passed, 4 warnings (framework, gateway, runtime).
The tutorial doubles as a runnable smoke:
Runs one full
generate_sequences()through a CPU-only fake rolloutserver and prints a summary JSON.
Real-rollout evidence from a downstream branch that carries a Deepeyes_with_gateway recipe (not part of this PR): a 24-step GRPO run on multi-turn multimodal data (Qwen3.5-4B, 7× RTX 3090 train + 1× local judge) produced a real learning curve —
critic/rewards/meanmoved from ~0.31 at step 1 to ~1.45 by step 24, with non-zero advantages throughout andactor/grad_normstable in the 3–20 range.API and Usage Example
Public APIs added:
verl.agent.framework—AgentFramework,OpenAICompatibleAgentFramework,TrajectoryAssembler,apply_multi_modal_postprocessverl.agent.gateway—GatewayServingRuntime,GatewayManager,GatewayActorMinimum viable wiring (see
examples/tutorial/agent_framework_get_started/minimal_e2e.pyfor thefull runnable example):
generate_sequences()writes finalized trajectories directly toTransferQueue with key
"{uid}_{session_id}_{index}", matchingAgentLoopWorkerTQ._agent_loop_postprocess()'s field / tag layout, andreturns a stats dict with success / failure counts and short failure
reasons.
Design & Code Changes
High-level changes:
AgentFrameworkbase class +OpenAICompatibleAgentFrameworkconcrete implementation own session orchestration (
create_session→
agent_runner→finalize_session), trajectory assembly,multimodal post-processing, reward scoring, and TransferQueue writes.
Per-session failures are isolated via
asyncio.gather(..., return_exceptions=True)so one bad sessiondoes not cancel the rest of the batch.
GatewayActorprovides OpenAI Chat Completions over sticky sessionswith prefix-consistency checks, tool-parser decoding, and multimodal
media accumulation.
GatewayManagerroutes new sessions byleast-active count.
GatewayServingRuntimeowns gateway actorlifecycle and delegates backend routing to
LLMServerClient(no duplicate routing surface).
multi_modal_inputsand(4, seq_len)position ids inside theframework, so VLM sessions do not need per-recipe glue.
logging; the trainer's metric dictis intentionally not touched in this PR.
Checklist Before Submitting
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysdeferred to a follow-up doc PR; inline docstrings and the tutorial
README ship with this PR.
to cover all the code. Focused framework / gateway CPU tests are
included and routed to
cpu_unit_tests.ymlvia the*_on_cpu.pynaming convention.
the
ci-requestchannelin the
verlSlack workspace.(If not accessible, please try the Feishu group (飞书群).)
recipesubmodule, please alsoupdate the reference to the submodule commit via
git submodule update --remoteorcd recipe && git pull origin main. —Not applicable: this PR does not include the
recipesubmodule.