From 68c4d5ecc0b8f99df6e0f65b11074ba18b447918 Mon Sep 17 00:00:00 2001 From: Ben Reilly Date: Tue, 26 May 2026 23:25:23 -0400 Subject: [PATCH] voice context reset + boot-time sub-agent discovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related additions, sharing the control socket and progress store already plumbed by #4. == voice context reset == Long realtime sessions accumulate enough conversation history that the voice model starts drifting and (occasionally) the server rejects further turns. Three new ways to wipe the conversation back to a safe baseline without disturbing audio or the websocket itself: - voice tool `reset_voice_context(reason?)` — the realtime model can self-reset when it senses overload or the user asks for a fresh start. Acked with a function_call_output so the model keeps speaking naturally afterward. - control socket `Request::Reset { reason }` exposed as `gamechat reset [--reason TEXT]` from a second terminal. - auto-reset once tracked items exceed `settings.auto_reset_after_items` (clamped to [20, 2000]; omit or set to 0 to disable). Disabled by default. Mechanics (see `src/voice_loop/reset.rs`): 1. `response.cancel` if a response is in flight, so the server stops emitting audio for a turn whose item id we're about to delete. 2. `conversation.item.delete` for every id we've observed via `conversation.item.created` (tracked in `ConversationItemTracker`, capped at 4000 entries with FIFO eviction). 3. Re-send the original `session.update` to re-baseline instructions, voice, tools, and turn detection. The local playback buffer and mic input are deliberately left alone, so audio already handed to cpal finishes playing while the next turn comes back clean. Any queued `response.create` events that pre-dated the reset are dropped — the conversation they would have continued no longer exists. == boot-time sub-agent discovery == When a new `gamechat --realtime` starts, it now scans `runtime_dir()` for sockets owned by other live gamechats and asks each peer for its active slug list (with a 750ms per-peer timeout). Discovered slugs are stamped into the local `ProgressStore` under `peer__` with a `discovered:` provider tag so they are visible in `gamechat inspect` but unambiguously not locally owned. Standalone CLI: `gamechat discover` walks the runtime dir and prints every peer's slugs in one table — useful when triaging "who's running the background work I see in claude?" across multiple terminals. Discovery is enabled by default; flip `settings.discover_existing_subagents: false` to opt out. == other == - `BASE_INSTRUCTIONS` updated to tell the voice model when to call `reset_voice_context` and not to announce it. - duplicate `spawn_server` call in `run_realtime_voice` removed (was binding the control socket twice, second bind always failed). - 30 new tests covering the reset event sequence, item tracker bounding/dedup, auto-reset threshold clamping, discovery seeding, peer-query timeout, the new `reset_voice_context` tool definition, and the control-socket Reset handler (both happy path and voice-loop-channel-closed). Total: 71 tests, all passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/control/client.rs | 60 +++++++- src/control/discovery.rs | 282 +++++++++++++++++++++++++++++++++++ src/control/mod.rs | 6 +- src/control/protocol.rs | 12 ++ src/control/server.rs | 98 +++++++++++- src/main.rs | 38 ++++- src/voice_loop/mod.rs | 295 ++++++++++++++++++++++++++++++++++--- src/voice_loop/reset.rs | 245 ++++++++++++++++++++++++++++++ src/voice_loop/session.rs | 59 +++++++- src/voice_loop/settings.rs | 102 ++++++++++++- 10 files changed, 1154 insertions(+), 43 deletions(-) create mode 100644 src/control/discovery.rs create mode 100644 src/voice_loop/reset.rs diff --git a/src/control/client.rs b/src/control/client.rs index 30e076c..fe72ee3 100644 --- a/src/control/client.rs +++ b/src/control/client.rs @@ -1,6 +1,8 @@ -//! Client side of the control protocol — used by the `inspect`, `tail`, and -//! `open` subcommands. Connects to a Unix socket and prints results. +//! Client side of the control protocol — used by the `inspect`, `tail`, +//! `open`, `reset`, and `discover` subcommands. Connects to a Unix socket +//! and prints results. +use super::discovery::discover_existing_subagents; use super::protocol::{Request, Response}; use super::runtime_dir::{discover_sockets, socket_path_for_pid}; use super::{ControlSubcommand, ControlTarget}; @@ -15,11 +17,18 @@ pub(super) async fn run( subcommand: ControlSubcommand, target: ControlTarget, ) -> Result<(), String> { + // `discover` doesn't talk to a single instance — it surveys the whole + // runtime dir — so resolve the socket lazily for the other subcommands. + if let ControlSubcommand::Discover = subcommand { + return run_discover().await; + } let socket = resolve_socket(&target)?; match subcommand { ControlSubcommand::Inspect => run_inspect(&socket).await, ControlSubcommand::Tail { slug } => run_tail(&socket, &slug).await, ControlSubcommand::Open { slug, launch } => run_open(&socket, &slug, launch).await, + ControlSubcommand::Reset { reason } => run_reset(&socket, reason).await, + ControlSubcommand::Discover => unreachable!(), } } @@ -154,6 +163,53 @@ async fn run_tail(socket: &Path, slug: &str) -> Result<(), String> { } } +async fn run_reset(socket: &Path, reason: Option) -> Result<(), String> { + let resp = send_request(socket, Request::Reset { reason: reason.clone() }).await?; + match resp { + Response::Reset { dispatched } => { + if dispatched { + println!( + "reset signal dispatched to {} (reason={})", + socket.display(), + reason.as_deref().unwrap_or("unspecified") + ); + } else { + return Err(format!( + "voice loop reset channel is closed; reset NOT applied (socket {})", + socket.display() + )); + } + Ok(()) + } + Response::Error { message } => Err(message), + other => Err(format!("unexpected response: {other:?}")), + } +} + +async fn run_discover() -> Result<(), String> { + let discovered = discover_existing_subagents().await; + if discovered.is_empty() { + println!("(no other live gamechat instances expose any sub-agents)"); + return Ok(()); + } + println!( + "{:<8} {:<24} {:<10} {:<9} {:>9} {}", + "PEER", "SLUG", "PROVIDER", "STATUS", "ELAPSED", "LAST" + ); + for entry in discovered { + println!( + "{:<8} {:<24} {:<10} {:<9} {:>8}s {}", + entry.pid, + truncate_for_table(&entry.summary.slug, 24), + truncate_for_table(&entry.summary.provider, 10), + status_label(entry.summary.status), + entry.summary.elapsed_seconds.round() as u64, + truncate_for_table(&entry.summary.last_message, 60), + ); + } + Ok(()) +} + async fn run_open(socket: &Path, slug: &str, launch: bool) -> Result<(), String> { let resp = send_request( socket, diff --git a/src/control/discovery.rs b/src/control/discovery.rs new file mode 100644 index 0000000..947a3bd --- /dev/null +++ b/src/control/discovery.rs @@ -0,0 +1,282 @@ +//! Boot-time sub-agent discovery. +//! +//! On startup the voice loop scans the runtime directory for sockets owned by +//! other live gamechat processes, sends each a `List` request, and records +//! whatever active slugs come back. The local progress store is then seeded +//! with a stub entry per discovered slug so this instance can: +//! +//! - log who else is doing background work in this user account +//! - surface those slugs in `gamechat inspect` alongside its own jobs +//! - tell the voice model "there's already a refactor_docs agent running in +//! pid 12345" when the user asks +//! +//! The seeded entries are clearly marked as `discovered:` providers so +//! they can't be confused with locally-owned jobs. They are read-only; we do +//! not try to drive remote work from this instance. + +use super::protocol::{Request, Response}; +use super::runtime_dir::{discover_sockets, socket_path_for_pid}; +use crate::orchestrator::progress::{ProgressStore, SlugSummary}; +use std::path::{Path, PathBuf}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; +use tokio::time::{Duration, timeout}; + +/// How long we'll wait on any one peer before giving up. Keeps a hung peer +/// from blocking startup forever. +const PEER_QUERY_TIMEOUT: Duration = Duration::from_millis(750); + +/// A sub-agent learned about from another running gamechat instance. +#[derive(Debug, Clone)] +pub(crate) struct DiscoveredSubagent { + pub pid: u32, + // Held so callers can re-open the peer socket (e.g. to tail a slug). + // Currently only used for logging by the discovery printer; suppress + // the dead-code warning so we don't have to give the field a leading + // underscore that would break the public-looking field name. + #[allow(dead_code)] + pub socket: PathBuf, + pub summary: SlugSummary, +} + +/// Scan the runtime dir and probe every other live gamechat socket. The +/// current process is excluded so we never recurse. All failures are logged +/// and dropped; the return value is a best-effort union of what came back. +pub(crate) async fn discover_existing_subagents() -> Vec { + let own_pid = std::process::id(); + let sockets = match discover_sockets() { + Ok(sockets) => sockets, + Err(err) => { + eprintln!("boot discovery skipped: failed to scan runtime dir: {err}"); + return Vec::new(); + } + }; + + let own_socket = socket_path_for_pid(own_pid).ok(); + let mut discovered: Vec = Vec::new(); + let mut peers_queried: usize = 0; + let mut peers_with_slugs: usize = 0; + + for socket in sockets { + if own_socket.as_ref().map(|p| p == &socket).unwrap_or(false) { + continue; + } + let pid = match pid_for_socket(&socket) { + Some(pid) => pid, + None => continue, + }; + if pid == own_pid { + continue; + } + peers_queried += 1; + match query_peer(&socket).await { + Ok(slugs) if slugs.is_empty() => { + eprintln!( + "boot discovery peer pid={pid} socket={} has no active sub-agents", + socket.display() + ); + } + Ok(slugs) => { + peers_with_slugs += 1; + eprintln!( + "boot discovery peer pid={pid} socket={} active_slugs={}", + socket.display(), + slugs.len() + ); + for summary in slugs { + discovered.push(DiscoveredSubagent { + pid, + socket: socket.clone(), + summary, + }); + } + } + Err(err) => { + eprintln!( + "boot discovery peer pid={pid} socket={} failed: {err}", + socket.display() + ); + } + } + } + eprintln!( + "boot discovery done peers_queried={} peers_with_slugs={} discovered_total={}", + peers_queried, + peers_with_slugs, + discovered.len() + ); + discovered +} + +/// Stamp discovered slugs into the local progress store so they show up +/// alongside locally-owned jobs. Each stub uses a `discovered:` +/// provider tag so it's obvious where it came from and so a future tool +/// call won't accidentally treat it as resumable. +pub(crate) fn seed_discovered_subagents(store: &ProgressStore, discovered: &[DiscoveredSubagent]) { + if discovered.is_empty() { + return; + } + for entry in discovered { + let provider_tag = format!("discovered:{}", entry.pid); + let local_slug = format!("peer_{}_{}", entry.pid, entry.summary.slug); + store.register_job(&local_slug, leak_provider_tag(provider_tag)); + let elapsed = entry.summary.elapsed_seconds.round() as u64; + let last = if entry.summary.last_message.trim().is_empty() { + "(no recent message)".to_string() + } else { + entry.summary.last_message.clone() + }; + store.push_progress( + &local_slug, + &format!( + "Discovered at boot from pid {} (peer slug {}, status {}, elapsed {}s). Last message: {}", + entry.pid, entry.summary.slug, entry.summary.status, elapsed, last + ), + ); + if let Some(session_id) = entry.summary.session_id.as_deref() { + store.set_session_id(&local_slug, session_id); + } + } +} + +/// `ProgressStore::register_job` wants `&'static str` for the provider name +/// so it can be cheaply re-stored. Discovered providers are only known at +/// runtime, so we intentionally leak the string. The set of distinct +/// `discovered:` tags is bounded by the number of peer gamechat +/// processes ever seen during this binary's lifetime — at most a handful. +fn leak_provider_tag(tag: String) -> &'static str { + Box::leak(tag.into_boxed_str()) +} + +fn pid_for_socket(path: &Path) -> Option { + path.file_stem() + .and_then(|s| s.to_str()) + .and_then(|s| s.parse::().ok()) +} + +async fn query_peer(socket: &Path) -> Result, String> { + let task = async { + let stream = UnixStream::connect(socket) + .await + .map_err(|e| format!("connect failed: {e}"))?; + let (read_half, mut write_half) = stream.into_split(); + let mut payload = serde_json::to_string(&Request::List) + .map_err(|e| format!("serialize list request failed: {e}"))?; + payload.push('\n'); + write_half + .write_all(payload.as_bytes()) + .await + .map_err(|e| format!("write list request failed: {e}"))?; + write_half + .shutdown() + .await + .map_err(|e| format!("half-close failed: {e}"))?; + + let mut reader = BufReader::new(read_half); + let mut line = String::new(); + reader + .read_line(&mut line) + .await + .map_err(|e| format!("read response failed: {e}"))?; + let response: Response = serde_json::from_str(line.trim()) + .map_err(|e| format!("parse response failed: {e}: {line}"))?; + match response { + Response::List { slugs } => Ok(slugs), + Response::Error { message } => Err(format!("peer reported error: {message}")), + other => Err(format!("unexpected response: {other:?}")), + } + }; + match timeout(PEER_QUERY_TIMEOUT, task).await { + Ok(result) => result, + Err(_) => Err(format!( + "peer query exceeded {}ms", + PEER_QUERY_TIMEOUT.as_millis() + )), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::orchestrator::progress::JobStatus; + + fn fake_summary(slug: &str) -> SlugSummary { + SlugSummary { + slug: slug.to_string(), + provider: "claude".to_string(), + status: JobStatus::Running, + last_message: "doing the thing".to_string(), + elapsed_seconds: 12.0, + session_id: Some("sess-99".to_string()), + last_seq: Some(2), + } + } + + #[test] + fn seed_writes_each_discovered_slug_into_store() { + let store = ProgressStore::new(); + let entries = vec![ + DiscoveredSubagent { + pid: 42, + socket: PathBuf::from("/tmp/42.sock"), + summary: fake_summary("refactor_docs"), + }, + DiscoveredSubagent { + pid: 99, + socket: PathBuf::from("/tmp/99.sock"), + summary: fake_summary("scan_repo"), + }, + ]; + seed_discovered_subagents(&store, &entries); + let summaries = store.snapshot_all(); + assert_eq!(summaries.len(), 2); + // Sorted by slug. + assert_eq!(summaries[0].slug, "peer_42_refactor_docs"); + assert!(summaries[0].provider.starts_with("discovered:42")); + assert_eq!(summaries[0].session_id.as_deref(), Some("sess-99")); + assert_eq!(summaries[1].slug, "peer_99_scan_repo"); + } + + #[test] + fn seed_noop_for_empty_input() { + let store = ProgressStore::new(); + seed_discovered_subagents(&store, &[]); + assert!(store.snapshot_all().is_empty()); + } + + #[test] + fn seed_falls_back_to_placeholder_when_no_last_message() { + let store = ProgressStore::new(); + let mut summary = fake_summary("blank"); + summary.last_message = " ".to_string(); + let entries = vec![DiscoveredSubagent { + pid: 7, + socket: PathBuf::from("/tmp/7.sock"), + summary, + }]; + seed_discovered_subagents(&store, &entries); + let summaries = store.snapshot_all(); + assert_eq!(summaries.len(), 1); + assert!(summaries[0].last_message.contains("(no recent message)")); + } + + #[tokio::test] + async fn query_peer_timeouts_when_no_one_listens() { + // Build a path that does not have a listener bound; the connect + // attempt will fail fast (which is fine — we just need to assert + // we return an Err rather than hanging). + let path = std::env::temp_dir().join(format!( + "gamechat-test-nopeer-{}-{}.sock", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + let err = query_peer(&path).await.expect_err("should fail"); + assert!( + err.contains("connect failed") || err.contains("exceeded"), + "unexpected error: {err}" + ); + } +} diff --git a/src/control/mod.rs b/src/control/mod.rs index 4032849..4ff1bbb 100644 --- a/src/control/mod.rs +++ b/src/control/mod.rs @@ -18,13 +18,15 @@ //! resume command for the underlying agent UI. mod client; +mod discovery; mod protocol; mod runtime_dir; mod server; use std::path::PathBuf; -pub(crate) use server::spawn_server; +pub(crate) use discovery::{discover_existing_subagents, seed_discovered_subagents}; +pub(crate) use server::{ResetSignal, spawn_server}; /// Subcommand parsed from `gamechat`'s argv. #[derive(Debug)] @@ -32,6 +34,8 @@ pub(crate) enum ControlSubcommand { Inspect, Tail { slug: String }, Open { slug: String, launch: bool }, + Reset { reason: Option }, + Discover, } /// Optional connection target. `pid` picks a specific running gamechat; if diff --git a/src/control/protocol.rs b/src/control/protocol.rs index 2cca9eb..766c80a 100644 --- a/src/control/protocol.rs +++ b/src/control/protocol.rs @@ -23,6 +23,12 @@ pub(crate) enum Request { }, /// Look up the resume target for a slug — provider + session id. Resume { slug: String }, + /// Trigger a voice-context reset on the running voice loop. Optional + /// `reason` is recorded in the server log alongside the trigger source. + Reset { + #[serde(default)] + reason: Option, + }, } #[derive(Debug, Serialize, Deserialize)] @@ -47,6 +53,12 @@ pub(crate) enum Response { provider: String, session_id: Option, }, + /// Acknowledgement that the reset signal was accepted by the server. + /// `dispatched` is true when the voice loop is alive and received the + /// signal; false when the voice loop is absent (read-only socket). + Reset { + dispatched: bool, + }, Error { message: String, }, diff --git a/src/control/server.rs b/src/control/server.rs index cdf5723..57c4777 100644 --- a/src/control/server.rs +++ b/src/control/server.rs @@ -12,6 +12,14 @@ use std::path::PathBuf; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; +use tokio::sync::mpsc; + +/// Opaque signal carried from the control server to the voice loop when a +/// reset is requested. The voice loop maps it onto its own `ResetTrigger`. +#[derive(Debug, Clone)] +pub(crate) struct ResetSignal { + pub reason: Option, +} /// Handle returned to the voice loop; dropping it removes the socket file. pub(crate) struct ServerHandle { @@ -29,6 +37,7 @@ impl Drop for ServerHandle { pub(crate) fn spawn_server( store: Arc, provider_name: &'static str, + reset_tx: mpsc::UnboundedSender, ) -> Result { let pid = std::process::id(); let socket_path = socket_path_for_pid(pid)?; @@ -46,18 +55,26 @@ pub(crate) fn spawn_server( ); let store_for_task = Arc::clone(&store); tokio::spawn(async move { - accept_loop(listener, store_for_task, provider_name).await; + accept_loop(listener, store_for_task, provider_name, reset_tx).await; }); Ok(ServerHandle { socket_path }) } -async fn accept_loop(listener: UnixListener, store: Arc, provider_name: &'static str) { +async fn accept_loop( + listener: UnixListener, + store: Arc, + provider_name: &'static str, + reset_tx: mpsc::UnboundedSender, +) { loop { match listener.accept().await { Ok((stream, _addr)) => { let store = Arc::clone(&store); + let reset_tx = reset_tx.clone(); tokio::spawn(async move { - if let Err(err) = handle_connection(stream, store, provider_name).await { + if let Err(err) = + handle_connection(stream, store, provider_name, reset_tx).await + { eprintln!("control connection error: {err}"); } }); @@ -74,6 +91,7 @@ async fn handle_connection( stream: UnixStream, store: Arc, provider_name: &'static str, + reset_tx: mpsc::UnboundedSender, ) -> Result<(), String> { let (read_half, mut write_half) = stream.into_split(); let mut reader = BufReader::new(read_half); @@ -86,7 +104,7 @@ async fn handle_connection( return Ok(()); } let response = match serde_json::from_str::(line.trim()) { - Ok(request) => handle_request(request, &store, provider_name), + Ok(request) => handle_request(request, &store, provider_name, &reset_tx), Err(err) => Response::Error { message: format!("invalid request: {err}"), }, @@ -109,6 +127,7 @@ fn handle_request( request: Request, store: &ProgressStore, provider_name: &'static str, + reset_tx: &mpsc::UnboundedSender, ) -> Response { match request { Request::Hello => Response::Hello { @@ -146,6 +165,19 @@ fn handle_request( }, } } + Request::Reset { reason } => { + let display_reason = reason.clone().unwrap_or_else(|| "unspecified".to_string()); + eprintln!( + "control socket reset requested reason={display_reason}" + ); + let dispatched = reset_tx.send(ResetSignal { reason }).is_ok(); + if !dispatched { + eprintln!( + "control socket reset dropped: voice loop reset channel closed" + ); + } + Response::Reset { dispatched } + } } } @@ -161,6 +193,11 @@ mod tests { use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + fn unused_reset_sender() -> mpsc::UnboundedSender { + let (tx, _rx) = mpsc::unbounded_channel(); + tx + } + #[tokio::test] async fn end_to_end_list_and_tail_over_socket() { let store = Arc::new(ProgressStore::new()); @@ -176,7 +213,10 @@ mod tests { let listener = UnixListener::bind(&socket_path).unwrap(); let store_for_task = Arc::clone(&store); - tokio::spawn(async move { accept_loop(listener, store_for_task, "claude").await }); + let reset_tx = unused_reset_sender(); + tokio::spawn(async move { + accept_loop(listener, store_for_task, "claude", reset_tx).await + }); // List → should see our slug + session_id. let resp = roundtrip(&socket_path, Request::List).await; @@ -278,7 +318,8 @@ mod tests { store.push_progress("a", "step"); store.set_session_id("a", "abc"); - let resp = handle_request(Request::List, &store, "claude"); + let reset_tx = unused_reset_sender(); + let resp = handle_request(Request::List, &store, "claude", &reset_tx); match resp { Response::List { slugs } => { assert_eq!(slugs.len(), 1); @@ -294,12 +335,14 @@ mod tests { let store = Arc::new(ProgressStore::new()); store.register_job("solo", "claude"); store.set_session_id("solo", "sess-xyz"); + let reset_tx = unused_reset_sender(); let resp = handle_request( Request::Resume { slug: "solo".into(), }, &store, "claude", + &reset_tx, ); match resp { Response::Resume { @@ -322,6 +365,7 @@ mod tests { store.push_progress("t", "one"); store.push_progress("t", "two"); + let reset_tx = unused_reset_sender(); let resp = handle_request( Request::Tail { slug: "t".into(), @@ -329,6 +373,7 @@ mod tests { }, &store, "claude", + &reset_tx, ); match resp { Response::Tail { @@ -348,6 +393,7 @@ mod tests { #[test] fn unknown_slug_in_tail_is_error() { let store = Arc::new(ProgressStore::new()); + let reset_tx = unused_reset_sender(); let resp = handle_request( Request::Tail { slug: "ghost".into(), @@ -355,10 +401,50 @@ mod tests { }, &store, "claude", + &reset_tx, ); match resp { Response::Error { message } => assert!(message.contains("ghost")), other => panic!("expected Error, got {other:?}"), } } + + #[test] + fn reset_handler_dispatches_to_voice_loop_channel() { + let store = Arc::new(ProgressStore::new()); + let (reset_tx, mut reset_rx) = mpsc::unbounded_channel::(); + let resp = handle_request( + Request::Reset { + reason: Some("context_overload".into()), + }, + &store, + "claude", + &reset_tx, + ); + match resp { + Response::Reset { dispatched } => assert!(dispatched), + other => panic!("expected Reset, got {other:?}"), + } + let signal = reset_rx + .try_recv() + .expect("reset signal should have been forwarded to voice loop"); + assert_eq!(signal.reason.as_deref(), Some("context_overload")); + } + + #[test] + fn reset_handler_reports_undispatched_when_channel_closed() { + let store = Arc::new(ProgressStore::new()); + let (reset_tx, reset_rx) = mpsc::unbounded_channel::(); + drop(reset_rx); + let resp = handle_request( + Request::Reset { reason: None }, + &store, + "claude", + &reset_tx, + ); + match resp { + Response::Reset { dispatched } => assert!(!dispatched), + other => panic!("expected Reset, got {other:?}"), + } + } } diff --git a/src/main.rs b/src/main.rs index a54b7af..ee4530f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -217,12 +217,16 @@ fn parse_control_subcommand() -> Result = Vec::new(); let mut target = ControlTarget::default(); let mut launch = false; + let mut reset_reason: Option = None; while let Some(arg) = iter.next() { match arg.as_str() { "--pid" => { @@ -236,6 +240,7 @@ fn parse_control_subcommand() -> Result launch = true, + "--reason" => reset_reason = Some(next_value(&mut iter, "--reason")?), "--help" | "-h" => return Err(control_usage()), other if other.starts_with("--") => { return Err(format!("unknown argument for {subcommand_name}: {other}")); @@ -267,6 +272,29 @@ fn parse_control_subcommand() -> Result { + if !positional.is_empty() { + return Err(format!( + "reset takes no positional arguments, got: {}. Use --reason if you want to record one.", + positional.join(" ") + )); + } + ControlSubcommand::Reset { reason: reset_reason } + } + "discover" => { + if !positional.is_empty() { + return Err(format!( + "discover takes no positional arguments, got: {}", + positional.join(" ") + )); + } + if target.pid.is_some() || target.socket.is_some() { + return Err( + "discover surveys every live gamechat in the runtime dir; --pid/--socket do not apply".to_string(), + ); + } + ControlSubcommand::Discover + } _ => unreachable!(), }; Ok(Some((subcommand, target))) @@ -277,10 +305,16 @@ fn control_usage() -> String { gamechat inspect [--pid N | --socket PATH] gamechat tail [--pid N | --socket PATH] gamechat open [--pid N | --socket PATH] [--launch] + gamechat reset [--pid N | --socket PATH] [--reason TEXT] + gamechat discover Inspect a running gamechat realtime session from another terminal. With no target flag the client connects to the only running gamechat instance and errors if more than one is present. + +`reset` clears the realtime conversation context on the running voice loop +without disturbing audio playback. `discover` walks the runtime dir and +reports the active sub-agent slugs of every other live gamechat instance. " .to_string() } @@ -310,6 +344,8 @@ fn usage() -> String { gamechat inspect List active sub-agents in a running session. gamechat tail Stream a sub-agent's progress buffer. gamechat open [--launch] Print (or launch on macOS) the resume command. + gamechat reset [--reason TEXT] Reset the realtime voice conversation context. + gamechat discover List sub-agents in every other live gamechat. options: --realtime Start the live voice loop (mic + speakers). diff --git a/src/voice_loop/mod.rs b/src/voice_loop/mod.rs index 5ec4d61..7f530a4 100644 --- a/src/voice_loop/mod.rs +++ b/src/voice_loop/mod.rs @@ -1,7 +1,9 @@ mod audio; +mod reset; mod session; pub(crate) mod settings; +use crate::control::ResetSignal; use crate::orchestrator::{ OpenAiSummarizer, OrchestratorBridge, OrchestratorJobManager, OrchestratorProvider, }; @@ -9,10 +11,11 @@ use audio::{ AudioChunk, PlaybackBuffer, enqueue_audio_delta, i16_to_le_bytes, playback_depth_ms, resample_i16, start_input_stream, start_output_stream, }; +use reset::{ConversationItemTracker, ResetTrigger, build_reset_events, item_id_from_event}; use settings::ResolvedVoiceSettings; use base64::Engine; use futures_util::{Sink, SinkExt, StreamExt}; -use serde_json::json; +use serde_json::{Value, json}; use std::collections::VecDeque; use std::fmt::Display; use std::sync::{Arc, Mutex}; @@ -38,24 +41,28 @@ pub(crate) async fn run_realtime_voice(config: RealtimeRunConfig) -> Result<(), let summarizer = Arc::new(OpenAiSummarizer::new(config.openai_api_key.clone(), None)?); let orchestrator_bridge = OrchestratorBridge::new(summarizer); - // Best-effort: keep the realtime loop running even if the control socket - // fails to bind (read-only headless environments, etc). - let _control_handle = match crate::control::spawn_server( - orchestrator_jobs.progress_store(), - provider_name, - ) { - Ok(handle) => Some(handle), - Err(err) => { - eprintln!("control socket disabled: {err}"); - None - } - }; + // Boot-time sub-agent discovery: probe every other live gamechat socket + // for its active slugs and seed the local progress store so this + // instance is aware of background work running elsewhere. Best-effort: + // any failures are logged and ignored. + if config.voice_settings.discover_existing_subagents { + let discovered = crate::control::discover_existing_subagents().await; + crate::control::seed_discovered_subagents( + &orchestrator_jobs.progress_store(), + &discovered, + ); + } else { + eprintln!("boot discovery disabled by settings.discover_existing_subagents=false"); + } + + let (reset_tx, reset_rx) = mpsc::unbounded_channel::(); // Best-effort: keep the realtime loop running even if the control socket // fails to bind (read-only headless environments, etc). let _control_handle = match crate::control::spawn_server( orchestrator_jobs.progress_store(), provider_name, + reset_tx, ) { Ok(handle) => Some(handle), Err(err) => { @@ -78,6 +85,7 @@ pub(crate) async fn run_realtime_voice(config: RealtimeRunConfig) -> Result<(), output_rate, orchestrator_jobs, orchestrator_bridge, + reset_rx, }) .await } @@ -91,11 +99,15 @@ struct VoiceLoop<'a> { output_rate: u32, orchestrator_jobs: OrchestratorJobManager, orchestrator_bridge: OrchestratorBridge, + reset_rx: mpsc::UnboundedReceiver, } async fn run_voice_loop(mut state: VoiceLoop<'_>) -> Result<(), String> { let mut response_active = false; let mut deferred_response_creates = VecDeque::::new(); + let mut item_tracker = ConversationItemTracker::new(); + + let session_update = session::session_update_json_for(&state.model, &state.voice_settings); let url = format!("wss://api.openai.com/v1/realtime?model={}", state.model); let mut request = url @@ -115,11 +127,7 @@ async fn run_voice_loop(mut state: VoiceLoop<'_>) -> Result<(), String> { let (mut write, mut read) = ws.split(); write - .send(Message::Text( - session::session_update_json_for(&state.model, &state.voice_settings) - .to_string() - .into(), - )) + .send(Message::Text(session_update.to_string().into())) .await .map_err(|e| format!("failed to send session.update: {e}"))?; eprintln!("connected. Speak into your microphone. Press Ctrl-C to stop."); @@ -155,6 +163,21 @@ async fn run_voice_loop(mut state: VoiceLoop<'_>) -> Result<(), String> { .await?; } } + Some(signal) = state.reset_rx.recv() => { + eprintln!( + "voice loop received external reset signal reason={}", + signal.reason.as_deref().unwrap_or("unspecified") + ); + perform_reset( + &mut write, + &mut item_tracker, + &session_update, + &mut response_active, + &mut deferred_response_creates, + ResetTrigger::Control, + ) + .await?; + } msg = read.next() => { let Some(msg) = msg else { return Err("Realtime websocket closed".to_string()); @@ -165,16 +188,17 @@ async fn run_voice_loop(mut state: VoiceLoop<'_>) -> Result<(), String> { }; let value: serde_json::Value = serde_json::from_str(&text) .map_err(|e| format!("invalid Realtime event JSON: {e}: {text}"))?; - let events = handle_realtime_event( + let dispatch = handle_realtime_event( value, &mut state.orchestrator_bridge, &state.orchestrator_jobs, &mut response_active, Arc::clone(&state.playback), state.output_rate, + &mut item_tracker, ) .await?; - for event in events { + for event in dispatch.outbound { send_or_defer_realtime_event( &mut write, event, @@ -183,6 +207,35 @@ async fn run_voice_loop(mut state: VoiceLoop<'_>) -> Result<(), String> { ) .await?; } + if dispatch.reset_requested { + perform_reset( + &mut write, + &mut item_tracker, + &session_update, + &mut response_active, + &mut deferred_response_creates, + ResetTrigger::Voice, + ) + .await?; + } + if state.voice_settings.auto_reset_after_items > 0 + && item_tracker.len() >= state.voice_settings.auto_reset_after_items + { + eprintln!( + "voice context auto-reset triggered tracked_items={} threshold={}", + item_tracker.len(), + state.voice_settings.auto_reset_after_items + ); + perform_reset( + &mut write, + &mut item_tracker, + &session_update, + &mut response_active, + &mut deferred_response_creates, + ResetTrigger::Auto, + ) + .await?; + } flush_deferred_response_create( &mut write, &mut response_active, @@ -201,6 +254,45 @@ async fn run_voice_loop(mut state: VoiceLoop<'_>) -> Result<(), String> { Ok(()) } +async fn perform_reset( + write: &mut S, + item_tracker: &mut ConversationItemTracker, + session_update: &Value, + response_active: &mut bool, + deferred_response_creates: &mut VecDeque, + trigger: ResetTrigger, +) -> Result<(), String> +where + S: Sink + Unpin, + S::Error: Display, +{ + let drained = item_tracker.drain(); + let plan = build_reset_events(&drained, session_update, *response_active); + eprintln!( + "voice context reset trigger={} cleared_items={} response_was_active={}", + trigger.as_str(), + plan.cleared_items, + *response_active + ); + // After a reset the deferred response queue is no longer meaningful — the + // realtime conversation it would have continued has been wiped. + if !deferred_response_creates.is_empty() { + eprintln!( + "voice context reset dropping deferred response.creates count={}", + deferred_response_creates.len() + ); + deferred_response_creates.clear(); + } + *response_active = false; + for event in plan.events { + write + .send(Message::Text(event.to_string().into())) + .await + .map_err(|e| format!("failed to send reset event: {e}"))?; + } + Ok(()) +} + async fn send_or_defer_realtime_event( write: &mut S, event: serde_json::Value, @@ -252,6 +344,14 @@ where .map_err(|e| format!("failed to send deferred response.create: {e}")) } +/// Outcome of processing one Realtime event from the server. The voice loop +/// uses this to decide both what to send back over the websocket and whether +/// a context-reset cycle should run before the next select tick. +struct EventDispatch { + outbound: Vec, + reset_requested: bool, +} + async fn handle_realtime_event( value: serde_json::Value, orchestrator_bridge: &mut OrchestratorBridge, @@ -259,7 +359,8 @@ async fn handle_realtime_event( response_active: &mut bool, playback: Arc>, output_rate: u32, -) -> Result, String> { + item_tracker: &mut ConversationItemTracker, +) -> Result { let event_type = value.get("type").and_then(|v| v.as_str()).unwrap_or(""); match event_type { "session.created" | "session.updated" => { @@ -286,7 +387,155 @@ async fn handle_realtime_event( } _ => {} } - orchestrator_bridge + + if let Some(id) = item_id_from_event(&value) { + item_tracker.record(id); + } + + let (reset_outbound, reset_requested) = handle_reset_voice_context_call(&value); + + let mut outbound = reset_outbound; + let mut orchestrator_events = orchestrator_bridge .handle_realtime_event(&value, orchestrator_jobs) - .await + .await?; + outbound.append(&mut orchestrator_events); + + Ok(EventDispatch { + outbound, + reset_requested, + }) +} + +/// Detect a `reset_voice_context` function-call event and return the +/// function-output ack that should be sent back to the model, plus a flag +/// that tells the voice loop to actually run the reset sequence. +fn handle_reset_voice_context_call(value: &Value) -> (Vec, bool) { + let call = match value.get("type").and_then(|v| v.as_str()) { + Some("response.function_call_arguments.done") => Some(value.clone()), + Some("response.output_item.done") => function_call_payload_from_output_item(value), + _ => None, + }; + let Some(call) = call else { + return (Vec::new(), false); + }; + if call.get("name").and_then(|v| v.as_str()) != Some("reset_voice_context") { + return (Vec::new(), false); + } + let call_id = match call.get("call_id").and_then(|v| v.as_str()) { + Some(id) if !id.is_empty() => id.to_string(), + _ => { + eprintln!("reset_voice_context call ignored: missing call_id"); + return (Vec::new(), false); + } + }; + let reason = call + .get("arguments") + .and_then(|v| v.as_str()) + .and_then(|raw| serde_json::from_str::(raw).ok()) + .and_then(|args| { + args.get("reason") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + }) + .unwrap_or_else(|| "unspecified".to_string()); + eprintln!( + "realtime event: reset_voice_context call_id={call_id} reason={reason}" + ); + let ack = json!({ + "type": "conversation.item.create", + "item": { + "type": "function_call_output", + "call_id": call_id, + "output": json!({ + "ok": true, + "reset": true, + "reason": reason, + "instruction": "Conversation context has been reset. Continue speaking naturally without referring to prior turns." + }).to_string(), + } + }); + (vec![ack], true) +} + +fn function_call_payload_from_output_item(value: &Value) -> Option { + let item = value.get("item")?; + if item.get("type").and_then(|v| v.as_str()) != Some("function_call") { + return None; + } + Some(json!({ + "type": "response.function_call_arguments.done", + "name": item.get("name").cloned().unwrap_or_default(), + "call_id": item.get("call_id").cloned().unwrap_or_default(), + "arguments": item.get("arguments").cloned().unwrap_or_default(), + })) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn reset_voice_context_call_returns_ack_and_flag() { + let event = json!({ + "type": "response.function_call_arguments.done", + "name": "reset_voice_context", + "call_id": "call_reset_1", + "arguments": "{\"reason\":\"context_overload\"}" + }); + let (events, reset) = handle_reset_voice_context_call(&event); + assert!(reset); + assert_eq!(events.len(), 1); + assert_eq!(events[0]["item"]["call_id"], "call_reset_1"); + let output = events[0]["item"]["output"].as_str().unwrap(); + let payload: Value = serde_json::from_str(output).unwrap(); + assert_eq!(payload["reset"], true); + assert_eq!(payload["reason"], "context_overload"); + } + + #[test] + fn reset_voice_context_call_handles_missing_reason() { + let event = json!({ + "type": "response.function_call_arguments.done", + "name": "reset_voice_context", + "call_id": "call_reset_2", + "arguments": "{}" + }); + let (events, reset) = handle_reset_voice_context_call(&event); + assert!(reset); + let output: Value = + serde_json::from_str(events[0]["item"]["output"].as_str().unwrap()).unwrap(); + assert_eq!(output["reason"], "unspecified"); + } + + #[test] + fn non_reset_function_calls_are_ignored() { + let event = json!({ + "type": "response.function_call_arguments.done", + "name": "delegate_to_orchestrator", + "call_id": "call_delegate", + "arguments": "{}" + }); + let (events, reset) = handle_reset_voice_context_call(&event); + assert!(!reset); + assert!(events.is_empty()); + } + + #[test] + fn reset_voice_context_via_output_item_done_is_handled() { + let event = json!({ + "type": "response.output_item.done", + "item": { + "type": "function_call", + "name": "reset_voice_context", + "call_id": "call_reset_3", + "arguments": "{\"reason\":\"user_requested\"}" + } + }); + let (events, reset) = handle_reset_voice_context_call(&event); + assert!(reset); + assert_eq!(events.len(), 1); + let output: Value = + serde_json::from_str(events[0]["item"]["output"].as_str().unwrap()).unwrap(); + assert_eq!(output["reason"], "user_requested"); + } } diff --git a/src/voice_loop/reset.rs b/src/voice_loop/reset.rs new file mode 100644 index 0000000..e860332 --- /dev/null +++ b/src/voice_loop/reset.rs @@ -0,0 +1,245 @@ +//! Voice-context reset: tear down the realtime conversation history without +//! breaking audio flow. +//! +//! The Realtime API holds the entire conversation in server state. Long voice +//! sessions accrete enough context to noticeably degrade responses (and to +//! occasionally fail outright). A reset: +//! +//! 1. Cancels any in-flight response (`response.cancel`) so we don't strand a +//! half-emitted assistant turn whose item id we're about to delete. +//! 2. Deletes every tracked conversation item id with +//! `conversation.item.delete`. We only track ids the server has already +//! confirmed via `conversation.item.created`, so this is exact. +//! 3. Re-emits the original `session.update` to re-baseline instructions, +//! voice, tools, and turn detection. Cheap; the server treats it as a +//! no-op when nothing changed. +//! +//! Audio flow: the local playback buffer is left intact, so any audio already +//! handed to cpal finishes playing. The mic input stream is never touched. +//! The websocket itself stays open. + +use serde_json::{Value, json}; +use std::collections::VecDeque; + +/// Maximum number of conversation item ids we track. Bounded so a runaway +/// session can't grow the in-memory set without limit even if the user +/// never triggers a reset. The oldest id is dropped first; on reset we +/// best-effort delete whatever is still in the window. +pub(crate) const MAX_TRACKED_ITEMS: usize = 4000; + +/// Track conversation item ids in the order they were created so a reset +/// can issue `conversation.item.delete` for each. +#[derive(Debug, Default)] +pub(crate) struct ConversationItemTracker { + items: VecDeque, +} + +impl ConversationItemTracker { + pub(crate) fn new() -> Self { + Self::default() + } + + pub(crate) fn record(&mut self, id: &str) { + if id.is_empty() { + return; + } + if self.items.iter().any(|existing| existing == id) { + return; + } + if self.items.len() >= MAX_TRACKED_ITEMS { + self.items.pop_front(); + } + self.items.push_back(id.to_string()); + } + + pub(crate) fn len(&self) -> usize { + self.items.len() + } + + #[cfg(test)] + pub(crate) fn ids(&self) -> impl Iterator { + self.items.iter() + } + + /// Drain every tracked id and return it. Called as part of a reset so + /// subsequent voice turns start from an empty tracker. + pub(crate) fn drain(&mut self) -> Vec { + std::mem::take(&mut self.items).into_iter().collect() + } +} + +/// Why the reset is happening. Recorded in logs and surfaced to operators. +#[derive(Debug, Clone, Copy)] +pub(crate) enum ResetTrigger { + /// The voice model called the `reset_voice_context` tool. + Voice, + /// An external caller invoked `gamechat reset` through the control plane. + Control, + /// The tracked item count reached the configured auto-reset threshold. + Auto, +} + +impl ResetTrigger { + pub(crate) fn as_str(self) -> &'static str { + match self { + ResetTrigger::Voice => "voice_tool", + ResetTrigger::Control => "control_socket", + ResetTrigger::Auto => "auto_threshold", + } + } +} + +/// Result of [`build_reset_events`]: the events to send and the count of +/// items being deleted (for logging / control-plane responses). +#[derive(Debug)] +pub(crate) struct ResetPlan { + pub events: Vec, + pub cleared_items: usize, +} + +/// Build the Realtime event sequence that performs a context reset. +/// +/// * `tracked_ids` — ids previously returned in `conversation.item.created`. +/// * `session_update` — the baseline `session.update` payload to re-send. +/// * `response_active` — when true, prepend `response.cancel` so the +/// in-flight assistant turn doesn't keep streaming into ids we just +/// asked the server to delete. +pub(crate) fn build_reset_events( + tracked_ids: &[String], + session_update: &Value, + response_active: bool, +) -> ResetPlan { + let mut events = Vec::with_capacity(tracked_ids.len() + 2); + if response_active { + events.push(json!({ "type": "response.cancel" })); + } + for id in tracked_ids { + events.push(json!({ + "type": "conversation.item.delete", + "item_id": id, + })); + } + events.push(session_update.clone()); + ResetPlan { + events, + cleared_items: tracked_ids.len(), + } +} + +/// Inspect a parsed Realtime event for a conversation item id worth +/// tracking. Returns the id only when the server confirms creation. +pub(crate) fn item_id_from_event(value: &Value) -> Option<&str> { + let event_type = value.get("type").and_then(|v| v.as_str())?; + if event_type != "conversation.item.created" { + return None; + } + value.get("item")?.get("id")?.as_str() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn mk_session_update() -> Value { + json!({"type": "session.update", "session": {"voice": "marin"}}) + } + + #[test] + fn tracker_records_unique_ids_in_order() { + let mut tracker = ConversationItemTracker::new(); + tracker.record("item_a"); + tracker.record("item_b"); + tracker.record("item_a"); // duplicate, ignored + assert_eq!(tracker.len(), 2); + let collected: Vec<&String> = tracker.ids().collect(); + assert_eq!(collected[0], "item_a"); + assert_eq!(collected[1], "item_b"); + } + + #[test] + fn tracker_ignores_empty_id() { + let mut tracker = ConversationItemTracker::new(); + tracker.record(""); + assert_eq!(tracker.len(), 0); + } + + #[test] + fn tracker_drops_oldest_past_capacity() { + let mut tracker = ConversationItemTracker::new(); + for i in 0..(MAX_TRACKED_ITEMS + 5) { + tracker.record(&format!("item_{i}")); + } + assert_eq!(tracker.len(), MAX_TRACKED_ITEMS); + let first = tracker.ids().next().unwrap(); + // The first 5 should have been dropped. + assert_eq!(first, "item_5"); + } + + #[test] + fn drain_empties_the_tracker() { + let mut tracker = ConversationItemTracker::new(); + tracker.record("a"); + tracker.record("b"); + let drained = tracker.drain(); + assert_eq!(drained, vec!["a".to_string(), "b".to_string()]); + assert_eq!(tracker.len(), 0); + } + + #[test] + fn build_reset_events_emits_cancel_then_deletes_then_session_update() { + let ids = vec!["item_1".to_string(), "item_2".to_string()]; + let session = mk_session_update(); + let plan = build_reset_events(&ids, &session, true); + assert_eq!(plan.cleared_items, 2); + assert_eq!(plan.events.len(), 4); + assert_eq!(plan.events[0]["type"], "response.cancel"); + assert_eq!(plan.events[1]["type"], "conversation.item.delete"); + assert_eq!(plan.events[1]["item_id"], "item_1"); + assert_eq!(plan.events[2]["type"], "conversation.item.delete"); + assert_eq!(plan.events[2]["item_id"], "item_2"); + assert_eq!(plan.events[3]["type"], "session.update"); + } + + #[test] + fn build_reset_events_skips_cancel_when_no_response_active() { + let ids = vec!["only".to_string()]; + let session = mk_session_update(); + let plan = build_reset_events(&ids, &session, false); + assert_eq!(plan.events.len(), 2); + assert_eq!(plan.events[0]["type"], "conversation.item.delete"); + assert_eq!(plan.events[1]["type"], "session.update"); + } + + #[test] + fn build_reset_events_with_empty_tracker_only_resends_session() { + let plan = build_reset_events(&[], &mk_session_update(), false); + assert_eq!(plan.cleared_items, 0); + assert_eq!(plan.events.len(), 1); + assert_eq!(plan.events[0]["type"], "session.update"); + } + + #[test] + fn item_id_extracted_from_created_event() { + let event = json!({ + "type": "conversation.item.created", + "item": {"id": "item_42", "type": "message"} + }); + assert_eq!(item_id_from_event(&event), Some("item_42")); + } + + #[test] + fn item_id_ignores_other_event_types() { + let event = json!({ + "type": "response.created", + "item": {"id": "item_42"} + }); + assert!(item_id_from_event(&event).is_none()); + } + + #[test] + fn reset_trigger_string_repr_is_stable() { + assert_eq!(ResetTrigger::Voice.as_str(), "voice_tool"); + assert_eq!(ResetTrigger::Control.as_str(), "control_socket"); + assert_eq!(ResetTrigger::Auto.as_str(), "auto_threshold"); + } +} diff --git a/src/voice_loop/session.rs b/src/voice_loop/session.rs index b3484ec..c9e1441 100644 --- a/src/voice_loop/session.rs +++ b/src/voice_loop/session.rs @@ -87,6 +87,21 @@ pub(crate) fn session_update_json_for( }, "required": ["slug"] } + }, + { + "type": "function", + "name": "reset_voice_context", + "description": "Reset the realtime conversation context to a safe baseline. Use when the conversation has grown long enough that context overload is a risk, or when the user explicitly asks for a fresh start. The reset preserves your persona, tools, and any currently-playing audio; it only clears the prior conversation items the server is reasoning over. Do not announce the reset; keep speaking naturally afterward.", + "parameters": { + "type": "object", + "additionalProperties": false, + "properties": { + "reason": { + "type": "string", + "description": "Short rationale for the reset, e.g. \"context_overload\" or \"user_requested\". Used only for logging." + } + } + } } ] } @@ -97,12 +112,18 @@ pub(crate) fn session_update_json_for( mod tests { use super::*; + fn settings_for_test(voice: &str, instructions: &str) -> ResolvedVoiceSettings { + ResolvedVoiceSettings { + voice: voice.to_string(), + instructions: instructions.to_string(), + auto_reset_after_items: 0, + discover_existing_subagents: true, + } + } + #[test] fn session_exposes_sub_agent_progress_tool() { - let settings = ResolvedVoiceSettings { - voice: "marin".to_string(), - instructions: "base".to_string(), - }; + let settings = settings_for_test("marin", "base"); let config = session_update_json_for("test-model", &settings); let tools = config["session"]["tools"] .as_array() @@ -127,12 +148,34 @@ mod tests { ); } + #[test] + fn session_exposes_reset_voice_context_tool() { + let settings = settings_for_test("marin", "base"); + let config = session_update_json_for("test-model", &settings); + let tools = config["session"]["tools"] + .as_array() + .expect("tools should be an array"); + let reset_tool = tools + .iter() + .find(|tool| tool["name"].as_str() == Some("reset_voice_context")) + .expect("reset_voice_context tool should be present"); + // Reason is optional; no required fields at all. + let required = reset_tool["parameters"] + .get("required") + .and_then(|v| v.as_array()) + .map(|a| a.len()) + .unwrap_or(0); + assert_eq!(required, 0); + assert!( + reset_tool["parameters"]["properties"] + .get("reason") + .is_some() + ); + } + #[test] fn session_uses_resolved_voice_and_instructions() { - let settings = ResolvedVoiceSettings { - voice: "cedar".to_string(), - instructions: "custom instructions".to_string(), - }; + let settings = settings_for_test("cedar", "custom instructions"); let config = session_update_json_for("test-model", &settings); assert_eq!( config["session"]["audio"]["output"]["voice"].as_str(), diff --git a/src/voice_loop/settings.rs b/src/voice_loop/settings.rs index 8f68470..dd7b8b5 100644 --- a/src/voice_loop/settings.rs +++ b/src/voice_loop/settings.rs @@ -5,9 +5,15 @@ use std::path::{Path, PathBuf}; // Default voice if neither the chosen preset nor the user's settings override it. pub(crate) const DEFAULT_VOICE: &str = "marin"; +// Floor/ceiling for the auto-reset threshold. The floor avoids pathological +// configs that would reset on every response; the ceiling matches the rough +// point where Realtime context starts to drift on long sessions. +pub(crate) const MIN_AUTO_RESET_THRESHOLD: usize = 20; +pub(crate) const MAX_AUTO_RESET_THRESHOLD: usize = 2000; + // The orchestrator-aware base prompt. Personas are appended to this so every // preset still knows how to delegate work and check progress. -pub(super) const BASE_INSTRUCTIONS: &str = "You are a realtime voice frontend. Keep the spoken conversation moving. When the user asks for work that benefits from deeper reasoning, tools, files, research, or multi-step execution, call delegate_to_orchestrator. Always include a stable snake_case slug that names what the background agent will do, such as refactor_docs. Reuse the same slug to continue that background conversation; use a new slug for unrelated work. If the user asks how background work is going, call sub_agent_progress with that slug and read the returned summary aloud. When the user asks something specific (\"is it done?\", \"did it find the bug?\"), pass it through as the question argument so the summary answers it. Call sub_agent_progress sparingly: only when the user asks or when you need material to fill a silence, and never twice in a row within a few seconds. If the response has rate_limited=true, wait retry_after_seconds before calling again. Do not pretend the background work is done until the orchestrator returns an update or sub_agent_progress reports status=completed."; +pub(super) const BASE_INSTRUCTIONS: &str = "You are a realtime voice frontend. Keep the spoken conversation moving. When the user asks for work that benefits from deeper reasoning, tools, files, research, or multi-step execution, call delegate_to_orchestrator. Always include a stable snake_case slug that names what the background agent will do, such as refactor_docs. Reuse the same slug to continue that background conversation; use a new slug for unrelated work. If the user asks how background work is going, call sub_agent_progress with that slug and read the returned summary aloud. When the user asks something specific (\"is it done?\", \"did it find the bug?\"), pass it through as the question argument so the summary answers it. Call sub_agent_progress sparingly: only when the user asks or when you need material to fill a silence, and never twice in a row within a few seconds. If the response has rate_limited=true, wait retry_after_seconds before calling again. Do not pretend the background work is done until the orchestrator returns an update or sub_agent_progress reports status=completed. If the conversation has grown long and you sense the model is losing track of recent context, call reset_voice_context with a short reason (\"context_overload\" or \"user_requested\"). The reset is silent to the user — keep speaking immediately afterward as if nothing happened."; #[derive(Debug, Default, Deserialize)] pub(crate) struct Settings { @@ -23,6 +29,17 @@ pub(crate) struct Settings { /// User-defined presets. Override built-ins of the same name. #[serde(default)] pub presets: HashMap, + /// Auto-trigger a voice context reset once the number of tracked + /// conversation items reaches this value. Omit or set to 0 to disable. + /// Clamped to [`MIN_AUTO_RESET_THRESHOLD`, `MAX_AUTO_RESET_THRESHOLD`] + /// when non-zero. + #[serde(default)] + pub auto_reset_after_items: Option, + /// When true (default), the voice loop scans the runtime dir at boot for + /// other live gamechat instances and records their active sub-agent + /// slugs so the orchestrator and operators can see prior background work. + #[serde(default)] + pub discover_existing_subagents: Option, } #[derive(Debug, Clone, Deserialize)] @@ -33,10 +50,15 @@ pub(crate) struct Preset { pub persona: String, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct ResolvedVoiceSettings { pub voice: String, pub instructions: String, + /// Item count that triggers an auto-reset. 0 means disabled. + pub auto_reset_after_items: usize, + /// Whether the voice loop should probe other gamechat instances for + /// their active sub-agent slugs at startup. + pub discover_existing_subagents: bool, } pub(crate) fn builtin_presets() -> HashMap { @@ -173,9 +195,17 @@ impl Settings { format!("{BASE_INSTRUCTIONS}\n\n{}", persona.trim()) }; + let auto_reset_after_items = match self.auto_reset_after_items { + Some(0) | None => 0, + Some(n) => n.clamp(MIN_AUTO_RESET_THRESHOLD, MAX_AUTO_RESET_THRESHOLD), + }; + let discover_existing_subagents = self.discover_existing_subagents.unwrap_or(true); + Ok(ResolvedVoiceSettings { voice, instructions, + auto_reset_after_items, + discover_existing_subagents, }) } } @@ -225,6 +255,8 @@ mod tests { preset: Some("jarvis".to_string()), persona: None, presets: HashMap::new(), + auto_reset_after_items: None, + discover_existing_subagents: None, }; let resolved = settings.resolve(None, None).unwrap(); assert_eq!(resolved.voice, "echo"); @@ -245,6 +277,8 @@ mod tests { preset: Some("jarvis".to_string()), persona: None, presets, + auto_reset_after_items: None, + discover_existing_subagents: None, }; let resolved = settings.resolve(None, None).unwrap(); assert_eq!(resolved.voice, "alloy"); @@ -258,6 +292,8 @@ mod tests { preset: Some("jarvis".to_string()), persona: Some("Just be a robot.".to_string()), presets: HashMap::new(), + auto_reset_after_items: None, + discover_existing_subagents: None, }; let resolved = settings.resolve(None, None).unwrap(); assert_eq!(resolved.voice, "cedar"); @@ -265,6 +301,68 @@ mod tests { assert!(!resolved.instructions.contains("JARVIS")); } + #[test] + fn auto_reset_zero_disables() { + let settings = Settings { + voice: None, + preset: None, + persona: None, + presets: HashMap::new(), + auto_reset_after_items: Some(0), + discover_existing_subagents: None, + }; + let resolved = settings.resolve(None, None).unwrap(); + assert_eq!(resolved.auto_reset_after_items, 0); + } + + #[test] + fn auto_reset_clamped_below_minimum() { + let settings = Settings { + voice: None, + preset: None, + persona: None, + presets: HashMap::new(), + auto_reset_after_items: Some(3), + discover_existing_subagents: None, + }; + let resolved = settings.resolve(None, None).unwrap(); + assert_eq!(resolved.auto_reset_after_items, MIN_AUTO_RESET_THRESHOLD); + } + + #[test] + fn auto_reset_clamped_above_maximum() { + let settings = Settings { + voice: None, + preset: None, + persona: None, + presets: HashMap::new(), + auto_reset_after_items: Some(100_000), + discover_existing_subagents: None, + }; + let resolved = settings.resolve(None, None).unwrap(); + assert_eq!(resolved.auto_reset_after_items, MAX_AUTO_RESET_THRESHOLD); + } + + #[test] + fn discovery_defaults_to_enabled() { + let resolved = Settings::default().resolve(None, None).unwrap(); + assert!(resolved.discover_existing_subagents); + } + + #[test] + fn discovery_can_be_disabled() { + let settings = Settings { + voice: None, + preset: None, + persona: None, + presets: HashMap::new(), + auto_reset_after_items: None, + discover_existing_subagents: Some(false), + }; + let resolved = settings.resolve(None, None).unwrap(); + assert!(!resolved.discover_existing_subagents); + } + #[test] fn unknown_preset_errors_with_available_list() { let err = Settings::default()