diff --git a/README.md b/README.md index fdd15831..eaef72f6 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@
-
——   every agent deserves its own computer   ——
+
——   the easiest way to give your agent their own computer   ——


@@ -319,10 +319,11 @@ This project is licensed under the [Apache License 2.0](./LICENSE). Special thanks to all our contributors, testers, and community members who help make microsandbox better every day! We'd like to thank the following projects and communities that made `microsandbox` possible: [libkrun](https://github.com/containers/libkrun) and [smoltcp](https://github.com/smoltcp-rs/smoltcp) -
-

Backed by Y Combinator
+ +
+
diff --git a/crates/microsandbox/lib/agent/client.rs b/crates/microsandbox/lib/agent/client.rs index be510e05..a881f647 100644 --- a/crates/microsandbox/lib/agent/client.rs +++ b/crates/microsandbox/lib/agent/client.rs @@ -45,10 +45,6 @@ use super::error::{AgentClientError, AgentClientResult}; /// Default handshake timeout used by [`AgentClient::connect`]. const DEFAULT_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); -/// Per-client correlation ID range step. Mirrors the relay-side constant in -/// `microsandbox-runtime`; both ends must agree on this value. -const ID_RANGE_STEP: u32 = u32::MAX / 16; - //-------------------------------------------------------------------------------------------------- // Types //-------------------------------------------------------------------------------------------------- @@ -59,7 +55,7 @@ const ID_RANGE_STEP: u32 = u32::MAX / 16; pub struct AgentClient { /// Writer half of the Unix socket connection. writer: Arc>, - /// Next correlation ID to allocate (starts at `id_offset + 1`). + /// Next correlation ID to allocate (starts at `id_min`). next_id: AtomicU32, /// Lower bound (inclusive) of the assigned ID range, used for wrap-around. id_min: u32, @@ -116,17 +112,24 @@ impl AgentClient { let (mut reader, writer) = stream.into_split(); - // Handshake: [id_offset: u32 BE][ready_frame_bytes...] - let mut offset_buf = [0u8; 4]; - tokio::time::timeout_at(deadline, reader.read_exact(&mut offset_buf)) + // Handshake: [id_min: u32 BE][id_max: u32 BE][ready_frame_bytes...] + let mut range_buf = [0u8; 8]; + tokio::time::timeout_at(deadline, reader.read_exact(&mut range_buf)) .await .map_err(|_| { AgentClientError::Handshake( - "read id_offset: timed out before relay sent bytes".into(), + "read id range: timed out before relay sent bytes".into(), ) })? - .map_err(|e| AgentClientError::Handshake(format!("read id_offset: {e}")))?; - let id_offset = u32::from_be_bytes(offset_buf); + .map_err(|e| AgentClientError::Handshake(format!("read id range: {e}")))?; + let id_min = u32::from_be_bytes(range_buf[0..4].try_into().unwrap()); + let id_max = u32::from_be_bytes(range_buf[4..8].try_into().unwrap()); + + if id_min >= id_max { + return Err(AgentClientError::Handshake(format!( + "invalid relay id range: start={id_min}, end={id_max}" + ))); + } let ready_frame = tokio::time::timeout_at(deadline, codec::read_raw_frame(&mut reader)) .await @@ -149,7 +152,8 @@ impl AgentClient { .map_err(|e| AgentClientError::Handshake(format!("decode ready payload: {e}")))?; tracing::info!( - id_offset, + id_min, + id_max, ready_bytes = ready_frame.body.len(), boot_time_ns = ready.boot_time_ns, "agent client: connected to relay" @@ -161,9 +165,6 @@ impl AgentClient { let reader_handle = tokio::spawn(reader_loop(reader, Arc::clone(&pending))); let writer = Arc::new(Mutex::new(writer)); - let id_min = id_offset + 1; - let id_max = id_offset.saturating_add(ID_RANGE_STEP); - Ok(Self { writer, next_id: AtomicU32::new(id_min), @@ -462,7 +463,8 @@ mod tests { tokio::spawn(async move { let (mut socket, _) = listener.accept().await.unwrap(); - socket.write_all(&0u32.to_be_bytes()).await.unwrap(); + socket.write_all(&1u32.to_be_bytes()).await.unwrap(); + socket.write_all(&1024u32.to_be_bytes()).await.unwrap(); codec::write_message(&mut socket, &ready_msg).await.unwrap(); }); diff --git a/crates/microsandbox/tests/correlation_ids.rs b/crates/microsandbox/tests/correlation_ids.rs new file mode 100644 index 00000000..0e3d374b --- /dev/null +++ b/crates/microsandbox/tests/correlation_ids.rs @@ -0,0 +1,43 @@ +//! Integration tests for relay correlation ID handling. +//! +//! These tests require KVM (or libkrun on macOS). The `#[msb_test]` +//! attribute marks them `#[ignore]`, so plain `cargo test --workspace` +//! skips them. Run them via: +//! +//! cargo nextest run -p microsandbox --test correlation_ids --run-ignored=only + +use std::time::Duration; + +use microsandbox::Sandbox; +use test_utils::msb_test; + +/// `core.shutdown` is a process-level control frame sent with correlation ID +/// 0, not a client-owned request/session ID. The relay must allow it even +/// though normal client frames are restricted to the assigned ID range. +#[msb_test] +async fn shutdown_control_id_zero_stops_sandbox() { + let name = "correlation-shutdown-id-zero"; + + let sandbox = Sandbox::builder(name) + .image("mirror.gcr.io/library/alpine") + .cpus(1) + .memory(256) + .replace() + .create() + .await + .expect("create sandbox"); + + let stop_result = tokio::time::timeout(Duration::from_secs(30), sandbox.stop_and_wait()).await; + + if stop_result.is_err() { + if let Ok(mut h) = Sandbox::get(name).await { + let _ = h.kill().await; + let _ = h.remove().await; + } + } + Sandbox::remove(name).await.ok(); + + stop_result + .expect("stop_and_wait timed out; relay likely rejected core.shutdown id 0") + .expect("stop_and_wait failed"); +} diff --git a/crates/runtime/lib/relay.rs b/crates/runtime/lib/relay.rs index b18770ff..867bb6b1 100644 --- a/crates/runtime/lib/relay.rs +++ b/crates/runtime/lib/relay.rs @@ -62,7 +62,7 @@ type SessionRegistry = std::sync::Mutex>; //-------------------------------------------------------------------------------------------------- /// Maximum number of simultaneous clients. -const MAX_CLIENTS: u32 = 16; +const MAX_CLIENTS: u32 = 128; /// Size of the correlation ID range allocated to each client. const ID_RANGE_STEP: u32 = u32::MAX / MAX_CLIENTS; @@ -321,15 +321,19 @@ impl AgentRelay { }; let id_offset = slot * ID_RANGE_STEP; + let id_start = id_offset.saturating_add(1); + let id_end_exclusive = id_offset.saturating_add(ID_RANGE_STEP); tracing::info!( - "agent relay: client connected slot={slot} id_offset={id_offset}" + "agent relay: client connected slot={slot} id_start={id_start} id_end_exclusive={id_end_exclusive}" ); - // Perform handshake: send [id_offset: u32 BE][ready_frame_bytes...]. + // Perform handshake: send + // [id_start: u32 BE][id_end_exclusive: u32 BE][ready_frame_bytes...]. let (reader_half, mut writer_half) = stream.into_split(); - let mut handshake = Vec::with_capacity(4 + ready_frame.len()); - handshake.extend_from_slice(&id_offset.to_be_bytes()); + let mut handshake = Vec::with_capacity(8 + ready_frame.len()); + handshake.extend_from_slice(&id_start.to_be_bytes()); + handshake.extend_from_slice(&id_end_exclusive.to_be_bytes()); handshake.extend_from_slice(&ready_frame); if let Err(e) = writer_half.write_all(&handshake).await { @@ -381,6 +385,8 @@ impl AgentRelay { drain_tx_clone, registry_clone, next_id_clone, + id_start, + id_end_exclusive, )); } Err(e) => { @@ -743,6 +749,8 @@ async fn client_reader_task( drain_tx: mpsc::Sender<()>, session_registry: Arc, next_session_id: Arc, + id_start: u32, + id_end_exclusive: u32, ) { loop { let frame = match read_raw_frame(&mut reader).await { @@ -758,6 +766,16 @@ async fn client_reader_task( let is_terminal = (frame.flags & FLAG_TERMINAL) != 0; let is_shutdown = (frame.flags & FLAG_SHUTDOWN) != 0; + if !is_client_frame_allowed(frame.id, frame.flags, id_start, id_end_exclusive) { + tracing::warn!( + "agent relay: client slot={slot} sent out-of-range id={} range=[{}, {})", + frame.id, + id_start, + id_end_exclusive + ); + break; + } + // Forward shutdown to agentd (via the agent_tx send below) so the // guest can sync filesystems and power off cleanly. Also notify the // caller so it can start the flush-grace fallback timer — if the @@ -865,3 +883,41 @@ async fn client_reader_task( used_slots.lock().await.remove(&slot); tracing::debug!("agent relay: slot={slot} released"); } + +/// Return whether a client-originated frame may be forwarded to agentd. +/// +/// Most client frames must use a correlation ID from the relay-assigned +/// range so responses route back to the owning client. `core.shutdown` is a +/// process-level control frame, not a correlated request, and the SDK sends it +/// with ID 0. +fn is_client_frame_allowed(id: u32, flags: u8, id_start: u32, id_end_exclusive: u32) -> bool { + let is_shutdown_control = (flags & FLAG_SHUTDOWN) != 0 && id == 0; + is_shutdown_control || (id >= id_start && id < id_end_exclusive) +} + +//-------------------------------------------------------------------------------------------------- +// Tests +//-------------------------------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn client_frame_validation_allows_ids_in_assigned_range() { + assert!(is_client_frame_allowed(10, 0, 10, 20)); + assert!(is_client_frame_allowed(19, FLAG_SESSION_START, 10, 20)); + } + + #[test] + fn client_frame_validation_rejects_non_shutdown_ids_outside_range() { + assert!(!is_client_frame_allowed(0, 0, 10, 20)); + assert!(!is_client_frame_allowed(9, FLAG_SESSION_START, 10, 20)); + assert!(!is_client_frame_allowed(20, FLAG_TERMINAL, 10, 20)); + } + + #[test] + fn client_frame_validation_allows_shutdown_control_id_zero() { + assert!(is_client_frame_allowed(0, FLAG_SHUTDOWN, 10, 20)); + } +}