From c6869647532ce598f2fb7aafb154923124dd7f1f Mon Sep 17 00:00:00 2001 From: Rach Pradhan <54503978+justrach@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:36:58 +0800 Subject: [PATCH 1/2] fix(telemetry): give each run a unique run_id so concurrent sub-agents don't collide MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The uploader buffers in-flight runs in a HashMap keyed by (conversation_id, agent_id). Child recorders deliberately reuse the *parent's* conversation_id so /trace can group a fleet under one conversation — which means two concurrent sub-agents of the SAME agent (e.g. three parallel `forge` workers under one `task` fan-out) hash to the identical key. Their events interleave into one RunBuf and the envelope that uploads is a corrupted merge of both trajectories: wrong step counts, wrong token totals, wrong reward attribution. That poisons exactly the per-variant fitness signal the agent_version work exists to produce. Fix: stamp a process-unique `run_id` on every live trajectory event (a monotonic counter folded into the recorder, which is already one-per-run) and key the uploader by (conversation_id, agent_id, run_id). Parent grouping for /trace is untouched (it reads conversation_id); concurrent siblings now buffer and upload independently. `run_id` is Option — None for events reconstructed from the DB (the upload path only ever sees live broadcast events, which always set it), so no migration and no DB schema change. Stacked on #166 (agent_version); together they make the trajectory feed both variant-attributed and collision-free for DGM-style evolution. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/forge_app/src/trajectory_recorder.rs | 22 ++++++++++++++++--- crates/forge_app/src/trajectory_upload.rs | 6 ++--- crates/forge_domain/src/trajectory.rs | 6 +++++ .../src/trajectory/trajectory_record.rs | 1 + .../src/trajectory/trajectory_repo.rs | 2 ++ 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/crates/forge_app/src/trajectory_recorder.rs b/crates/forge_app/src/trajectory_recorder.rs index 8dae7ad4..b283f5f7 100644 --- a/crates/forge_app/src/trajectory_recorder.rs +++ b/crates/forge_app/src/trajectory_recorder.rs @@ -1,5 +1,10 @@ use std::sync::Arc; -use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::atomic::{AtomicI32, AtomicU64, Ordering}; + +/// Process-global monotonic counter giving every recorder (= every run) a +/// unique id, so concurrent sub-agents of the same (conversation, agent) do +/// not collide in the uploader. +static RUN_UID: AtomicU64 = AtomicU64::new(0); use std::time::{SystemTime, UNIX_EPOCH}; use forge_domain::{ @@ -82,6 +87,7 @@ pub struct TrajectoryRecorder { conversation_id: String, agent_id: String, parent_agent_id: Option, + run_id: String, seq: AtomicI32, /// Optional broadcast sender wired in by [`TrajectoryHub`]. When /// present, every persisted event is forwarded to live subscribers @@ -99,11 +105,20 @@ impl TrajectoryRecorder { parent_agent_id: Option, initial_seq: i32, ) -> Self { + let conversation_id = conversation_id.into(); + let agent_id = agent_id.into(); + let run_id = format!( + "{}-{}-r{}", + conversation_id, + agent_id, + RUN_UID.fetch_add(1, Ordering::Relaxed) + ); Self { repo, - conversation_id: conversation_id.into(), - agent_id: agent_id.into(), + conversation_id, + agent_id, parent_agent_id, + run_id, seq: AtomicI32::new(initial_seq), broadcast: None, } @@ -254,6 +269,7 @@ impl TrajectoryRecorder { conversation_id: self.conversation_id.clone(), agent_id: self.agent_id.clone(), parent_agent_id: self.parent_agent_id.clone(), + run_id: Some(self.run_id.clone()), seq, ts_ms: now_ms(), payload, diff --git a/crates/forge_app/src/trajectory_upload.rs b/crates/forge_app/src/trajectory_upload.rs index ab999704..497cc473 100644 --- a/crates/forge_app/src/trajectory_upload.rs +++ b/crates/forge_app/src/trajectory_upload.rs @@ -224,7 +224,7 @@ impl TrajectoryUploader { let ingest_key = std::env::var("GRAFF_INGEST_KEY").ok().filter(|s| !s.is_empty()); tokio::spawn(async move { - let mut runs: HashMap<(String, String), RunBuf> = HashMap::new(); + let mut runs: HashMap<(String, String, Option), RunBuf> = HashMap::new(); loop { match rx.recv().await { Ok(ev) => handle(&mut runs, ev, &client, &endpoint, ingest_key.as_deref()), @@ -253,13 +253,13 @@ impl TrajectoryUploader { } fn handle( - runs: &mut HashMap<(String, String), RunBuf>, + runs: &mut HashMap<(String, String, Option), RunBuf>, ev: TrajectoryEvent, client: &reqwest::Client, endpoint: &str, ingest_key: Option<&str>, ) { - let key = (ev.conversation_id.clone(), ev.agent_id.clone()); + let key = (ev.conversation_id.clone(), ev.agent_id.clone(), ev.run_id.clone()); let seq = ev.seq; let ts_ms = ev.ts_ms; diff --git a/crates/forge_domain/src/trajectory.rs b/crates/forge_domain/src/trajectory.rs index 3fbdb6d2..adf839cc 100644 --- a/crates/forge_domain/src/trajectory.rs +++ b/crates/forge_domain/src/trajectory.rs @@ -18,6 +18,12 @@ pub struct TrajectoryEvent { pub agent_id: String, pub parent_agent_id: Option, pub seq: i32, + /// Stable id for THIS run (one per spawn / top-level chat), distinct from + /// `seq` (per-event) and from `(conversation_id, agent_id)` (shared by + /// concurrent sibling sub-agents). Keeps each parallel sub-agent's + /// trajectory separate in the uploader. `None` for events rebuilt from the + /// DB (the upload path only sees live broadcast events, which always set it). + pub run_id: Option, pub ts_ms: i64, pub payload: TrajectoryPayload, } diff --git a/crates/forge_repo/src/trajectory/trajectory_record.rs b/crates/forge_repo/src/trajectory/trajectory_record.rs index 074bb297..0afea90b 100644 --- a/crates/forge_repo/src/trajectory/trajectory_record.rs +++ b/crates/forge_repo/src/trajectory/trajectory_record.rs @@ -74,6 +74,7 @@ impl TryFrom for TrajectoryEvent { conversation_id: record.conversation_id, agent_id: record.agent_id, parent_agent_id: record.parent_agent_id, + run_id: None, seq: record.seq, ts_ms: record.ts_ms, payload, diff --git a/crates/forge_repo/src/trajectory/trajectory_repo.rs b/crates/forge_repo/src/trajectory/trajectory_repo.rs index 82c2eb27..d10d4e6d 100644 --- a/crates/forge_repo/src/trajectory/trajectory_repo.rs +++ b/crates/forge_repo/src/trajectory/trajectory_repo.rs @@ -136,6 +136,7 @@ mod tests { conversation_id: conv_id.to_string(), agent_id: agent_id.to_string(), parent_agent_id: None, + run_id: None, seq, ts_ms: 1000 + seq as i64, payload: TrajectoryPayload::ToolCall { @@ -151,6 +152,7 @@ mod tests { conversation_id: conv_id.to_string(), agent_id: agent_id.to_string(), parent_agent_id: None, + run_id: None, seq, ts_ms: 1000 + seq as i64, payload: TrajectoryPayload::ToolResult { From 6a06ce76e636ab1712e949653dd1aba85ab7c256 Mon Sep 17 00:00:00 2001 From: Rach Pradhan <54503978+justrach@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:59:33 +0800 Subject: [PATCH 2/2] test(telemetry): cover the sub-agent run_id collision fix end-to-end Three deterministic tests pinning the producer -> consumer chain: - concurrent_siblings_mint_and_emit_distinct_run_ids (recorder): two recorders sharing the same (conversation, agent) -- what a parallel `task` fan-out produces -- mint distinct run_ids shaped {conv}-{agent}-r{n}, and those ids ride on every event they emit. - concurrent_siblings_with_distinct_run_ids_do_not_collide (uploader): interleaved sibling events land in two independent buffers, each holding only its own steps. - without_run_id_siblings_collide_into_one_buffer (uploader): the identical interleaving with run_id=None collapses into one buffer -- proving run_id is the load-bearing part of the key, i.e. the exact corruption the fix removes. Also fixes the now-stale module doc (buffer key is (conversation_id, agent_id, run_id)). Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/forge_app/src/trajectory_recorder.rs | 30 +++++++ crates/forge_app/src/trajectory_upload.rs | 86 ++++++++++++++++++++- 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/crates/forge_app/src/trajectory_recorder.rs b/crates/forge_app/src/trajectory_recorder.rs index b283f5f7..1f7468d0 100644 --- a/crates/forge_app/src/trajectory_recorder.rs +++ b/crates/forge_app/src/trajectory_recorder.rs @@ -455,6 +455,36 @@ mod tests { } } + /// Two sub-agents sharing the same (conversation, agent) -- exactly what a + /// parallel `task` fan-out produces -- must mint distinct run_ids, and + /// those ids must ride on every event they emit. This is the producer + /// half of the sub-agent collision fix; the uploader half (keying buffers + /// by run_id) is covered in `trajectory_upload`'s tests. + #[tokio::test] + async fn concurrent_siblings_mint_and_emit_distinct_run_ids() { + let repo = Arc::new(FakeRepo::default()); + let a = TrajectoryRecorder::new(repo.clone(), "conv", "forge", Some("p".to_string()), 0); + let b = TrajectoryRecorder::new(repo.clone(), "conv", "forge", Some("p".to_string()), 0); + + // Distinct at the source, shaped {conv}-{agent}-r{n}. + assert_ne!(a.run_id, b.run_id, "siblings must mint distinct run_ids"); + assert!(a.run_id.starts_with("conv-forge-r"), "got {}", a.run_id); + + // ...and the id rides on every event each emits (build_event stamps it). + a.record_error("boom-a", None).await; + b.record_error("boom-b", None).await; + let ids: std::collections::HashSet> = repo + .list("conv", "forge") + .await + .unwrap() + .iter() + .map(|e| e.run_id.clone()) + .collect(); + assert_eq!(ids.len(), 2, "the two runs' events carry two distinct run_ids"); + assert!(ids.contains(&Some(a.run_id.clone()))); + assert!(ids.contains(&Some(b.run_id.clone()))); + } + #[tokio::test] async fn records_tool_call_then_result_with_increasing_seq() { let repo = Arc::new(FakeRepo::default()); diff --git a/crates/forge_app/src/trajectory_upload.rs b/crates/forge_app/src/trajectory_upload.rs index 497cc473..f7bcf6c1 100644 --- a/crates/forge_app/src/trajectory_upload.rs +++ b/crates/forge_app/src/trajectory_upload.rs @@ -1,7 +1,7 @@ //! Client-side trajectory uploader for `data.codegraff.com`. //! //! A pure subscriber on the [`TrajectoryHub`] broadcast: it buffers events per -//! `(conversation_id, agent_id)` run and, on `AgentRunEnd`, builds a +//! `(conversation_id, agent_id, run_id)` run and, on `AgentRunEnd`, builds a //! `graff.trajectory.v1` envelope, redacts it, and POSTs it best-effort. It //! touches nothing in the orchestrator. //! @@ -371,7 +371,7 @@ fn handle( #[cfg(test)] mod tests { use super::rlvr_label; - use super::Run; + use super::{handle, Run}; #[test] fn run_envelope_carries_agent_version() { @@ -408,6 +408,88 @@ mod tests { ); } + // --- sub-agent collision regression (the run_id fix) --- + + use forge_domain::{ToolName, TrajectoryEvent, TrajectoryPayload}; + use std::collections::HashMap; + + fn agent_run(run_id: &str, seq: i32) -> TrajectoryEvent { + TrajectoryEvent { + conversation_id: "conv".into(), + agent_id: "forge".into(), + parent_agent_id: Some("parent".into()), + run_id: Some(run_id.into()), + seq, + ts_ms: 1000 + seq as i64, + payload: TrajectoryPayload::AgentRun { + agent_id: "forge".into(), + parent_agent_id: Some("parent".into()), + requested_model: None, + resolved_model: "gpt-5.5".into(), + agent_version: Some("v1".into()), + }, + } + } + + fn read_call(run_id: Option<&str>, seq: i32, call: &str) -> TrajectoryEvent { + TrajectoryEvent { + conversation_id: "conv".into(), + agent_id: "forge".into(), + parent_agent_id: Some("parent".into()), + run_id: run_id.map(str::to_string), + seq, + ts_ms: 1000 + seq as i64, + payload: TrajectoryPayload::ToolCall { + tool_name: ToolName::new("read"), + call_id: call.into(), + arguments: serde_json::json!({}), + }, + } + } + + /// Two sub-agents of the SAME (conversation, agent) -- e.g. two parallel + /// `forge` workers under one `task` fan-out. They deliberately share + /// conv+agent so `/trace` groups them, and are told apart only by + /// `run_id`. Their events arrive interleaved on the broadcast; the + /// uploader must keep each in its own buffer, never merging them into one + /// corrupted envelope. + #[tokio::test] + async fn concurrent_siblings_with_distinct_run_ids_do_not_collide() { + let client = reqwest::Client::new(); + let mut runs = HashMap::new(); + // A starts, B starts, A reads, B reads, A reads again. + handle(&mut runs, agent_run("rA", 0), &client, "http://localhost", None); + handle(&mut runs, agent_run("rB", 0), &client, "http://localhost", None); + handle(&mut runs, read_call(Some("rA"), 1, "a1"), &client, "http://localhost", None); + handle(&mut runs, read_call(Some("rB"), 1, "b1"), &client, "http://localhost", None); + handle(&mut runs, read_call(Some("rA"), 2, "a2"), &client, "http://localhost", None); + + assert_eq!(runs.len(), 2, "siblings must not share a buffer"); + let a = runs + .get(&("conv".to_string(), "forge".to_string(), Some("rA".to_string()))) + .expect("run A buffered separately"); + let b = runs + .get(&("conv".to_string(), "forge".to_string(), Some("rB".to_string()))) + .expect("run B buffered separately"); + assert_eq!(a.steps.len(), 2, "run A keeps exactly its own two reads"); + assert_eq!(b.steps.len(), 1, "run B keeps exactly its own one read"); + } + + /// Guard that `run_id` is the load-bearing part of the key: the identical + /// interleaving with `run_id = None` for both (the pre-fix world, and what + /// DB-reconstructed events carry) collapses into a single shared buffer. + /// This is the exact corruption the fix removes. + #[tokio::test] + async fn without_run_id_siblings_collide_into_one_buffer() { + let client = reqwest::Client::new(); + let mut runs = HashMap::new(); + handle(&mut runs, read_call(None, 1, "a1"), &client, "http://localhost", None); + handle(&mut runs, read_call(None, 1, "b1"), &client, "http://localhost", None); + handle(&mut runs, read_call(None, 2, "a2"), &client, "http://localhost", None); + assert_eq!(runs.len(), 1, "without run_id the siblings share a key (old bug)"); + assert_eq!(runs.values().next().unwrap().steps.len(), 3); + } + #[test] fn accepted_completed_run_gets_full_reward() {