From 0980556ee86d39024060377c6c03bc024b9b37c4 Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Sat, 30 May 2026 11:06:09 -0400 Subject: [PATCH] feat(core): warn on concurrent active checkout sessions --- code-rs/core/src/active_sessions.rs | 383 ++++++++++++++++++ code-rs/core/src/codex.rs | 9 + code-rs/core/src/codex/session.rs | 2 + code-rs/core/src/codex/streaming.rs | 35 +- code-rs/core/src/conversation_manager.rs | 21 +- code-rs/core/src/housekeeping.rs | 46 +-- code-rs/core/src/lib.rs | 2 + code-rs/core/src/process_liveness.rs | 63 +++ code-rs/core/tests/active_session_warnings.rs | 76 ++++ 9 files changed, 588 insertions(+), 49 deletions(-) create mode 100644 code-rs/core/src/active_sessions.rs create mode 100644 code-rs/core/src/process_liveness.rs create mode 100644 code-rs/core/tests/active_session_warnings.rs diff --git a/code-rs/core/src/active_sessions.rs b/code-rs/core/src/active_sessions.rs new file mode 100644 index 00000000000..902f5a9b0ba --- /dev/null +++ b/code-rs/core/src/active_sessions.rs @@ -0,0 +1,383 @@ +use crate::process_liveness::check_pid_alive; +use crate::protocol::SandboxPolicy; +use code_protocol::protocol::SessionSource; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::time::{SystemTime, UNIX_EPOCH}; +use uuid::Uuid; + +const ACTIVE_SESSIONS_DIR: &str = "active-sessions"; +const SCHEMA_VERSION: u32 = 1; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum ActiveSessionMode { + WriteCapable, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ActiveSessionRecord { + pub schema_version: u32, + pub product: String, + pub session_id: Uuid, + pub pid: u32, + pub source: SessionSource, + pub mode: ActiveSessionMode, + pub started_at_unix: u64, + pub heartbeat_at_unix: u64, + pub cwd: PathBuf, + pub checkout_root: PathBuf, + pub git_common_dir: Option, + pub branch: Option, + pub head: Option, +} + +#[derive(Debug)] +pub struct ActiveSessionGuard { + path: PathBuf, +} + +#[derive(Debug)] +pub struct ActiveSessionRegistration { + pub guard: ActiveSessionGuard, + pub conflicts: Vec, +} + +impl Drop for ActiveSessionGuard { + fn drop(&mut self) { + if let Err(err) = fs::remove_file(&self.path) { + if err.kind() != io::ErrorKind::NotFound { + tracing::debug!( + "failed to remove active session record {}: {err}", + self.path.display() + ); + } + } + } +} + +pub fn register_if_write_capable( + code_home: &Path, + cwd: &Path, + sandbox_policy: &SandboxPolicy, + session_id: Uuid, + source: SessionSource, +) -> io::Result> { + if !is_write_capable(sandbox_policy) { + return Ok(None); + } + + let Some(checkout_root) = git_path(cwd, &["rev-parse", "--show-toplevel"]) else { + return Ok(None); + }; + + let now = unix_now(); + let record = ActiveSessionRecord { + schema_version: SCHEMA_VERSION, + product: "Every Code".to_string(), + session_id, + pid: std::process::id(), + source, + mode: ActiveSessionMode::WriteCapable, + started_at_unix: now, + heartbeat_at_unix: now, + cwd: canonicalize_lossy(cwd), + checkout_root: canonicalize_lossy(&checkout_root), + git_common_dir: git_path(cwd, &["rev-parse", "--git-common-dir"]) + .map(|path| absolutize_git_path(cwd, path)), + branch: git_output(cwd, &["branch", "--show-current"]), + head: git_output(cwd, &["rev-parse", "--verify", "HEAD"]), + }; + + let dir = active_sessions_dir(code_home)?; + prune_stale_records(&dir); + let path = record_path(&dir, record.pid, record.session_id); + let bytes = serde_json::to_vec_pretty(&record).map_err(io::Error::other)?; + fs::write(&path, bytes)?; + + let conflicts = live_records(&dir) + .into_iter() + .filter(|candidate| candidate.session_id != session_id) + .filter(|candidate| candidate.checkout_root == record.checkout_root) + .filter(|candidate| candidate.mode == ActiveSessionMode::WriteCapable) + .collect(); + + Ok(Some(ActiveSessionRegistration { + guard: ActiveSessionGuard { path }, + conflicts, + })) +} + +pub fn active_session_warning(conflicts: &[ActiveSessionRecord]) -> Option { + let first = conflicts.first()?; + let source = format_session_source(&first.source); + let detail = format!( + "pid {}, {}, started {}s ago", + first.pid, + source, + unix_now().saturating_sub(first.started_at_unix) + ); + let root = first.checkout_root.display(); + if conflicts.len() == 1 { + Some(format!( + "Another write-capable Every Code session is active in this checkout ({detail}) at {root}. Concurrent edits can conflict; consider closing it or using a separate worktree." + )) + } else { + Some(format!( + "{} other write-capable Every Code sessions are active in this checkout, including {detail}, at {root}. Concurrent edits can conflict; consider closing them or using separate worktrees.", + conflicts.len() + )) + } +} + +fn is_write_capable(sandbox_policy: &SandboxPolicy) -> bool { + matches!( + sandbox_policy, + SandboxPolicy::WorkspaceWrite { .. } | SandboxPolicy::DangerFullAccess + ) +} + +fn active_sessions_dir(code_home: &Path) -> io::Result { + let dir = code_home.join("state").join(ACTIVE_SESSIONS_DIR); + fs::create_dir_all(&dir)?; + Ok(dir) +} + +fn record_path(dir: &Path, pid: u32, session_id: Uuid) -> PathBuf { + dir.join(format!("pid-{pid}-{session_id}.json")) +} + +fn live_records(dir: &Path) -> Vec { + let mut records = Vec::new(); + let Ok(entries) = fs::read_dir(dir) else { + return records; + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|ext| ext.to_str()) != Some("json") { + continue; + } + let Some(record) = read_record(&path) else { + let _ = fs::remove_file(&path); + continue; + }; + match check_pid_alive(record.pid as i32) { + Some(true) => records.push(record), + Some(false) => { + let _ = fs::remove_file(&path); + } + None => {} + } + } + records +} + +fn prune_stale_records(dir: &Path) { + let _ = live_records(dir); +} + +fn read_record(path: &Path) -> Option { + let bytes = fs::read(path).ok()?; + let record: ActiveSessionRecord = serde_json::from_slice(&bytes).ok()?; + (record.schema_version == SCHEMA_VERSION).then_some(record) +} + +fn git_path(cwd: &Path, args: &[&str]) -> Option { + git_output(cwd, args).map(PathBuf::from) +} + +fn git_output(cwd: &Path, args: &[&str]) -> Option { + let output = Command::new("git") + .args(args) + .current_dir(cwd) + .output() + .ok()?; + if !output.status.success() { + return None; + } + let value = String::from_utf8(output.stdout).ok()?; + let trimmed = value.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_string()) +} + +fn absolutize_git_path(cwd: &Path, path: PathBuf) -> PathBuf { + if path.is_absolute() { + canonicalize_lossy(&path) + } else { + canonicalize_lossy(&cwd.join(path)) + } +} + +fn canonicalize_lossy(path: &Path) -> PathBuf { + path.canonicalize().unwrap_or_else(|_| path.to_path_buf()) +} + +fn unix_now() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs()) + .unwrap_or(0) +} + +fn format_session_source(source: &SessionSource) -> &'static str { + match source { + SessionSource::Cli => "cli", + SessionSource::Exec => "exec", + SessionSource::VSCode => "vscode", + SessionSource::Mcp => "mcp", + SessionSource::SubAgent(_) => "subagent", + SessionSource::Unknown => "unknown", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn read_only_sessions_do_not_register() { + let home = tempdir().unwrap(); + let cwd = tempdir().unwrap(); + let result = register_if_write_capable( + home.path(), + cwd.path(), + &SandboxPolicy::ReadOnly, + Uuid::new_v4(), + SessionSource::Exec, + ) + .unwrap(); + + assert!(result.is_none()); + assert!(!home.path().join("state").join(ACTIVE_SESSIONS_DIR).exists()); + } + + #[test] + fn second_write_capable_session_warns_in_same_checkout() { + let home = tempdir().unwrap(); + let repo = tempdir().unwrap(); + init_git_repo(repo.path()); + + let first = register_if_write_capable( + home.path(), + repo.path(), + &SandboxPolicy::DangerFullAccess, + Uuid::new_v4(), + SessionSource::Cli, + ) + .unwrap() + .unwrap(); + assert!(first.conflicts.is_empty()); + + let second = register_if_write_capable( + home.path(), + repo.path(), + &SandboxPolicy::DangerFullAccess, + Uuid::new_v4(), + SessionSource::Exec, + ) + .unwrap() + .unwrap(); + + assert_eq!(second.conflicts.len(), 1); + assert_eq!(second.conflicts[0].source, SessionSource::Cli); + assert!(active_session_warning(&second.conflicts).unwrap().contains("write-capable")); + } + + #[test] + fn stale_session_file_is_removed() { + let home = tempdir().unwrap(); + let repo = tempdir().unwrap(); + init_git_repo(repo.path()); + let dir = active_sessions_dir(home.path()).unwrap(); + let stale = ActiveSessionRecord { + schema_version: SCHEMA_VERSION, + product: "Every Code".to_string(), + session_id: Uuid::new_v4(), + pid: i32::MAX as u32, + source: SessionSource::Cli, + mode: ActiveSessionMode::WriteCapable, + started_at_unix: 1, + heartbeat_at_unix: 1, + cwd: repo.path().to_path_buf(), + checkout_root: repo.path().canonicalize().unwrap(), + git_common_dir: None, + branch: None, + head: None, + }; + let path = record_path(&dir, stale.pid, stale.session_id); + fs::write(&path, serde_json::to_vec(&stale).unwrap()).unwrap(); + + let current = register_if_write_capable( + home.path(), + repo.path(), + &SandboxPolicy::DangerFullAccess, + Uuid::new_v4(), + SessionSource::Exec, + ) + .unwrap() + .unwrap(); + + assert!(current.conflicts.is_empty()); + assert!(!path.exists()); + } + + #[test] + fn different_worktrees_do_not_conflict() { + let home = tempdir().unwrap(); + let parent = tempdir().unwrap(); + let repo = parent.path().join("repo"); + let worktree = parent.path().join("repo-worktree"); + fs::create_dir(&repo).unwrap(); + init_git_repo(&repo); + run_git(&repo, &["checkout", "-b", "feature"]); + run_git(&repo, &["worktree", "add", worktree.to_str().unwrap()]); + + let first = register_if_write_capable( + home.path(), + &repo, + &SandboxPolicy::DangerFullAccess, + Uuid::new_v4(), + SessionSource::Cli, + ) + .unwrap() + .unwrap(); + assert!(first.conflicts.is_empty()); + + let second = register_if_write_capable( + home.path(), + &worktree, + &SandboxPolicy::DangerFullAccess, + Uuid::new_v4(), + SessionSource::Exec, + ) + .unwrap() + .unwrap(); + + assert!(second.conflicts.is_empty()); + } + + fn init_git_repo(path: &Path) { + run_git(path, &["init"]); + run_git(path, &["checkout", "-b", "main"]); + fs::write(path.join("README.md"), "test\n").unwrap(); + run_git(path, &["add", "."]); + run_git(path, &["-c", "user.name=Test", "-c", "user.email=test@example.com", "commit", "-m", "init"]); + } + + fn run_git(path: &Path, args: &[&str]) { + let output = Command::new("git") + .args(args) + .current_dir(path) + .output() + .unwrap(); + assert!( + output.status.success(), + "git {args:?} failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + } +} diff --git a/code-rs/core/src/codex.rs b/code-rs/core/src/codex.rs index 332c0230eae..ac4358bca5b 100644 --- a/code-rs/core/src/codex.rs +++ b/code-rs/core/src/codex.rs @@ -1192,6 +1192,14 @@ impl Codex { pub async fn spawn_with_auth_manager( config: Config, auth_manager: Option>, + ) -> CodexResult { + Self::spawn_with_auth_manager_and_source(config, auth_manager, SessionSource::Cli).await + } + + pub async fn spawn_with_auth_manager_and_source( + config: Config, + auth_manager: Option>, + session_source: SessionSource, ) -> CodexResult { // experimental resume path (undocumented) let resume_path = config.experimental_resume.clone(); @@ -1235,6 +1243,7 @@ impl Codex { session_id, config, auth_manager, + session_source, rx_sub, tx_event, )); diff --git a/code-rs/core/src/codex/session.rs b/code-rs/core/src/codex/session.rs index ef9288930f6..ccef1b03c34 100644 --- a/code-rs/core/src/codex/session.rs +++ b/code-rs/core/src/codex/session.rs @@ -1,4 +1,5 @@ use super::*; +use crate::active_sessions::ActiveSessionGuard; use crate::protocol::TaskOriginKind; use serde_json::Value; use crate::util::extract_shell_script; @@ -516,6 +517,7 @@ pub(crate) struct Session { pub(super) env_ctx_v2: bool, pub(super) retention_config: crate::config_types::RetentionConfig, pub(super) model_descriptions: Option, + pub(super) _active_session_guard: Option, } pub(super) struct HookGuard<'a> { flag: &'a AtomicBool, diff --git a/code-rs/core/src/codex/streaming.rs b/code-rs/core/src/codex/streaming.rs index e7325a2d615..6a6885204aa 100644 --- a/code-rs/core/src/codex/streaming.rs +++ b/code-rs/core/src/codex/streaming.rs @@ -221,6 +221,7 @@ pub(super) async fn submission_loop( mut session_id: Uuid, config: Arc, auth_manager: Option>, + session_source: SessionSource, rx_sub: Receiver, tx_event: Sender, ) { @@ -535,7 +536,7 @@ pub(super) async fn submission_loop( crate::rollout::recorder::RolloutRecorderParams::new( code_protocol::mcp_protocol::ConversationId::from(session_id), effective_user_instructions.clone(), - SessionSource::Cli, + session_source.clone(), ), ) .await @@ -708,6 +709,27 @@ pub(super) async fn submission_loop( } } let default_shell = shell::default_user_shell().await; + let active_session_registration = + match crate::active_sessions::register_if_write_capable( + &config.code_home, + &config.cwd, + &config.sandbox_policy, + session_id, + session_source.clone(), + ) { + Ok(registration) => registration, + Err(err) => { + warn!("failed to register active session presence: {err}"); + None + } + }; + let active_session_warning = active_session_registration + .as_ref() + .and_then(|registration| { + crate::active_sessions::active_session_warning(®istration.conflicts) + }); + let active_session_guard = + active_session_registration.map(|registration| registration.guard); let mut tools_config = ToolsConfig::new( &config.model_family, approval_policy, @@ -836,6 +858,7 @@ pub(super) async fn submission_loop( env_ctx_v2: config.env_ctx_v2, retention_config: config.retention.clone(), model_descriptions, + _active_session_guard: active_session_guard, }); let weak_handle = Arc::downgrade(&new_session); if let Some(inner) = Arc::get_mut(&mut new_session) { @@ -916,6 +939,16 @@ pub(super) async fn submission_loop( } } + if let Some(message) = active_session_warning { + let warning_event = sess_arc.make_event( + &sub.id, + EventMsg::Warning(crate::protocol::WarningEvent { message }), + ); + if let Err(e) = tx_event.send(warning_event).await { + warn!("failed to send active session warning: {e}"); + } + } + // If we resumed from a rollout, replay the prior transcript into the UI. if replay_history_items.is_some() || restored_history_snapshot.is_some() diff --git a/code-rs/core/src/conversation_manager.rs b/code-rs/core/src/conversation_manager.rs index ab9b4ad7a70..b120e4fec2f 100644 --- a/code-rs/core/src/conversation_manager.rs +++ b/code-rs/core/src/conversation_manager.rs @@ -76,7 +76,12 @@ impl ConversationManager { codex, init_id: _, session_id, - } = Codex::spawn_with_auth_manager(config, Some(auth_manager.clone())).await?; + } = Codex::spawn_with_auth_manager_and_source( + config, + Some(auth_manager.clone()), + self.session_source.clone(), + ) + .await?; let conversation_id: code_protocol::ConversationId = session_id.into(); self.finalize_spawn(codex, conversation_id).await } @@ -137,7 +142,12 @@ impl ConversationManager { codex, init_id: _, session_id, - } = Codex::spawn_with_auth_manager(config, Some(auth_manager.clone())).await?; + } = Codex::spawn_with_auth_manager_and_source( + config, + Some(auth_manager.clone()), + self.session_source.clone(), + ) + .await?; let conversation_id: code_protocol::ConversationId = session_id.into(); self.finalize_spawn(codex, conversation_id).await } @@ -203,7 +213,12 @@ impl ConversationManager { codex, init_id: _, session_id, - } = Codex::spawn_with_auth_manager(config, Some(self.auth_manager.clone())).await?; + } = Codex::spawn_with_auth_manager_and_source( + config, + Some(self.auth_manager.clone()), + self.session_source.clone(), + ) + .await?; let conversation_id: code_protocol::ConversationId = session_id.into(); self.finalize_spawn(codex, conversation_id).await } diff --git a/code-rs/core/src/housekeeping.rs b/code-rs/core/src/housekeeping.rs index 696d496ae8b..cd76124c565 100644 --- a/code-rs/core/src/housekeeping.rs +++ b/code-rs/core/src/housekeeping.rs @@ -1,4 +1,5 @@ use crate::git_worktree; +use crate::process_liveness::check_pid_alive; use crate::rollout::SESSIONS_SUBDIR; use fs2::FileExt; use serde::{Deserialize, Serialize}; @@ -579,51 +580,6 @@ fn pid_file_is_active(file_name: &OsStr) -> Option { check_pid_alive(pid) } -#[cfg(target_os = "linux")] -fn check_pid_alive(pid: i32) -> Option { - use std::path::Path; - - Some(Path::new("/proc").join(pid.to_string()).exists()) -} - -#[cfg(any(target_os = "macos", target_os = "ios"))] -fn check_pid_alive(pid: i32) -> Option { - use libc::{kill, c_int}; - const SIGZERO: c_int = 0; - let result = unsafe { kill(pid, SIGZERO) }; - if result == 0 { - return Some(true); - } - let errno = std::io::Error::last_os_error().raw_os_error()?; - Some(errno != libc::ESRCH) -} - -#[cfg(target_os = "windows")] -fn check_pid_alive(pid: i32) -> Option { - use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, STILL_ACTIVE}; - use windows_sys::Win32::System::Threading::{GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION}; - - unsafe { - let handle: HANDLE = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid as u32); - if handle.is_null() { - return Some(false); - } - - let mut status: u32 = 0; - let ok = GetExitCodeProcess(handle, &mut status as *mut u32); - CloseHandle(handle); - if ok == 0 { - return None; - } - Some(status == STILL_ACTIVE as u32) - } -} - -#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "ios", target_os = "windows")))] -fn check_pid_alive(_pid: i32) -> Option { - None -} - #[derive(Default)] struct SessionCleanupStats { removed_days: usize, diff --git a/code-rs/core/src/lib.rs b/code-rs/core/src/lib.rs index 29a1ebbb213..160302fb9e8 100644 --- a/code-rs/core/src/lib.rs +++ b/code-rs/core/src/lib.rs @@ -7,6 +7,7 @@ #![deny(clippy::print_stdout, clippy::print_stderr)] mod apply_patch; +mod active_sessions; mod fs_sanitize; pub mod auth; pub mod auth_accounts; @@ -65,6 +66,7 @@ mod mcp_tool_call; mod message_history; mod memories; mod model_provider_info; +mod process_liveness; pub mod remote_models; // Remote model discovery caches its own on-disk state within the module. mod cgroup; diff --git a/code-rs/core/src/process_liveness.rs b/code-rs/core/src/process_liveness.rs new file mode 100644 index 00000000000..8af1bf18977 --- /dev/null +++ b/code-rs/core/src/process_liveness.rs @@ -0,0 +1,63 @@ +#[cfg(target_os = "linux")] +pub(crate) fn check_pid_alive(pid: i32) -> Option { + use std::path::Path; + + Some(Path::new("/proc").join(pid.to_string()).exists()) +} + +#[cfg(any(target_os = "macos", target_os = "ios"))] +pub(crate) fn check_pid_alive(pid: i32) -> Option { + use libc::{kill, c_int}; + const SIGZERO: c_int = 0; + let result = unsafe { kill(pid, SIGZERO) }; + if result == 0 { + return Some(true); + } + let errno = std::io::Error::last_os_error().raw_os_error()?; + Some(errno != libc::ESRCH) +} + +#[cfg(target_os = "windows")] +pub(crate) fn check_pid_alive(pid: i32) -> Option { + use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, STILL_ACTIVE}; + use windows_sys::Win32::System::Threading::{ + GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, + }; + + unsafe { + let handle: HANDLE = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid as u32); + if handle.is_null() { + return Some(false); + } + + let mut status: u32 = 0; + let ok = GetExitCodeProcess(handle, &mut status as *mut u32); + CloseHandle(handle); + if ok == 0 { + return None; + } + Some(status == STILL_ACTIVE as u32) + } +} + +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "ios", target_os = "windows")))] +pub(crate) fn check_pid_alive(_pid: i32) -> Option { + None +} + +#[cfg(test)] +mod tests { + use super::check_pid_alive; + + #[test] + fn current_process_is_alive() { + let pid = std::process::id() as i32; + assert_eq!(check_pid_alive(pid), Some(true)); + } + + #[test] + fn implausible_process_is_not_alive() { + let pid = i32::MAX; + assert_eq!(check_pid_alive(pid), Some(false)); + } +} diff --git a/code-rs/core/tests/active_session_warnings.rs b/code-rs/core/tests/active_session_warnings.rs new file mode 100644 index 00000000000..b390057ef89 --- /dev/null +++ b/code-rs/core/tests/active_session_warnings.rs @@ -0,0 +1,76 @@ +#![allow(clippy::unwrap_used)] + +mod common; + +use common::load_default_config_for_test; + +use code_core::protocol::{AskForApproval, EventMsg, SandboxPolicy}; +use code_core::{AuthManager, CodexAuth, ConversationManager}; +use code_protocol::protocol::SessionSource; +use std::path::Path; +use std::process::Command; +use tempfile::TempDir; +use tokio::time::{timeout, Duration}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_session_warns_when_checkout_already_has_write_capable_session() { + let code_home = TempDir::new().unwrap(); + let repo = TempDir::new().unwrap(); + init_git_repo(repo.path()); + + let mut config = load_default_config_for_test(&code_home); + config.cwd = repo.path().to_path_buf(); + config.approval_policy = AskForApproval::Never; + config.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let auth = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); + let cli_manager = ConversationManager::new(auth.clone(), SessionSource::Cli); + let exec_manager = ConversationManager::new(auth, SessionSource::Exec); + + let _existing = cli_manager + .new_conversation(config.clone()) + .await + .expect("create existing cli conversation"); + let exec_conversation = exec_manager + .new_conversation(config) + .await + .expect("create exec conversation") + .conversation; + + let event = timeout(Duration::from_secs(5), exec_conversation.next_event()) + .await + .expect("timed out waiting for active-session warning") + .expect("event stream ended"); + + match event.msg { + EventMsg::Warning(warning) => { + assert!(warning.message.contains("Another write-capable Every Code session")); + assert!(warning.message.contains("cli")); + assert!(warning.message.contains("separate worktree")); + } + other => panic!("expected warning event, got {other:?}"), + } +} + +fn init_git_repo(path: &Path) { + run_git(path, &["init"]); + run_git(path, &["checkout", "-b", "main"]); + run_git(path, &["config", "user.email", "code@example.com"]); + run_git(path, &["config", "user.name", "Every Code Tester"]); + std::fs::write(path.join("README.md"), "test\n").unwrap(); + run_git(path, &["add", "."]); + run_git(path, &["commit", "-m", "init"]); +} + +fn run_git(path: &Path, args: &[&str]) { + let output = Command::new("git") + .args(args) + .current_dir(path) + .output() + .unwrap(); + assert!( + output.status.success(), + "git {args:?} failed: {}", + String::from_utf8_lossy(&output.stderr) + ); +}