diff --git a/protocols/v2/channels-sv2/src/client/extended.rs b/protocols/v2/channels-sv2/src/client/extended.rs index d0b8a076de..0f78572cbd 100644 --- a/protocols/v2/channels-sv2/src/client/extended.rs +++ b/protocols/v2/channels-sv2/src/client/extended.rs @@ -13,6 +13,7 @@ use crate::{ share_accounting::{ShareAccounting, ShareValidationError, ShareValidationResult}, }, merkle_root::merkle_root_from_path, + persistence::PersistenceHandler, target::{bytes_to_hex, u256_to_block_hash}, MAX_EXTRANONCE_PREFIX_LEN, }; @@ -59,7 +60,7 @@ pub type ExtendedJob<'a> = (NewExtendedMiningJob<'a>, Vec); /// - Share accounting for the channel (as tracked by the client). /// - The channel's current chain tip. #[derive(Clone, Debug)] -pub struct ExtendedChannel<'a> { +pub struct ExtendedChannel<'a, P> { channel_id: u32, user_identity: String, extranonce_prefix: Vec, @@ -74,12 +75,16 @@ pub struct ExtendedChannel<'a> { past_jobs: HashMap>, // stale jobs are indexed with job_id (u32) stale_jobs: HashMap>, - share_accounting: ShareAccounting, + share_accounting: ShareAccounting

, chain_tip: Option, } -impl<'a> ExtendedChannel<'a> { +impl<'a, P> ExtendedChannel<'a, P> +where + P: PersistenceHandler, +{ /// Constructs a new [`ExtendedChannel`]. + #[allow(clippy::too_many_arguments)] pub fn new( channel_id: u32, user_identity: String, @@ -88,6 +93,7 @@ impl<'a> ExtendedChannel<'a> { nominal_hashrate: f32, version_rolling: bool, rollable_extranonce_size: u16, + persistence: P, ) -> Self { Self { channel_id, @@ -101,7 +107,7 @@ impl<'a> ExtendedChannel<'a> { active_job: None, past_jobs: HashMap::new(), stale_jobs: HashMap::new(), - share_accounting: ShareAccounting::new(), + share_accounting: ShareAccounting::new(persistence), chain_tip: None, } } @@ -202,7 +208,7 @@ impl<'a> ExtendedChannel<'a> { } /// Returns a reference to the [`ShareAccounting`] for this channel. - pub fn get_share_accounting(&self) -> &ShareAccounting { + pub fn get_share_accounting(&self) -> &ShareAccounting

{ &self.share_accounting } @@ -533,9 +539,12 @@ impl<'a> ExtendedChannel<'a> { // check if a block was found if network_target.is_met_by(hash) { self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), share.sequence_number, hash.to_raw_hash(), + true, ); return Ok(ShareValidationResult::BlockFound); } @@ -547,13 +556,17 @@ impl<'a> ExtendedChannel<'a> { } self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), share.sequence_number, hash.to_raw_hash(), + false, ); // update the best diff - self.share_accounting.update_best_diff(hash_as_diff); + self.share_accounting + .update_best_diff(self.channel_id, hash_as_diff); return Ok(ShareValidationResult::Valid); } @@ -564,9 +577,12 @@ impl<'a> ExtendedChannel<'a> { #[cfg(test)] mod tests { - use crate::client::{ - extended::ExtendedChannel, - share_accounting::{ShareValidationError, ShareValidationResult}, + use crate::{ + client::{ + extended::ExtendedChannel, + share_accounting::{ShareValidationError, ShareValidationResult}, + }, + persistence::{Persistence, PersistenceHandler, ShareAccountingEvent}, }; use binary_sv2::Sv2Option; use bitcoin::Target; @@ -575,6 +591,16 @@ mod tests { }; use std::convert::TryInto; + /// Unit-like type for persistence in tests + #[derive(Debug, Clone)] + struct TestPersistence; + + impl PersistenceHandler for TestPersistence { + fn persist_event(&self, _event: ShareAccountingEvent) { + // No-op for tests + } + } + #[test] fn test_future_job_activation_flow() { let channel_id = 1; @@ -589,7 +615,7 @@ mod tests { let version_rolling = true; let rollable_extranonce_size = 4u16; - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::>::new( channel_id, user_identity, extranonce_prefix.clone(), @@ -597,6 +623,7 @@ mod tests { nominal_hashrate, version_rolling, rollable_extranonce_size, + Persistence::default(), ); let future_job = NewExtendedMiningJob { @@ -671,7 +698,7 @@ mod tests { let version_rolling = true; let rollable_extranonce_size = 4u16; - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::>::new( channel_id, user_identity, extranonce_prefix.clone(), @@ -679,6 +706,7 @@ mod tests { nominal_hashrate, version_rolling, rollable_extranonce_size, + Persistence::default(), ); let ntime: u32 = 1746839905; @@ -744,7 +772,7 @@ mod tests { let version_rolling = true; let rollable_extranonce_size = 8u16; - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::>::new( channel_id, user_identity, extranonce_prefix.clone(), @@ -752,6 +780,7 @@ mod tests { nominal_hashrate, version_rolling, rollable_extranonce_size, + Persistence::default(), ); let future_job = NewExtendedMiningJob { @@ -836,7 +865,7 @@ mod tests { let version_rolling = true; let rollable_extranonce_size = 8u16; - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::>::new( channel_id, user_identity, extranonce_prefix.clone(), @@ -844,6 +873,7 @@ mod tests { nominal_hashrate, version_rolling, rollable_extranonce_size, + Persistence::default(), ); let future_job = NewExtendedMiningJob { @@ -931,7 +961,7 @@ mod tests { let version_rolling = true; let rollable_extranonce_size = 8u16; - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::>::new( channel_id, user_identity, extranonce_prefix.clone(), @@ -939,6 +969,7 @@ mod tests { nominal_hashrate, version_rolling, rollable_extranonce_size, + Persistence::default(), ); let future_job = NewExtendedMiningJob { diff --git a/protocols/v2/channels-sv2/src/client/share_accounting.rs b/protocols/v2/channels-sv2/src/client/share_accounting.rs index ee51074c30..956d43c431 100644 --- a/protocols/v2/channels-sv2/src/client/share_accounting.rs +++ b/protocols/v2/channels-sv2/src/client/share_accounting.rs @@ -5,6 +5,7 @@ //! are intended for use in Mining Clients. use super::HashSet; +use crate::persistence::{PersistenceHandler, ShareAccountingEvent}; use bitcoin::hashes::sha256d::Hash; /// The outcome of share validation, as seen by a Mining Client. @@ -46,33 +47,34 @@ pub enum ShareValidationError { /// Keeps statistics and state for shares submitted through the channel: /// - last received share's sequence number /// - total accepted shares -/// - cumulative work from accepted shares +/// - cumulative work from accepted shares (sum of share difficulties as floating point) /// - hashes of seen shares (for duplicate detection) /// - highest difficulty seen in accepted shares #[derive(Clone, Debug)] -pub struct ShareAccounting { +pub struct ShareAccounting

{ last_share_sequence_number: u32, shares_accepted: u32, - share_work_sum: u64, + share_work_sum: f64, seen_shares: HashSet, best_diff: f64, + persistence: P, } -impl Default for ShareAccounting { - fn default() -> Self { - Self::new() - } -} - -impl ShareAccounting { +impl

ShareAccounting

+where + P: PersistenceHandler, +{ /// Creates a new [`ShareAccounting`] instance, initializing all statistics to zero. - pub fn new() -> Self { + /// + /// `persistence` handles background persistence of share accounting events. + pub fn new(persistence: P) -> Self { Self { last_share_sequence_number: 0, shares_accepted: 0, - share_work_sum: 0, + share_work_sum: 0.0, seen_shares: HashSet::new(), best_diff: 0.0, + persistence, } } @@ -83,14 +85,31 @@ impl ShareAccounting { /// - Records share hash to detect duplicates. pub fn update_share_accounting( &mut self, - share_work: u64, + channel_id: u32, + user_identity: &str, + share_work: f64, share_sequence_number: u32, share_hash: Hash, + block_found: bool, ) { self.last_share_sequence_number = share_sequence_number; self.shares_accepted += 1; self.share_work_sum += share_work; self.seen_shares.insert(share_hash); + + // Persist the share accepted event + let event = ShareAccountingEvent::ShareAccepted { + channel_id, + user_identity: user_identity.to_string(), + share_work, + share_sequence_number, + share_hash, + total_shares_accepted: self.shares_accepted, + total_share_work_sum: self.share_work_sum, + timestamp: std::time::SystemTime::now(), + block_found, + }; + self.persistence.persist_event(event); } /// Clears the set of seen share hashes. @@ -99,6 +118,9 @@ impl ShareAccounting { /// to prevent unbounded memory growth. pub fn flush_seen_shares(&mut self) { self.seen_shares.clear(); + + // Ensure any pending persistence events are flushed + self.persistence.flush(); } /// Returns the sequence number of the last share received. @@ -112,7 +134,7 @@ impl ShareAccounting { } /// Returns the cumulative work of all accepted shares. - pub fn get_share_work_sum(&self) -> u64 { + pub fn get_share_work_sum(&self) -> f64 { self.share_work_sum } @@ -127,9 +149,19 @@ impl ShareAccounting { } /// Updates the best difficulty if the new difficulty is higher than the current best. - pub fn update_best_diff(&mut self, diff: f64) { + pub fn update_best_diff(&mut self, channel_id: u32, diff: f64) { + let previous_best_diff = self.best_diff; if diff > self.best_diff { self.best_diff = diff; + + // Persist the best difficulty updated event + let event = ShareAccountingEvent::BestDifficultyUpdated { + channel_id, + new_best_diff: diff, + previous_best_diff, + timestamp: std::time::SystemTime::now(), + }; + self.persistence.persist_event(event); } } } diff --git a/protocols/v2/channels-sv2/src/client/standard.rs b/protocols/v2/channels-sv2/src/client/standard.rs index df56ec2d9d..7864cd14c4 100644 --- a/protocols/v2/channels-sv2/src/client/standard.rs +++ b/protocols/v2/channels-sv2/src/client/standard.rs @@ -13,6 +13,7 @@ use crate::{ share_accounting::{ShareAccounting, ShareValidationError, ShareValidationResult}, }, merkle_root::merkle_root_from_path, + persistence::PersistenceHandler, target::{bytes_to_hex, u256_to_block_hash}, MAX_EXTRANONCE_PREFIX_LEN, }; @@ -43,7 +44,7 @@ use tracing::debug; /// - share accounting state /// - chain tip state #[derive(Debug, Clone)] -pub struct StandardChannel<'a> { +pub struct StandardChannel<'a, P> { channel_id: u32, user_identity: String, extranonce_prefix: Vec, @@ -53,11 +54,14 @@ pub struct StandardChannel<'a> { active_job: Option>, past_jobs: HashMap>, stale_jobs: HashMap>, - share_accounting: ShareAccounting, + share_accounting: ShareAccounting

, chain_tip: Option, } -impl<'a> StandardChannel<'a> { +impl<'a, P> StandardChannel<'a, P> +where + P: PersistenceHandler, +{ /// Creates a new [`StandardChannel`] instance with provided channel parameters. pub fn new( channel_id: u32, @@ -65,6 +69,7 @@ impl<'a> StandardChannel<'a> { extranonce_prefix: Vec, target: Target, nominal_hashrate: f32, + persistence: P, ) -> Self { Self { channel_id, @@ -76,7 +81,7 @@ impl<'a> StandardChannel<'a> { active_job: None, past_jobs: HashMap::new(), stale_jobs: HashMap::new(), - share_accounting: ShareAccounting::new(), + share_accounting: ShareAccounting::new(persistence), chain_tip: None, } } @@ -161,7 +166,7 @@ impl<'a> StandardChannel<'a> { } /// Returns the share accounting state for this channel. - pub fn get_share_accounting(&self) -> &ShareAccounting { + pub fn get_share_accounting(&self) -> &ShareAccounting

{ &self.share_accounting } @@ -330,9 +335,12 @@ impl<'a> StandardChannel<'a> { // check if a block was found if network_target.is_met_by(hash) { self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), share.sequence_number, hash.to_raw_hash(), + true, ); return Ok(ShareValidationResult::BlockFound); } @@ -344,13 +352,17 @@ impl<'a> StandardChannel<'a> { } self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), share.sequence_number, hash.to_raw_hash(), + false, ); // update the best diff - self.share_accounting.update_best_diff(hash_as_diff); + self.share_accounting + .update_best_diff(self.channel_id, hash_as_diff); return Ok(ShareValidationResult::Valid); } @@ -361,10 +373,23 @@ impl<'a> StandardChannel<'a> { #[cfg(test)] mod tests { - use crate::client::{ - share_accounting::{ShareValidationError, ShareValidationResult}, - standard::StandardChannel, + use crate::{ + client::{ + share_accounting::{ShareValidationError, ShareValidationResult}, + standard::StandardChannel, + }, + persistence::{Persistence, PersistenceHandler, ShareAccountingEvent}, }; + + /// Unit-like type for persistence in tests + #[derive(Debug, Clone)] + struct TestPersistence; + + impl PersistenceHandler for TestPersistence { + fn persist_event(&self, _event: ShareAccountingEvent) { + // No-op for tests + } + } use binary_sv2::Sv2Option; use bitcoin::Target; use mining_sv2::{NewMiningJob, SetNewPrevHash as SetNewPrevHashMp, SubmitSharesStandard}; @@ -381,12 +406,13 @@ mod tests { let target = Target::from_le_bytes([0xff; 32]); let nominal_hashrate = 1.0; - let mut channel = StandardChannel::new( + let mut channel = StandardChannel::>::new( channel_id, user_identity, extranonce_prefix, target, nominal_hashrate, + Persistence::default(), ); let future_job = NewMiningJob { @@ -441,12 +467,13 @@ mod tests { let target = Target::from_le_bytes([0xff; 32]); let nominal_hashrate = 1.0; - let mut channel = StandardChannel::new( + let mut channel = StandardChannel::>::new( channel_id, user_identity, extranonce_prefix, target, nominal_hashrate, + Persistence::default(), ); let ntime: u32 = 1746839905; @@ -489,12 +516,13 @@ mod tests { let target = Target::from_le_bytes([0xff; 32]); let nominal_hashrate = 1.0; - let mut channel = StandardChannel::new( + let mut channel = StandardChannel::>::new( channel_id, user_identity, extranonce_prefix, target, nominal_hashrate, + Persistence::default(), ); let future_job = NewMiningJob { @@ -562,12 +590,13 @@ mod tests { ]); let nominal_hashrate = 1.0; - let mut channel = StandardChannel::new( + let mut channel = StandardChannel::>::new( channel_id, user_identity, extranonce_prefix, target, nominal_hashrate, + Persistence::default(), ); let future_job = NewMiningJob { @@ -638,12 +667,13 @@ mod tests { ]); let nominal_hashrate = 1.0; - let mut channel = StandardChannel::new( + let mut channel = StandardChannel::>::new( channel_id, user_identity, extranonce_prefix, target, nominal_hashrate, + Persistence::default(), ); let future_job = NewMiningJob { diff --git a/protocols/v2/channels-sv2/src/lib.rs b/protocols/v2/channels-sv2/src/lib.rs index dc8feef62e..94e6317939 100644 --- a/protocols/v2/channels-sv2/src/lib.rs +++ b/protocols/v2/channels-sv2/src/lib.rs @@ -28,6 +28,7 @@ pub mod bip141; pub mod chain_tip; pub mod client; pub mod merkle_root; +pub mod persistence; pub mod target; pub mod vardiff; pub use vardiff::{classic::VardiffState, Vardiff}; diff --git a/protocols/v2/channels-sv2/src/persistence.rs b/protocols/v2/channels-sv2/src/persistence.rs new file mode 100644 index 0000000000..1cce23b546 --- /dev/null +++ b/protocols/v2/channels-sv2/src/persistence.rs @@ -0,0 +1,140 @@ +//! Share Accounting Persistence Abstraction. +//! +//! This module provides a trait-based abstraction for persisting share accounting data +//! without blocking critical mining operations. The design uses async channels to decouple +//! the hot path share processing from persistence operations. +//! +//! ## Design Goals +//! +//! - **Zero Hot Path Impact**: No `Result` types or blocking operations in share accounting methods +//! - **Async Channel Based**: Events are sent via channels for background persistence processing +//! - **Flexible Implementation**: Trait allows different persistence backends (database, file, +//! etc.) +//! +//! ## Usage +//! +//! The persistence system works by sending events through async channels when share accounting +//! state changes occur. Implementations of the `Persistence` trait handle these events in +//! background tasks without affecting mining performance. + +use bitcoin::hashes::sha256d::Hash; + +/// Events that can be persisted from share accounting operations. +/// +/// These events capture all the critical state changes that occur during share processing, +/// allowing persistence implementations to maintain complete historical records. +#[derive(Debug, Clone)] +pub enum ShareAccountingEvent { + /// A share was accepted and accounting was updated. + ShareAccepted { + /// The channel identifier (server-assigned for server channels, client-assigned for client + /// channels) + channel_id: u32, + /// User identity associated with the channel + user_identity: String, + /// Work value of the accepted share (difficulty as floating point) + share_work: f64, + /// Sequence number of the share + share_sequence_number: u32, + /// Hash of the accepted share (for duplicate detection) + share_hash: Hash, + /// Total shares accepted after this update + total_shares_accepted: u32, + /// Total work sum after this update (sum of all share difficulties) + total_share_work_sum: f64, + /// Timestamp when the event occurred + timestamp: std::time::SystemTime, + /// Block found? + block_found: bool, + }, + /// Best difficulty was updated for the channel. + BestDifficultyUpdated { + /// The channel identifier + channel_id: u32, + /// The new best difficulty + new_best_diff: f64, + /// The previous best difficulty + previous_best_diff: f64, + /// Timestamp when the update occurred + timestamp: std::time::SystemTime, + }, +} + +/// Trait for handling persistence of share accounting events. +/// +/// Implementations of this trait handle the actual persistence operations, +/// ensuring that persistence operations are non-blocking and can handle failures internally. +pub trait PersistenceHandler { + /// Sends a share accounting event for persistence. + /// + /// This method MUST be non-blocking and infallible from the caller's perspective. + /// Any persistence failures should be handled internally (e.g., logged, retried, etc.). + fn persist_event(&self, event: ShareAccountingEvent); + + /// Optional method to flush any pending events. + /// + /// This is a hint that the caller would like any buffered events to be processed + /// immediately, but implementations are free to ignore this if not applicable. + fn flush(&self) {} + + /// Optional method called when the persistence handler is being dropped. + /// + /// Implementations can use this for cleanup operations, but should not block. + fn shutdown(&self) {} +} + +/// Main persistence abstraction that handles enabled/disabled states. +#[derive(Debug, Clone)] +pub enum Persistence { + /// Persistence is enabled with the given handler + Enabled(T), + /// Persistence is disabled (no-op) + Disabled, +} + +impl Persistence { + /// Creates persistence state from an Option. + /// Some(persistence) becomes Enabled, None becomes Disabled. + pub fn new(persistence: Option) -> Self { + match persistence { + Some(p) => Self::Enabled(p), + None => Self::Disabled, + } + } +} + +impl Default for Persistence { + /// Default persistence state is Disabled (no persistence). + fn default() -> Self { + Self::Disabled + } +} + +impl PersistenceHandler for Persistence { + fn persist_event(&self, event: ShareAccountingEvent) { + match self { + Self::Enabled(persistence) => persistence.persist_event(event), + Self::Disabled => { + // No-op - persistence is disabled + } + } + } + + fn flush(&self) { + match self { + Self::Enabled(persistence) => persistence.flush(), + Self::Disabled => { + // No-op - persistence is disabled + } + } + } + + fn shutdown(&self) { + match self { + Self::Enabled(persistence) => persistence.shutdown(), + Self::Disabled => { + // No-op - persistence is disabled + } + } + } +} diff --git a/protocols/v2/channels-sv2/src/server/extended.rs b/protocols/v2/channels-sv2/src/server/extended.rs index bd20a0d59f..5c5f0a283a 100644 --- a/protocols/v2/channels-sv2/src/server/extended.rs +++ b/protocols/v2/channels-sv2/src/server/extended.rs @@ -42,6 +42,7 @@ use crate::{ chain_tip::ChainTip, merkle_root::merkle_root_from_path, + persistence::PersistenceHandler, server::{ error::ExtendedChannelError, jobs::{extended::ExtendedJob, factory::JobFactory, job_store::JobStore, JobOrigin}, @@ -83,7 +84,7 @@ use tracing::debug; /// - the channel's job factory /// - the channel's chain tip #[derive(Debug)] -pub struct ExtendedChannel<'a, J> +pub struct ExtendedChannel<'a, J, P> where J: JobStore>, { @@ -96,15 +97,16 @@ where nominal_hashrate: f32, job_store: J, job_factory: JobFactory, - share_accounting: ShareAccounting, + share_accounting: ShareAccounting

, expected_share_per_minute: f32, chain_tip: Option, phantom: PhantomData<&'a ()>, } -impl<'a, J> ExtendedChannel<'a, J> +impl<'a, J, P> ExtendedChannel<'a, J, P> where J: JobStore>, + P: PersistenceHandler, { /// Constructor of `ExtendedChannel` for a Sv2 Pool Server. /// Not meant for usage on a Sv2 Job Declaration Client. @@ -129,6 +131,7 @@ where expected_share_per_minute: f32, job_store: J, pool_tag_string: String, + persistence: P, ) -> Result { Self::new( channel_id, @@ -143,6 +146,7 @@ where job_store, Some(pool_tag_string), None, + persistence, ) } @@ -170,6 +174,7 @@ where job_store: J, pool_tag_string: Option, miner_tag_string: String, + persistence: P, ) -> Result { Self::new( channel_id, @@ -184,6 +189,7 @@ where job_store, pool_tag_string, Some(miner_tag_string), + persistence, ) } @@ -202,8 +208,9 @@ where job_store: J, pool_tag: Option, miner_tag: Option, + persistence: P, ) -> Result { - let target = + let target: Target = match hash_rate_to_target(nominal_hashrate.into(), expected_share_per_minute.into()) { Ok(target) => target, Err(_) => { @@ -212,8 +219,6 @@ where }; if target > max_target { - println!("target: {:?}", target.to_be_bytes()); - println!("max_target: {:?}", max_target.to_be_bytes()); return Err(ExtendedChannelError::RequestedMaxTargetOutOfRange); } @@ -244,7 +249,7 @@ where nominal_hashrate, job_store, job_factory: JobFactory::new(version_rolling_allowed, pool_tag, miner_tag), - share_accounting: ShareAccounting::new(share_batch_size), + share_accounting: ShareAccounting::new(share_batch_size, persistence), expected_share_per_minute, chain_tip: None, phantom: PhantomData, @@ -418,7 +423,7 @@ where self.job_store.get_past_jobs() } /// Returns a reference to the share accounting state for this channel. - pub fn get_share_accounting(&self) -> &ShareAccounting { + pub fn get_share_accounting(&self) -> &ShareAccounting

{ &self.share_accounting } @@ -673,9 +678,12 @@ where // check if a block was found if network_target.is_met_by(hash) { self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), share.sequence_number, hash.to_raw_hash(), + true, ); let mut coinbase = vec![]; @@ -704,13 +712,17 @@ where } self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), share.sequence_number, hash.to_raw_hash(), + false, ); // update the best diff - self.share_accounting.update_best_diff(hash_as_diff); + self.share_accounting + .update_best_diff(self.channel_id, hash_as_diff); let last_sequence_number = self.share_accounting.get_last_share_sequence_number(); let new_submits_accepted_count = self.share_accounting.get_shares_accepted(); @@ -737,6 +749,7 @@ where mod tests { use crate::{ chain_tip::ChainTip, + persistence::{Persistence, PersistenceHandler, ShareAccountingEvent}, server::{ error::ExtendedChannelError, extended::ExtendedChannel, @@ -744,6 +757,16 @@ mod tests { share_accounting::{ShareValidationError, ShareValidationResult}, }, }; + + /// Unit-like type for persistence in tests + #[derive(Debug, Clone)] + struct TestPersistence; + + impl PersistenceHandler for TestPersistence { + fn persist_event(&self, _event: ShareAccountingEvent) { + // No-op for tests + } + } use binary_sv2::Sv2Option; use bitcoin::{transaction::TxOut, Amount, ScriptBuf, Target}; use mining_sv2::{NewExtendedMiningJob, SubmitSharesExtended}; @@ -772,7 +795,7 @@ mod tests { let share_batch_size = 100; let job_store = DefaultJobStore::new(); - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix, @@ -785,6 +808,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); @@ -923,7 +947,7 @@ mod tests { let share_batch_size = 100; let job_store = DefaultJobStore::new(); - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix, @@ -936,6 +960,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); @@ -1043,7 +1068,7 @@ mod tests { let share_batch_size = 100; let job_store = DefaultJobStore::new(); - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix, @@ -1056,6 +1081,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); @@ -1121,7 +1147,7 @@ mod tests { let share_batch_size = 100; let job_store = DefaultJobStore::new(); - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix, @@ -1134,6 +1160,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); @@ -1230,7 +1257,7 @@ mod tests { let share_batch_size = 100; let job_store = DefaultJobStore::new(); - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix, @@ -1243,6 +1270,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); @@ -1342,7 +1370,7 @@ mod tests { let share_batch_size = 100; let job_store = DefaultJobStore::new(); - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix, @@ -1355,6 +1383,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); @@ -1469,7 +1498,7 @@ mod tests { let max_target = Target::from_le_bytes([0xff; 32]); // Create a channel with initial hashrate - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix, @@ -1482,6 +1511,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); @@ -1556,7 +1586,7 @@ mod tests { let share_batch_size = 100; let job_store = DefaultJobStore::new(); - let mut channel = ExtendedChannel::new( + let mut channel = ExtendedChannel::, Persistence>::new( channel_id, user_identity, extranonce_prefix.clone(), @@ -1569,6 +1599,7 @@ mod tests { job_store, None, None, + Persistence::default(), ) .unwrap(); diff --git a/protocols/v2/channels-sv2/src/server/share_accounting.rs b/protocols/v2/channels-sv2/src/server/share_accounting.rs index 85db09fde4..bf5c3f8f5d 100644 --- a/protocols/v2/channels-sv2/src/server/share_accounting.rs +++ b/protocols/v2/channels-sv2/src/server/share_accounting.rs @@ -18,6 +18,7 @@ //! Intended for use within mining server implementations that process SV2 share submissions and //! issue `SubmitShares.Success` messages. Not intended for use by mining clients. +use crate::persistence::{PersistenceHandler, ShareAccountingEvent}; use bitcoin::hashes::sha256d::Hash; use std::collections::HashSet; @@ -43,8 +44,8 @@ pub enum ShareValidationResult { /// Contains: /// - `last_sequence_number`: The sequence number of the last accepted share in the batch. /// - `new_submits_accepted_count`: The number of new shares accepted in this batch. - /// - `new_shares_sum`: The total work contributed by shares in this batch. - ValidWithAcknowledgement(u32, u32, u64), + /// - `new_shares_sum`: The total work contributed by shares in this batch (as floating point). + ValidWithAcknowledgement(u32, u32, f64), /// The share solves a block. /// Contains: /// - `template_id`: The template ID associated with the job, or `None` for custom jobs. @@ -81,27 +82,33 @@ pub enum ShareValidationError { /// This struct manages per-channel share statistics, batch acknowledgment, duplicate detection, /// and difficulty tracking. Only meant for usage on Mining Servers. #[derive(Clone, Debug)] -pub struct ShareAccounting { +pub struct ShareAccounting

{ last_share_sequence_number: u32, shares_accepted: u32, - share_work_sum: u64, + share_work_sum: f64, share_batch_size: usize, seen_shares: HashSet, best_diff: f64, + persistence: P, } -impl ShareAccounting { +impl

ShareAccounting

+where + P: PersistenceHandler, +{ /// Constructs a new `ShareAccounting` instance for a channel. /// /// `share_batch_size` controls how many accepted shares trigger a batch acknowledgment. - pub fn new(share_batch_size: usize) -> Self { + /// `persistence` handles background persistence of share accounting events. + pub fn new(share_batch_size: usize, persistence: P) -> Self { Self { last_share_sequence_number: 0, shares_accepted: 0, - share_work_sum: 0, + share_work_sum: 0.0, share_batch_size, seen_shares: HashSet::new(), best_diff: 0.0, + persistence, } } @@ -112,14 +119,31 @@ impl ShareAccounting { /// - Records the share hash to detect duplicates. pub fn update_share_accounting( &mut self, - share_work: u64, + channel_id: u32, + user_identity: &str, + share_work: f64, share_sequence_number: u32, share_hash: Hash, + block_found: bool, ) { self.last_share_sequence_number = share_sequence_number; self.shares_accepted += 1; self.share_work_sum += share_work; self.seen_shares.insert(share_hash); + + // Persist the share accepted event + let event = ShareAccountingEvent::ShareAccepted { + channel_id, + user_identity: user_identity.to_string(), + share_work, + share_sequence_number, + share_hash, + total_shares_accepted: self.shares_accepted, + total_share_work_sum: self.share_work_sum, + timestamp: std::time::SystemTime::now(), + block_found, + }; + self.persistence.persist_event(event); } /// Clears the set of seen share hashes. @@ -128,6 +152,9 @@ impl ShareAccounting { /// and allow new shares for the new tip. pub fn flush_seen_shares(&mut self) { self.seen_shares.clear(); + + // Ensure any pending persistence events are flushed + self.persistence.flush(); } /// Returns the sequence number of the last accepted share. @@ -141,7 +168,7 @@ impl ShareAccounting { } /// Returns the sum of work contributed by all accepted shares. - pub fn get_share_work_sum(&self) -> u64 { + pub fn get_share_work_sum(&self) -> f64 { self.share_work_sum } @@ -166,9 +193,19 @@ impl ShareAccounting { } /// Updates the best difficulty if the new value is higher. - pub fn update_best_diff(&mut self, diff: f64) { + pub fn update_best_diff(&mut self, channel_id: u32, diff: f64) { + let previous_best_diff = self.best_diff; if diff > self.best_diff { self.best_diff = diff; + + // Persist the best difficulty updated event + let event = ShareAccountingEvent::BestDifficultyUpdated { + channel_id, + new_best_diff: diff, + previous_best_diff, + timestamp: std::time::SystemTime::now(), + }; + self.persistence.persist_event(event); } } } diff --git a/protocols/v2/channels-sv2/src/server/standard.rs b/protocols/v2/channels-sv2/src/server/standard.rs index 1ffd3a1a8d..8f12ae36da 100644 --- a/protocols/v2/channels-sv2/src/server/standard.rs +++ b/protocols/v2/channels-sv2/src/server/standard.rs @@ -35,6 +35,7 @@ //! - Job lifecycle and share accounting are managed on a per-channel basis. use crate::{ chain_tip::ChainTip, + persistence::PersistenceHandler, server::{ error::StandardChannelError, jobs::{ @@ -80,7 +81,7 @@ use tracing::debug; /// - the channel's job factory /// - the channel's chain tip #[derive(Debug)] -pub struct StandardChannel<'a, J> +pub struct StandardChannel<'a, J, P> where J: JobStore>, { @@ -90,7 +91,7 @@ where requested_max_target: Target, target: Target, nominal_hashrate: f32, - share_accounting: ShareAccounting, + share_accounting: ShareAccounting

, expected_share_per_minute: f32, job_store: J, job_factory: JobFactory, @@ -98,9 +99,10 @@ where phantom: PhantomData<&'a ()>, } -impl<'a, J> StandardChannel<'a, J> +impl<'a, J, P> StandardChannel<'a, J, P> where J: JobStore>, + P: PersistenceHandler, { /// Constructor of `StandardChannel` for a Sv2 Pool Server. /// Not meant for usage on a Sv2 Job Declaration Client. @@ -123,6 +125,7 @@ where expected_share_per_minute: f32, job_store: J, pool_tag_string: String, + persistence: P, ) -> Result { Self::new( channel_id, @@ -135,6 +138,7 @@ where job_store, Some(pool_tag_string), None, + persistence, ) } @@ -160,6 +164,7 @@ where job_store: J, pool_tag_string: Option, miner_tag_string: String, + persistence: P, ) -> Result { Self::new( channel_id, @@ -172,6 +177,7 @@ where job_store, pool_tag_string, Some(miner_tag_string), + persistence, ) } @@ -188,6 +194,7 @@ where job_store: J, pool_tag_string: Option, miner_tag_string: Option, + persistence: P, ) -> Result { let calculated_target = match hash_rate_to_target(nominal_hashrate.into(), expected_share_per_minute.into()) { @@ -226,7 +233,7 @@ where requested_max_target, target, nominal_hashrate, - share_accounting: ShareAccounting::new(share_batch_size), + share_accounting: ShareAccounting::new(share_batch_size, persistence), expected_share_per_minute, job_factory: JobFactory::new(true, pool_tag_string, miner_tag_string), chain_tip: None, @@ -388,7 +395,7 @@ where } /// Returns a reference to the share accounting state for this channel. - pub fn get_share_accounting(&self) -> &ShareAccounting { + pub fn get_share_accounting(&self) -> &ShareAccounting

{ &self.share_accounting } @@ -593,12 +600,6 @@ where // check if a block was found if network_target.is_met_by(hash) { - self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, - share.sequence_number, - hash.to_raw_hash(), - ); - let op_pushbytes_pool_miner_tag = self .job_factory .op_pushbytes_pool_miner_tag() @@ -627,6 +628,15 @@ where .consensus_encode(&mut serialized_coinbase) .map_err(|_| ShareValidationError::InvalidCoinbase)?; + self.share_accounting.update_share_accounting( + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), + share.sequence_number, + hash.to_raw_hash(), + true, + ); + return Ok(ShareValidationResult::BlockFound( Some(job.get_template().template_id), serialized_coinbase, @@ -640,13 +650,17 @@ where } self.share_accounting.update_share_accounting( - self.target.difficulty_float() as u64, + self.channel_id, + &self.user_identity, + self.target.difficulty_float(), share.sequence_number, hash.to_raw_hash(), + false, ); // update the best diff - self.share_accounting.update_best_diff(hash_as_diff); + self.share_accounting + .update_best_diff(self.channel_id, hash_as_diff); let last_sequence_number = self.share_accounting.get_last_share_sequence_number(); let new_submits_accepted_count = self.share_accounting.get_shares_accepted(); @@ -673,6 +687,7 @@ where mod tests { use crate::{ chain_tip::ChainTip, + persistence::{Persistence, PersistenceHandler, ShareAccountingEvent}, server::{ error::StandardChannelError, jobs::{job_store::DefaultJobStore, standard::StandardJob}, @@ -680,6 +695,16 @@ mod tests { standard::StandardChannel, }, }; + + /// Unit-like type for persistence in tests + #[derive(Debug, Clone)] + struct TestPersistence; + + impl PersistenceHandler for TestPersistence { + fn persist_event(&self, _event: ShareAccountingEvent) { + // No-op for tests + } + } use binary_sv2::Sv2Option; use bitcoin::{transaction::TxOut, Amount, ScriptBuf, Target}; use mining_sv2::{NewMiningJob, SubmitSharesStandard}; @@ -719,6 +744,7 @@ mod tests { job_store, None, None, + Persistence::Enabled(TestPersistence), ) .unwrap(); @@ -847,6 +873,7 @@ mod tests { job_store, None, None, + Persistence::Enabled(TestPersistence), ) .unwrap(); @@ -951,6 +978,7 @@ mod tests { job_store, None, None, + Persistence::Enabled(TestPersistence), ) .unwrap(); @@ -1057,6 +1085,7 @@ mod tests { job_store, None, None, + Persistence::Enabled(TestPersistence), ) .unwrap(); @@ -1166,6 +1195,7 @@ mod tests { job_store, None, None, + Persistence::Enabled(TestPersistence), ) .unwrap(); @@ -1269,6 +1299,7 @@ mod tests { job_store, None, None, + Persistence::Enabled(TestPersistence), ) .unwrap(); @@ -1356,6 +1387,7 @@ mod tests { job_store, None, None, + Persistence::Enabled(TestPersistence), ) .unwrap(); diff --git a/roles/jd-client/src/lib/channel_manager/downstream_message_handler.rs b/roles/jd-client/src/lib/channel_manager/downstream_message_handler.rs index 6b88979517..8e831279b2 100644 --- a/roles/jd-client/src/lib/channel_manager/downstream_message_handler.rs +++ b/roles/jd-client/src/lib/channel_manager/downstream_message_handler.rs @@ -6,6 +6,7 @@ use stratum_apps::stratum_core::{ channels_sv2::{ client, outputs::deserialize_outputs, + persistence::Persistence, server::{ error::{ExtendedChannelError, StandardChannelError}, extended::ExtendedChannel, @@ -348,6 +349,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { job_store, channel_manager_data.pool_tag_string.clone(), self.miner_tag_string.clone(), + Persistence::default(), ) { Ok(channel) => channel, Err(e) => { @@ -586,6 +588,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { job_store, channel_manager_data.pool_tag_string.clone(), self.miner_tag_string.clone(), + Persistence::default(), ) { Ok(c) => c, Err(e) => { @@ -983,7 +986,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number, new_submits_accepted_count, - new_shares_sum, + new_shares_sum: new_shares_sum as u64, }; is_downstream_share_valid = true; info!("SubmitSharesStandard on downstream channel: {} ✅", success); @@ -1012,7 +1015,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number: share_accounting.get_last_share_sequence_number(), new_submits_accepted_count: share_accounting.get_shares_accepted(), - new_shares_sum: share_accounting.get_share_work_sum(), + new_shares_sum: share_accounting.get_share_work_sum() as u64, }; messages.push(( downstream.downstream_id, @@ -1184,7 +1187,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number, new_submits_accepted_count, - new_shares_sum, + new_shares_sum: new_shares_sum as u64, }; info!("SubmitSharesExtended on downstream channel: {} ✅", success); is_downstream_share_valid = true; @@ -1211,7 +1214,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number: share_accounting.get_last_share_sequence_number(), new_submits_accepted_count: share_accounting.get_shares_accepted(), - new_shares_sum: share_accounting.get_share_work_sum(), + new_shares_sum: share_accounting.get_share_work_sum() as u64, }; is_downstream_share_valid = true; messages.push(( diff --git a/roles/jd-client/src/lib/channel_manager/mod.rs b/roles/jd-client/src/lib/channel_manager/mod.rs index d35c01f38c..3f7165c329 100644 --- a/roles/jd-client/src/lib/channel_manager/mod.rs +++ b/roles/jd-client/src/lib/channel_manager/mod.rs @@ -12,10 +12,12 @@ use stratum_apps::{ custom_mutex::Mutex, key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}, network_helpers::noise_stream::NoiseTcpStream, + share_persistence::NoOpPersistence, stratum_core::{ bitcoin::Target, channels_sv2::{ client::extended::ExtendedChannel, + persistence::Persistence, server::{ jobs::{ extended::ExtendedJob, factory::JobFactory, job_store::DefaultJobStore, @@ -65,6 +67,9 @@ mod upstream_message_handler; pub const JDC_SEARCH_SPACE_BYTES: usize = 4; +/// Type alias for disabled persistence used in JD client +pub(crate) type DisabledPersistence = Persistence; + /// A `DeclaredJob` encapsulates all the relevant data associated with a single /// job declaration, including its template, optional messages, coinbase output, /// and transaction list. @@ -135,7 +140,7 @@ pub struct ChannelManagerData { // Maps channel ID → downstream ID. channel_id_to_downstream_id: HashMap, // The active upstream extended channel (client-side instance), if any. - upstream_channel: Option>, + upstream_channel: Option>, // Optional "pool tag" string, identifying the pool. pool_tag_string: Option, // List of pending downstream connection requests, @@ -922,6 +927,7 @@ impl ChannelManager { channel_state: &mut stratum_apps::stratum_core::channels_sv2::server::extended::ExtendedChannel< 'static, DefaultJobStore>, + DisabledPersistence, >, vardiff_state: &mut VardiffState, updates: &mut Vec, @@ -967,7 +973,11 @@ impl ChannelManager { fn run_vardiff_on_standard_channel( downstream_id: u32, channel_id: u32, - channel: &mut StandardChannel<'static, DefaultJobStore>>, + channel: &mut StandardChannel< + 'static, + DefaultJobStore>, + DisabledPersistence, + >, vardiff_state: &mut VardiffState, updates: &mut Vec, ) { diff --git a/roles/jd-client/src/lib/channel_manager/upstream_message_handler.rs b/roles/jd-client/src/lib/channel_manager/upstream_message_handler.rs index d1d95116ef..6741fefaaa 100644 --- a/roles/jd-client/src/lib/channel_manager/upstream_message_handler.rs +++ b/roles/jd-client/src/lib/channel_manager/upstream_message_handler.rs @@ -15,7 +15,7 @@ use tracing::{debug, error, info, warn}; use crate::{ channel_manager::{ - downstream_message_handler::RouteMessageTo, ChannelManager, DeclaredJob, + downstream_message_handler::RouteMessageTo, ChannelManager, DeclaredJob, Persistence, JDC_SEARCH_SPACE_BYTES, }, error::{ChannelSv2Error, JDCError}, @@ -144,6 +144,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { hashrate, true, msg.extranonce_size, + Persistence::default(), ); if let Some(ref mut prevhash) = data.last_new_prev_hash { diff --git a/roles/jd-client/src/lib/downstream/mod.rs b/roles/jd-client/src/lib/downstream/mod.rs index a9a37d8014..09d5ceac02 100644 --- a/roles/jd-client/src/lib/downstream/mod.rs +++ b/roles/jd-client/src/lib/downstream/mod.rs @@ -21,6 +21,7 @@ use tokio::sync::broadcast; use tracing::{debug, error, warn}; use crate::{ + channel_manager::DisabledPersistence, error::JDCError, status::{handle_error, Status, StatusSender}, task_manager::TaskManager, @@ -42,10 +43,14 @@ mod message_handler; pub struct DownstreamData { pub require_std_job: bool, pub group_channels: Option>>>, - pub extended_channels: - HashMap>>>, - pub standard_channels: - HashMap>>>, + pub extended_channels: HashMap< + u32, + ExtendedChannel<'static, DefaultJobStore>, DisabledPersistence>, + >, + pub standard_channels: HashMap< + u32, + StandardChannel<'static, DefaultJobStore>, DisabledPersistence>, + >, } /// Communication layer for a downstream connection. diff --git a/roles/pool/Cargo.toml b/roles/pool/Cargo.toml index dffd724e02..7d8a138d13 100644 --- a/roles/pool/Cargo.toml +++ b/roles/pool/Cargo.toml @@ -26,3 +26,4 @@ tokio = { version = "1.44.1", features = ["full"] } ext-config = { version = "0.14.0", features = ["toml"], package = "config" } tracing = { version = "0.1" } clap = { version = "4.5.39", features = ["derive"] } + diff --git a/roles/pool/config-examples/pool-config-hosted-tp-example.toml b/roles/pool/config-examples/pool-config-hosted-tp-example.toml index cb9747e9fb..1a2ac991aa 100644 --- a/roles/pool/config-examples/pool-config-hosted-tp-example.toml +++ b/roles/pool/config-examples/pool-config-hosted-tp-example.toml @@ -22,6 +22,12 @@ pool_signature = "Stratum V2 SRI Pool" # The CLI option --log-file (or -f) will override this setting if provided. # log_file = "./pool.log" +# Share persistence file path (optional) +# When enabled, all share accounting events (accepted shares, difficulty updates, etc.) +# will be logged to this file for auditing and analysis purposes. +# Comment out or remove this line to disable share persistence. +# share_persistence_file_path = "./shares.txt" + # Template Provider config # Local TP (this is pointing to localhost so you must run a TP locally for this configuration to work) #tp_address = "127.0.0.1:8442" diff --git a/roles/pool/config-examples/pool-config-local-tp-example.toml b/roles/pool/config-examples/pool-config-local-tp-example.toml index 000e3e0fd2..ae4548cb32 100644 --- a/roles/pool/config-examples/pool-config-local-tp-example.toml +++ b/roles/pool/config-examples/pool-config-local-tp-example.toml @@ -22,6 +22,11 @@ pool_signature = "Stratum V2 SRI Pool" # The CLI option --log-file (or -f) will override this setting if provided. # log_file = "./pool.log" +# Share persistence file path (optional) +# When enabled, all share accounting events (accepted shares, difficulty updates, etc.) +# will be logged to this file for auditing and analysis purposes. +# Comment out or remove this line to disable share persistence. +# share_persistence_file_path = "./shares.txt" # Template Provider config # Local TP (this is pointing to localhost so you must run a TP locally for this configuration to work) diff --git a/roles/pool/src/lib/channel_manager/mining_message_handler.rs b/roles/pool/src/lib/channel_manager/mining_message_handler.rs index 5f2dff2fc5..77b3bea644 100644 --- a/roles/pool/src/lib/channel_manager/mining_message_handler.rs +++ b/roles/pool/src/lib/channel_manager/mining_message_handler.rs @@ -137,7 +137,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { let channel_id = downstream_data.channel_id_factory.fetch_add(1, Ordering::SeqCst); let job_store = DefaultJobStore::new(); - let mut standard_channel = match StandardChannel::new_for_pool(channel_id as u32, user_identity.to_string(), extranonce_prefix.to_vec(), requested_max_target, nominal_hash_rate, self.share_batch_size, self.shares_per_minute, job_store, self.pool_tag_string.clone()) { + let mut standard_channel = match StandardChannel::new_for_pool(channel_id as u32, user_identity.to_string(), extranonce_prefix.to_vec(), requested_max_target, nominal_hash_rate, self.share_batch_size, self.shares_per_minute, job_store, self.pool_tag_string.clone(), self.persistence.clone()) { Ok(channel) => channel, Err(e) => match e { StandardChannelError::InvalidNominalHashrate => { @@ -303,6 +303,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { self.shares_per_minute, job_store, self.pool_tag_string.clone(), + self.persistence.clone(), ) { Ok(channel) => channel, Err(e) => match e { @@ -544,7 +545,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number, new_submits_accepted_count, - new_shares_sum, + new_shares_sum: new_shares_sum as u64, }; info!("SubmitSharesStandard: {} ✅", success); messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); @@ -569,7 +570,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number: share_accounting.get_last_share_sequence_number(), new_submits_accepted_count: share_accounting.get_shares_accepted(), - new_shares_sum: share_accounting.get_share_work_sum(), + new_shares_sum: share_accounting.get_share_work_sum() as u64, }; messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); } @@ -702,7 +703,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number, new_submits_accepted_count, - new_shares_sum, + new_shares_sum: new_shares_sum as u64, }; info!("SubmitSharesExtended: {} ✅", success); messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); @@ -727,7 +728,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { channel_id, last_sequence_number: share_accounting.get_last_share_sequence_number(), new_submits_accepted_count: share_accounting.get_shares_accepted(), - new_shares_sum: share_accounting.get_share_work_sum(), + new_shares_sum: share_accounting.get_share_work_sum() as u64, }; messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); } diff --git a/roles/pool/src/lib/channel_manager/mod.rs b/roles/pool/src/lib/channel_manager/mod.rs index c0096cd30f..af4e99d481 100644 --- a/roles/pool/src/lib/channel_manager/mod.rs +++ b/roles/pool/src/lib/channel_manager/mod.rs @@ -11,8 +11,10 @@ use stratum_apps::{ custom_mutex::Mutex, key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}, network_helpers::noise_stream::NoiseTcpStream, + share_persistence::{ShareFileHandler, ShareFilePersistence}, stratum_core::{ channels_sv2::{ + persistence::Persistence, server::{ extended::ExtendedChannel, jobs::{extended::ExtendedJob, job_store::DefaultJobStore, standard::StandardJob}, @@ -44,6 +46,9 @@ use crate::{ mod mining_message_handler; mod template_distribution_message_handler; +/// Type alias for the pool's persistence implementation. +pub type PoolPersistence = Persistence; + const POOL_ALLOCATION_BYTES: usize = 4; const CLIENT_SEARCH_SPACE_BYTES: usize = 8; pub const FULL_EXTRANONCE_SIZE: usize = POOL_ALLOCATION_BYTES + CLIENT_SEARCH_SPACE_BYTES; @@ -90,6 +95,7 @@ pub struct ChannelManager { share_batch_size: usize, shares_per_minute: f32, coinbase_reward_script: CoinbaseRewardScript, + persistence: PoolPersistence, } impl ChannelManager { @@ -102,6 +108,7 @@ impl ChannelManager { downstream_sender: broadcast::Sender<(u32, Mining<'static>)>, downstream_receiver: Receiver<(u32, Mining<'static>)>, coinbase_outputs: Vec, + task_manager: Arc, ) -> PoolResult { let range_0 = 0..0; let range_1 = 0..POOL_ALLOCATION_BYTES; @@ -143,6 +150,32 @@ impl ChannelManager { downstream_receiver, }; + // Initialize persistence based on config + let persistence = if let Some(path) = config.share_persistence_file_path() { + info!("Initializing file-based share persistence: {}", path); + let mut share_file_handler = ShareFileHandler::new(path).await; + let sender = share_file_handler.get_sender(); + let receiver = share_file_handler.get_receiver(); + + // Spawn the share file handler task + task_manager.spawn(async move { + loop { + match receiver.recv().await { + Ok(event) => share_file_handler.write_event_to_file(event).await, + Err(_) => { + info!("Share persistence channel closed, stopping file handler"); + break; + } + } + } + }); + + Persistence::new(Some(ShareFilePersistence::new(sender))) + } else { + info!("Share persistence disabled"); + Persistence::new(None) + }; + let channel_manager = ChannelManager { channel_manager_data, channel_manager_channel, @@ -150,6 +183,7 @@ impl ChannelManager { shares_per_minute: config.shares_per_minute(), pool_tag_string: config.pool_signature().to_string(), coinbase_reward_script: config.coinbase_reward_script().clone(), + persistence, }; Ok(channel_manager) @@ -372,7 +406,11 @@ impl ChannelManager { fn run_vardiff_on_extended_channel( downstream_id: u32, channel_id: u32, - channel_state: &mut ExtendedChannel<'static, DefaultJobStore>>, + channel_state: &mut ExtendedChannel< + 'static, + DefaultJobStore>, + PoolPersistence, + >, vardiff_state: &mut VardiffState, updates: &mut Vec, ) { @@ -417,7 +455,11 @@ impl ChannelManager { fn run_vardiff_on_standard_channel( downstream_id: u32, channel_id: u32, - channel: &mut StandardChannel<'static, DefaultJobStore>>, + channel: &mut StandardChannel< + 'static, + DefaultJobStore>, + PoolPersistence, + >, vardiff_state: &mut VardiffState, updates: &mut Vec, ) { diff --git a/roles/pool/src/lib/config.rs b/roles/pool/src/lib/config.rs index 51e74317b4..15edcae708 100644 --- a/roles/pool/src/lib/config.rs +++ b/roles/pool/src/lib/config.rs @@ -34,6 +34,7 @@ pub struct PoolConfig { share_batch_size: usize, log_file: Option, server_id: u16, + share_persistence_file_path: Option, } impl PoolConfig { @@ -42,6 +43,7 @@ impl PoolConfig { /// # Panics /// /// Panics if `coinbase_reward_script` is empty. + #[allow(clippy::too_many_arguments)] pub fn new( pool_connection: ConnectionConfig, template_provider: TemplateProviderConfig, @@ -50,6 +52,7 @@ impl PoolConfig { shares_per_minute: f32, share_batch_size: usize, server_id: u16, + share_persistence_file_path: Option, ) -> Self { Self { listen_address: pool_connection.listen_address, @@ -64,6 +67,7 @@ impl PoolConfig { share_batch_size, log_file: None, server_id, + share_persistence_file_path, } } @@ -149,6 +153,11 @@ impl PoolConfig { script_pubkey: self.coinbase_reward_script.script_pubkey().to_owned(), } } + + /// Returns the share persistence file path. + pub fn share_persistence_file_path(&self) -> Option<&String> { + self.share_persistence_file_path.as_ref() + } } /// Configuration for connecting to a Template Provider. diff --git a/roles/pool/src/lib/downstream/mod.rs b/roles/pool/src/lib/downstream/mod.rs index 180c79fab0..0a03c886d7 100644 --- a/roles/pool/src/lib/downstream/mod.rs +++ b/roles/pool/src/lib/downstream/mod.rs @@ -27,6 +27,7 @@ use tokio::sync::broadcast; use tracing::{debug, error, warn}; use crate::{ + channel_manager::PoolPersistence, error::{PoolError, PoolResult}, status::{handle_error, Status, StatusSender}, task_manager::TaskManager, @@ -47,10 +48,14 @@ mod common_message_handler; /// - Active [`StandardChannel`]s keyed by channel ID. pub struct DownstreamData { pub group_channels: Option>>>, - pub extended_channels: - HashMap>>>, - pub standard_channels: - HashMap>>>, + pub extended_channels: HashMap< + u32, + ExtendedChannel<'static, DefaultJobStore>, PoolPersistence>, + >, + pub standard_channels: HashMap< + u32, + StandardChannel<'static, DefaultJobStore>, PoolPersistence>, + >, pub channel_id_factory: AtomicUsize, } diff --git a/roles/pool/src/lib/mod.rs b/roles/pool/src/lib/mod.rs index df2381d661..b8e1467ece 100644 --- a/roles/pool/src/lib/mod.rs +++ b/roles/pool/src/lib/mod.rs @@ -75,6 +75,7 @@ impl PoolSv2 { channel_manager_to_downstream_sender.clone(), downstream_to_channel_manager_receiver, encoded_outputs.clone(), + task_manager.clone(), ) .await?; diff --git a/roles/pool/src/lib/status.rs b/roles/pool/src/lib/status.rs index 0c1933c6a1..d3ecb8ad78 100644 --- a/roles/pool/src/lib/status.rs +++ b/roles/pool/src/lib/status.rs @@ -85,7 +85,7 @@ pub enum State { ChannelManagerShutdown(PoolError), } -/// Wrapper around a component’s state, sent as status updates across the system. +/// Wrapper around a component's state, sent as status updates across the system. #[derive(Debug)] pub struct Status { /// The current state being reported. diff --git a/roles/stratum-apps/src/lib.rs b/roles/stratum-apps/src/lib.rs index 37a88b8cdd..5a241c3a83 100644 --- a/roles/stratum-apps/src/lib.rs +++ b/roles/stratum-apps/src/lib.rs @@ -58,3 +58,11 @@ pub mod rpc; /// Provides Secp256k1 key management, serialization/deserialization, and signature services. /// Supports both standard and no_std environments. pub mod key_utils; + +/// Share persistence utilities +/// +/// Persistence implementations for share accounting events. +/// This module provides reusable handlers that can be used by any role (Pool, JD Client, etc.) +/// to persist share accounting data without impacting mining performance. +#[cfg(feature = "core")] +pub mod share_persistence; diff --git a/roles/stratum-apps/src/share_persistence/mod.rs b/roles/stratum-apps/src/share_persistence/mod.rs new file mode 100644 index 0000000000..168ad48a3f --- /dev/null +++ b/roles/stratum-apps/src/share_persistence/mod.rs @@ -0,0 +1,219 @@ +//! Share Persistence File Handler +//! +//! This module provides a generic file-based persistence implementation for share accounting +//! events. It is designed to be reusable across different Stratum V2 roles (Pool, JD Client, +//! Translator, etc.). +//! +//! ## Design +//! +//! - **Non-blocking**: Uses async channels to decouple persistence from critical mining operations +//! - **Generic**: No role-specific dependencies, uses standard `tracing` for logging +//! - **File-based**: Writes share accounting events to a text file for audit/analysis +//! +//! ## Usage +//! +//! ```rust,ignore +//! use stratum_apps::share_persistence::{ShareFileHandler, ShareFilePersistence}; +//! use stratum_apps::stratum_core::channels_sv2::persistence::Persistence; +//! +//! // Create the handler +//! let mut handler = ShareFileHandler::new("/path/to/shares.txt").await; +//! let sender = handler.get_sender(); +//! let receiver = handler.get_receiver(); +//! +//! // Spawn background task to write events +//! tokio::spawn(async move { +//! loop { +//! match receiver.recv().await { +//! Ok(event) => handler.write_event_to_file(event).await, +//! Err(_) => break, +//! } +//! } +//! }); +//! +//! // Create persistence handler for share accounting +//! let persistence = Persistence::new(Some(ShareFilePersistence::new(sender))); +//! ``` + +use stratum_core::channels_sv2::persistence::{PersistenceHandler, ShareAccountingEvent}; +use tokio::io::AsyncWriteExt; +use tracing::{debug, error, info}; + +/// File-based handler for writing share accounting events to disk. +/// +/// This handler maintains an async channel for receiving events and writes them +/// to a file in a human-readable format. It uses non-blocking I/O to ensure +/// persistence operations don't impact mining performance. +pub struct ShareFileHandler { + file: tokio::fs::File, + receiver: async_channel::Receiver, + sender: async_channel::Sender, +} + +impl ShareFileHandler { + /// Creates a new share file handler that writes to the specified path. + /// + /// # Arguments + /// + /// * `path` - File path where share events will be written + /// + /// # Panics + /// + /// Panics if the file cannot be created. + pub async fn new(path: &str) -> Self { + let file = tokio::fs::File::create(path).await.unwrap(); + let (sender, receiver) = async_channel::bounded(1024); + info!("Share accounting file created at {}", path); + Self { + file, + receiver, + sender, + } + } + + /// Returns a receiver for consuming share accounting events. + /// + /// The receiver should be used in a background task that calls + /// `write_event_to_file` for each received event. + pub fn get_receiver(&self) -> async_channel::Receiver { + self.receiver.clone() + } + + /// Returns a sender for producing share accounting events. + /// + /// This sender should be wrapped in a `ShareFilePersistence` and passed + /// to the channel's persistence configuration. + pub fn get_sender(&self) -> async_channel::Sender { + self.sender.clone() + } + + /// Writes a share accounting event to the file. + /// + /// Events are formatted as human-readable text lines. Errors are logged + /// using `tracing::error!` but do not propagate to avoid impacting mining. + pub async fn write_event_to_file(&mut self, event: ShareAccountingEvent) { + match event { + ShareAccountingEvent::ShareAccepted { + channel_id, + user_identity, + share_work, + share_sequence_number, + share_hash, + total_shares_accepted, + total_share_work_sum, + timestamp, + block_found, + } => { + let result = self.file.write_all( + format!( + "ShareAccepted: channel_id: {}, user_identity: {}, share_work: {}, share_sequence_number: {}, share_hash: {}, total_shares_accepted: {}, total_share_work_sum: {}, timestamp: {:?}, block_found: {}\n", + channel_id, + user_identity, + share_work, + share_sequence_number, + share_hash, + total_shares_accepted, + total_share_work_sum, + timestamp, + block_found + ) + .as_bytes(), + ).await; + + if let Err(e) = result { + error!( + target = "share_file_handler", + "Failed to write share event: {}", e + ); + } else { + debug!("Wrote share record to file."); + } + } + ShareAccountingEvent::BestDifficultyUpdated { + channel_id, + new_best_diff, + previous_best_diff, + timestamp, + } => { + let result = self.file.write_all( + format!( + "BestDifficultyUpdated: channel_id: {}, new_best_diff: {}, previous_best_diff: {}, timestamp: {:?}\n", + channel_id, + new_best_diff, + previous_best_diff, + timestamp + ) + .as_bytes(), + ).await; + + if let Err(e) = result { + error!( + target = "share_file_handler", + "Failed to write difficulty update: {}", e + ); + } else { + debug!("Wrote difficulty update record to file."); + } + } + } + } +} + +/// Channel-based persistence handler for share accounting. +/// +/// This implements the `PersistenceHandler` trait and sends events through +/// an async channel to be processed by a `ShareFileHandler` in a background task. +#[derive(Clone, Debug)] +pub struct ShareFilePersistence { + sender: async_channel::Sender, +} + +impl ShareFilePersistence { + /// Creates a new persistence handler with the given event sender. + /// + /// # Arguments + /// + /// * `sender` - Channel sender connected to a `ShareFileHandler` + pub fn new(sender: async_channel::Sender) -> Self { + info!("File persistence enabled for share accounting."); + Self { sender } + } +} + +impl PersistenceHandler for ShareFilePersistence { + /// Sends a share accounting event for persistence. + /// + /// This is non-blocking and will not return errors. Failed sends are logged + /// but do not propagate to avoid impacting the hot path. + fn persist_event(&self, event: ShareAccountingEvent) { + let _ = self + .sender + .try_send(event) + .map_err(|e| error!(target = "share_file_persistence", "{}", e)); + } +} + +/// No-op persistence handler for roles that don't need persistence. +/// +/// This is a unit-like type that implements `PersistenceHandler` but does nothing. +/// It's useful for roles that need to instantiate channels but don't want to persist +/// share accounting events. +/// +/// ## Usage +/// +/// ```rust,ignore +/// use stratum_apps::share_persistence::NoOpPersistence; +/// use stratum_apps::stratum_core::channels_sv2::persistence::Persistence; +/// +/// // Create disabled persistence +/// type DisabledPersistence = Persistence; +/// let persistence = Persistence::Disabled; // or Persistence::new(None) +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct NoOpPersistence; + +impl PersistenceHandler for NoOpPersistence { + fn persist_event(&self, _event: ShareAccountingEvent) { + // No-op - this should never be called when using Persistence::Disabled + } +} diff --git a/roles/translator/src/lib/sv2/channel_manager/channel_manager.rs b/roles/translator/src/lib/sv2/channel_manager/channel_manager.rs index e242b32a4e..1982a7ee76 100644 --- a/roles/translator/src/lib/sv2/channel_manager/channel_manager.rs +++ b/roles/translator/src/lib/sv2/channel_manager/channel_manager.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock}; use stratum_apps::{ custom_mutex::Mutex, stratum_core::{ - channels_sv2::client::extended::ExtendedChannel, + channels_sv2::{client::extended::ExtendedChannel, persistence::Persistence}, framing_sv2::framing::Frame, handlers_sv2::HandleMiningMessagesFromServerAsync, mining_sv2::OpenExtendedMiningChannelSuccess, @@ -319,6 +319,7 @@ impl ChannelManager { hashrate, true, new_extranonce_size as u16, + Persistence::default(), ); self.channel_manager_data.super_safe_lock(|c| { c.extended_channels.insert( diff --git a/roles/translator/src/lib/sv2/channel_manager/data.rs b/roles/translator/src/lib/sv2/channel_manager/data.rs index c79bae825b..e2b05f041c 100644 --- a/roles/translator/src/lib/sv2/channel_manager/data.rs +++ b/roles/translator/src/lib/sv2/channel_manager/data.rs @@ -4,11 +4,16 @@ use std::{ }; use stratum_apps::{ custom_mutex::Mutex, + share_persistence::NoOpPersistence, stratum_core::{ - channels_sv2::client::extended::ExtendedChannel, mining_sv2::ExtendedExtranonce, + channels_sv2::{client::extended::ExtendedChannel, persistence::Persistence}, + mining_sv2::ExtendedExtranonce, }, }; +/// Type alias for disabled persistence used in translator +pub type DisabledPersistence = Persistence; + /// Defines the operational mode for channel management. /// /// The channel manager can operate in two different modes that affect how @@ -38,9 +43,10 @@ pub struct ChannelManagerData { /// downstream_extranonce_len) pub pending_channels: HashMap, /// Map of active extended channels by channel ID - pub extended_channels: HashMap>>>, + pub extended_channels: HashMap>>>, /// The upstream extended channel used in aggregated mode - pub upstream_extended_channel: Option>>>, + pub upstream_extended_channel: + Option>>>, /// Extranonce prefix factory for allocating unique prefixes in aggregated mode pub extranonce_prefix_factory: Option>>, /// Current operational mode diff --git a/roles/translator/src/lib/sv2/channel_manager/message_handler.rs b/roles/translator/src/lib/sv2/channel_manager/message_handler.rs index bfa714e7cd..47377c71af 100644 --- a/roles/translator/src/lib/sv2/channel_manager/message_handler.rs +++ b/roles/translator/src/lib/sv2/channel_manager/message_handler.rs @@ -9,7 +9,7 @@ use stratum_apps::{ custom_mutex::Mutex, stratum_core::{ bitcoin::Target, - channels_sv2::client::extended::ExtendedChannel, + channels_sv2::{client::extended::ExtendedChannel, persistence::Persistence}, handlers_sv2::{HandleMiningMessagesFromServerAsync, SupportedChannelTypes}, mining_sv2::{ CloseChannel, ExtendedExtranonce, Extranonce, NewExtendedMiningJob, NewMiningJob, @@ -86,6 +86,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { nominal_hashrate, version_rolling, m.extranonce_size, + Persistence::default(), ); // If we are in aggregated mode, we need to create a new extranonce prefix and @@ -139,6 +140,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { nominal_hashrate, true, new_extranonce_size, + Persistence::default(), ); channel_manager_data.extended_channels.insert( m.channel_id, @@ -194,6 +196,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { nominal_hashrate, true, downstream_extranonce_len as u16, + Persistence::default(), ); channel_manager_data.extended_channels.insert( m.channel_id, diff --git a/test/integration-tests/lib/mod.rs b/test/integration-tests/lib/mod.rs index a93b836a47..89b4fe4d7b 100644 --- a/test/integration-tests/lib/mod.rs +++ b/test/integration-tests/lib/mod.rs @@ -106,6 +106,7 @@ pub async fn start_pool(template_provider_address: Option) -> (PoolS SHARES_PER_MINUTE, share_batch_size, 1, + None, ); let pool = PoolSv2::new(config); let pool_clone = pool.clone();