diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index eff214f6f..59340fa99 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -3,6 +3,8 @@ use std::io; use thiserror::Error; +use crate::sync::ManagerIdentifier; + /// Main error type for the Dash SPV client. #[derive(Debug, Error)] pub enum SpvError { @@ -188,8 +190,8 @@ pub enum ValidationError { #[derive(Debug, Error)] pub enum SyncError { /// Indicates that a sync operation is already in progress - #[error("Sync already in progress")] - SyncInProgress, + #[error("{0} already started")] + SyncInProgress(ManagerIdentifier), /// Deprecated: Use specific error variants instead #[deprecated(note = "Use Network, Storage, Validation, or Timeout variants instead")] @@ -236,7 +238,7 @@ impl SyncError { /// Returns a static string representing the error category based on the variant pub fn category(&self) -> &'static str { match self { - SyncError::SyncInProgress | SyncError::InvalidState(_) => "state", + SyncError::SyncInProgress(_) | SyncError::InvalidState(_) => "state", SyncError::Timeout(_) => "timeout", SyncError::Validation(_) => "validation", SyncError::MissingDependency(_) => "dependency", @@ -334,7 +336,7 @@ mod tests { assert_eq!(SyncError::Storage("test".to_string()).category(), "storage"); // Test existing variant categories - assert_eq!(SyncError::SyncInProgress.category(), "state"); + assert_eq!(SyncError::SyncInProgress(ManagerIdentifier::BlockHeader).category(), "state"); assert_eq!(SyncError::InvalidState("test".to_string()).category(), "state"); assert_eq!(SyncError::MissingDependency("test".to_string()).category(), "dependency"); diff --git a/dash-spv/src/sync/block_headers/sync_manager.rs b/dash-spv/src/sync/block_headers/sync_manager.rs index 9af239ac7..0d9dd3737 100644 --- a/dash-spv/src/sync/block_headers/sync_manager.rs +++ b/dash-spv/src/sync/block_headers/sync_manager.rs @@ -1,6 +1,7 @@ use crate::error::SyncResult; use crate::network::{Message, MessageType, NetworkEvent, RequestSender}; use crate::storage::BlockHeaderStorage; +use crate::sync::sync_manager::ensure_not_started; use crate::sync::{ BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager, SyncManagerProgress, SyncState, @@ -55,10 +56,7 @@ impl SyncManager for BlockHeadersManager { } async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult> { - if self.state() != SyncState::WaitingForConnections { - tracing::warn!("{} sync already started.", self.identifier()); - return Ok(vec![]); - } + ensure_not_started(self.state(), self.identifier())?; self.progress.set_state(SyncState::Syncing); let tip = self.tip().await?; diff --git a/dash-spv/src/sync/blocks/sync_manager.rs b/dash-spv/src/sync/blocks/sync_manager.rs index c0b93e7d8..6a9c9471b 100644 --- a/dash-spv/src/sync/blocks/sync_manager.rs +++ b/dash-spv/src/sync/blocks/sync_manager.rs @@ -1,6 +1,7 @@ use crate::error::SyncResult; use crate::network::{Message, MessageType, RequestSender}; use crate::storage::{BlockHeaderStorage, BlockStorage}; +use crate::sync::sync_manager::ensure_not_started; use crate::sync::{ BlocksManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState, }; @@ -45,6 +46,7 @@ impl SyncM } async fn start_sync(&mut self, _requests: &RequestSender) -> SyncResult> { + ensure_not_started(self.state(), self.identifier())?; // Check if filters already completed (event received before start_sync) if self.filters_sync_complete && self.pipeline.is_complete() { self.progress.set_state(SyncState::Synced); diff --git a/dash-spv/src/sync/filters/sync_manager.rs b/dash-spv/src/sync/filters/sync_manager.rs index f523c5f0d..8fd9e1fbd 100644 --- a/dash-spv/src/sync/filters/sync_manager.rs +++ b/dash-spv/src/sync/filters/sync_manager.rs @@ -2,6 +2,7 @@ use crate::error::{SyncError, SyncResult}; use crate::network::{Message, MessageType, RequestSender}; use crate::storage::{BlockHeaderStorage, FilterHeaderStorage, FilterStorage}; use crate::sync::progress::ProgressPercentage; +use crate::sync::sync_manager::ensure_not_started; use crate::sync::{ FiltersManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState, }; @@ -63,10 +64,7 @@ impl< } async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult> { - if self.state() != SyncState::WaitingForConnections { - tracing::warn!("{} sync already started.", self.identifier()); - return Ok(vec![]); - } + ensure_not_started(self.state(), self.identifier())?; // Check if there are already stored filters we need to process // This handles restart where filters are persisted but wallet state isn't diff --git a/dash-spv/src/sync/sync_coordinator.rs b/dash-spv/src/sync/sync_coordinator.rs index c32fb3cfa..c7980af91 100644 --- a/dash-spv/src/sync/sync_coordinator.rs +++ b/dash-spv/src/sync/sync_coordinator.rs @@ -185,7 +185,7 @@ where N: NetworkManager, { if !self.tasks.is_empty() { - return Err(SyncError::SyncInProgress); + return Err(SyncError::InvalidState("SyncCoordinator already started".to_string())); } tracing::info!("Starting sync managers in separate tasks"); diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 7954a1d4f..039c9a10f 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -7,6 +7,8 @@ use crate::sync::{ }; use async_trait::async_trait; +use crate::SyncError; + /// Contains a trait for event-driven sync managers. /// /// Each manager is responsible for a specific sync task (headers, filters, blocks, etc.) @@ -64,6 +66,18 @@ impl SyncManagerTaskContext { } } +/// Guard that verifies a manager has not already been started. +pub(super) fn ensure_not_started( + state: SyncState, + identifier: ManagerIdentifier, +) -> SyncResult<()> { + if state != SyncState::WaitingForConnections { + tracing::warn!("{} sync already started.", identifier); + return Err(SyncError::SyncInProgress(identifier)); + } + Ok(()) +} + #[async_trait] pub trait SyncManager: Send + Sync + std::fmt::Debug { /// Get the unique identifier for this manager. @@ -100,11 +114,7 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { /// For example, BlockHeadersManager sends its first getheaders request here. /// The default implementation is for reactive managers that just wait for events. async fn start_sync(&mut self, _requests: &RequestSender) -> SyncResult> { - if !matches!(self.state(), SyncState::WaitingForConnections | SyncState::WaitForEvents) { - tracing::warn!("{} sync already started.", self.identifier()); - return Ok(vec![]); - } - + ensure_not_started(self.state(), self.identifier())?; self.set_state(SyncState::WaitForEvents); Ok(vec![SyncEvent::SyncStart { identifier: self.identifier(), diff --git a/dash-spv/tests/error_types_test.rs b/dash-spv/tests/error_types_test.rs index 96e3c603e..49d415525 100644 --- a/dash-spv/tests/error_types_test.rs +++ b/dash-spv/tests/error_types_test.rs @@ -11,6 +11,7 @@ use dashcore_hashes::Hash; use std::io; use dash_spv::error::*; +use dash_spv::sync::ManagerIdentifier; #[test] fn test_network_error_from_io_error() { @@ -80,12 +81,12 @@ fn test_spv_error_from_validation_error() { #[test] fn test_spv_error_from_sync_error() { - let sync_err = SyncError::SyncInProgress; + let sync_err = SyncError::SyncInProgress(ManagerIdentifier::BlockHeader); let spv_err: SpvError = sync_err.into(); match spv_err { - SpvError::Sync(SyncError::SyncInProgress) => { - assert_eq!(spv_err.to_string(), "Sync error: Sync already in progress"); + SpvError::Sync(SyncError::SyncInProgress(_)) => { + assert_eq!(spv_err.to_string(), "Sync error: BlockHeader already started"); } _ => panic!("Expected SpvError::Sync variant"), } @@ -226,7 +227,11 @@ fn test_validation_error_variants() { #[test] fn test_sync_error_variants_and_categories() { let test_cases = vec![ - (SyncError::SyncInProgress, "state", "Sync already in progress"), + ( + SyncError::SyncInProgress(ManagerIdentifier::BlockHeader), + "state", + "BlockHeader already started", + ), ( SyncError::InvalidState("Unexpected phase transition".to_string()), "state", @@ -361,7 +366,7 @@ fn test_result_type_aliases() { } fn sync_operation() -> SyncResult<()> { - Err(SyncError::SyncInProgress) + Err(SyncError::SyncInProgress(ManagerIdentifier::BlockHeader)) } fn wallet_operation() -> WalletResult {