Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration-tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions miner-apps/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pool-apps/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions stratum-apps/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions stratum-apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = ["axum"], optional = true }
# Common external dependencies that roles always need
ext-config = { version = "0.14.0", features = ["toml"], package = "config" }
shellexpand = "3.1.1"
dashmap = "6.1.0"

[features]
default = ["network", "config", "std"]
Expand Down
3 changes: 3 additions & 0 deletions stratum-apps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,8 @@ pub mod coinbase_output_constraints;
/// Fallback coordinator
pub mod fallback_coordinator;

/// Share synchronous primitives
pub mod shared;

/// Shared async channel cleanup helpers.
pub mod channel_utils;
254 changes: 254 additions & 0 deletions stratum-apps/src/shared.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use std::{
hash::Hash,
sync::{Arc, Mutex, MutexGuard, PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard},
};

use dashmap::DashMap;

type SharedResult<'a, T, R> = Result<R, PoisonError<MutexGuard<'a, T>>>;
type SharedReadResult<'a, T, R> = Result<R, PoisonError<RwLockReadGuard<'a, T>>>;
type SharedWriteResult<'a, T, R> = Result<R, PoisonError<RwLockWriteGuard<'a, T>>>;

/// Thread-safe shared mutable value using `Mutex` for exclusive access.
#[derive(Debug)]
pub struct Shared<T>(Arc<Mutex<T>>);

impl<T> Clone for Shared<T> {
fn clone(&self) -> Self {
Shared(Arc::clone(&self.0))
}
}

impl<T> Shared<T> {
/// Create a new shared value.
pub fn new(v: T) -> Self {
Shared(Arc::new(Mutex::new(v)))
}

/// Execute a closure with mutable access to the inner value.
pub fn with<F, R>(&self, f: F) -> SharedResult<'_, T, R>
where
F: FnOnce(&mut T) -> R,
{
let mut lock = self.0.lock()?;
Ok(f(&mut *lock))
}

/// Get a cloned snapshot of the value.
pub fn get(&self) -> SharedResult<'_, T, T>
where
T: Clone,
{
self.with(|v| v.clone())
}

/// Replace the inner value.
pub fn set(&self, value: T) -> SharedResult<'_, T, ()> {
self.with(|v| *v = value)?;
Ok(())
}
}

/// Thread-safe shared value using `RwLock` for concurrent reads and exclusive writes.
#[derive(Debug)]
pub struct SharedRw<T>(Arc<RwLock<T>>);

impl<T> Clone for SharedRw<T> {
fn clone(&self) -> Self {
SharedRw(Arc::clone(&self.0))
}
}

impl<T> SharedRw<T> {
/// Create a new shared value.
pub fn new(v: T) -> Self {
SharedRw(Arc::new(RwLock::new(v)))
}

/// Execute a closure with read-only access.
pub fn read<F, R>(&self, f: F) -> SharedReadResult<'_, T, R>
where
F: FnOnce(&T) -> R,
{
let guard = self.0.read()?;
Ok(f(&*guard))
}

/// Execute a closure with mutable access.
pub fn write<F, R>(&self, f: F) -> SharedWriteResult<'_, T, R>
where
F: FnOnce(&mut T) -> R,
{
let mut guard = self.0.write()?;
Ok(f(&mut *guard))
}

/// Get a cloned snapshot of the value.
pub fn get(&self) -> SharedReadResult<'_, T, T>
where
T: Clone,
{
self.read(|v| v.clone())
}

/// Replace the inner value.
pub fn set(&self, value: T) -> SharedWriteResult<'_, T, ()> {
self.write(|v| *v = value)?;
Ok(())
}
}

/// Concurrent map wrapper over `DashMap` providing ergonomic scoped access.
pub struct SharedMap<K: Eq + Clone + Hash, V>(Arc<DashMap<K, V>>);

impl<K: Eq + Clone + Hash, V> Clone for SharedMap<K, V> {
fn clone(&self) -> Self {
SharedMap(Arc::clone(&self.0))
}
}

impl<K: Eq + Hash + Clone, V> SharedMap<K, V> {
/// Create a new concurrent map.
pub fn new() -> Self {
SharedMap(Arc::new(DashMap::new()))
}

/// Read a value for a key while holding the entry lock.
pub fn with<F, R>(&self, key: &K, f: F) -> Option<R>
where
F: FnOnce(&V) -> R,
{
let guard = self.0.get(key)?;
Some(f(guard.value()))
}

/// Mutate a value for a key while holding the entry lock.
pub fn with_mut<F, R>(&self, key: &K, f: F) -> Option<R>
where
F: FnOnce(&mut V) -> R,
{
let mut guard = self.0.get_mut(key)?;
Some(f(guard.value_mut()))
}

/// Get a cloned snapshot of a value for a key.
pub fn get_cloned(&self, key: &K) -> Option<V>
where
V: Clone,
{
self.0.get(key).map(|guard| guard.value().clone())
}

/// Collect a cloned snapshot of all entries.
pub fn snapshot(&self) -> Vec<(K, V)>
where
V: Clone,
{
self.0
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}

/// Insert a key-value pair.
pub fn insert(&self, key: K, value: V) -> Option<V> {
self.0.insert(key, value)
}

/// Remove a key.
pub fn remove(&self, key: &K) -> Option<(K, V)> {
self.0.remove(key)
}

/// Check if a key exists.
pub fn contains_key(&self, key: &K) -> bool {
self.0.contains_key(key)
}

/// Collect all keys.
pub fn keys(&self) -> Vec<K>
where
K: Clone,
{
self.0.iter().map(|e| e.key().clone()).collect()
}

/// Number of entries.
pub fn len(&self) -> usize {
self.0.len()
}

/// Check if empty.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

impl<K: Eq + Hash + Clone, V> Default for SharedMap<K, V> {
fn default() -> Self {
Self::new()
}
}

impl<K: Eq + Hash + Clone, V: Clone> IntoIterator for &SharedMap<K, V> {
type Item = (K, V);
type IntoIter = std::vec::IntoIter<(K, V)>;

fn into_iter(self) -> Self::IntoIter {
self.snapshot().into_iter()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn shared_basic_usage() {
let v = Shared::new(10);

v.with(|x| *x += 5).unwrap();
assert_eq!(v.get().unwrap(), 15);

v.set(42).unwrap();
assert_eq!(v.get().unwrap(), 42);
}

#[test]
fn shared_rw_usage() {
let v = SharedRw::new(100);

let a = v.read(|x| *x).unwrap();
let b = v.read(|x| *x).unwrap();
assert_eq!(a, b);

v.write(|x| *x += 1).unwrap();
assert_eq!(v.get().unwrap(), 101);
}

#[test]
fn shared_map_usage() {
let map = SharedMap::new();

map.insert("a", 1);
map.insert("b", 2);

let read_val = map.with(&"a", |v| *v).unwrap();
assert_eq!(read_val, 1);

let val = map.get_cloned(&"a").unwrap();
assert_eq!(val, 1);

map.with_mut(&"a", |v| *v += 10).unwrap();
assert_eq!(map.get_cloned(&"a").unwrap(), 11);

let sum: i32 = (&map).into_iter().map(|(_, v)| v).sum();
assert_eq!(sum, 13);

let snapshot = map.snapshot();
assert_eq!(snapshot.len(), 2);

map.remove(&"a");
assert!(!map.contains_key(&"a"));
}
}
Loading