Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions crates/forge_app/src/trajectory_recorder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};

/// Process-global monotonic counter giving every recorder (= every run) a
/// unique id, so concurrent sub-agents of the same (conversation, agent) do
/// not collide in the uploader.
static RUN_UID: AtomicU64 = AtomicU64::new(0);
use std::time::{SystemTime, UNIX_EPOCH};

use forge_domain::{
Expand Down Expand Up @@ -82,6 +87,7 @@ pub struct TrajectoryRecorder {
conversation_id: String,
agent_id: String,
parent_agent_id: Option<String>,
run_id: String,
seq: AtomicI32,
/// Optional broadcast sender wired in by [`TrajectoryHub`]. When
/// present, every persisted event is forwarded to live subscribers
Expand All @@ -99,11 +105,20 @@ impl TrajectoryRecorder {
parent_agent_id: Option<String>,
initial_seq: i32,
) -> Self {
let conversation_id = conversation_id.into();
let agent_id = agent_id.into();
let run_id = format!(
"{}-{}-r{}",
conversation_id,
agent_id,
RUN_UID.fetch_add(1, Ordering::Relaxed)
);
Self {
repo,
conversation_id: conversation_id.into(),
agent_id: agent_id.into(),
conversation_id,
agent_id,
parent_agent_id,
run_id,
seq: AtomicI32::new(initial_seq),
broadcast: None,
}
Expand Down Expand Up @@ -254,6 +269,7 @@ impl TrajectoryRecorder {
conversation_id: self.conversation_id.clone(),
agent_id: self.agent_id.clone(),
parent_agent_id: self.parent_agent_id.clone(),
run_id: Some(self.run_id.clone()),
seq,
ts_ms: now_ms(),
payload,
Expand Down Expand Up @@ -439,6 +455,36 @@ mod tests {
}
}

/// Two sub-agents sharing the same (conversation, agent) -- exactly what a
/// parallel `task` fan-out produces -- must mint distinct run_ids, and
/// those ids must ride on every event they emit. This is the producer
/// half of the sub-agent collision fix; the uploader half (keying buffers
/// by run_id) is covered in `trajectory_upload`'s tests.
#[tokio::test]
async fn concurrent_siblings_mint_and_emit_distinct_run_ids() {
let repo = Arc::new(FakeRepo::default());
let a = TrajectoryRecorder::new(repo.clone(), "conv", "forge", Some("p".to_string()), 0);
let b = TrajectoryRecorder::new(repo.clone(), "conv", "forge", Some("p".to_string()), 0);

// Distinct at the source, shaped {conv}-{agent}-r{n}.
assert_ne!(a.run_id, b.run_id, "siblings must mint distinct run_ids");
assert!(a.run_id.starts_with("conv-forge-r"), "got {}", a.run_id);

// ...and the id rides on every event each emits (build_event stamps it).
a.record_error("boom-a", None).await;
b.record_error("boom-b", None).await;
let ids: std::collections::HashSet<Option<String>> = repo
.list("conv", "forge")
.await
.unwrap()
.iter()
.map(|e| e.run_id.clone())
.collect();
assert_eq!(ids.len(), 2, "the two runs' events carry two distinct run_ids");
assert!(ids.contains(&Some(a.run_id.clone())));
assert!(ids.contains(&Some(b.run_id.clone())));
}

#[tokio::test]
async fn records_tool_call_then_result_with_increasing_seq() {
let repo = Arc::new(FakeRepo::default());
Expand Down
92 changes: 87 additions & 5 deletions crates/forge_app/src/trajectory_upload.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Client-side trajectory uploader for `data.codegraff.com`.
//!
//! A pure subscriber on the [`TrajectoryHub`] broadcast: it buffers events per
//! `(conversation_id, agent_id)` run and, on `AgentRunEnd`, builds a
//! `(conversation_id, agent_id, run_id)` run and, on `AgentRunEnd`, builds a
//! `graff.trajectory.v1` envelope, redacts it, and POSTs it best-effort. It
//! touches nothing in the orchestrator.
//!
Expand Down Expand Up @@ -224,7 +224,7 @@ impl TrajectoryUploader {
let ingest_key = std::env::var("GRAFF_INGEST_KEY").ok().filter(|s| !s.is_empty());

tokio::spawn(async move {
let mut runs: HashMap<(String, String), RunBuf> = HashMap::new();
let mut runs: HashMap<(String, String, Option<String>), RunBuf> = HashMap::new();
loop {
match rx.recv().await {
Ok(ev) => handle(&mut runs, ev, &client, &endpoint, ingest_key.as_deref()),
Expand Down Expand Up @@ -253,13 +253,13 @@ impl TrajectoryUploader {
}

fn handle(
runs: &mut HashMap<(String, String), RunBuf>,
runs: &mut HashMap<(String, String, Option<String>), RunBuf>,
ev: TrajectoryEvent,
client: &reqwest::Client,
endpoint: &str,
ingest_key: Option<&str>,
) {
let key = (ev.conversation_id.clone(), ev.agent_id.clone());
let key = (ev.conversation_id.clone(), ev.agent_id.clone(), ev.run_id.clone());
let seq = ev.seq;
let ts_ms = ev.ts_ms;

Expand Down Expand Up @@ -371,7 +371,7 @@ fn handle(
#[cfg(test)]
mod tests {
use super::rlvr_label;
use super::Run;
use super::{handle, Run};

#[test]
fn run_envelope_carries_agent_version() {
Expand Down Expand Up @@ -408,6 +408,88 @@ mod tests {
);
}

// --- sub-agent collision regression (the run_id fix) ---

use forge_domain::{ToolName, TrajectoryEvent, TrajectoryPayload};
use std::collections::HashMap;

fn agent_run(run_id: &str, seq: i32) -> TrajectoryEvent {
TrajectoryEvent {
conversation_id: "conv".into(),
agent_id: "forge".into(),
parent_agent_id: Some("parent".into()),
run_id: Some(run_id.into()),
seq,
ts_ms: 1000 + seq as i64,
payload: TrajectoryPayload::AgentRun {
agent_id: "forge".into(),
parent_agent_id: Some("parent".into()),
requested_model: None,
resolved_model: "gpt-5.5".into(),
agent_version: Some("v1".into()),
},
}
}

fn read_call(run_id: Option<&str>, seq: i32, call: &str) -> TrajectoryEvent {
TrajectoryEvent {
conversation_id: "conv".into(),
agent_id: "forge".into(),
parent_agent_id: Some("parent".into()),
run_id: run_id.map(str::to_string),
seq,
ts_ms: 1000 + seq as i64,
payload: TrajectoryPayload::ToolCall {
tool_name: ToolName::new("read"),
call_id: call.into(),
arguments: serde_json::json!({}),
},
}
}

/// Two sub-agents of the SAME (conversation, agent) -- e.g. two parallel
/// `forge` workers under one `task` fan-out. They deliberately share
/// conv+agent so `/trace` groups them, and are told apart only by
/// `run_id`. Their events arrive interleaved on the broadcast; the
/// uploader must keep each in its own buffer, never merging them into one
/// corrupted envelope.
#[tokio::test]
async fn concurrent_siblings_with_distinct_run_ids_do_not_collide() {
let client = reqwest::Client::new();
let mut runs = HashMap::new();
// A starts, B starts, A reads, B reads, A reads again.
handle(&mut runs, agent_run("rA", 0), &client, "http://localhost", None);
handle(&mut runs, agent_run("rB", 0), &client, "http://localhost", None);
handle(&mut runs, read_call(Some("rA"), 1, "a1"), &client, "http://localhost", None);
handle(&mut runs, read_call(Some("rB"), 1, "b1"), &client, "http://localhost", None);
handle(&mut runs, read_call(Some("rA"), 2, "a2"), &client, "http://localhost", None);

assert_eq!(runs.len(), 2, "siblings must not share a buffer");
let a = runs
.get(&("conv".to_string(), "forge".to_string(), Some("rA".to_string())))
.expect("run A buffered separately");
let b = runs
.get(&("conv".to_string(), "forge".to_string(), Some("rB".to_string())))
.expect("run B buffered separately");
assert_eq!(a.steps.len(), 2, "run A keeps exactly its own two reads");
assert_eq!(b.steps.len(), 1, "run B keeps exactly its own one read");
}

/// Guard that `run_id` is the load-bearing part of the key: the identical
/// interleaving with `run_id = None` for both (the pre-fix world, and what
/// DB-reconstructed events carry) collapses into a single shared buffer.
/// This is the exact corruption the fix removes.
#[tokio::test]
async fn without_run_id_siblings_collide_into_one_buffer() {
let client = reqwest::Client::new();
let mut runs = HashMap::new();
handle(&mut runs, read_call(None, 1, "a1"), &client, "http://localhost", None);
handle(&mut runs, read_call(None, 1, "b1"), &client, "http://localhost", None);
handle(&mut runs, read_call(None, 2, "a2"), &client, "http://localhost", None);
assert_eq!(runs.len(), 1, "without run_id the siblings share a key (old bug)");
assert_eq!(runs.values().next().unwrap().steps.len(), 3);
}


#[test]
fn accepted_completed_run_gets_full_reward() {
Expand Down
6 changes: 6 additions & 0 deletions crates/forge_domain/src/trajectory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ pub struct TrajectoryEvent {
pub agent_id: String,
pub parent_agent_id: Option<String>,
pub seq: i32,
/// Stable id for THIS run (one per spawn / top-level chat), distinct from
/// `seq` (per-event) and from `(conversation_id, agent_id)` (shared by
/// concurrent sibling sub-agents). Keeps each parallel sub-agent's
/// trajectory separate in the uploader. `None` for events rebuilt from the
/// DB (the upload path only sees live broadcast events, which always set it).
pub run_id: Option<String>,
pub ts_ms: i64,
pub payload: TrajectoryPayload,
}
Expand Down
1 change: 1 addition & 0 deletions crates/forge_repo/src/trajectory/trajectory_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl TryFrom<TrajectoryEventRecord> for TrajectoryEvent {
conversation_id: record.conversation_id,
agent_id: record.agent_id,
parent_agent_id: record.parent_agent_id,
run_id: None,
seq: record.seq,
ts_ms: record.ts_ms,
payload,
Expand Down
2 changes: 2 additions & 0 deletions crates/forge_repo/src/trajectory/trajectory_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ mod tests {
conversation_id: conv_id.to_string(),
agent_id: agent_id.to_string(),
parent_agent_id: None,
run_id: None,
seq,
ts_ms: 1000 + seq as i64,
payload: TrajectoryPayload::ToolCall {
Expand All @@ -151,6 +152,7 @@ mod tests {
conversation_id: conv_id.to_string(),
agent_id: agent_id.to_string(),
parent_agent_id: None,
run_id: None,
seq,
ts_ms: 1000 + seq as i64,
payload: TrajectoryPayload::ToolResult {
Expand Down
Loading