diff --git a/integration-tests/Cargo.lock b/integration-tests/Cargo.lock index 067572280..c831bba88 100644 --- a/integration-tests/Cargo.lock +++ b/integration-tests/Cargo.lock @@ -3085,6 +3085,7 @@ dependencies = [ "axum", "bs58", "config", + "dashmap", "dirs", "futures", "miniscript", diff --git a/miner-apps/Cargo.lock b/miner-apps/Cargo.lock index ffee01e62..afe22073c 100644 --- a/miner-apps/Cargo.lock +++ b/miner-apps/Cargo.lock @@ -2438,6 +2438,7 @@ dependencies = [ "axum", "bs58", "config", + "dashmap", "dirs", "futures", "miniscript", diff --git a/pool-apps/Cargo.lock b/pool-apps/Cargo.lock index 8427c10b3..1bb32106f 100644 --- a/pool-apps/Cargo.lock +++ b/pool-apps/Cargo.lock @@ -2445,6 +2445,7 @@ dependencies = [ "axum", "bs58", "config", + "dashmap", "dirs", "futures", "miniscript", diff --git a/stratum-apps/Cargo.lock b/stratum-apps/Cargo.lock index d50fe3b5a..7b9ae297d 100644 --- a/stratum-apps/Cargo.lock +++ b/stratum-apps/Cargo.lock @@ -588,6 +588,20 @@ dependencies = [ "cipher", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derive_arbitrary" version = "1.4.2" @@ -2053,6 +2067,7 @@ dependencies = [ "base64 0.21.7", "bs58", "config", + "dashmap", "dirs", "futures", "hex", diff --git a/stratum-apps/Cargo.toml b/stratum-apps/Cargo.toml index 9ba788d7b..353df06d0 100644 --- a/stratum-apps/Cargo.toml +++ b/stratum-apps/Cargo.toml @@ -55,6 +55,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = ["axum"], optional = true } # Common external dependencies that roles always need ext-config = { version = "0.14.0", features = ["toml"], package = "config" } shellexpand = "3.1.1" +dashmap = "6.1.0" [features] default = ["network", "config", "std"] diff --git a/stratum-apps/src/lib.rs b/stratum-apps/src/lib.rs index d1875d268..0c3419eda 100644 --- a/stratum-apps/src/lib.rs +++ b/stratum-apps/src/lib.rs @@ -79,5 +79,8 @@ pub mod coinbase_output_constraints; /// Fallback coordinator pub mod fallback_coordinator; +/// Share synchronous primitives +pub mod shared; + /// Shared async channel cleanup helpers. pub mod channel_utils; diff --git a/stratum-apps/src/shared.rs b/stratum-apps/src/shared.rs new file mode 100644 index 000000000..525752c19 --- /dev/null +++ b/stratum-apps/src/shared.rs @@ -0,0 +1,254 @@ +use std::{ + hash::Hash, + sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard}, +}; + +use dashmap::DashMap; + +type SharedResult<'a, T, R> = Result>>; +type SharedReadResult<'a, T, R> = Result>>; +type SharedWriteResult<'a, T, R> = Result>>; + +/// Thread-safe shared mutable value using `Mutex` for exclusive access. +#[derive(Debug)] +pub struct Shared(Arc>); + +impl Clone for Shared { + fn clone(&self) -> Self { + Shared(Arc::clone(&self.0)) + } +} + +impl Shared { + /// Create a new shared value. + pub fn new(v: T) -> Self { + Shared(Arc::new(Mutex::new(v))) + } + + /// Execute a closure with mutable access to the inner value. + pub fn with(&self, f: F) -> SharedResult<'_, T, R> + where + F: FnOnce(&mut T) -> R, + { + let mut lock = self.0.lock()?; + Ok(f(&mut *lock)) + } + + /// Get a cloned snapshot of the value. + pub fn get(&self) -> SharedResult<'_, T, T> + where + T: Clone, + { + self.with(|v| v.clone()) + } + + /// Replace the inner value. + pub fn set(&self, value: T) -> SharedResult<'_, T, ()> { + self.with(|v| *v = value)?; + Ok(()) + } +} + +/// Thread-safe shared value using `RwLock` for concurrent reads and exclusive writes. +#[derive(Debug)] +pub struct SharedRw(Arc>); + +impl Clone for SharedRw { + fn clone(&self) -> Self { + SharedRw(Arc::clone(&self.0)) + } +} + +impl SharedRw { + /// Create a new shared value. + pub fn new(v: T) -> Self { + SharedRw(Arc::new(RwLock::new(v))) + } + + /// Execute a closure with read-only access. + pub fn read(&self, f: F) -> SharedReadResult<'_, T, R> + where + F: FnOnce(&T) -> R, + { + let guard = self.0.read()?; + Ok(f(&*guard)) + } + + /// Execute a closure with mutable access. + pub fn write(&self, f: F) -> SharedWriteResult<'_, T, R> + where + F: FnOnce(&mut T) -> R, + { + let mut guard = self.0.write()?; + Ok(f(&mut *guard)) + } + + /// Get a cloned snapshot of the value. + pub fn get(&self) -> SharedReadResult<'_, T, T> + where + T: Clone, + { + self.read(|v| v.clone()) + } + + /// Replace the inner value. + pub fn set(&self, value: T) -> SharedWriteResult<'_, T, ()> { + self.write(|v| *v = value)?; + Ok(()) + } +} + +/// Concurrent map wrapper over `DashMap` providing ergonomic scoped access. +pub struct SharedMap(Arc>); + +impl Clone for SharedMap { + fn clone(&self) -> Self { + SharedMap(Arc::clone(&self.0)) + } +} + +impl SharedMap { + /// Create a new concurrent map. + pub fn new() -> Self { + SharedMap(Arc::new(DashMap::new())) + } + + /// Read a value for a key while holding the entry lock. + pub fn with(&self, key: &K, f: F) -> Option + where + F: FnOnce(&V) -> R, + { + let guard = self.0.get(key)?; + Some(f(guard.value())) + } + + /// Mutate a value for a key while holding the entry lock. + pub fn with_mut(&self, key: &K, f: F) -> Option + where + F: FnOnce(&mut V) -> R, + { + let mut guard = self.0.get_mut(key)?; + Some(f(guard.value_mut())) + } + + /// Get a cloned snapshot of a value for a key. + pub fn get_cloned(&self, key: &K) -> Option + where + V: Clone, + { + self.0.get(key).map(|guard| guard.value().clone()) + } + + /// Collect a cloned snapshot of all entries. + pub fn snapshot(&self) -> Vec<(K, V)> + where + V: Clone, + { + self.0 + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect() + } + + /// Insert a key-value pair. + pub fn insert(&self, key: K, value: V) -> Option { + self.0.insert(key, value) + } + + /// Remove a key. + pub fn remove(&self, key: &K) -> Option<(K, V)> { + self.0.remove(key) + } + + /// Check if a key exists. + pub fn contains_key(&self, key: &K) -> bool { + self.0.contains_key(key) + } + + /// Collect all keys. + pub fn keys(&self) -> Vec + where + K: Clone, + { + self.0.iter().map(|e| e.key().clone()).collect() + } + + /// Number of entries. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Check if empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl Default for SharedMap { + fn default() -> Self { + Self::new() + } +} + +impl IntoIterator for &SharedMap { + type Item = (K, V); + type IntoIter = std::vec::IntoIter<(K, V)>; + + fn into_iter(self) -> Self::IntoIter { + self.snapshot().into_iter() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn shared_basic_usage() { + let v = Shared::new(10); + + v.with(|x| *x += 5).unwrap(); + assert_eq!(v.get().unwrap(), 15); + + v.set(42).unwrap(); + assert_eq!(v.get().unwrap(), 42); + } + + #[test] + fn shared_rw_usage() { + let v = SharedRw::new(100); + + let a = v.read(|x| *x).unwrap(); + let b = v.read(|x| *x).unwrap(); + assert_eq!(a, b); + + v.write(|x| *x += 1).unwrap(); + assert_eq!(v.get().unwrap(), 101); + } + + #[test] + fn shared_map_usage() { + let map = SharedMap::new(); + + map.insert("a", 1); + map.insert("b", 2); + + let read_val = map.with(&"a", |v| *v).unwrap(); + assert_eq!(read_val, 1); + + let val = map.get_cloned(&"a").unwrap(); + assert_eq!(val, 1); + + map.with_mut(&"a", |v| *v += 10).unwrap(); + assert_eq!(map.get_cloned(&"a").unwrap(), 11); + + let sum: i32 = (&map).into_iter().map(|(_, v)| v).sum(); + assert_eq!(sum, 13); + + let snapshot = map.snapshot(); + assert_eq!(snapshot.len(), 2); + + map.remove(&"a"); + assert!(!map.contains_key(&"a")); + } +}