Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dash-spv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use std::io;
use thiserror::Error;

use crate::sync::ManagerIdentifier;

/// Main error type for the Dash SPV client.
#[derive(Debug, Error)]
pub enum SpvError {
Expand Down Expand Up @@ -188,8 +190,8 @@ pub enum ValidationError {
#[derive(Debug, Error)]
pub enum SyncError {
/// Indicates that a sync operation is already in progress
#[error("Sync already in progress")]
SyncInProgress,
#[error("{0} already started")]
SyncInProgress(ManagerIdentifier),

/// Deprecated: Use specific error variants instead
#[deprecated(note = "Use Network, Storage, Validation, or Timeout variants instead")]
Expand Down Expand Up @@ -236,7 +238,7 @@ impl SyncError {
/// Returns a static string representing the error category based on the variant
pub fn category(&self) -> &'static str {
match self {
SyncError::SyncInProgress | SyncError::InvalidState(_) => "state",
SyncError::SyncInProgress(_) | SyncError::InvalidState(_) => "state",
SyncError::Timeout(_) => "timeout",
SyncError::Validation(_) => "validation",
SyncError::MissingDependency(_) => "dependency",
Expand Down Expand Up @@ -334,7 +336,7 @@ mod tests {
assert_eq!(SyncError::Storage("test".to_string()).category(), "storage");

// Test existing variant categories
assert_eq!(SyncError::SyncInProgress.category(), "state");
assert_eq!(SyncError::SyncInProgress(ManagerIdentifier::BlockHeader).category(), "state");
assert_eq!(SyncError::InvalidState("test".to_string()).category(), "state");
assert_eq!(SyncError::MissingDependency("test".to_string()).category(), "dependency");

Expand Down
6 changes: 2 additions & 4 deletions dash-spv/src/sync/block_headers/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::SyncResult;
use crate::network::{Message, MessageType, NetworkEvent, RequestSender};
use crate::storage::BlockHeaderStorage;
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager,
SyncManagerProgress, SyncState,
Expand Down Expand Up @@ -55,10 +56,7 @@ impl<H: BlockHeaderStorage> SyncManager for BlockHeadersManager<H> {
}

async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
if self.state() != SyncState::WaitingForConnections {
tracing::warn!("{} sync already started.", self.identifier());
return Ok(vec![]);
}
ensure_not_started(self.state(), self.identifier())?;
self.progress.set_state(SyncState::Syncing);

let tip = self.tip().await?;
Expand Down
2 changes: 2 additions & 0 deletions dash-spv/src/sync/blocks/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::SyncResult;
use crate::network::{Message, MessageType, RequestSender};
use crate::storage::{BlockHeaderStorage, BlockStorage};
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
BlocksManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState,
};
Expand Down Expand Up @@ -45,6 +46,7 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface + 'static> SyncM
}

async fn start_sync(&mut self, _requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
ensure_not_started(self.state(), self.identifier())?;
// Check if filters already completed (event received before start_sync)
if self.filters_sync_complete && self.pipeline.is_complete() {
self.progress.set_state(SyncState::Synced);
Expand Down
6 changes: 2 additions & 4 deletions dash-spv/src/sync/filters/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error::{SyncError, SyncResult};
use crate::network::{Message, MessageType, RequestSender};
use crate::storage::{BlockHeaderStorage, FilterHeaderStorage, FilterStorage};
use crate::sync::progress::ProgressPercentage;
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
FiltersManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState,
};
Expand Down Expand Up @@ -63,10 +64,7 @@ impl<
}

async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
if self.state() != SyncState::WaitingForConnections {
tracing::warn!("{} sync already started.", self.identifier());
return Ok(vec![]);
}
ensure_not_started(self.state(), self.identifier())?;

// Check if there are already stored filters we need to process
// This handles restart where filters are persisted but wallet state isn't
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/sync/sync_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ where
N: NetworkManager,
{
if !self.tasks.is_empty() {
return Err(SyncError::SyncInProgress);
return Err(SyncError::InvalidState("SyncCoordinator already started".to_string()));
}

tracing::info!("Starting sync managers in separate tasks");
Expand Down
20 changes: 15 additions & 5 deletions dash-spv/src/sync/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::sync::{
};
use async_trait::async_trait;

use crate::SyncError;

/// Contains a trait for event-driven sync managers.
///
/// Each manager is responsible for a specific sync task (headers, filters, blocks, etc.)
Expand Down Expand Up @@ -64,6 +66,18 @@ impl SyncManagerTaskContext {
}
}

/// Guard that verifies a manager has not already been started.
pub(super) fn ensure_not_started(
state: SyncState,
identifier: ManagerIdentifier,
) -> SyncResult<()> {
if state != SyncState::WaitingForConnections {
tracing::warn!("{} sync already started.", identifier);
return Err(SyncError::SyncInProgress(identifier));
}
Ok(())
}

#[async_trait]
pub trait SyncManager: Send + Sync + std::fmt::Debug {
/// Get the unique identifier for this manager.
Expand Down Expand Up @@ -100,11 +114,7 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug {
/// For example, BlockHeadersManager sends its first getheaders request here.
/// The default implementation is for reactive managers that just wait for events.
async fn start_sync(&mut self, _requests: &RequestSender) -> SyncResult<Vec<SyncEvent>> {
if !matches!(self.state(), SyncState::WaitingForConnections | SyncState::WaitForEvents) {
tracing::warn!("{} sync already started.", self.identifier());
return Ok(vec![]);
}

ensure_not_started(self.state(), self.identifier())?;
self.set_state(SyncState::WaitForEvents);
Ok(vec![SyncEvent::SyncStart {
identifier: self.identifier(),
Expand Down
15 changes: 10 additions & 5 deletions dash-spv/tests/error_types_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use dashcore_hashes::Hash;
use std::io;

use dash_spv::error::*;
use dash_spv::sync::ManagerIdentifier;

#[test]
fn test_network_error_from_io_error() {
Expand Down Expand Up @@ -80,12 +81,12 @@ fn test_spv_error_from_validation_error() {

#[test]
fn test_spv_error_from_sync_error() {
let sync_err = SyncError::SyncInProgress;
let sync_err = SyncError::SyncInProgress(ManagerIdentifier::BlockHeader);
let spv_err: SpvError = sync_err.into();

match spv_err {
SpvError::Sync(SyncError::SyncInProgress) => {
assert_eq!(spv_err.to_string(), "Sync error: Sync already in progress");
SpvError::Sync(SyncError::SyncInProgress(_)) => {
assert_eq!(spv_err.to_string(), "Sync error: BlockHeader already started");
}
_ => panic!("Expected SpvError::Sync variant"),
}
Expand Down Expand Up @@ -226,7 +227,11 @@ fn test_validation_error_variants() {
#[test]
fn test_sync_error_variants_and_categories() {
let test_cases = vec![
(SyncError::SyncInProgress, "state", "Sync already in progress"),
(
SyncError::SyncInProgress(ManagerIdentifier::BlockHeader),
"state",
"BlockHeader already started",
),
(
SyncError::InvalidState("Unexpected phase transition".to_string()),
"state",
Expand Down Expand Up @@ -361,7 +366,7 @@ fn test_result_type_aliases() {
}

fn sync_operation() -> SyncResult<()> {
Err(SyncError::SyncInProgress)
Err(SyncError::SyncInProgress(ManagerIdentifier::BlockHeader))
}

fn wallet_operation() -> WalletResult<u64> {
Expand Down
Loading