From 571072d36af616aa1532d34d864f2d07b25db6b3 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Mon, 23 Mar 2026 12:52:49 +0530 Subject: [PATCH 1/4] add shared and sharedMap to stratum apps --- integration-tests/Cargo.lock | 1 + miner-apps/Cargo.lock | 1 + pool-apps/Cargo.lock | 1 + stratum-apps/Cargo.toml | 1 + stratum-apps/src/shared.rs | 184 +++++++++++++++++++++++++++++++++++ 5 files changed, 188 insertions(+) create mode 100644 stratum-apps/src/shared.rs diff --git a/integration-tests/Cargo.lock b/integration-tests/Cargo.lock index 067572280..c831bba88 100644 --- a/integration-tests/Cargo.lock +++ b/integration-tests/Cargo.lock @@ -3085,6 +3085,7 @@ dependencies = [ "axum", "bs58", "config", + "dashmap", "dirs", "futures", "miniscript", diff --git a/miner-apps/Cargo.lock b/miner-apps/Cargo.lock index ffee01e62..afe22073c 100644 --- a/miner-apps/Cargo.lock +++ b/miner-apps/Cargo.lock @@ -2438,6 +2438,7 @@ dependencies = [ "axum", "bs58", "config", + "dashmap", "dirs", "futures", "miniscript", diff --git a/pool-apps/Cargo.lock b/pool-apps/Cargo.lock index 8427c10b3..1bb32106f 100644 --- a/pool-apps/Cargo.lock +++ b/pool-apps/Cargo.lock @@ -2445,6 +2445,7 @@ dependencies = [ "axum", "bs58", "config", + "dashmap", "dirs", "futures", "miniscript", diff --git a/stratum-apps/Cargo.toml b/stratum-apps/Cargo.toml index 9ba788d7b..353df06d0 100644 --- a/stratum-apps/Cargo.toml +++ b/stratum-apps/Cargo.toml @@ -55,6 +55,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = ["axum"], optional = true } # Common external dependencies that roles always need ext-config = { version = "0.14.0", features = ["toml"], package = "config" } shellexpand = "3.1.1" +dashmap = "6.1.0" [features] default = ["network", "config", "std"] diff --git a/stratum-apps/src/shared.rs b/stratum-apps/src/shared.rs new file mode 100644 index 000000000..ba6a32bc7 --- /dev/null +++ b/stratum-apps/src/shared.rs @@ -0,0 +1,184 @@ +use std::{ + hash::Hash, + sync::{Arc, RwLock}, +}; + +use dashmap::DashMap; +use std::sync::Mutex; + +#[derive(Debug)] +pub struct Shared(Arc>); + +impl Clone for Shared { + fn clone(&self) -> Self { + Shared(Arc::clone(&self.0)) + } +} + +impl Shared { + pub fn new(v: T) -> Self { + Shared(Arc::new(Mutex::new(v))) + } + + pub fn with(&self, f: F) -> R + where + F: FnOnce(&mut T) -> R, + { + let mut lock = self.0.lock().unwrap(); + f(&mut *lock) + } + + pub fn get(&self) -> T + where + T: Clone, + { + self.with(|v| v.clone()) + } + + pub fn set(&self, value: T) { + self.with(|v| *v = value); + } +} + +#[derive(Debug)] +pub struct SharedRw(Arc>); + +impl Clone for SharedRw { + fn clone(&self) -> Self { + SharedRw(Arc::clone(&self.0)) + } +} + +impl SharedRw { + pub fn new(v: T) -> Self { + SharedRw(Arc::new(RwLock::new(v))) + } + + pub fn read(&self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + let guard = self.0.read().unwrap(); + f(&*guard) + } + + pub fn write(&self, f: F) -> R + where + F: FnOnce(&mut T) -> R, + { + let mut guard = self.0.write().unwrap(); + f(&mut *guard) + } + + pub fn get(&self) -> T + where + T: Clone, + { + self.read(|v| v.clone()) + } + + pub fn set(&self, value: T) { + self.write(|v| *v = value); + } +} + +pub struct SharedMap(Arc>); + +impl Clone for SharedMap { + fn clone(&self) -> Self { + SharedMap(Arc::clone(&self.0)) + } +} + +impl SharedMap { + pub fn new() -> Self { + SharedMap(Arc::new(DashMap::new())) + } + + pub fn with(&self, key: &K, f: F) -> Option + where + F: FnOnce(&V) -> R, + { + let guard = self.0.get(key)?; + let result = f(guard.value()); + drop(guard); + Some(result) + } + + pub fn with_mut(&self, key: &K, f: F) -> Option + where + F: FnOnce(&mut V) -> R, + { + let mut guard = self.0.get_mut(key)?; + let result = f(guard.value_mut()); + Some(result) + } + + pub fn for_each(&self, mut f: F) + where + F: FnMut(K, &V) -> Ret, + { + for entry in self.0.iter() { + f(entry.key().clone(), entry.value()); + } + } + + pub fn for_each_mut(&self, mut f: F) + where + F: FnMut(K, &mut V) -> Ret, + { + for mut entry in self.0.iter_mut() { + f(entry.key().clone(), entry.value_mut()); + } + } + + pub fn try_for_each_mut(&self, mut f: F) -> Result<(), E> + where + F: FnMut(K, &mut V) -> Result<(), E>, + { + for mut entry in self.0.iter_mut() { + f(entry.key().clone(), entry.value_mut())?; + } + Ok(()) + } + + pub fn insert(&self, key: K, value: V) -> Option { + self.0.insert(key, value) + } + + pub fn remove(&self, key: &K) -> Option<(K, V)> { + self.0.remove(key) + } + + pub fn contains_key(&self, key: &K) -> bool { + self.0.contains_key(key) + } + + pub fn retain(&self, f: F) + where + F: FnMut(&K, &mut V) -> bool, + { + self.0.retain(f); + } + + pub fn keys(&self) -> Vec + where + K: Clone, + { + self.0.iter().map(|e| e.key().clone()).collect() + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl Default for SharedMap { + fn default() -> Self { + Self::new() + } +} From cdf2a9528b5585a571e20c1a23ee7f77d58578f8 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Tue, 24 Mar 2026 11:50:23 +0530 Subject: [PATCH 2/4] add doc and unit test to explain the usage better --- stratum-apps/Cargo.lock | 15 ++++++++ stratum-apps/src/lib.rs | 3 ++ stratum-apps/src/shared.rs | 74 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+) diff --git a/stratum-apps/Cargo.lock b/stratum-apps/Cargo.lock index d50fe3b5a..7b9ae297d 100644 --- a/stratum-apps/Cargo.lock +++ b/stratum-apps/Cargo.lock @@ -588,6 +588,20 @@ dependencies = [ "cipher", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derive_arbitrary" version = "1.4.2" @@ -2053,6 +2067,7 @@ dependencies = [ "base64 0.21.7", "bs58", "config", + "dashmap", "dirs", "futures", "hex", diff --git a/stratum-apps/src/lib.rs b/stratum-apps/src/lib.rs index d1875d268..0c3419eda 100644 --- a/stratum-apps/src/lib.rs +++ b/stratum-apps/src/lib.rs @@ -79,5 +79,8 @@ pub mod coinbase_output_constraints; /// Fallback coordinator pub mod fallback_coordinator; +/// Share synchronous primitives +pub mod shared; + /// Shared async channel cleanup helpers. pub mod channel_utils; diff --git a/stratum-apps/src/shared.rs b/stratum-apps/src/shared.rs index ba6a32bc7..6aefb1bad 100644 --- a/stratum-apps/src/shared.rs +++ b/stratum-apps/src/shared.rs @@ -6,6 +6,7 @@ use std::{ use dashmap::DashMap; use std::sync::Mutex; +/// Thread-safe shared mutable value using `Mutex` for exclusive access. #[derive(Debug)] pub struct Shared(Arc>); @@ -16,10 +17,12 @@ impl Clone for Shared { } impl Shared { + /// Create a new shared value. pub fn new(v: T) -> Self { Shared(Arc::new(Mutex::new(v))) } + /// Execute a closure with mutable access to the inner value. pub fn with(&self, f: F) -> R where F: FnOnce(&mut T) -> R, @@ -28,6 +31,7 @@ impl Shared { f(&mut *lock) } + /// Get a cloned snapshot of the value. pub fn get(&self) -> T where T: Clone, @@ -35,11 +39,13 @@ impl Shared { self.with(|v| v.clone()) } + /// Replace the inner value. pub fn set(&self, value: T) { self.with(|v| *v = value); } } +/// Thread-safe shared value using `RwLock` for concurrent reads and exclusive writes. #[derive(Debug)] pub struct SharedRw(Arc>); @@ -50,10 +56,12 @@ impl Clone for SharedRw { } impl SharedRw { + /// Create a new shared value. pub fn new(v: T) -> Self { SharedRw(Arc::new(RwLock::new(v))) } + /// Execute a closure with read-only access. pub fn read(&self, f: F) -> R where F: FnOnce(&T) -> R, @@ -62,6 +70,7 @@ impl SharedRw { f(&*guard) } + /// Execute a closure with mutable access. pub fn write(&self, f: F) -> R where F: FnOnce(&mut T) -> R, @@ -70,6 +79,7 @@ impl SharedRw { f(&mut *guard) } + /// Get a cloned snapshot of the value. pub fn get(&self) -> T where T: Clone, @@ -77,11 +87,13 @@ impl SharedRw { self.read(|v| v.clone()) } + /// Replace the inner value. pub fn set(&self, value: T) { self.write(|v| *v = value); } } +/// Concurrent map wrapper over `DashMap` providing ergonomic scoped access. pub struct SharedMap(Arc>); impl Clone for SharedMap { @@ -91,10 +103,12 @@ impl Clone for SharedMap { } impl SharedMap { + /// Create a new concurrent map. pub fn new() -> Self { SharedMap(Arc::new(DashMap::new())) } + /// Read a value for a key using a closure. pub fn with(&self, key: &K, f: F) -> Option where F: FnOnce(&V) -> R, @@ -105,6 +119,7 @@ impl SharedMap { Some(result) } + /// Mutate a value for a key using a closure. pub fn with_mut(&self, key: &K, f: F) -> Option where F: FnOnce(&mut V) -> R, @@ -114,6 +129,7 @@ impl SharedMap { Some(result) } + /// Iterate over all entries immutably. pub fn for_each(&self, mut f: F) where F: FnMut(K, &V) -> Ret, @@ -123,6 +139,7 @@ impl SharedMap { } } + /// Iterate over all entries mutably. pub fn for_each_mut(&self, mut f: F) where F: FnMut(K, &mut V) -> Ret, @@ -132,6 +149,7 @@ impl SharedMap { } } + /// Fallible mutable iteration over all entries. pub fn try_for_each_mut(&self, mut f: F) -> Result<(), E> where F: FnMut(K, &mut V) -> Result<(), E>, @@ -142,18 +160,22 @@ impl SharedMap { Ok(()) } + /// Insert a key-value pair. pub fn insert(&self, key: K, value: V) -> Option { self.0.insert(key, value) } + /// Remove a key. pub fn remove(&self, key: &K) -> Option<(K, V)> { self.0.remove(key) } + /// Check if a key exists. pub fn contains_key(&self, key: &K) -> bool { self.0.contains_key(key) } + /// Retain entries matching predicate. pub fn retain(&self, f: F) where F: FnMut(&K, &mut V) -> bool, @@ -161,6 +183,7 @@ impl SharedMap { self.0.retain(f); } + /// Collect all keys. pub fn keys(&self) -> Vec where K: Clone, @@ -168,10 +191,12 @@ impl SharedMap { self.0.iter().map(|e| e.key().clone()).collect() } + /// Number of entries. pub fn len(&self) -> usize { self.0.len() } + /// Check if empty. pub fn is_empty(&self) -> bool { self.0.is_empty() } @@ -182,3 +207,52 @@ impl Default for SharedMap { Self::new() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn shared_basic_usage() { + let v = Shared::new(10); + + v.with(|x| *x += 5); + assert_eq!(v.get(), 15); + + v.set(42); + assert_eq!(v.get(), 42); + } + + #[test] + fn shared_rw_usage() { + let v = SharedRw::new(100); + + let a = v.read(|x| *x); + let b = v.read(|x| *x); + assert_eq!(a, b); + + v.write(|x| *x += 1); + assert_eq!(v.get(), 101); + } + + #[test] + fn shared_map_usage() { + let map = SharedMap::new(); + + map.insert("a", 1); + map.insert("b", 2); + + let val = map.with(&"a", |v| *v).unwrap(); + assert_eq!(val, 1); + + map.with_mut(&"a", |v| *v += 10); + assert_eq!(map.with(&"a", |v| *v).unwrap(), 11); + + let mut sum = 0; + map.for_each(|_, v| sum += v); + assert_eq!(sum, 13); + + map.remove(&"a"); + assert!(!map.contains_key(&"a")); + } +} From 1a2a65105537db000ab545ffce96130833f1cc1b Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Wed, 13 May 2026 20:25:24 +0530 Subject: [PATCH 3/4] Add Results in Rwlock and Mutex shared API's --- stratum-apps/src/shared.rs | 39 +++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/stratum-apps/src/shared.rs b/stratum-apps/src/shared.rs index 6aefb1bad..894b8d01f 100644 --- a/stratum-apps/src/shared.rs +++ b/stratum-apps/src/shared.rs @@ -1,10 +1,13 @@ use std::{ hash::Hash, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; use dashmap::DashMap; -use std::sync::Mutex; + +type SharedResult<'a, T, R> = Result>>; +type SharedReadResult<'a, T, R> = Result>>; +type SharedWriteResult<'a, T, R> = Result>>; /// Thread-safe shared mutable value using `Mutex` for exclusive access. #[derive(Debug)] @@ -23,16 +26,16 @@ impl Shared { } /// Execute a closure with mutable access to the inner value. - pub fn with(&self, f: F) -> R + pub fn with(&self, f: F) -> SharedResult<'_, T, R> where F: FnOnce(&mut T) -> R, { - let mut lock = self.0.lock().unwrap(); - f(&mut *lock) + let mut lock = self.0.lock()?; + Ok(f(&mut *lock)) } /// Get a cloned snapshot of the value. - pub fn get(&self) -> T + pub fn get(&self) -> SharedResult<'_, T, T> where T: Clone, { @@ -40,8 +43,9 @@ impl Shared { } /// Replace the inner value. - pub fn set(&self, value: T) { - self.with(|v| *v = value); + pub fn set(&self, value: T) -> SharedResult<'_, T, ()> { + self.with(|v| *v = value)?; + Ok(()) } } @@ -62,25 +66,25 @@ impl SharedRw { } /// Execute a closure with read-only access. - pub fn read(&self, f: F) -> R + pub fn read(&self, f: F) -> SharedReadResult<'_, T, R> where F: FnOnce(&T) -> R, { - let guard = self.0.read().unwrap(); - f(&*guard) + let guard = self.0.read()?; + Ok(f(&*guard)) } /// Execute a closure with mutable access. - pub fn write(&self, f: F) -> R + pub fn write(&self, f: F) -> SharedWriteResult<'_, T, R> where F: FnOnce(&mut T) -> R, { - let mut guard = self.0.write().unwrap(); - f(&mut *guard) + let mut guard = self.0.write()?; + Ok(f(&mut *guard)) } /// Get a cloned snapshot of the value. - pub fn get(&self) -> T + pub fn get(&self) -> SharedReadResult<'_, T, T> where T: Clone, { @@ -88,8 +92,9 @@ impl SharedRw { } /// Replace the inner value. - pub fn set(&self, value: T) { - self.write(|v| *v = value); + pub fn set(&self, value: T) -> SharedWriteResult<'_, T, ()> { + self.write(|v| *v = value)?; + Ok(()) } } From 5a1c8a735ce8c0ba1c27236f33283e0fe37f86f7 Mon Sep 17 00:00:00 2001 From: bit-aloo Date: Wed, 13 May 2026 20:26:31 +0530 Subject: [PATCH 4/4] provide safer dashmap method, so no one can abuse it --- stratum-apps/src/shared.rs | 93 +++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 51 deletions(-) diff --git a/stratum-apps/src/shared.rs b/stratum-apps/src/shared.rs index 894b8d01f..525752c19 100644 --- a/stratum-apps/src/shared.rs +++ b/stratum-apps/src/shared.rs @@ -113,56 +113,41 @@ impl SharedMap { SharedMap(Arc::new(DashMap::new())) } - /// Read a value for a key using a closure. + /// Read a value for a key while holding the entry lock. pub fn with(&self, key: &K, f: F) -> Option where F: FnOnce(&V) -> R, { let guard = self.0.get(key)?; - let result = f(guard.value()); - drop(guard); - Some(result) + Some(f(guard.value())) } - /// Mutate a value for a key using a closure. + /// Mutate a value for a key while holding the entry lock. pub fn with_mut(&self, key: &K, f: F) -> Option where F: FnOnce(&mut V) -> R, { let mut guard = self.0.get_mut(key)?; - let result = f(guard.value_mut()); - Some(result) + Some(f(guard.value_mut())) } - /// Iterate over all entries immutably. - pub fn for_each(&self, mut f: F) + /// Get a cloned snapshot of a value for a key. + pub fn get_cloned(&self, key: &K) -> Option where - F: FnMut(K, &V) -> Ret, + V: Clone, { - for entry in self.0.iter() { - f(entry.key().clone(), entry.value()); - } + self.0.get(key).map(|guard| guard.value().clone()) } - /// Iterate over all entries mutably. - pub fn for_each_mut(&self, mut f: F) + /// Collect a cloned snapshot of all entries. + pub fn snapshot(&self) -> Vec<(K, V)> where - F: FnMut(K, &mut V) -> Ret, + V: Clone, { - for mut entry in self.0.iter_mut() { - f(entry.key().clone(), entry.value_mut()); - } - } - - /// Fallible mutable iteration over all entries. - pub fn try_for_each_mut(&self, mut f: F) -> Result<(), E> - where - F: FnMut(K, &mut V) -> Result<(), E>, - { - for mut entry in self.0.iter_mut() { - f(entry.key().clone(), entry.value_mut())?; - } - Ok(()) + self.0 + .iter() + .map(|entry| (entry.key().clone(), entry.value().clone())) + .collect() } /// Insert a key-value pair. @@ -180,14 +165,6 @@ impl SharedMap { self.0.contains_key(key) } - /// Retain entries matching predicate. - pub fn retain(&self, f: F) - where - F: FnMut(&K, &mut V) -> bool, - { - self.0.retain(f); - } - /// Collect all keys. pub fn keys(&self) -> Vec where @@ -213,6 +190,15 @@ impl Default for SharedMap { } } +impl IntoIterator for &SharedMap { + type Item = (K, V); + type IntoIter = std::vec::IntoIter<(K, V)>; + + fn into_iter(self) -> Self::IntoIter { + self.snapshot().into_iter() + } +} + #[cfg(test)] mod tests { use super::*; @@ -221,23 +207,23 @@ mod tests { fn shared_basic_usage() { let v = Shared::new(10); - v.with(|x| *x += 5); - assert_eq!(v.get(), 15); + v.with(|x| *x += 5).unwrap(); + assert_eq!(v.get().unwrap(), 15); - v.set(42); - assert_eq!(v.get(), 42); + v.set(42).unwrap(); + assert_eq!(v.get().unwrap(), 42); } #[test] fn shared_rw_usage() { let v = SharedRw::new(100); - let a = v.read(|x| *x); - let b = v.read(|x| *x); + let a = v.read(|x| *x).unwrap(); + let b = v.read(|x| *x).unwrap(); assert_eq!(a, b); - v.write(|x| *x += 1); - assert_eq!(v.get(), 101); + v.write(|x| *x += 1).unwrap(); + assert_eq!(v.get().unwrap(), 101); } #[test] @@ -247,16 +233,21 @@ mod tests { map.insert("a", 1); map.insert("b", 2); - let val = map.with(&"a", |v| *v).unwrap(); + let read_val = map.with(&"a", |v| *v).unwrap(); + assert_eq!(read_val, 1); + + let val = map.get_cloned(&"a").unwrap(); assert_eq!(val, 1); - map.with_mut(&"a", |v| *v += 10); - assert_eq!(map.with(&"a", |v| *v).unwrap(), 11); + map.with_mut(&"a", |v| *v += 10).unwrap(); + assert_eq!(map.get_cloned(&"a").unwrap(), 11); - let mut sum = 0; - map.for_each(|_, v| sum += v); + let sum: i32 = (&map).into_iter().map(|(_, v)| v).sum(); assert_eq!(sum, 13); + let snapshot = map.snapshot(); + assert_eq!(snapshot.len(), 2); + map.remove(&"a"); assert!(!map.contains_key(&"a")); }