From 1d1185a8ef735d39fc728ff96c2199442948a948 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Wed, 28 Jan 2026 11:26:17 +0100 Subject: [PATCH 1/4] feat(msg-sim): configurable runtime --- msg-sim/examples/bdp_throughput.rs | 8 +- msg-sim/examples/sim_multi_region.rs | 12 +- msg-sim/examples/tcp_tuning.rs | 9 +- msg-sim/src/namespace.rs | 38 ++--- msg-sim/src/network.rs | 212 +++++++++++++++++---------- 5 files changed, 173 insertions(+), 106 deletions(-) diff --git a/msg-sim/examples/bdp_throughput.rs b/msg-sim/examples/bdp_throughput.rs index 8b996d48..4e1abc11 100644 --- a/msg-sim/examples/bdp_throughput.rs +++ b/msg-sim/examples/bdp_throughput.rs @@ -24,7 +24,7 @@ async fn main() -> Result<(), Box> { use futures::StreamExt; use msg_sim::{ ip::Subnet, - network::{Link, Network, PeerIdExt}, + network::{HubOptions, Link, Network, PeerIdExt, PeerOptions}, tc::impairment::LinkImpairment, }; use msg_socket::{RepSocket, ReqSocket}; @@ -92,9 +92,9 @@ async fn main() -> Result<(), Box> { println!("Transfer: {} messages × {} KB = {} MB\n", NUM_MESSAGES, MSG_SIZE / 1024, total_mb); let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 100, 0, 0)), 16); - let mut network = Network::new(subnet).await?; - let sender = network.add_peer().await?; - let receiver = network.add_peer().await?; + let mut network = Network::new(subnet, HubOptions::default()).await?; + let sender = network.add_peer(PeerOptions::default()).await?; + let receiver = network.add_peer(PeerOptions::default()).await?; let impairment = LinkImpairment::default().with_latency_ms(LATENCY_MS).with_bandwidth_mbit_s(BANDWIDTH_MBIT); diff --git a/msg-sim/examples/sim_multi_region.rs b/msg-sim/examples/sim_multi_region.rs index 37e09bde..cc84321e 100644 --- a/msg-sim/examples/sim_multi_region.rs +++ b/msg-sim/examples/sim_multi_region.rs @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box> { use msg_sim::{ ip::Subnet, - network::{Link, Network, PeerIdExt}, + network::{HubOptions, Link, Network, PeerIdExt, PeerOptions}, }; use tracing_subscriber::EnvFilter; @@ -161,13 +161,13 @@ async fn main() -> Result<(), Box> { // Create network let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24); - let mut network = Network::new(subnet).await?; + let mut network = Network::new(subnet, HubOptions::default()).await?; // Add peers - let eu = network.add_peer().await?; - let us1 = network.add_peer().await?; - let us2 = network.add_peer().await?; - let tokyo = network.add_peer().await?; + let eu = network.add_peer(PeerOptions::default()).await?; + let us1 = network.add_peer(PeerOptions::default()).await?; + let us2 = network.add_peer(PeerOptions::default()).await?; + let tokyo = network.add_peer(PeerOptions::default()).await?; println!("Peers:"); for id in [eu, us1, us2, tokyo] { diff --git a/msg-sim/examples/tcp_tuning.rs b/msg-sim/examples/tcp_tuning.rs index 97f9b641..a591a6b6 100644 --- a/msg-sim/examples/tcp_tuning.rs +++ b/msg-sim/examples/tcp_tuning.rs @@ -14,7 +14,10 @@ fn main() {} async fn main() -> Result<(), Box> { use std::net::{IpAddr, Ipv4Addr}; - use msg_sim::{ip::Subnet, network::Network}; + use msg_sim::{ + ip::Subnet, + network::{HubOptions, Network, PeerOptions}, + }; use tracing_subscriber::EnvFilter; const TCP_RMEM: &str = "/proc/sys/net/ipv4/tcp_rmem"; @@ -41,8 +44,8 @@ async fn main() -> Result<(), Box> { // Create network with one peer let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24); - let mut network = Network::new(subnet).await?; - let peer = network.add_peer().await?; + let mut network = Network::new(subnet, HubOptions::default()).await?; + let peer = network.add_peer(PeerOptions::default()).await?; // Tune TCP buffers in peer's namespace (min, default, max) let tuned_buffers = "4096 1048576 16777216"; // 4KB / 1MB / 16MB diff --git a/msg-sim/src/namespace.rs b/msg-sim/src/namespace.rs index 7bc4b33a..64d4dd49 100644 --- a/msg-sim/src/namespace.rs +++ b/msg-sim/src/namespace.rs @@ -8,6 +8,7 @@ use tokio::sync::oneshot; use crate::dynch::{DynCh, DynRequestSender}; use crate::namespace::helpers::current_netns; +use crate::network::RuntimeMakerFn; /// Base directory for named network namespaces. /// @@ -230,6 +231,7 @@ impl NetworkNamespaceInner { /// `setns(2)`, which is thread-local. pub fn spawn( self, + make_runtime: RuntimeMakerFn, make_ctx: impl FnOnce() -> Ctx + Send + 'static, ) -> (std::thread::JoinHandle>, DynRequestSender) { let (tx, mut rx) = DynCh::::channel(8); @@ -245,7 +247,7 @@ impl NetworkNamespaceInner { // Create mount namespace and remount /proc for namespace-specific sysctl access helpers::setup_mount_namespace()?; - let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?; + let rt = make_runtime(); tracing::debug!("started runtime"); drop(_span); @@ -286,6 +288,7 @@ pub struct NetworkNamespace { impl NetworkNamespace { pub async fn new( name: impl Into, + make_runtime: RuntimeMakerFn, make_ctx: impl FnOnce() -> Ctx + Send + 'static, ) -> Result> { let name = name.into(); @@ -296,7 +299,7 @@ impl NetworkNamespace { let file = tokio::fs::File::open(path).await?.into_std().await; let inner = NetworkNamespaceInner { name, file }; - let (_receiver_task, task_sender) = inner.try_clone()?.spawn(make_ctx); + let (_receiver_task, task_sender) = inner.try_clone()?.spawn(make_runtime, make_ctx); Ok(NetworkNamespace:: { inner, task_sender, _receiver_task }) } @@ -349,11 +352,18 @@ mod tests { const TCP_SLOW_START_AFTER_IDLE: &str = "/proc/sys/net/ipv4/tcp_slow_start_after_idle"; + fn default_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("to create runtime") + } + #[tokio::test(flavor = "multi_thread")] async fn mount_namespace_isolates_proc() { // Create two namespaces - let ns1 = NetworkNamespace::new("test-ns-mount-1", || ()).await.unwrap(); - let ns2 = NetworkNamespace::new("test-ns-mount-2", || ()).await.unwrap(); + let ns1 = NetworkNamespace::new("test-ns-mount-1", Box::new(default_runtime), || ()).await.unwrap(); + let ns2 = NetworkNamespace::new("test-ns-mount-2", Box::new(default_runtime), || ()).await.unwrap(); // Verify /proc is mounted in ns1 by checking /proc/self/ns/net exists let proc_mounted_ns1: bool = ns1 @@ -385,8 +395,8 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn sysctl_values_are_namespace_specific() { // Create two namespaces - let ns1 = NetworkNamespace::new("test-ns-sysctl-1", || ()).await.unwrap(); - let ns2 = NetworkNamespace::new("test-ns-sysctl-2", || ()).await.unwrap(); + let ns1 = NetworkNamespace::new("test-ns-sysctl-1", Box::new(default_runtime), || ()).await.unwrap(); + let ns2 = NetworkNamespace::new("test-ns-sysctl-2", Box::new(default_runtime), || ()).await.unwrap(); // Set different values in each namespace let write_result_ns1: std::io::Result<()> = ns1 @@ -446,24 +456,19 @@ mod tests { assert_eq!(value_ns1, "0", "ns1 should have tcp_slow_start_after_idle=0"); assert_eq!(value_ns2, "1", "ns2 should have tcp_slow_start_after_idle=1"); - assert_ne!( - value_ns1, value_ns2, - "sysctls should be isolated between namespaces" - ); + assert_ne!(value_ns1, value_ns2, "sysctls should be isolated between namespaces"); } #[tokio::test(flavor = "multi_thread")] async fn namespace_has_isolated_network_identity() { // Create a namespace - let ns = NetworkNamespace::new("test-ns-identity", || ()).await.unwrap(); + let ns = NetworkNamespace::new("test-ns-identity", Box::new(default_runtime), || ()).await.unwrap(); // Get the network namespace inode from inside the namespace let ns_inode_inside: u64 = ns .task_sender .submit(|_: &mut ()| -> DynFuture<'_, u64> { - Box::pin(async { - helpers::current_netns().map(|id| id.inode).unwrap_or(0) - }) + Box::pin(async { helpers::current_netns().map(|id| id.inode).unwrap_or(0) }) }) .await .unwrap() @@ -476,9 +481,6 @@ mod tests { assert_ne!(ns_inode_inside, 0, "should get valid inode inside namespace"); assert_ne!(host_inode, 0, "should get valid host inode"); - assert_ne!( - ns_inode_inside, host_inode, - "namespace inode should differ from host" - ); + assert_ne!(ns_inode_inside, host_inode, "namespace inode should differ from host"); } } diff --git a/msg-sim/src/network.rs b/msg-sim/src/network.rs index 21215b54..253472bd 100644 --- a/msg-sim/src/network.rs +++ b/msg-sim/src/network.rs @@ -52,7 +52,6 @@ //! classes. See the [`crate::tc`] module for details on the qdisc hierarchy. use std::{ - any::Any, collections::{HashMap, HashSet}, fmt::{Debug, Display}, io, @@ -137,8 +136,8 @@ pub struct Link(pub PeerId, pub PeerId); impl Link { /// Create a new directed link from source to destination. #[inline] - pub fn new(source: PeerId, destination: PeerId) -> Self { - Link(source, destination) + pub fn new(source: impl Into, destination: impl Into) -> Self { + Link(source.into(), destination.into()) } /// Get the source peer (traffic originates here). @@ -194,7 +193,7 @@ impl PeerTcState { } /// Map from peer ID to peer instance. -pub type PeerMap = HashMap>; +pub type PeerMap = HashMap>; /// Map from peer ID to traffic control state. type TcStateMap = HashMap; @@ -217,21 +216,77 @@ impl Peer { } } +pub(crate) type RuntimeMakerFn = Box tokio::runtime::Runtime + Send>; + +/// Common context provided to all namespaces. +/// +/// This context gives access to rtnetlink for network configuration. +#[derive(Debug)] +pub struct CommonContext { + /// Handle for sending rtnetlink messages within this namespace. + handle: rtnetlink::Handle, + /// Background task processing rtnetlink responses. + _connection_task: tokio::task::JoinHandle<()>, +} + +pub struct HubOptions { + make_runtime: RuntimeMakerFn, +} + +impl Default for HubOptions { + fn default() -> Self { + Self { + make_runtime: Box::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("to create runtime") + }), + } + } +} + /// Context provided to tasks running within a peer's namespace. /// /// This context gives access to rtnetlink for network configuration /// and metadata about the peer's position in the network. #[derive(Debug)] -pub struct Context { +pub struct PeerContext { /// Handle for sending rtnetlink messages within this namespace. - handle: rtnetlink::Handle, + pub handle: rtnetlink::Handle, /// Background task processing rtnetlink responses. _connection_task: tokio::task::JoinHandle<()>, - /// The subnet this network uses. - subnet: Subnet, + pub subnet: Subnet, /// This peer's ID. - peer_id: usize, + pub peer_id: PeerId, +} + +/// Options for configuring a peer. +pub struct PeerOptions { + make_runtime: RuntimeMakerFn, +} + +impl Default for PeerOptions { + fn default() -> Self { + Self { + make_runtime: Box::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("to create runtime") + }), + } + } +} + +impl PeerOptions { + /// Create new peer options with a custom runtime factory. + pub fn with_runtime( + make_runtime: impl FnOnce() -> tokio::runtime::Runtime + Send + 'static, + ) -> Self { + Self { make_runtime: Box::new(make_runtime) } + } } // ------------------------------------------------------------------------------------- @@ -289,7 +344,7 @@ pub type Result = std::result::Result; /// # Example /// /// ```no_run -/// use msg_sim::network::{Network, Link}; +/// use msg_sim::network::{Network, Link, HubOptions, PeerOptions}; /// use msg_sim::tc::impairment::LinkImpairment; /// use msg_sim::ip::Subnet; /// use std::net::Ipv4Addr; @@ -298,12 +353,12 @@ pub type Result = std::result::Result; /// async fn main() { /// // Create a network with a /16 subnet /// let subnet = Subnet::new(Ipv4Addr::new(10, 0, 0, 0).into(), 16); -/// let mut network = Network::new(subnet).await.unwrap(); +/// let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); /// /// // Add some peers -/// let peer_1 = network.add_peer().await.unwrap(); -/// let peer_2 = network.add_peer().await.unwrap(); -/// let peer_3 = network.add_peer().await.unwrap(); +/// let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); +/// let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); +/// let peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); /// /// // Configure different impairments for different paths /// network.apply_impairment( @@ -344,7 +399,7 @@ pub struct Network { subnet: Subnet, /// The hub namespace containing the bridge device. - network_hub_namespace: NetworkNamespace, + network_hub_namespace: NetworkNamespace, /// Rtnetlink handle bound to the host namespace. /// @@ -366,7 +421,7 @@ impl Network { /// 2. A bridge device (`msg-sim-br0`) in the hub namespace /// /// Peers can then be added with [`add_peer`](Self::add_peer). - pub async fn new(subnet: Subnet) -> Result { + pub async fn new(subnet: Subnet, options: HubOptions) -> Result { // Create rtnetlink connection in the host namespace. // This is used for creating veth pairs and moving devices. let (connection, handle, _) = rtnetlink::new_connection()?; @@ -379,11 +434,13 @@ impl Network { .map(|(connection, handle, _)| (handle, tokio::task::spawn(connection))) .unwrap(); - Context { handle, subnet, peer_id: 0, _connection_task } + CommonContext { handle, _connection_task } }; // Create the hub namespace that will host the bridge. - let namespace_hub = NetworkNamespace::new(Self::hub_namespace_name(), make_ctx).await?; + let namespace_hub = + NetworkNamespace::new(Self::hub_namespace_name(), options.make_runtime, make_ctx) + .await?; let fd = namespace_hub.fd(); let network = Self { @@ -418,7 +475,7 @@ impl Network { /// 1. A new network namespace for the peer /// 2. A veth pair connecting the peer to the hub bridge /// 3. IP address assignment based on the subnet and peer ID - pub async fn add_peer(&mut self) -> Result { + pub async fn add_peer(&mut self, options: PeerOptions) -> Result { let peer_id = PEER_ID_NEXT.load(Ordering::Relaxed); let namespace_name = peer_id.namespace_name(); let veth_name = Arc::new(peer_id.veth_name()); @@ -435,10 +492,11 @@ impl Network { .map(|(connection, handle, _)| (handle, tokio::task::spawn(connection))) .expect("to create rtnetlink socket"); - Context { handle, peer_id, subnet, _connection_task } + PeerContext { handle, _connection_task, subnet, peer_id } }; - let network_namespace = NetworkNamespace::new(namespace_name.clone(), make_ctx).await?; + let network_namespace = + NetworkNamespace::new(namespace_name.clone(), options.make_runtime, make_ctx).await?; // Step 1: Create the veth pair in the host namespace. // One end (veth_name) will go to the peer, the other (veth_br_name) to the bridge. @@ -476,7 +534,7 @@ impl Network { network_namespace .task_sender - .submit(|ctx| { + .submit(|ctx: &mut PeerContext| { Box::pin(async move { let address = ctx.peer_id.veth_address(ctx.subnet); let mask = ctx.subnet.netmask; @@ -560,16 +618,16 @@ impl Network { /// /// ```no_run /// use msg_sim::ip::Subnet; - /// use msg_sim::network::Network; + /// use msg_sim::network::{Network, HubOptions, PeerOptions}; /// use std::net::Ipv4Addr; /// use tokio::net::TcpListener; /// /// #[tokio::main] /// async fn main() { /// let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - /// let mut network = Network::new(subnet).await.unwrap(); + /// let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); /// - /// let peer_id = network.add_peer().await.unwrap(); + /// let peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); /// network /// .run_in_namespace(peer_id, |_ctx| { /// Box::pin(async move { @@ -589,8 +647,8 @@ impl Network { fut: F, ) -> Result>> where - T: Any + Send + 'static, - F: for<'a> FnOnce(&'a mut Context) -> DynFuture<'a, T> + Send + 'static, + T: Send + 'static, + F: for<'a> FnOnce(&'a mut PeerContext) -> DynFuture<'a, T> + Send + 'static, { let Some(peer) = self.peers.get(&peer_id) else { return Err(Error::PeerNotFound(peer_id)); @@ -630,7 +688,7 @@ impl Network { /// ```no_run /// use msg_sim::{ /// ip::Subnet, - /// network::{Link, Network}, + /// network::{Link, Network, HubOptions, PeerOptions}, /// tc::impairment::LinkImpairment /// }; /// @@ -639,10 +697,10 @@ impl Network { /// #[tokio::main] /// async fn main() { /// let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - /// let mut network = Network::new(subnet).await.unwrap(); + /// let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); /// - /// let peer_1 = network.add_peer().await.unwrap(); - /// let peer_2 = network.add_peer().await.unwrap(); + /// let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + /// let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); /// /// // Simulate a slow, lossy link from peer 1 to peer 2 /// network.apply_impairment( @@ -681,7 +739,7 @@ impl Network { src_peer .namespace .task_sender - .submit(move |ctx| { + .submit(move |ctx: &mut PeerContext| { let span = tracing::debug_span!( "apply_impairment", link = %link, @@ -765,7 +823,7 @@ mod msg_sim_network { use crate::{ ip::Subnet, - network::{Link, Network, PeerIdExt}, + network::{HubOptions, Link, Network, PeerIdExt, PeerOptions}, tc::impairment::LinkImpairment, }; @@ -774,7 +832,7 @@ mod msg_sim_network { async fn create_network_works() { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(11, 0, 0, 0).into(), 16); - let _network = Network::new(subnet).await.unwrap(); + let _network = Network::new(subnet, HubOptions::default()).await.unwrap(); let path = format!("/run/netns/{}", Network::hub_namespace_name()); let exists = std::fs::exists(path.clone()).unwrap(); @@ -787,10 +845,10 @@ mod msg_sim_network { async fn add_peer_works() { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let _peer_id = network.add_peer().await.unwrap(); - let _peer_id = network.add_peer().await.unwrap(); + let _peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); + let _peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); } /// Test basic request/reply communication between two peers. @@ -799,10 +857,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(13, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); let address_2 = peer_2.veth_address(subnet); let port_2 = 12345; @@ -845,11 +903,11 @@ mod msg_sim_network { async fn apply_impairment_works() { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); - let _peer_3 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let _peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); let impairment = LinkImpairment { loss: 50.0, @@ -867,10 +925,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(14, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); // Apply 1 second latency from peer 1 to peer 2 let sec_in_us = 1_000_000; @@ -927,11 +985,11 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(15, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); - let peer_3 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); // Peer 1 → Peer 2: 100ms latency let fast_latency_us = 100_000; @@ -996,13 +1054,17 @@ mod msg_sim_network { req_socket_2.connect_sync(SocketAddr::new(address_2, port)); req_socket_3.connect_sync(SocketAddr::new(address_3, port)); - // Measure RTT to peer 2 (should be ~200ms round trip for 100ms one-way) + // Wait for both TCP connections to be established before starting + // measurements. Peer 3 has 500ms one-way latency, so TCP handshake takes ~1s. + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + // Measure RTT to peer 2 (should be ~100ms for one-way latency) let start = Instant::now(); let resp = req_socket_2.request("ping".into()).await.unwrap(); let rtt_2 = start.elapsed(); assert_eq!(resp.as_ref(), b"peer2"); - // Measure RTT to peer 3 (should be ~1000ms round trip for 500ms one-way) + // Measure RTT to peer 3 (should be ~500ms for one-way latency) let start = Instant::now(); let resp = req_socket_3.request("ping".into()).await.unwrap(); let rtt_3 = start.elapsed(); @@ -1031,10 +1093,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(16, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); // Apply bandwidth limit: 1 Mbit/s from peer 1 to peer 2 let bandwidth_mbit = 1.0; @@ -1105,10 +1167,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(17, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); // Apply both bandwidth limit and latency let latency_us = 100_000; // 100ms @@ -1176,10 +1238,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(18, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); // First application: 100ms latency let impairment_1 = LinkImpairment { latency: 100_000, ..Default::default() }; @@ -1207,10 +1269,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(19, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); // Different impairments in each direction let impairment_1_to_2 = LinkImpairment { latency: 50_000, ..Default::default() }; @@ -1232,10 +1294,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(20, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); // Apply bandwidth limit with custom burst network @@ -1266,11 +1328,11 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(24, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); - let peer_3 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); // First netem WITH duplicate - works let with_dup = LinkImpairment { latency: 20_000, duplicate: 0.02, ..Default::default() }; @@ -1297,10 +1359,10 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(21, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet).await.unwrap(); + let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); - let peer_1 = network.add_peer().await.unwrap(); - let peer_2 = network.add_peer().await.unwrap(); + let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); // Apply 100% packet duplication from peer 1 to peer 2 // This should cause every packet to be sent twice From e74d0147b4f1a396881ee2b350d529900ae27255 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Thu, 29 Jan 2026 13:36:40 +0100 Subject: [PATCH 2/4] chore(msg-sim): renaming --- msg-sim/src/namespace.rs | 35 +++++++++++++++++++++-------------- msg-sim/src/network.rs | 19 ++++++++++--------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/msg-sim/src/namespace.rs b/msg-sim/src/namespace.rs index 64d4dd49..4d584ed5 100644 --- a/msg-sim/src/namespace.rs +++ b/msg-sim/src/namespace.rs @@ -8,7 +8,7 @@ use tokio::sync::oneshot; use crate::dynch::{DynCh, DynRequestSender}; use crate::namespace::helpers::current_netns; -use crate::network::RuntimeMakerFn; +use crate::network::RuntimeFactory; /// Base directory for named network namespaces. /// @@ -231,7 +231,7 @@ impl NetworkNamespaceInner { /// `setns(2)`, which is thread-local. pub fn spawn( self, - make_runtime: RuntimeMakerFn, + runtime_factory: RuntimeFactory, make_ctx: impl FnOnce() -> Ctx + Send + 'static, ) -> (std::thread::JoinHandle>, DynRequestSender) { let (tx, mut rx) = DynCh::::channel(8); @@ -247,7 +247,7 @@ impl NetworkNamespaceInner { // Create mount namespace and remount /proc for namespace-specific sysctl access helpers::setup_mount_namespace()?; - let rt = make_runtime(); + let rt = runtime_factory(); tracing::debug!("started runtime"); drop(_span); @@ -288,7 +288,7 @@ pub struct NetworkNamespace { impl NetworkNamespace { pub async fn new( name: impl Into, - make_runtime: RuntimeMakerFn, + runtime_factory: RuntimeFactory, make_ctx: impl FnOnce() -> Ctx + Send + 'static, ) -> Result> { let name = name.into(); @@ -299,7 +299,7 @@ impl NetworkNamespace { let file = tokio::fs::File::open(path).await?.into_std().await; let inner = NetworkNamespaceInner { name, file }; - let (_receiver_task, task_sender) = inner.try_clone()?.spawn(make_runtime, make_ctx); + let (_receiver_task, task_sender) = inner.try_clone()?.spawn(runtime_factory, make_ctx); Ok(NetworkNamespace:: { inner, task_sender, _receiver_task }) } @@ -353,17 +353,18 @@ mod tests { const TCP_SLOW_START_AFTER_IDLE: &str = "/proc/sys/net/ipv4/tcp_slow_start_after_idle"; fn default_runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .expect("to create runtime") + tokio::runtime::Builder::new_multi_thread().enable_all().build().expect("to create runtime") } #[tokio::test(flavor = "multi_thread")] async fn mount_namespace_isolates_proc() { // Create two namespaces - let ns1 = NetworkNamespace::new("test-ns-mount-1", Box::new(default_runtime), || ()).await.unwrap(); - let ns2 = NetworkNamespace::new("test-ns-mount-2", Box::new(default_runtime), || ()).await.unwrap(); + let ns1 = NetworkNamespace::new("test-ns-mount-1", Box::new(default_runtime), || ()) + .await + .unwrap(); + let ns2 = NetworkNamespace::new("test-ns-mount-2", Box::new(default_runtime), || ()) + .await + .unwrap(); // Verify /proc is mounted in ns1 by checking /proc/self/ns/net exists let proc_mounted_ns1: bool = ns1 @@ -395,8 +396,12 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn sysctl_values_are_namespace_specific() { // Create two namespaces - let ns1 = NetworkNamespace::new("test-ns-sysctl-1", Box::new(default_runtime), || ()).await.unwrap(); - let ns2 = NetworkNamespace::new("test-ns-sysctl-2", Box::new(default_runtime), || ()).await.unwrap(); + let ns1 = NetworkNamespace::new("test-ns-sysctl-1", Box::new(default_runtime), || ()) + .await + .unwrap(); + let ns2 = NetworkNamespace::new("test-ns-sysctl-2", Box::new(default_runtime), || ()) + .await + .unwrap(); // Set different values in each namespace let write_result_ns1: std::io::Result<()> = ns1 @@ -462,7 +467,9 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn namespace_has_isolated_network_identity() { // Create a namespace - let ns = NetworkNamespace::new("test-ns-identity", Box::new(default_runtime), || ()).await.unwrap(); + let ns = NetworkNamespace::new("test-ns-identity", Box::new(default_runtime), || ()) + .await + .unwrap(); // Get the network namespace inode from inside the namespace let ns_inode_inside: u64 = ns diff --git a/msg-sim/src/network.rs b/msg-sim/src/network.rs index 253472bd..1191e243 100644 --- a/msg-sim/src/network.rs +++ b/msg-sim/src/network.rs @@ -216,7 +216,7 @@ impl Peer { } } -pub(crate) type RuntimeMakerFn = Box tokio::runtime::Runtime + Send>; +pub(crate) type RuntimeFactory = Box tokio::runtime::Runtime + Send>; /// Common context provided to all namespaces. /// @@ -230,13 +230,13 @@ pub struct CommonContext { } pub struct HubOptions { - make_runtime: RuntimeMakerFn, + runtime_factory: RuntimeFactory, } impl Default for HubOptions { fn default() -> Self { Self { - make_runtime: Box::new(|| { + runtime_factory: Box::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -264,13 +264,13 @@ pub struct PeerContext { /// Options for configuring a peer. pub struct PeerOptions { - make_runtime: RuntimeMakerFn, + runtime_factory: RuntimeFactory, } impl Default for PeerOptions { fn default() -> Self { Self { - make_runtime: Box::new(|| { + runtime_factory: Box::new(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -283,9 +283,9 @@ impl Default for PeerOptions { impl PeerOptions { /// Create new peer options with a custom runtime factory. pub fn with_runtime( - make_runtime: impl FnOnce() -> tokio::runtime::Runtime + Send + 'static, + runtime_factory: impl FnOnce() -> tokio::runtime::Runtime + Send + 'static, ) -> Self { - Self { make_runtime: Box::new(make_runtime) } + Self { runtime_factory: Box::new(runtime_factory) } } } @@ -439,7 +439,7 @@ impl Network { // Create the hub namespace that will host the bridge. let namespace_hub = - NetworkNamespace::new(Self::hub_namespace_name(), options.make_runtime, make_ctx) + NetworkNamespace::new(Self::hub_namespace_name(), options.runtime_factory, make_ctx) .await?; let fd = namespace_hub.fd(); @@ -496,7 +496,8 @@ impl Network { }; let network_namespace = - NetworkNamespace::new(namespace_name.clone(), options.make_runtime, make_ctx).await?; + NetworkNamespace::new(namespace_name.clone(), options.runtime_factory, make_ctx) + .await?; // Step 1: Create the veth pair in the host namespace. // One end (veth_name) will go to the peer, the other (veth_br_name) to the bridge. From d3d29c7e9d111183f46e015c6f564e3202b24687 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Thu, 29 Jan 2026 17:50:02 +0100 Subject: [PATCH 3/4] chore(msg-sim): remove hub-options --- msg-sim/examples/bdp_throughput.rs | 4 +- msg-sim/examples/sim_multi_region.rs | 4 +- msg-sim/examples/tcp_tuning.rs | 4 +- msg-sim/src/network.rs | 61 ++++++++++++---------------- 4 files changed, 31 insertions(+), 42 deletions(-) diff --git a/msg-sim/examples/bdp_throughput.rs b/msg-sim/examples/bdp_throughput.rs index 4e1abc11..cf2e621a 100644 --- a/msg-sim/examples/bdp_throughput.rs +++ b/msg-sim/examples/bdp_throughput.rs @@ -24,7 +24,7 @@ async fn main() -> Result<(), Box> { use futures::StreamExt; use msg_sim::{ ip::Subnet, - network::{HubOptions, Link, Network, PeerIdExt, PeerOptions}, + network::{Link, Network, PeerIdExt, PeerOptions}, tc::impairment::LinkImpairment, }; use msg_socket::{RepSocket, ReqSocket}; @@ -92,7 +92,7 @@ async fn main() -> Result<(), Box> { println!("Transfer: {} messages × {} KB = {} MB\n", NUM_MESSAGES, MSG_SIZE / 1024, total_mb); let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 100, 0, 0)), 16); - let mut network = Network::new(subnet, HubOptions::default()).await?; + let mut network = Network::new(subnet).await?; let sender = network.add_peer(PeerOptions::default()).await?; let receiver = network.add_peer(PeerOptions::default()).await?; diff --git a/msg-sim/examples/sim_multi_region.rs b/msg-sim/examples/sim_multi_region.rs index cc84321e..e0ae97ce 100644 --- a/msg-sim/examples/sim_multi_region.rs +++ b/msg-sim/examples/sim_multi_region.rs @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box> { use msg_sim::{ ip::Subnet, - network::{HubOptions, Link, Network, PeerIdExt, PeerOptions}, + network::{Link, Network, PeerIdExt, PeerOptions}, }; use tracing_subscriber::EnvFilter; @@ -161,7 +161,7 @@ async fn main() -> Result<(), Box> { // Create network let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24); - let mut network = Network::new(subnet, HubOptions::default()).await?; + let mut network = Network::new(subnet).await?; // Add peers let eu = network.add_peer(PeerOptions::default()).await?; diff --git a/msg-sim/examples/tcp_tuning.rs b/msg-sim/examples/tcp_tuning.rs index a591a6b6..dfed2a9f 100644 --- a/msg-sim/examples/tcp_tuning.rs +++ b/msg-sim/examples/tcp_tuning.rs @@ -16,7 +16,7 @@ async fn main() -> Result<(), Box> { use msg_sim::{ ip::Subnet, - network::{HubOptions, Network, PeerOptions}, + network::{Network, PeerOptions}, }; use tracing_subscriber::EnvFilter; @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box> { // Create network with one peer let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24); - let mut network = Network::new(subnet, HubOptions::default()).await?; + let mut network = Network::new(subnet).await?; let peer = network.add_peer(PeerOptions::default()).await?; // Tune TCP buffers in peer's namespace (min, default, max) diff --git a/msg-sim/src/network.rs b/msg-sim/src/network.rs index 1191e243..4ebc9840 100644 --- a/msg-sim/src/network.rs +++ b/msg-sim/src/network.rs @@ -218,6 +218,12 @@ impl Peer { pub(crate) type RuntimeFactory = Box tokio::runtime::Runtime + Send>; +pub fn default_runtime_factory() -> RuntimeFactory { + Box::new(|| { + tokio::runtime::Builder::new_multi_thread().enable_all().build().expect("to create runtime") + }) +} + /// Common context provided to all namespaces. /// /// This context gives access to rtnetlink for network configuration. @@ -229,23 +235,6 @@ pub struct CommonContext { _connection_task: tokio::task::JoinHandle<()>, } -pub struct HubOptions { - runtime_factory: RuntimeFactory, -} - -impl Default for HubOptions { - fn default() -> Self { - Self { - runtime_factory: Box::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .expect("to create runtime") - }), - } - } -} - /// Context provided to tasks running within a peer's namespace. /// /// This context gives access to rtnetlink for network configuration @@ -353,7 +342,7 @@ pub type Result = std::result::Result; /// async fn main() { /// // Create a network with a /16 subnet /// let subnet = Subnet::new(Ipv4Addr::new(10, 0, 0, 0).into(), 16); -/// let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); +/// let mut network = Network::new(subnet).await.unwrap(); /// /// // Add some peers /// let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -421,7 +410,7 @@ impl Network { /// 2. A bridge device (`msg-sim-br0`) in the hub namespace /// /// Peers can then be added with [`add_peer`](Self::add_peer). - pub async fn new(subnet: Subnet, options: HubOptions) -> Result { + pub async fn new(subnet: Subnet) -> Result { // Create rtnetlink connection in the host namespace. // This is used for creating veth pairs and moving devices. let (connection, handle, _) = rtnetlink::new_connection()?; @@ -439,7 +428,7 @@ impl Network { // Create the hub namespace that will host the bridge. let namespace_hub = - NetworkNamespace::new(Self::hub_namespace_name(), options.runtime_factory, make_ctx) + NetworkNamespace::new(Self::hub_namespace_name(), default_runtime_factory(), make_ctx) .await?; let fd = namespace_hub.fd(); @@ -626,7 +615,7 @@ impl Network { /// #[tokio::main] /// async fn main() { /// let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - /// let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + /// let mut network = Network::new(subnet).await.unwrap(); /// /// let peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); /// network @@ -698,7 +687,7 @@ impl Network { /// #[tokio::main] /// async fn main() { /// let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - /// let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + /// let mut network = Network::new(subnet).await.unwrap(); /// /// let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); /// let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -824,7 +813,7 @@ mod msg_sim_network { use crate::{ ip::Subnet, - network::{HubOptions, Link, Network, PeerIdExt, PeerOptions}, + network::{Link, Network, PeerIdExt, PeerOptions}, tc::impairment::LinkImpairment, }; @@ -833,7 +822,7 @@ mod msg_sim_network { async fn create_network_works() { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(11, 0, 0, 0).into(), 16); - let _network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let _network = Network::new(subnet).await.unwrap(); let path = format!("/run/netns/{}", Network::hub_namespace_name()); let exists = std::fs::exists(path.clone()).unwrap(); @@ -846,7 +835,7 @@ mod msg_sim_network { async fn add_peer_works() { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let _peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); let _peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -858,7 +847,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(13, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -904,7 +893,7 @@ mod msg_sim_network { async fn apply_impairment_works() { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -926,7 +915,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(14, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -986,7 +975,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(15, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -1094,7 +1083,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(16, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -1168,7 +1157,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(17, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -1239,7 +1228,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(18, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -1270,7 +1259,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(19, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -1295,7 +1284,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(20, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -1329,7 +1318,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(24, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); @@ -1360,7 +1349,7 @@ mod msg_sim_network { let _ = tracing_subscriber::fmt::try_init(); let subnet = Subnet::new(Ipv4Addr::new(21, 0, 0, 0).into(), 16); - let mut network = Network::new(subnet, HubOptions::default()).await.unwrap(); + let mut network = Network::new(subnet).await.unwrap(); let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); From f60efc48a1a9cc20d6502f6d41cd3bcff5615cc0 Mon Sep 17 00:00:00 2001 From: thedevbirb Date: Fri, 30 Jan 2026 14:22:01 +0100 Subject: [PATCH 4/4] chore(msg_sim): add_peer_with_options --- msg-sim/examples/bdp_throughput.rs | 6 +-- msg-sim/examples/sim_multi_region.rs | 10 ++-- msg-sim/examples/tcp_tuning.rs | 7 +-- msg-sim/src/network.rs | 81 +++++++++++++++------------- 4 files changed, 53 insertions(+), 51 deletions(-) diff --git a/msg-sim/examples/bdp_throughput.rs b/msg-sim/examples/bdp_throughput.rs index cf2e621a..8b996d48 100644 --- a/msg-sim/examples/bdp_throughput.rs +++ b/msg-sim/examples/bdp_throughput.rs @@ -24,7 +24,7 @@ async fn main() -> Result<(), Box> { use futures::StreamExt; use msg_sim::{ ip::Subnet, - network::{Link, Network, PeerIdExt, PeerOptions}, + network::{Link, Network, PeerIdExt}, tc::impairment::LinkImpairment, }; use msg_socket::{RepSocket, ReqSocket}; @@ -93,8 +93,8 @@ async fn main() -> Result<(), Box> { let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 100, 0, 0)), 16); let mut network = Network::new(subnet).await?; - let sender = network.add_peer(PeerOptions::default()).await?; - let receiver = network.add_peer(PeerOptions::default()).await?; + let sender = network.add_peer().await?; + let receiver = network.add_peer().await?; let impairment = LinkImpairment::default().with_latency_ms(LATENCY_MS).with_bandwidth_mbit_s(BANDWIDTH_MBIT); diff --git a/msg-sim/examples/sim_multi_region.rs b/msg-sim/examples/sim_multi_region.rs index e0ae97ce..37e09bde 100644 --- a/msg-sim/examples/sim_multi_region.rs +++ b/msg-sim/examples/sim_multi_region.rs @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box> { use msg_sim::{ ip::Subnet, - network::{Link, Network, PeerIdExt, PeerOptions}, + network::{Link, Network, PeerIdExt}, }; use tracing_subscriber::EnvFilter; @@ -164,10 +164,10 @@ async fn main() -> Result<(), Box> { let mut network = Network::new(subnet).await?; // Add peers - let eu = network.add_peer(PeerOptions::default()).await?; - let us1 = network.add_peer(PeerOptions::default()).await?; - let us2 = network.add_peer(PeerOptions::default()).await?; - let tokyo = network.add_peer(PeerOptions::default()).await?; + let eu = network.add_peer().await?; + let us1 = network.add_peer().await?; + let us2 = network.add_peer().await?; + let tokyo = network.add_peer().await?; println!("Peers:"); for id in [eu, us1, us2, tokyo] { diff --git a/msg-sim/examples/tcp_tuning.rs b/msg-sim/examples/tcp_tuning.rs index dfed2a9f..97f9b641 100644 --- a/msg-sim/examples/tcp_tuning.rs +++ b/msg-sim/examples/tcp_tuning.rs @@ -14,10 +14,7 @@ fn main() {} async fn main() -> Result<(), Box> { use std::net::{IpAddr, Ipv4Addr}; - use msg_sim::{ - ip::Subnet, - network::{Network, PeerOptions}, - }; + use msg_sim::{ip::Subnet, network::Network}; use tracing_subscriber::EnvFilter; const TCP_RMEM: &str = "/proc/sys/net/ipv4/tcp_rmem"; @@ -45,7 +42,7 @@ async fn main() -> Result<(), Box> { // Create network with one peer let subnet = Subnet::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 24); let mut network = Network::new(subnet).await?; - let peer = network.add_peer(PeerOptions::default()).await?; + let peer = network.add_peer().await?; // Tune TCP buffers in peer's namespace (min, default, max) let tuned_buffers = "4096 1048576 16777216"; // 4KB / 1MB / 16MB diff --git a/msg-sim/src/network.rs b/msg-sim/src/network.rs index 4ebc9840..8ad5c84f 100644 --- a/msg-sim/src/network.rs +++ b/msg-sim/src/network.rs @@ -333,7 +333,7 @@ pub type Result = std::result::Result; /// # Example /// /// ```no_run -/// use msg_sim::network::{Network, Link, HubOptions, PeerOptions}; +/// use msg_sim::network::{Network, Link, PeerOptions}; /// use msg_sim::tc::impairment::LinkImpairment; /// use msg_sim::ip::Subnet; /// use std::net::Ipv4Addr; @@ -345,9 +345,9 @@ pub type Result = std::result::Result; /// let mut network = Network::new(subnet).await.unwrap(); /// /// // Add some peers -/// let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); -/// let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); -/// let peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); +/// let peer_1 = network.add_peer().await.unwrap(); +/// let peer_2 = network.add_peer().await.unwrap(); +/// let peer_3 = network.add_peer().await.unwrap(); /// /// // Configure different impairments for different paths /// network.apply_impairment( @@ -464,7 +464,7 @@ impl Network { /// 1. A new network namespace for the peer /// 2. A veth pair connecting the peer to the hub bridge /// 3. IP address assignment based on the subnet and peer ID - pub async fn add_peer(&mut self, options: PeerOptions) -> Result { + pub async fn add_peer_with_options(&mut self, options: PeerOptions) -> Result { let peer_id = PEER_ID_NEXT.load(Ordering::Relaxed); let namespace_name = peer_id.namespace_name(); let veth_name = Arc::new(peer_id.veth_name()); @@ -589,6 +589,11 @@ impl Network { Ok(peer_id) } + /// See [`Self::add_peer_with_options`]. + pub async fn add_peer(&mut self) -> Result { + self.add_peer_with_options(PeerOptions::default()).await + } + /// Run a task in a peer's network namespace. /// /// The provided closure receives a mutable reference to the namespace's context, @@ -608,7 +613,7 @@ impl Network { /// /// ```no_run /// use msg_sim::ip::Subnet; - /// use msg_sim::network::{Network, HubOptions, PeerOptions}; + /// use msg_sim::network::{Network, PeerOptions}; /// use std::net::Ipv4Addr; /// use tokio::net::TcpListener; /// @@ -617,7 +622,7 @@ impl Network { /// let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); /// let mut network = Network::new(subnet).await.unwrap(); /// - /// let peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); + /// let peer_id = network.add_peer().await.unwrap(); /// network /// .run_in_namespace(peer_id, |_ctx| { /// Box::pin(async move { @@ -678,7 +683,7 @@ impl Network { /// ```no_run /// use msg_sim::{ /// ip::Subnet, - /// network::{Link, Network, HubOptions, PeerOptions}, + /// network::{Link, Network, PeerOptions}, /// tc::impairment::LinkImpairment /// }; /// @@ -689,8 +694,8 @@ impl Network { /// let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); /// let mut network = Network::new(subnet).await.unwrap(); /// - /// let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - /// let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + /// let peer_1 = network.add_peer().await.unwrap(); + /// let peer_2 = network.add_peer().await.unwrap(); /// /// // Simulate a slow, lossy link from peer 1 to peer 2 /// network.apply_impairment( @@ -813,7 +818,7 @@ mod msg_sim_network { use crate::{ ip::Subnet, - network::{Link, Network, PeerIdExt, PeerOptions}, + network::{Link, Network, PeerIdExt}, tc::impairment::LinkImpairment, }; @@ -837,8 +842,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let _peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); - let _peer_id = network.add_peer(PeerOptions::default()).await.unwrap(); + let _peer_id = network.add_peer().await.unwrap(); + let _peer_id = network.add_peer().await.unwrap(); } /// Test basic request/reply communication between two peers. @@ -849,8 +854,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(13, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); let address_2 = peer_2.veth_address(subnet); let port_2 = 12345; @@ -895,9 +900,9 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); - let _peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); + let _peer_3 = network.add_peer().await.unwrap(); let impairment = LinkImpairment { loss: 50.0, @@ -917,8 +922,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(14, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); // Apply 1 second latency from peer 1 to peer 2 let sec_in_us = 1_000_000; @@ -977,9 +982,9 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(15, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); + let peer_3 = network.add_peer().await.unwrap(); // Peer 1 → Peer 2: 100ms latency let fast_latency_us = 100_000; @@ -1085,8 +1090,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(16, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); // Apply bandwidth limit: 1 Mbit/s from peer 1 to peer 2 let bandwidth_mbit = 1.0; @@ -1159,8 +1164,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(17, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); // Apply both bandwidth limit and latency let latency_us = 100_000; // 100ms @@ -1230,8 +1235,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(18, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); // First application: 100ms latency let impairment_1 = LinkImpairment { latency: 100_000, ..Default::default() }; @@ -1261,8 +1266,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(19, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); // Different impairments in each direction let impairment_1_to_2 = LinkImpairment { latency: 50_000, ..Default::default() }; @@ -1286,8 +1291,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(20, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); // Apply bandwidth limit with custom burst network @@ -1320,9 +1325,9 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(24, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_3 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); + let peer_3 = network.add_peer().await.unwrap(); // First netem WITH duplicate - works let with_dup = LinkImpairment { latency: 20_000, duplicate: 0.02, ..Default::default() }; @@ -1351,8 +1356,8 @@ mod msg_sim_network { let subnet = Subnet::new(Ipv4Addr::new(21, 0, 0, 0).into(), 16); let mut network = Network::new(subnet).await.unwrap(); - let peer_1 = network.add_peer(PeerOptions::default()).await.unwrap(); - let peer_2 = network.add_peer(PeerOptions::default()).await.unwrap(); + let peer_1 = network.add_peer().await.unwrap(); + let peer_2 = network.add_peer().await.unwrap(); // Apply 100% packet duplication from peer 1 to peer 2 // This should cause every packet to be sent twice