diff --git a/crates/forge_app/src/trajectory_recorder.rs b/crates/forge_app/src/trajectory_recorder.rs index 8dae7ad4..1f7468d0 100644 --- a/crates/forge_app/src/trajectory_recorder.rs +++ b/crates/forge_app/src/trajectory_recorder.rs @@ -1,5 +1,10 @@ use std::sync::Arc; -use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::atomic::{AtomicI32, AtomicU64, Ordering}; + +/// Process-global monotonic counter giving every recorder (= every run) a +/// unique id, so concurrent sub-agents of the same (conversation, agent) do +/// not collide in the uploader. +static RUN_UID: AtomicU64 = AtomicU64::new(0); use std::time::{SystemTime, UNIX_EPOCH}; use forge_domain::{ @@ -82,6 +87,7 @@ pub struct TrajectoryRecorder { conversation_id: String, agent_id: String, parent_agent_id: Option, + run_id: String, seq: AtomicI32, /// Optional broadcast sender wired in by [`TrajectoryHub`]. When /// present, every persisted event is forwarded to live subscribers @@ -99,11 +105,20 @@ impl TrajectoryRecorder { parent_agent_id: Option, initial_seq: i32, ) -> Self { + let conversation_id = conversation_id.into(); + let agent_id = agent_id.into(); + let run_id = format!( + "{}-{}-r{}", + conversation_id, + agent_id, + RUN_UID.fetch_add(1, Ordering::Relaxed) + ); Self { repo, - conversation_id: conversation_id.into(), - agent_id: agent_id.into(), + conversation_id, + agent_id, parent_agent_id, + run_id, seq: AtomicI32::new(initial_seq), broadcast: None, } @@ -254,6 +269,7 @@ impl TrajectoryRecorder { conversation_id: self.conversation_id.clone(), agent_id: self.agent_id.clone(), parent_agent_id: self.parent_agent_id.clone(), + run_id: Some(self.run_id.clone()), seq, ts_ms: now_ms(), payload, @@ -439,6 +455,36 @@ mod tests { } } + /// Two sub-agents sharing the same (conversation, agent) -- exactly what a + /// parallel `task` fan-out produces -- must mint distinct run_ids, and + /// those ids must ride on every event they emit. This is the producer + /// half of the sub-agent collision fix; the uploader half (keying buffers + /// by run_id) is covered in `trajectory_upload`'s tests. + #[tokio::test] + async fn concurrent_siblings_mint_and_emit_distinct_run_ids() { + let repo = Arc::new(FakeRepo::default()); + let a = TrajectoryRecorder::new(repo.clone(), "conv", "forge", Some("p".to_string()), 0); + let b = TrajectoryRecorder::new(repo.clone(), "conv", "forge", Some("p".to_string()), 0); + + // Distinct at the source, shaped {conv}-{agent}-r{n}. + assert_ne!(a.run_id, b.run_id, "siblings must mint distinct run_ids"); + assert!(a.run_id.starts_with("conv-forge-r"), "got {}", a.run_id); + + // ...and the id rides on every event each emits (build_event stamps it). + a.record_error("boom-a", None).await; + b.record_error("boom-b", None).await; + let ids: std::collections::HashSet> = repo + .list("conv", "forge") + .await + .unwrap() + .iter() + .map(|e| e.run_id.clone()) + .collect(); + assert_eq!(ids.len(), 2, "the two runs' events carry two distinct run_ids"); + assert!(ids.contains(&Some(a.run_id.clone()))); + assert!(ids.contains(&Some(b.run_id.clone()))); + } + #[tokio::test] async fn records_tool_call_then_result_with_increasing_seq() { let repo = Arc::new(FakeRepo::default()); diff --git a/crates/forge_app/src/trajectory_upload.rs b/crates/forge_app/src/trajectory_upload.rs index ab999704..f7bcf6c1 100644 --- a/crates/forge_app/src/trajectory_upload.rs +++ b/crates/forge_app/src/trajectory_upload.rs @@ -1,7 +1,7 @@ //! Client-side trajectory uploader for `data.codegraff.com`. //! //! A pure subscriber on the [`TrajectoryHub`] broadcast: it buffers events per -//! `(conversation_id, agent_id)` run and, on `AgentRunEnd`, builds a +//! `(conversation_id, agent_id, run_id)` run and, on `AgentRunEnd`, builds a //! `graff.trajectory.v1` envelope, redacts it, and POSTs it best-effort. It //! touches nothing in the orchestrator. //! @@ -224,7 +224,7 @@ impl TrajectoryUploader { let ingest_key = std::env::var("GRAFF_INGEST_KEY").ok().filter(|s| !s.is_empty()); tokio::spawn(async move { - let mut runs: HashMap<(String, String), RunBuf> = HashMap::new(); + let mut runs: HashMap<(String, String, Option), RunBuf> = HashMap::new(); loop { match rx.recv().await { Ok(ev) => handle(&mut runs, ev, &client, &endpoint, ingest_key.as_deref()), @@ -253,13 +253,13 @@ impl TrajectoryUploader { } fn handle( - runs: &mut HashMap<(String, String), RunBuf>, + runs: &mut HashMap<(String, String, Option), RunBuf>, ev: TrajectoryEvent, client: &reqwest::Client, endpoint: &str, ingest_key: Option<&str>, ) { - let key = (ev.conversation_id.clone(), ev.agent_id.clone()); + let key = (ev.conversation_id.clone(), ev.agent_id.clone(), ev.run_id.clone()); let seq = ev.seq; let ts_ms = ev.ts_ms; @@ -371,7 +371,7 @@ fn handle( #[cfg(test)] mod tests { use super::rlvr_label; - use super::Run; + use super::{handle, Run}; #[test] fn run_envelope_carries_agent_version() { @@ -408,6 +408,88 @@ mod tests { ); } + // --- sub-agent collision regression (the run_id fix) --- + + use forge_domain::{ToolName, TrajectoryEvent, TrajectoryPayload}; + use std::collections::HashMap; + + fn agent_run(run_id: &str, seq: i32) -> TrajectoryEvent { + TrajectoryEvent { + conversation_id: "conv".into(), + agent_id: "forge".into(), + parent_agent_id: Some("parent".into()), + run_id: Some(run_id.into()), + seq, + ts_ms: 1000 + seq as i64, + payload: TrajectoryPayload::AgentRun { + agent_id: "forge".into(), + parent_agent_id: Some("parent".into()), + requested_model: None, + resolved_model: "gpt-5.5".into(), + agent_version: Some("v1".into()), + }, + } + } + + fn read_call(run_id: Option<&str>, seq: i32, call: &str) -> TrajectoryEvent { + TrajectoryEvent { + conversation_id: "conv".into(), + agent_id: "forge".into(), + parent_agent_id: Some("parent".into()), + run_id: run_id.map(str::to_string), + seq, + ts_ms: 1000 + seq as i64, + payload: TrajectoryPayload::ToolCall { + tool_name: ToolName::new("read"), + call_id: call.into(), + arguments: serde_json::json!({}), + }, + } + } + + /// Two sub-agents of the SAME (conversation, agent) -- e.g. two parallel + /// `forge` workers under one `task` fan-out. They deliberately share + /// conv+agent so `/trace` groups them, and are told apart only by + /// `run_id`. Their events arrive interleaved on the broadcast; the + /// uploader must keep each in its own buffer, never merging them into one + /// corrupted envelope. + #[tokio::test] + async fn concurrent_siblings_with_distinct_run_ids_do_not_collide() { + let client = reqwest::Client::new(); + let mut runs = HashMap::new(); + // A starts, B starts, A reads, B reads, A reads again. + handle(&mut runs, agent_run("rA", 0), &client, "http://localhost", None); + handle(&mut runs, agent_run("rB", 0), &client, "http://localhost", None); + handle(&mut runs, read_call(Some("rA"), 1, "a1"), &client, "http://localhost", None); + handle(&mut runs, read_call(Some("rB"), 1, "b1"), &client, "http://localhost", None); + handle(&mut runs, read_call(Some("rA"), 2, "a2"), &client, "http://localhost", None); + + assert_eq!(runs.len(), 2, "siblings must not share a buffer"); + let a = runs + .get(&("conv".to_string(), "forge".to_string(), Some("rA".to_string()))) + .expect("run A buffered separately"); + let b = runs + .get(&("conv".to_string(), "forge".to_string(), Some("rB".to_string()))) + .expect("run B buffered separately"); + assert_eq!(a.steps.len(), 2, "run A keeps exactly its own two reads"); + assert_eq!(b.steps.len(), 1, "run B keeps exactly its own one read"); + } + + /// Guard that `run_id` is the load-bearing part of the key: the identical + /// interleaving with `run_id = None` for both (the pre-fix world, and what + /// DB-reconstructed events carry) collapses into a single shared buffer. + /// This is the exact corruption the fix removes. + #[tokio::test] + async fn without_run_id_siblings_collide_into_one_buffer() { + let client = reqwest::Client::new(); + let mut runs = HashMap::new(); + handle(&mut runs, read_call(None, 1, "a1"), &client, "http://localhost", None); + handle(&mut runs, read_call(None, 1, "b1"), &client, "http://localhost", None); + handle(&mut runs, read_call(None, 2, "a2"), &client, "http://localhost", None); + assert_eq!(runs.len(), 1, "without run_id the siblings share a key (old bug)"); + assert_eq!(runs.values().next().unwrap().steps.len(), 3); + } + #[test] fn accepted_completed_run_gets_full_reward() { diff --git a/crates/forge_domain/src/trajectory.rs b/crates/forge_domain/src/trajectory.rs index 3fbdb6d2..adf839cc 100644 --- a/crates/forge_domain/src/trajectory.rs +++ b/crates/forge_domain/src/trajectory.rs @@ -18,6 +18,12 @@ pub struct TrajectoryEvent { pub agent_id: String, pub parent_agent_id: Option, pub seq: i32, + /// Stable id for THIS run (one per spawn / top-level chat), distinct from + /// `seq` (per-event) and from `(conversation_id, agent_id)` (shared by + /// concurrent sibling sub-agents). Keeps each parallel sub-agent's + /// trajectory separate in the uploader. `None` for events rebuilt from the + /// DB (the upload path only sees live broadcast events, which always set it). + pub run_id: Option, pub ts_ms: i64, pub payload: TrajectoryPayload, } diff --git a/crates/forge_repo/src/trajectory/trajectory_record.rs b/crates/forge_repo/src/trajectory/trajectory_record.rs index 074bb297..0afea90b 100644 --- a/crates/forge_repo/src/trajectory/trajectory_record.rs +++ b/crates/forge_repo/src/trajectory/trajectory_record.rs @@ -74,6 +74,7 @@ impl TryFrom for TrajectoryEvent { conversation_id: record.conversation_id, agent_id: record.agent_id, parent_agent_id: record.parent_agent_id, + run_id: None, seq: record.seq, ts_ms: record.ts_ms, payload, diff --git a/crates/forge_repo/src/trajectory/trajectory_repo.rs b/crates/forge_repo/src/trajectory/trajectory_repo.rs index 82c2eb27..d10d4e6d 100644 --- a/crates/forge_repo/src/trajectory/trajectory_repo.rs +++ b/crates/forge_repo/src/trajectory/trajectory_repo.rs @@ -136,6 +136,7 @@ mod tests { conversation_id: conv_id.to_string(), agent_id: agent_id.to_string(), parent_agent_id: None, + run_id: None, seq, ts_ms: 1000 + seq as i64, payload: TrajectoryPayload::ToolCall { @@ -151,6 +152,7 @@ mod tests { conversation_id: conv_id.to_string(), agent_id: agent_id.to_string(), parent_agent_id: None, + run_id: None, seq, ts_ms: 1000 + seq as i64, payload: TrajectoryPayload::ToolResult {