Skip to content
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<br />

<div align="center"><b>——&nbsp;&nbsp;&nbsp;every agent deserves its own computer&nbsp;&nbsp;&nbsp;——</b></div>
<div align="center"><b>——&nbsp;&nbsp;&nbsp;the easiest way to give your agent their own computer&nbsp;&nbsp;&nbsp;——</b></div>

<br />
<br />
Expand Down Expand Up @@ -319,10 +319,11 @@ This project is licensed under the [Apache License 2.0](./LICENSE).

Special thanks to all our contributors, testers, and community members who help make microsandbox better every day! We'd like to thank the following projects and communities that made `microsandbox` possible: [libkrun](https://github.com/containers/libkrun) and [smoltcp](https://github.com/smoltcp-rs/smoltcp)

<br />
<br />
<br />

<div align='center'>
<a href="https://www.ycombinator.com/"><img src="https://img.shields.io/badge/BACKED%20BY-Y%20COMBINATOR-F26522?style=for-the-badge&logo=ycombinator&logoColor=white" alt="Backed by Y Combinator"></a>
</div>

<br />
<br />
34 changes: 18 additions & 16 deletions crates/microsandbox/lib/agent/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ use super::error::{AgentClientError, AgentClientResult};
/// Default handshake timeout used by [`AgentClient::connect`].
const DEFAULT_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);

/// Per-client correlation ID range step. Mirrors the relay-side constant in
/// `microsandbox-runtime`; both ends must agree on this value.
const ID_RANGE_STEP: u32 = u32::MAX / 16;

//--------------------------------------------------------------------------------------------------
// Types
//--------------------------------------------------------------------------------------------------
Expand All @@ -59,7 +55,7 @@ const ID_RANGE_STEP: u32 = u32::MAX / 16;
pub struct AgentClient {
/// Writer half of the Unix socket connection.
writer: Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
/// Next correlation ID to allocate (starts at `id_offset + 1`).
/// Next correlation ID to allocate (starts at `id_min`).
next_id: AtomicU32,
/// Lower bound (inclusive) of the assigned ID range, used for wrap-around.
id_min: u32,
Expand Down Expand Up @@ -116,17 +112,24 @@ impl AgentClient {

let (mut reader, writer) = stream.into_split();

// Handshake: [id_offset: u32 BE][ready_frame_bytes...]
let mut offset_buf = [0u8; 4];
tokio::time::timeout_at(deadline, reader.read_exact(&mut offset_buf))
// Handshake: [id_min: u32 BE][id_max: u32 BE][ready_frame_bytes...]
let mut range_buf = [0u8; 8];
tokio::time::timeout_at(deadline, reader.read_exact(&mut range_buf))
.await
.map_err(|_| {
AgentClientError::Handshake(
"read id_offset: timed out before relay sent bytes".into(),
"read id range: timed out before relay sent bytes".into(),
)
})?
.map_err(|e| AgentClientError::Handshake(format!("read id_offset: {e}")))?;
let id_offset = u32::from_be_bytes(offset_buf);
.map_err(|e| AgentClientError::Handshake(format!("read id range: {e}")))?;
let id_min = u32::from_be_bytes(range_buf[0..4].try_into().unwrap());
let id_max = u32::from_be_bytes(range_buf[4..8].try_into().unwrap());

if id_min >= id_max {
return Err(AgentClientError::Handshake(format!(
"invalid relay id range: start={id_min}, end={id_max}"
)));
}

let ready_frame = tokio::time::timeout_at(deadline, codec::read_raw_frame(&mut reader))
.await
Expand All @@ -149,7 +152,8 @@ impl AgentClient {
.map_err(|e| AgentClientError::Handshake(format!("decode ready payload: {e}")))?;

tracing::info!(
id_offset,
id_min,
id_max,
ready_bytes = ready_frame.body.len(),
boot_time_ns = ready.boot_time_ns,
"agent client: connected to relay"
Expand All @@ -161,9 +165,6 @@ impl AgentClient {
let reader_handle = tokio::spawn(reader_loop(reader, Arc::clone(&pending)));
let writer = Arc::new(Mutex::new(writer));

let id_min = id_offset + 1;
let id_max = id_offset.saturating_add(ID_RANGE_STEP);

Ok(Self {
writer,
next_id: AtomicU32::new(id_min),
Expand Down Expand Up @@ -462,7 +463,8 @@ mod tests {

tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
socket.write_all(&0u32.to_be_bytes()).await.unwrap();
socket.write_all(&1u32.to_be_bytes()).await.unwrap();
socket.write_all(&1024u32.to_be_bytes()).await.unwrap();
codec::write_message(&mut socket, &ready_msg).await.unwrap();
});

Expand Down
43 changes: 43 additions & 0 deletions crates/microsandbox/tests/correlation_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Integration tests for relay correlation ID handling.
//!
//! These tests require KVM (or libkrun on macOS). The `#[msb_test]`
//! attribute marks them `#[ignore]`, so plain `cargo test --workspace`
//! skips them. Run them via:
//!
//! cargo nextest run -p microsandbox --test correlation_ids --run-ignored=only

use std::time::Duration;

use microsandbox::Sandbox;
use test_utils::msb_test;

/// `core.shutdown` is a process-level control frame sent with correlation ID
/// 0, not a client-owned request/session ID. The relay must allow it even
/// though normal client frames are restricted to the assigned ID range.
#[msb_test]
async fn shutdown_control_id_zero_stops_sandbox() {
let name = "correlation-shutdown-id-zero";

let sandbox = Sandbox::builder(name)
.image("mirror.gcr.io/library/alpine")
.cpus(1)
.memory(256)
.replace()
.create()
.await
.expect("create sandbox");

let stop_result = tokio::time::timeout(Duration::from_secs(30), sandbox.stop_and_wait()).await;

if stop_result.is_err() {
if let Ok(mut h) = Sandbox::get(name).await {
let _ = h.kill().await;
let _ = h.remove().await;
}
}
Sandbox::remove(name).await.ok();

stop_result
.expect("stop_and_wait timed out; relay likely rejected core.shutdown id 0")
.expect("stop_and_wait failed");
}
66 changes: 61 additions & 5 deletions crates/runtime/lib/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type SessionRegistry = std::sync::Mutex<HashMap<u32, SessionInfo>>;
//--------------------------------------------------------------------------------------------------

/// Maximum number of simultaneous clients.
const MAX_CLIENTS: u32 = 16;
const MAX_CLIENTS: u32 = 128;

/// Size of the correlation ID range allocated to each client.
const ID_RANGE_STEP: u32 = u32::MAX / MAX_CLIENTS;
Expand Down Expand Up @@ -321,15 +321,19 @@ impl AgentRelay {
};

let id_offset = slot * ID_RANGE_STEP;
let id_start = id_offset.saturating_add(1);
let id_end_exclusive = id_offset.saturating_add(ID_RANGE_STEP);
tracing::info!(
"agent relay: client connected slot={slot} id_offset={id_offset}"
"agent relay: client connected slot={slot} id_start={id_start} id_end_exclusive={id_end_exclusive}"
);

// Perform handshake: send [id_offset: u32 BE][ready_frame_bytes...].
// Perform handshake: send
// [id_start: u32 BE][id_end_exclusive: u32 BE][ready_frame_bytes...].
let (reader_half, mut writer_half) = stream.into_split();

let mut handshake = Vec::with_capacity(4 + ready_frame.len());
handshake.extend_from_slice(&id_offset.to_be_bytes());
let mut handshake = Vec::with_capacity(8 + ready_frame.len());
handshake.extend_from_slice(&id_start.to_be_bytes());
handshake.extend_from_slice(&id_end_exclusive.to_be_bytes());
handshake.extend_from_slice(&ready_frame);

if let Err(e) = writer_half.write_all(&handshake).await {
Expand Down Expand Up @@ -381,6 +385,8 @@ impl AgentRelay {
drain_tx_clone,
registry_clone,
next_id_clone,
id_start,
id_end_exclusive,
));
}
Err(e) => {
Expand Down Expand Up @@ -743,6 +749,8 @@ async fn client_reader_task(
drain_tx: mpsc::Sender<()>,
session_registry: Arc<SessionRegistry>,
next_session_id: Arc<AtomicU64>,
id_start: u32,
id_end_exclusive: u32,
) {
loop {
let frame = match read_raw_frame(&mut reader).await {
Expand All @@ -758,6 +766,16 @@ async fn client_reader_task(
let is_terminal = (frame.flags & FLAG_TERMINAL) != 0;
let is_shutdown = (frame.flags & FLAG_SHUTDOWN) != 0;

if !is_client_frame_allowed(frame.id, frame.flags, id_start, id_end_exclusive) {
tracing::warn!(
"agent relay: client slot={slot} sent out-of-range id={} range=[{}, {})",
frame.id,
id_start,
id_end_exclusive
);
break;
}

// Forward shutdown to agentd (via the agent_tx send below) so the
// guest can sync filesystems and power off cleanly. Also notify the
// caller so it can start the flush-grace fallback timer — if the
Expand Down Expand Up @@ -865,3 +883,41 @@ async fn client_reader_task(
used_slots.lock().await.remove(&slot);
tracing::debug!("agent relay: slot={slot} released");
}

/// Return whether a client-originated frame may be forwarded to agentd.
///
/// Most client frames must use a correlation ID from the relay-assigned
/// range so responses route back to the owning client. `core.shutdown` is a
/// process-level control frame, not a correlated request, and the SDK sends it
/// with ID 0.
fn is_client_frame_allowed(id: u32, flags: u8, id_start: u32, id_end_exclusive: u32) -> bool {
let is_shutdown_control = (flags & FLAG_SHUTDOWN) != 0 && id == 0;
is_shutdown_control || (id >= id_start && id < id_end_exclusive)
}

//--------------------------------------------------------------------------------------------------
// Tests
//--------------------------------------------------------------------------------------------------

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn client_frame_validation_allows_ids_in_assigned_range() {
assert!(is_client_frame_allowed(10, 0, 10, 20));
assert!(is_client_frame_allowed(19, FLAG_SESSION_START, 10, 20));
}

#[test]
fn client_frame_validation_rejects_non_shutdown_ids_outside_range() {
assert!(!is_client_frame_allowed(0, 0, 10, 20));
assert!(!is_client_frame_allowed(9, FLAG_SESSION_START, 10, 20));
assert!(!is_client_frame_allowed(20, FLAG_TERMINAL, 10, 20));
}

#[test]
fn client_frame_validation_allows_shutdown_control_id_zero() {
assert!(is_client_frame_allowed(0, FLAG_SHUTDOWN, 10, 20));
}
}
Loading