diff --git a/Cargo.lock b/Cargo.lock index bb43aa192..267164502 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5207,6 +5207,7 @@ dependencies = [ "signature_collector", "slot_clock", "ssv_types", + "subnet_service", "thiserror 2.0.17", "tokio", "tracing", @@ -5219,6 +5220,7 @@ version = "0.1.0" dependencies = [ "database", "ethereum_ssz", + "libp2p-gossipsub", "message_validator", "openssl", "processor", @@ -5250,6 +5252,7 @@ dependencies = [ "sha2 0.10.9", "slot_clock", "ssv_types", + "subnet_service", "task_executor", "thiserror 2.0.17", "tokio", @@ -7686,6 +7689,7 @@ dependencies = [ "eth2_network_config", "serde_yaml", "ssv_types", + "types", ] [[package]] diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 8126a7003..a29965ef0 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -43,7 +43,7 @@ use sensitive_url::SensitiveUrl; use signature_collector::SignatureCollectorManager; use slashing_protection::SlashingDatabase; use slot_clock::{SlotClock, SystemTimeSlotClock}; -use subnet_service::{SUBNET_COUNT, SubnetId, start_subnet_service}; +use subnet_service::{ForkSchedule, SUBNET_COUNT_NZ, SubnetId, SubnetRouter, start_subnet_service}; use task_executor::TaskExecutor; use tokio::{ net::TcpListener, @@ -412,13 +412,26 @@ impl Client { )); duties_tracker.clone().start(executor.clone()); - let message_validator = Validator::new( + // Create ForkSchedule from config + let fork_schedule = match config.global_config.ssv_network.subnet_topology_fork_epoch { + Some(fork_epoch) => ForkSchedule::new(fork_epoch), + None => ForkSchedule::pre_fork(), + }; + + // Create centralized SubnetRouter for fork-aware subnet calculations + let router = Arc::new(SubnetRouter::new( database.watch(), + fork_schedule.clone(), + slot_clock.clone(), E::slots_per_epoch(), + SUBNET_COUNT_NZ, + )); + + let message_validator = Validator::new( spec.epochs_per_sync_committee_period.as_u64(), E::sync_committee_size(), duties_tracker.clone(), - slot_clock.clone(), + router.clone(), &executor, ); @@ -447,12 +460,15 @@ impl Client { private_key: key.clone(), operator_id: operator_id.clone(), validator: Some(message_validator.clone()), - subnet_count: SUBNET_COUNT, is_synced: is_synced.clone(), + router: router.clone(), }, )?) } else { - Arc::new(ImpostorMessageSender::new(network_tx.clone(), SUBNET_COUNT)) + Arc::new(ImpostorMessageSender::new( + network_tx.clone(), + SUBNET_COUNT_NZ, + )) }; // Create the signature collector @@ -478,9 +494,10 @@ impl Client { // Start the subnet service now that we have slot_clock let subnet_service = start_subnet_service::( database.watch(), - SUBNET_COUNT, + SUBNET_COUNT_NZ, config.network.subscribe_all_subnets, config.network.disable_gossipsub_topic_scoring, + fork_schedule.clone(), &executor, slot_clock.clone(), spec.clone(), @@ -497,6 +514,7 @@ impl Client { outcome_tx, message_validator, doppelganger_service.clone(), + router, ); // Start the p2p network diff --git a/anchor/common/ssv_network_config/Cargo.toml b/anchor/common/ssv_network_config/Cargo.toml index 858eb9159..431e98e73 100644 --- a/anchor/common/ssv_network_config/Cargo.toml +++ b/anchor/common/ssv_network_config/Cargo.toml @@ -10,3 +10,4 @@ enr = { workspace = true } eth2_network_config = { workspace = true } serde_yaml = { workspace = true } ssv_types = { workspace = true } +types = { workspace = true } diff --git a/anchor/common/ssv_network_config/src/lib.rs b/anchor/common/ssv_network_config/src/lib.rs index 6cf09dbb4..d852aa4f3 100644 --- a/anchor/common/ssv_network_config/src/lib.rs +++ b/anchor/common/ssv_network_config/src/lib.rs @@ -8,6 +8,7 @@ use alloy::primitives::Address; use enr::{CombinedKey, Enr}; use eth2_network_config::Eth2NetworkConfig; use ssv_types::domain_type::DomainType; +use types::Epoch; macro_rules! include_str_for_net { ($network:ident, $file:literal) => { @@ -38,6 +39,9 @@ pub struct SsvNetworkConfig { pub ssv_contract: Address, pub ssv_contract_block: u64, pub ssv_domain_type: DomainType, + /// The epoch at which the subnet topology fork activates. + /// `None` means pre-fork operation (no fork configured). + pub subnet_topology_fork_epoch: Option, } impl SsvNetworkConfig { @@ -65,6 +69,7 @@ impl SsvNetworkConfig { ssv_domain_type: domain_type .parse() .map_err(|e| format!("Unable to parse built-in domain type: {e}"))?, + subnet_topology_fork_epoch: None, })) } @@ -82,12 +87,19 @@ impl SsvNetworkConfig { }) .transpose()?; + let subnet_topology_fork_epoch_path = base_dir.join("ssv_subnet_topology_fork_epoch.txt"); + let subnet_topology_fork_epoch = subnet_topology_fork_epoch_path + .exists() + .then(|| read(&subnet_topology_fork_epoch_path)) + .transpose()?; + Ok(Self { ssv_boot_nodes, ssv_contract: read(&base_dir.join("ssv_contract_address.txt"))?, ssv_contract_block: read(&base_dir.join("ssv_contract_block.txt"))?, ssv_domain_type: read(&base_dir.join("ssv_domain_type.txt"))?, eth2_network: Self::load_eth2_network_config(base_dir)?, + subnet_topology_fork_epoch, }) } diff --git a/anchor/eth/src/event_processor.rs b/anchor/eth/src/event_processor.rs index a7838d7d3..7117df290 100644 --- a/anchor/eth/src/event_processor.rs +++ b/anchor/eth/src/event_processor.rs @@ -190,9 +190,10 @@ impl EventProcessor { let max_seen = self.db.state().get_max_operator_id_seen(); - // Only check for missing operators if we have a previous max (not a migrated database) + // Only check for missing operators if we have a previous max (not a migrated database). + // Use checked_sub to avoid underflow if operatorId is 0. if let Some(max_seen) = max_seen - && max_seen != operatorId - 1 + && operatorId.checked_sub(1) != Some(max_seen) { return Err(ExecutionError::InvalidEvent(format!( "Missing OperatorAdded events: database has only seen up to id {max_seen}, \ diff --git a/anchor/message_receiver/Cargo.toml b/anchor/message_receiver/Cargo.toml index baf62b209..27ff93b6c 100644 --- a/anchor/message_receiver/Cargo.toml +++ b/anchor/message_receiver/Cargo.toml @@ -16,6 +16,7 @@ qbft_manager = { workspace = true } signature_collector = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } +subnet_service = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/anchor/message_receiver/src/manager.rs b/anchor/message_receiver/src/manager.rs index 0d9678fa5..ccf358662 100644 --- a/anchor/message_receiver/src/manager.rs +++ b/anchor/message_receiver/src/manager.rs @@ -11,6 +11,7 @@ use qbft_manager::QbftManager; use signature_collector::SignatureCollectorManager; use slot_clock::SlotClock; use ssv_types::msgid::DutyExecutor; +use subnet_service::SubnetRouter; use tokio::sync::{mpsc, mpsc::error::TrySendError, watch}; use tracing::{debug, debug_span, error, trace}; @@ -34,6 +35,8 @@ pub struct NetworkMessageReceiver { outcome_tx: mpsc::Sender, validator: Arc>, doppelganger_service: Option>, + /// Centralized router for fork-aware subnet calculations + router: Arc>, } impl NetworkMessageReceiver { @@ -47,6 +50,7 @@ impl NetworkMessageReceiver { outcome_tx: mpsc::Sender, validator: Arc>, doppelganger_service: Option>, + router: Arc>, ) -> Arc { Arc::new(Self { processor, @@ -57,6 +61,7 @@ impl NetworkMessageReceiver { outcome_tx, validator, doppelganger_service, + router, }) } } @@ -76,7 +81,7 @@ impl MessageReceiver let span = debug_span!("message_receiver", msg=%message_id); let _enter = span.enter(); - let result = receiver.validator.validate(&message.data); + let result = receiver.validator.validate(&message.data, &message.topic); let mut action = MessageAcceptance::from(&result); @@ -104,6 +109,7 @@ impl MessageReceiver let ValidatedMessage { signed_ssv_message, ssv_message, + matched_schema, } = match result { ValidationResult::Success(message) => message, ValidationResult::PreDecodeFailure(failure) => { @@ -120,6 +126,18 @@ impl MessageReceiver } }; + // Check if the matched schema should be processed at the current epoch. + // During pre-subscribe window (F-2, F-1), PostFork messages are accepted for + // gossipsub propagation but not processed for committee work. + if !receiver.router.should_process(matched_schema) { + trace!( + gossipsub_message_id = ?message_id, + ?matched_schema, + "Message accepted for propagation but not processed (pre-subscribe window)" + ); + return; + } + let msg_id = signed_ssv_message.ssv_message().msg_id().clone(); match msg_id.duty_executor() { diff --git a/anchor/message_sender/Cargo.toml b/anchor/message_sender/Cargo.toml index e41102145..751316b60 100644 --- a/anchor/message_sender/Cargo.toml +++ b/anchor/message_sender/Cargo.toml @@ -10,6 +10,7 @@ testing = [] [dependencies] database = { workspace = true } ethereum_ssz = { workspace = true } +gossipsub = { workspace = true } message_validator = { workspace = true } openssl = { workspace = true } processor = { workspace = true } diff --git a/anchor/message_sender/src/impostor.rs b/anchor/message_sender/src/impostor.rs index 53bed904b..ac7c89448 100644 --- a/anchor/message_sender/src/impostor.rs +++ b/anchor/message_sender/src/impostor.rs @@ -1,7 +1,9 @@ +use std::num::NonZeroU64; + use ssv_types::{CommitteeId, consensus::UnsignedSSVMessage, message::SignedSSVMessage}; use subnet_service::SubnetId; use tokio::sync::mpsc; -use tracing::debug; +use tracing::{debug, warn}; use crate::{Error, MessageCallback, MessageSender}; @@ -9,7 +11,7 @@ use crate::{Error, MessageCallback, MessageSender}; pub struct ImpostorMessageSender { // we only hold this so network does not get sad over the closed channel lol _network_tx: mpsc::Sender<(SubnetId, Vec)>, - subnet_count: usize, + subnet_count: NonZeroU64, } impl MessageSender for ImpostorMessageSender { @@ -19,20 +21,32 @@ impl MessageSender for ImpostorMessageSender { committee_id: CommitteeId, _additional_message_callback: Option>, ) -> Result<(), Error> { - let subnet = SubnetId::from_committee_alan(committee_id, self.subnet_count); - debug!(?msg, ?subnet, "Would send message"); + match SubnetId::from_committee_alan(committee_id, self.subnet_count) { + Ok(subnet) => debug!(?msg, ?subnet, "Would send message"), + Err(e) => warn!( + ?committee_id, + ?e, + "Failed to calculate subnet for impostor message" + ), + } Ok(()) } fn send(&self, msg: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error> { - let subnet = SubnetId::from_committee_alan(committee_id, self.subnet_count); - debug!(?msg, ?subnet, "Would send message"); + match SubnetId::from_committee_alan(committee_id, self.subnet_count) { + Ok(subnet) => debug!(?msg, ?subnet, "Would send message"), + Err(e) => warn!( + ?committee_id, + ?e, + "Failed to calculate subnet for impostor message" + ), + } Ok(()) } } impl ImpostorMessageSender { - pub fn new(network_tx: mpsc::Sender<(SubnetId, Vec)>, subnet_count: usize) -> Self { + pub fn new(network_tx: mpsc::Sender<(SubnetId, Vec)>, subnet_count: NonZeroU64) -> Self { Self { _network_tx: network_tx, subnet_count, diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 6177d6b66..5ee2406ed 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use database::OwnOperatorId; +use gossipsub::IdentTopic; use message_validator::{DutiesProvider, MessageAcceptance, Validator}; use openssl::{ hash::MessageDigest, @@ -13,7 +14,7 @@ use ssv_types::{ CommitteeId, RSA_SIGNATURE_SIZE, consensus::UnsignedSSVMessage, message::SignedSSVMessage, }; use ssz::Encode; -use subnet_service::SubnetId; +use subnet_service::{SubnetId, SubnetRouter}; use tokio::sync::{mpsc, mpsc::error::TrySendError, watch}; use tracing::{debug, error, trace, warn}; @@ -29,8 +30,9 @@ pub struct NetworkMessageSenderConfig { pub private_key: Rsa, pub operator_id: OwnOperatorId, pub validator: Option>>, - pub subnet_count: usize, pub is_synced: watch::Receiver, + /// Centralized router for fork-aware subnet calculations + pub router: Arc>, } pub struct NetworkMessageSender { @@ -39,8 +41,9 @@ pub struct NetworkMessageSender { private_key: PKey, operator_id: OwnOperatorId, validator: Option>>, - subnet_count: usize, is_synced: watch::Receiver, + /// Centralized router for fork-aware subnet calculations + router: Arc>, } impl MessageSender for Arc> { @@ -125,16 +128,32 @@ impl NetworkMessageSender { private_key, operator_id: config.operator_id, validator: config.validator, - subnet_count: config.subnet_count, is_synced: config.is_synced, + router: config.router, })) } fn do_send(&self, message: SignedSSVMessage, committee_id: CommitteeId) { let message_bytes = message.as_ssz_bytes(); + // Calculate subnet using centralized router + let subnet = match self.router.publish_subnet(committee_id) { + Ok(subnet) => subnet, + Err(e) => { + error!( + ?committee_id, + ?e, + "Failed to calculate subnet for publishing" + ); + return; + } + }; + + // Construct the topic for validation (format: "ssv.v2.") + let topic = IdentTopic::new(format!("ssv.v2.{}", *subnet)).hash(); + if let Some(validator) = self.validator.as_ref() - && let Err(err) = validator.validate(&message_bytes).as_result() + && let Err(err) = validator.validate(&message_bytes, &topic).as_result() { // `Reject` is more severe and can be punished by other peers. We should not have // created this message ever, while `Ignore` can be triggered simply because the message @@ -148,7 +167,6 @@ impl NetworkMessageSender { return; } - let subnet = SubnetId::from_committee_alan(committee_id, self.subnet_count); match self.network_tx.try_send((subnet, message_bytes)) { Ok(_) => trace!(?subnet, "Successfully sent message to network"), Err(TrySendError::Closed(_)) => warn!("Network queue closed (shutting down?)"), diff --git a/anchor/message_validator/Cargo.toml b/anchor/message_validator/Cargo.toml index 0d6b19eed..c766d1659 100644 --- a/anchor/message_validator/Cargo.toml +++ b/anchor/message_validator/Cargo.toml @@ -21,6 +21,7 @@ safe_arith = { workspace = true } sha2 = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } +subnet_service = { workspace = true } task_executor = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 4341ab3b2..014af5208 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -10,9 +10,9 @@ use std::{ }; use dashmap::{DashMap, mapref::one::RefMut}; -use database::NetworkState; +use database::{NetworkState, UniqueIndex}; pub use duties_tracker::DutiesProvider; -pub use gossipsub::MessageAcceptance; +pub use gossipsub::{MessageAcceptance, TopicHash}; use openssl::{ hash::MessageDigest, pkey::{PKey, Public}, @@ -30,8 +30,9 @@ use ssv_types::{ partial_sig::PartialSignatureMessages, }; use ssz::{Decode, DecodeError, Encode}; +use subnet_service::{SubnetId, SubnetRouter, SubnetSchema}; use task_executor::TaskExecutor; -use tokio::{sync::watch::Receiver, time::sleep}; +use tokio::time::sleep; use tracing::trace; use types::{Epoch, Slot}; @@ -237,13 +238,22 @@ pub enum ValidatedSSVMessage { pub struct ValidatedMessage { pub signed_ssv_message: SignedSSVMessage, pub ssv_message: ValidatedSSVMessage, + /// Which subnet schema this message matched against during validation. + /// Used to determine if the message should be processed or just propagated + /// (during pre-subscribe window, PostFork messages are accepted but not processed). + pub matched_schema: SubnetSchema, } impl ValidatedMessage { - pub fn new(signed_ssv_message: SignedSSVMessage, ssv_message: ValidatedSSVMessage) -> Self { + pub fn new( + signed_ssv_message: SignedSSVMessage, + ssv_message: ValidatedSSVMessage, + matched_schema: SubnetSchema, + ) -> Self { Self { signed_ssv_message, ssv_message, + matched_schema, } } } @@ -267,45 +277,43 @@ struct ValidationContext<'a, S> { } pub struct Validator { - network_state_rx: Receiver, duty_state_map: DashMap, - slots_per_epoch: u64, epochs_per_sync_committee_period: u64, sync_committee_size: usize, duties_provider: Arc, - slot_clock: S, + /// Centralized router for fork-aware subnet calculations and network state access + router: Arc>, } impl Validator { pub fn new( - network_state_rx: Receiver, - slots_per_epoch: u64, epochs_per_sync_committee_period: u64, sync_committee_size: usize, duties_provider: Arc, - slot_clock: S, + router: Arc>, task_executor: &TaskExecutor, ) -> Arc { let validator = Arc::new(Self { - network_state_rx, duty_state_map: DashMap::new(), - slots_per_epoch, epochs_per_sync_committee_period, sync_committee_size, duties_provider, - slot_clock, + router: router.clone(), }); - task_executor.spawn(Arc::clone(&validator).cleaner(), VALIDATOR_CLEANER_NAME); + task_executor.spawn( + Arc::clone(&validator).cleaner(router), + VALIDATOR_CLEANER_NAME, + ); validator } - pub fn validate(&self, message_data: &[u8]) -> ValidationResult { + pub fn validate(&self, message_data: &[u8], topic: &TopicHash) -> ValidationResult { match SignedSSVMessage::from_ssz_bytes(message_data) { Ok(signed_ssv_message) => { trace!(msg = ?signed_ssv_message, "SignedSSVMessage deserialized"); - match self.validate_decoded_message(&signed_ssv_message) { + match self.validate_decoded_message(&signed_ssv_message, topic) { Ok(validated_message) => ValidationResult::Success(validated_message), Err(failure) => { ValidationResult::PostDecodeFailure(failure, signed_ssv_message) @@ -321,6 +329,7 @@ impl Validator { fn validate_decoded_message( &self, signed_ssv_message: &SignedSSVMessage, + topic: &TopicHash, ) -> Result { // Get the role from message ID let ssv_message = signed_ssv_message.ssv_message(); @@ -329,17 +338,18 @@ impl Validator { .role() .ok_or(ValidationFailure::InvalidRole)?; - // Get committee info based on role and duty executor - let network_state = self.network_state_rx.borrow(); - let committee_info = match role { + // Get committee info and committee_id based on role and duty executor + let network_state = self.router.network_state(); + let (committee_info, committee_id) = match role { Role::Committee => { let committee_id = match ssv_message.msg_id().duty_executor() { Some(DutyExecutor::Committee(id)) => id, _ => return Err(ValidationFailure::NonExistentCommitteeID), }; - network_state + let committee_info = network_state .get_committee_info_by_committee_id(&committee_id) - .ok_or(ValidationFailure::NonExistentCommitteeID)? + .ok_or(ValidationFailure::NonExistentCommitteeID)?; + (committee_info, committee_id) } _ => { let validator_pk = match ssv_message.msg_id().duty_executor() { @@ -347,27 +357,53 @@ impl Validator { _ => return Err(ValidationFailure::UnknownValidator), }; - network_state + let share = network_state + .shares() + .get_by(&validator_pk) + .ok_or(ValidationFailure::UnknownValidator)?; + + // Get committee_id from cluster + let cluster = network_state + .clusters() + .get_by(&share.cluster_id) + .ok_or(ValidationFailure::UnknownValidator)?; + let committee_id = cluster.committee_id(); + + let committee_info = network_state .get_committee_info_by_validator_pk(&validator_pk) - .ok_or(ValidationFailure::UnknownValidator)? + .ok_or(ValidationFailure::UnknownValidator)?; + (committee_info, committee_id) } }; + + // Parse received subnet from topic + let received_subnet = + SubnetId::from_topic_str(topic.as_str()).ok_or(ValidationFailure::IncorrectTopic)?; + + // Validate subnet using operators we already have + let operators: Vec<_> = committee_info.committee_members.iter().copied().collect(); + let matched_schema = self + .router + .validate_subnet(committee_id, &operators, received_subnet) + .map_err(|_| ValidationFailure::IncorrectTopic)?; + let operator_pub_keys = &get_operator_pub_keys(&network_state, &committee_info.committee_members); drop(network_state); - let mut duty_state = self.get_duty_state(ssv_message.msg_id(), self.slots_per_epoch); + let slots_per_epoch = self.router.slots_per_epoch(); + let mut duty_state = self.get_duty_state(ssv_message.msg_id(), slots_per_epoch); let validation_context = ValidationContext { signed_ssv_message, role, committee_info: &committee_info, received_at: SystemTime::now(), - slots_per_epoch: self.slots_per_epoch, + slots_per_epoch, epochs_per_sync_committee_period: self.epochs_per_sync_committee_period, sync_committee_size: self.sync_committee_size, - slot_clock: self.slot_clock.clone(), + slot_clock: self.router.slot_clock().clone(), operator_pub_keys, }; @@ -376,7 +412,9 @@ impl Validator { duty_state.value_mut(), self.duties_provider.clone(), ) - .map(|validated| ValidatedMessage::new(signed_ssv_message.clone(), validated)) + .map(|validated| { + ValidatedMessage::new(signed_ssv_message.clone(), validated, matched_schema) + }) } /// Gets the duty state for a message ID, creating a new one if it doesn't exist @@ -394,9 +432,9 @@ impl Validator { }) } - async fn cleaner(self: Arc) { - let slot_clock = self.slot_clock.clone(); - let slots_per_epoch = self.slots_per_epoch; + async fn cleaner(self: Arc, router: Arc>) { + let slot_clock = router.slot_clock().clone(); + let slots_per_epoch = router.slots_per_epoch(); // Use a weak reference to exit when the other `Arc` are dropped. let weak_self = Arc::downgrade(&self); diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 8d9a1b860..ff9a0dffe 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -756,13 +756,9 @@ fn build_swarm( } fn subnet_to_topic(subnet: SubnetId) -> IdentTopic { - IdentTopic::new(format!("ssv.v2.{}", *subnet)) + IdentTopic::new(subnet.to_topic_string()) } fn topic_to_subnet(topic: &TopicHash) -> Option { - let s = topic.as_str(); - // Our topics use the form "ssv.v2.". - s.strip_prefix("ssv.v2.") - .and_then(|rest| rest.parse::().ok()) - .map(SubnetId::from) + SubnetId::from_topic_str(topic.as_str()) } diff --git a/anchor/subnet_service/src/lib.rs b/anchor/subnet_service/src/lib.rs index 68ef84b88..40ab1654d 100644 --- a/anchor/subnet_service/src/lib.rs +++ b/anchor/subnet_service/src/lib.rs @@ -6,10 +6,15 @@ //! - Message rate calculation for gossipsub topic scoring pub mod message_rate; +pub mod routing; mod scoring; mod service; mod subnet; +pub use routing::{ForkSchedule, SchemaSet, SubnetRouter, SubnetSchema, process_schemas}; pub use scoring::{calculate_message_rate_for_subnet, get_committee_info_for_subnet}; pub use service::start_subnet_service; -pub use subnet::{SUBNET_COUNT, SubnetBits, SubnetCalculationError, SubnetEvent, SubnetId}; +pub use subnet::{ + SUBNET_COUNT, SUBNET_COUNT_NZ, SubnetBits, SubnetCalculationError, SubnetEvent, SubnetId, + calculate_subnet_for_committee, +}; diff --git a/anchor/subnet_service/src/routing.rs b/anchor/subnet_service/src/routing.rs new file mode 100644 index 000000000..b4405f2e9 --- /dev/null +++ b/anchor/subnet_service/src/routing.rs @@ -0,0 +1,736 @@ +//! Centralized routing policy for subnet topology fork transition. +//! +//! This module provides a single source of truth for fork-related routing decisions, +//! answering four key questions: +//! 1. Which schema to publish with? → `publish_schema()` +//! 2. Which schemas to subscribe to? → `subscribe_schemas()` +//! 3. Which schemas to accept messages from (gossipsub)? → `accept_schemas()` +//! 4. Which schemas to process messages for? → `process_schemas()` +//! +//! ## Fork Timeline (SIP-43 Compliant) +//! +//! Let F = fork_epoch, P = 2 (PRIOR_WINDOW / pre-subscribe epochs): +//! +//! | Epoch | Subscribe | Publish | Accept (gossipsub) | Process | +//! |-------|----------------|---------|-------------------|----------------| +//! | F-3 | {Pre} | Pre | {Pre} | {Pre} | +//! | F-2 | {Pre, Post} | Pre | {Pre, Post} | {Pre} | +//! | F-1 | {Pre, Post} | Pre | {Pre, Post} | {Pre} | +//! | F | {Post} | Post | {Post} | {Post} | +//! | F+1 | {Post} | Post | {Post} | {Post} | +//! +//! Key behaviors per SIP-43: +//! - **PRIOR_WINDOW = 2**: Start dual-subscribing 2 epochs before fork +//! - **Accept during pre-subscribe**: Accept PostFork messages for gossipsub propagation +//! - **Don't process pre-fork**: Only process PreFork messages until fork epoch +//! - **Immediate cutoff at fork**: No grace period after fork + +use std::num::NonZeroU64; + +use database::NetworkState; +use slot_clock::SlotClock; +use ssv_types::{CommitteeId, OperatorId}; +use tokio::sync::watch; +use types::Epoch; + +use crate::{SubnetCalculationError, SubnetId, calculate_subnet_for_committee}; + +/// Subnet calculation algorithm schema. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SubnetSchema { + /// Pre-fork schema: subnet derived from committee ID (alan algorithm) + PreFork, + /// Post-fork schema: subnet derived from operator IDs (MinHash topology algorithm) + PostFork, +} + +/// Fixed-size schema list with an explicit active length. +/// +/// This keeps routing decisions allocation-free while still allowing dual-schema +/// transitions during fork windows. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SchemaSet { + len: usize, + schemas: [SubnetSchema; 2], +} + +impl SchemaSet { + /// Max supported schemas in a single decision window. + pub const MAX_LEN: usize = 2; + + /// Pre-fork default when no epoch is available (e.g., before genesis). + pub const fn pre_fork_only() -> Self { + Self { + len: 1, + schemas: [SubnetSchema::PreFork, SubnetSchema::PreFork], + } + } + + /// Post-fork-only schema set. + pub const fn post_fork_only() -> Self { + Self { + len: 1, + schemas: [SubnetSchema::PostFork, SubnetSchema::PostFork], + } + } + + /// Dual-schema set used during the pre-subscribe window. + pub const fn pre_and_post() -> Self { + Self { + len: 2, + schemas: [SubnetSchema::PreFork, SubnetSchema::PostFork], + } + } + + /// Number of active schemas. + pub fn len(&self) -> usize { + self.len + } + + /// True when no schemas are active. + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Active schema slice (length is `len()`). + pub fn as_slice(&self) -> &[SubnetSchema] { + &self.schemas[..self.len] + } + + /// Iterate active schemas. + pub fn iter(&self) -> std::slice::Iter<'_, SubnetSchema> { + self.as_slice().iter() + } + + /// True when the schema is part of this set. + pub fn contains(&self, schema: SubnetSchema) -> bool { + self.iter().any(|s| *s == schema) + } +} + +/// Fork schedule configuration. +/// +/// Defines when the fork happens and the transition window. +/// Uses hardcoded transition period: 2 epochs pre-subscribe (PRIOR_WINDOW per SIP-43). +#[derive(Debug, Clone)] +pub struct ForkSchedule { + /// The epoch at which the fork activates (publishing switches from PreFork to PostFork). + /// `None` means pre-fork forever (system operates in PreFork mode indefinitely). + fork_epoch: Option, +} + +impl ForkSchedule { + /// Number of epochs before fork_epoch to start dual-subscribing and accepting. + /// + /// At epoch (fork_epoch - PRE_SUBSCRIBE_EPOCHS), nodes start subscribing to + /// PostFork topics in addition to PreFork topics, and start accepting PostFork + /// messages for gossipsub propagation (but don't process them until fork_epoch). + /// + /// # Rationale for Value = 2 (SIP-43 PRIOR_WINDOW) + /// + /// Two epochs (~12.8 minutes) gives all nodes sufficient time to: + /// - Warm up mesh connections for new topics (GRAFT connections) + /// - Ensure message propagation works before the fork + /// - Handle gradual rollout of node upgrades + pub const PRE_SUBSCRIBE_EPOCHS: u64 = 2; + + /// Create a ForkSchedule for pre-fork operation (no fork configured). + /// + /// With this schedule, the system operates in PreFork mode indefinitely. + pub const fn pre_fork() -> Self { + Self { fork_epoch: None } + } + + /// Create a ForkSchedule with a specific fork epoch. + /// + /// The fork will activate at the specified epoch, with the transition window + /// starting PRE_SUBSCRIBE_EPOCHS before. + pub const fn new(fork_epoch: Epoch) -> Self { + Self { + fork_epoch: Some(fork_epoch), + } + } + + /// Get the fork epoch, if configured. + pub const fn fork_epoch(&self) -> Option { + self.fork_epoch + } + + /// Calculate the epoch when dual-subscription should start. + /// + /// Returns `fork_epoch - PRE_SUBSCRIBE_EPOCHS`, or `None` if no fork is configured. + fn subscribe_transition_epoch(&self) -> Option { + self.fork_epoch + .map(|fe| Epoch::new(fe.as_u64().saturating_sub(Self::PRE_SUBSCRIBE_EPOCHS))) + } +} + +/// Determine which schema to use for PUBLISHING messages at the given epoch. +/// +/// # Decision Logic +/// +/// - Before fork_epoch: PreFork +/// - At or after fork_epoch: PostFork +/// - If no fork is configured (fork_epoch = None): always PreFork +pub fn publish_schema(epoch: Epoch, schedule: &ForkSchedule) -> SubnetSchema { + match schedule.fork_epoch { + None => SubnetSchema::PreFork, + Some(fork_epoch) if epoch < fork_epoch => SubnetSchema::PreFork, + Some(_) => SubnetSchema::PostFork, + } +} + +/// Determine which schemas to SUBSCRIBE to at the given epoch. +/// +/// Returns a `SchemaSet` with up to 2 schemas to avoid heap allocation while +/// supporting dual-subscription during the transition period. +/// +/// # Decision Logic (SIP-43) +/// +/// - Before (fork_epoch - PRE_SUBSCRIBE_EPOCHS): {PreFork} +/// - From (fork_epoch - PRE_SUBSCRIBE_EPOCHS) to fork_epoch (exclusive): {PreFork, PostFork} +/// - At or after fork_epoch: {PostFork} +/// - If no fork is configured: always {PreFork} +pub fn subscribe_schemas(epoch: Epoch, schedule: &ForkSchedule) -> SchemaSet { + match schedule.fork_epoch { + None => SchemaSet::pre_fork_only(), + Some(fork_epoch) => { + let subscribe_start = schedule.subscribe_transition_epoch(); + + match subscribe_start { + Some(start) if epoch < start => { + // Before transition: only PreFork + SchemaSet::pre_fork_only() + } + Some(_) if epoch < fork_epoch => { + // Pre-subscribe window (F-2, F-1): dual-subscribe + SchemaSet::pre_and_post() + } + _ => { + // At or after fork: only PostFork + SchemaSet::post_fork_only() + } + } + } + } +} + +/// Determine which schemas to ACCEPT messages from at the given epoch (for gossipsub). +/// +/// This controls gossipsub message validation and propagation. Messages matching +/// accepted schemas are propagated through the network. +/// +/// Returns a `SchemaSet` with up to 2 schemas. +/// +/// # Decision Logic (SIP-43) +/// +/// - Before (fork_epoch - PRE_SUBSCRIBE_EPOCHS): {PreFork} +/// - From (fork_epoch - PRE_SUBSCRIBE_EPOCHS) to fork_epoch (exclusive): {PreFork, PostFork} +/// - At or after fork_epoch: {PostFork} +/// - If no fork is configured: always {PreFork} +/// +/// **Important**: During the pre-subscribe window (F-2, F-1), nodes accept PostFork +/// messages for gossipsub propagation to warm up the mesh, but don't process them +/// for committee work (see `process_schemas`). +pub fn accept_schemas(epoch: Epoch, schedule: &ForkSchedule) -> SchemaSet { + match schedule.fork_epoch { + None => SchemaSet::pre_fork_only(), + Some(fork_epoch) => { + let subscribe_start = schedule.subscribe_transition_epoch(); + + match subscribe_start { + Some(start) if epoch < start => { + // Before transition: only PreFork + SchemaSet::pre_fork_only() + } + Some(_) if epoch < fork_epoch => { + // Pre-subscribe window (F-2, F-1): accept both for gossipsub propagation + SchemaSet::pre_and_post() + } + _ => { + // At or after fork: only PostFork + SchemaSet::post_fork_only() + } + } + } + } +} + +/// Determine which schemas to PROCESS messages for at the given epoch (for committee work). +/// +/// This controls which messages are actually processed for committee duties. +/// Messages matching processed schemas are sent to the processor for consensus work. +/// +/// Returns a `SchemaSet` with the active schema for processing. +/// +/// # Decision Logic (SIP-43) +/// +/// - Before fork_epoch: {PreFork} +/// - At or after fork_epoch: {PostFork} +/// - If no fork is configured: always {PreFork} +/// +/// **Important**: During the pre-subscribe window (F-2, F-1), nodes accept PostFork +/// messages for gossipsub propagation but only process PreFork messages for committee work. +pub fn process_schemas(epoch: Epoch, schedule: &ForkSchedule) -> SchemaSet { + match schedule.fork_epoch { + None => SchemaSet::pre_fork_only(), + Some(fork_epoch) => { + if epoch < fork_epoch { + // Before fork: only process PreFork + SchemaSet::pre_fork_only() + } else { + // At or after fork: only process PostFork + SchemaSet::post_fork_only() + } + } + } +} + +/// Consistency helper for computing the current epoch from a slot clock. +/// +/// This ensures all components (subnet_service, message_sender, message_validator) +/// compute epoch the same way, preventing subtle inconsistencies during transition. +/// +/// # Clock Synchronization Requirements +/// +/// **Critical for fork transitions**: Nodes must maintain synchronized clocks using NTP or similar. +/// Nodes with severely out-of-sync clocks (seconds to minutes) may temporarily disagree on +/// current epoch at boundaries, causing their messages to be ignored by peers using different +/// schemas. +/// +/// # Returns +/// +/// - `Some(epoch)` if the slot clock returns a valid current slot +/// - `None` if the slot clock cannot determine the current slot (e.g., before genesis) +pub fn current_epoch(slot_clock: &impl SlotClock, slots_per_epoch: u64) -> Option { + slot_clock.now().map(|slot| slot.epoch(slots_per_epoch)) +} + +// ============================================================================ +// SubnetRouter - Centralized routing service +// ============================================================================ + +/// Centralized router for fork-aware subnet calculations. +/// +/// This struct encapsulates all the state needed to determine subnet routing +/// decisions and is shared (via `Arc`) across components that need routing info: +/// - `message_sender` - for publishing to the correct subnet +/// - `message_validator` - for validating incoming messages +/// - `message_receiver` - for deciding whether to process messages +pub struct SubnetRouter { + network_state_rx: watch::Receiver, + fork_schedule: ForkSchedule, + slot_clock: S, + slots_per_epoch: u64, + subnet_count: NonZeroU64, +} + +impl SubnetRouter { + /// Create a new SubnetRouter. + pub fn new( + network_state_rx: watch::Receiver, + fork_schedule: ForkSchedule, + slot_clock: S, + slots_per_epoch: u64, + subnet_count: NonZeroU64, + ) -> Self { + Self { + network_state_rx, + fork_schedule, + slot_clock, + slots_per_epoch, + subnet_count, + } + } + + /// Get the subnet to publish to for this committee. + /// + /// Uses `publish_schema()` to determine PreFork vs PostFork based on current epoch. + pub fn publish_subnet( + &self, + committee_id: CommitteeId, + ) -> Result { + let epoch = current_epoch(&self.slot_clock, self.slots_per_epoch); + let schema = epoch + .map(|e| publish_schema(e, &self.fork_schedule)) + .unwrap_or(SubnetSchema::PreFork); + + calculate_subnet_for_committee( + committee_id, + &self.network_state_rx.borrow(), + self.subnet_count, + schema, + ) + } + + /// Validate that a message arrived on the correct subnet for this committee. + /// + /// Checks against all accepted schemas at the current epoch. Returns the + /// matched schema if valid, or an error if no schema matches. + /// + /// During the pre-subscribe window (F-2, F-1), both PreFork and PostFork + /// subnets are accepted for gossipsub propagation. + pub fn validate_subnet( + &self, + committee_id: CommitteeId, + operators: &[OperatorId], + received_subnet: SubnetId, + ) -> Result { + let epoch = current_epoch(&self.slot_clock, self.slots_per_epoch); + let schema_set = if let Some(e) = epoch { + accept_schemas(e, &self.fork_schedule) + } else { + SchemaSet::pre_fork_only() + }; + + for schema in schema_set.iter() { + // Calculate expected subnet for this schema, skipping on error to try other schemas. + // This ensures a PostFork error (e.g., EmptyOperatorList) doesn't reject valid + // PreFork messages during the dual-subscribe window. + let expected_subnet = match schema { + SubnetSchema::PreFork => { + match SubnetId::from_committee_alan(committee_id, self.subnet_count) { + Ok(subnet) => subnet, + Err(_) => continue, + } + } + SubnetSchema::PostFork => { + match SubnetId::from_operators(operators, self.subnet_count) { + Ok(subnet) => subnet, + Err(_) => continue, + } + } + }; + + if expected_subnet == received_subnet { + return Ok(*schema); + } + } + + Err(SubnetCalculationError::IncorrectTopic) + } + + /// Check if a message with the given matched schema should be processed. + /// + /// During the pre-subscribe window (F-2, F-1), PostFork messages are accepted + /// for gossipsub propagation but should NOT be processed for committee work. + /// Only messages matching `process_schemas()` should be processed. + pub fn should_process(&self, matched_schema: SubnetSchema) -> bool { + let epoch = current_epoch(&self.slot_clock, self.slots_per_epoch); + let schema_set = if let Some(e) = epoch { + process_schemas(e, &self.fork_schedule) + } else { + // Before genesis: only process PreFork + SchemaSet::pre_fork_only() + }; + + schema_set.contains(matched_schema) + } + + /// Get subnets to subscribe to for a committee at the current epoch. + /// + /// During the pre-subscribe window, returns both PreFork and PostFork subnets. + pub fn subscribe_subnets( + &self, + committee_id: CommitteeId, + ) -> Result, SubnetCalculationError> { + let epoch = current_epoch(&self.slot_clock, self.slots_per_epoch); + let schema_set = if let Some(e) = epoch { + subscribe_schemas(e, &self.fork_schedule) + } else { + SchemaSet::pre_fork_only() + }; + + let network_state = self.network_state_rx.borrow(); + let mut subnets = Vec::with_capacity(schema_set.len()); + + for schema in schema_set.iter() { + if let Ok(subnet) = calculate_subnet_for_committee( + committee_id, + &network_state, + self.subnet_count, + *schema, + ) { + // Avoid duplicates (PreFork and PostFork might map to same subnet) + if !subnets.contains(&subnet) { + subnets.push(subnet); + } + } + } + + if subnets.is_empty() { + Err(SubnetCalculationError::EmptyOperatorList) + } else { + Ok(subnets) + } + } + + /// Get the fork schedule. + pub fn fork_schedule(&self) -> &ForkSchedule { + &self.fork_schedule + } + + /// Get the subnet count. + pub fn subnet_count(&self) -> NonZeroU64 { + self.subnet_count + } + + /// Get slots per epoch. + pub fn slots_per_epoch(&self) -> u64 { + self.slots_per_epoch + } + + /// Borrow the network state. + pub fn network_state(&self) -> watch::Ref<'_, NetworkState> { + self.network_state_rx.borrow() + } + + /// Get a reference to the slot clock. + pub fn slot_clock(&self) -> &S { + &self.slot_clock + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const FORK_EPOCH: Epoch = Epoch::new(100); + + #[test] + fn test_pre_fork_forever() { + let schedule = ForkSchedule::pre_fork(); + + // Test various epochs + for epoch_val in [0, 50, 100, 200, 1000] { + let epoch = Epoch::new(epoch_val); + + assert_eq!(publish_schema(epoch, &schedule), SubnetSchema::PreFork); + + let schemas = subscribe_schemas(epoch, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + + let schemas = accept_schemas(epoch, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + + let schemas = process_schemas(epoch, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + } + } + + #[test] + fn test_publish_schema_timeline() { + let schedule = ForkSchedule::new(FORK_EPOCH); + + // Before fork + assert_eq!( + publish_schema(Epoch::new(97), &schedule), + SubnetSchema::PreFork + ); + assert_eq!( + publish_schema(Epoch::new(98), &schedule), + SubnetSchema::PreFork + ); + assert_eq!( + publish_schema(Epoch::new(99), &schedule), + SubnetSchema::PreFork + ); + + // At fork and after + assert_eq!( + publish_schema(FORK_EPOCH, &schedule), + SubnetSchema::PostFork + ); + assert_eq!( + publish_schema(Epoch::new(101), &schedule), + SubnetSchema::PostFork + ); + assert_eq!( + publish_schema(Epoch::new(200), &schedule), + SubnetSchema::PostFork + ); + } + + #[test] + fn test_subscribe_schemas_timeline() { + let schedule = ForkSchedule::new(FORK_EPOCH); + + // Before pre-subscribe window (F-3) + let schemas = subscribe_schemas(Epoch::new(97), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + + // Pre-subscribe window (F-2): dual-subscribe + let schemas = subscribe_schemas(Epoch::new(98), &schedule); + assert_eq!(schemas.len(), 2); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + assert_eq!(schemas.as_slice()[1], SubnetSchema::PostFork); + + // Pre-subscribe window (F-1): still dual-subscribe + let schemas = subscribe_schemas(Epoch::new(99), &schedule); + assert_eq!(schemas.len(), 2); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + assert_eq!(schemas.as_slice()[1], SubnetSchema::PostFork); + + // At fork (F): only PostFork + let schemas = subscribe_schemas(FORK_EPOCH, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + + // After fork (F+1): only PostFork + let schemas = subscribe_schemas(Epoch::new(101), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + } + + #[test] + fn test_accept_schemas_timeline() { + let schedule = ForkSchedule::new(FORK_EPOCH); + + // Before pre-subscribe window (F-3): only PreFork + let schemas = accept_schemas(Epoch::new(97), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + + // Pre-subscribe window (F-2): accept both for propagation + let schemas = accept_schemas(Epoch::new(98), &schedule); + assert_eq!(schemas.len(), 2); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + assert_eq!(schemas.as_slice()[1], SubnetSchema::PostFork); + + // Pre-subscribe window (F-1): still accept both + let schemas = accept_schemas(Epoch::new(99), &schedule); + assert_eq!(schemas.len(), 2); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + assert_eq!(schemas.as_slice()[1], SubnetSchema::PostFork); + + // At fork (F): only PostFork + let schemas = accept_schemas(FORK_EPOCH, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + + // After fork (F+1): only PostFork + let schemas = accept_schemas(Epoch::new(101), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + } + + #[test] + fn test_process_schemas_timeline() { + let schedule = ForkSchedule::new(FORK_EPOCH); + + // Before fork (including pre-subscribe window): only process PreFork + let schemas = process_schemas(Epoch::new(97), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + + let schemas = process_schemas(Epoch::new(98), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + + let schemas = process_schemas(Epoch::new(99), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PreFork); + + // At fork (F): process PostFork + let schemas = process_schemas(FORK_EPOCH, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + + // After fork (F+1): process PostFork + let schemas = process_schemas(Epoch::new(101), &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + } + + #[test] + fn test_node_starts_after_fork() { + let schedule = ForkSchedule::new(FORK_EPOCH); + + // Node starts at epoch 200 (long after fork) + let epoch = Epoch::new(200); + + assert_eq!(publish_schema(epoch, &schedule), SubnetSchema::PostFork); + + let schemas = subscribe_schemas(epoch, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + + let schemas = accept_schemas(epoch, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + + let schemas = process_schemas(epoch, &schedule); + assert_eq!(schemas.len(), 1); + assert_eq!(schemas.as_slice()[0], SubnetSchema::PostFork); + } + + #[test] + fn test_fork_schedule_accessors() { + let schedule_pre_fork = ForkSchedule::pre_fork(); + assert_eq!(schedule_pre_fork.fork_epoch(), None); + + let schedule_with_fork = ForkSchedule::new(FORK_EPOCH); + assert_eq!(schedule_with_fork.fork_epoch(), Some(FORK_EPOCH)); + + // Test subscribe_transition_epoch (F - 2 = 98) + assert_eq!( + schedule_with_fork.subscribe_transition_epoch(), + Some(Epoch::new(98)) + ); + } + + #[test] + fn test_pre_subscribe_accept_vs_process() { + let schedule = ForkSchedule::new(FORK_EPOCH); + + // At F-2 and F-1: accept both but only process PreFork + for epoch_val in [98, 99] { + let epoch = Epoch::new(epoch_val); + + let accept = accept_schemas(epoch, &schedule); + assert_eq!( + accept.len(), + 2, + "Should accept both schemas at F-{}", + 100 - epoch_val + ); + assert_eq!(accept.as_slice()[0], SubnetSchema::PreFork); + assert_eq!(accept.as_slice()[1], SubnetSchema::PostFork); + + let process = process_schemas(epoch, &schedule); + assert_eq!( + process.len(), + 1, + "Should only process PreFork at F-{}", + 100 - epoch_val + ); + assert_eq!(process.as_slice()[0], SubnetSchema::PreFork); + } + } + + #[test] + fn test_immediate_cutoff_at_fork() { + let schedule = ForkSchedule::new(FORK_EPOCH); + + // Just before fork (F-1): still accept/subscribe PreFork + let sub = subscribe_schemas(Epoch::new(99), &schedule); + assert_eq!(sub.len(), 2, "Should dual-subscribe at F-1"); + + // At fork (F): immediate cutoff to PostFork only + let sub = subscribe_schemas(FORK_EPOCH, &schedule); + assert_eq!(sub.len(), 1, "Should only subscribe PostFork at fork"); + assert_eq!(sub.as_slice()[0], SubnetSchema::PostFork); + + let accept = accept_schemas(FORK_EPOCH, &schedule); + assert_eq!(accept.len(), 1, "Should only accept PostFork at fork"); + assert_eq!(accept.as_slice()[0], SubnetSchema::PostFork); + + let process = process_schemas(FORK_EPOCH, &schedule); + assert_eq!(process.len(), 1, "Should only process PostFork at fork"); + assert_eq!(process.as_slice()[0], SubnetSchema::PostFork); + } +} diff --git a/anchor/subnet_service/src/scoring.rs b/anchor/subnet_service/src/scoring.rs index 3d06e21bb..5813a1d28 100644 --- a/anchor/subnet_service/src/scoring.rs +++ b/anchor/subnet_service/src/scoring.rs @@ -4,13 +4,18 @@ //! for gossipsub topic scoring. These rates help detect flooding and //! underperformance on subnet topics. -use std::ops::Deref; +use std::{num::NonZeroU64, ops::Deref}; -use database::NetworkState; +use database::{NetworkState, NonUniqueIndex}; +use slot_clock::SlotClock; use ssv_types::CommitteeInfo; +use tracing::warn; use types::{ChainSpec, EthSpec}; -use crate::{SUBNET_COUNT, SubnetId, message_rate}; +use crate::{ + ForkSchedule, SubnetCalculationError, SubnetId, calculate_subnet_for_committee, message_rate, + routing, +}; /// Calculate the expected message rate for a specific subnet. /// @@ -18,17 +23,34 @@ use crate::{SUBNET_COUNT, SubnetId, message_rate}; /// - Flooding (too many messages vs expected) /// - Underperformance (too few messages vs expected) /// +/// Fork-aware: includes clusters that map to subnet under any accepted schema at current epoch. +/// /// # Arguments /// /// * `subnet` - The subnet to calculate the rate for /// * `network_state` - Current network state containing cluster information /// * `chain_spec` - Chain specification for timing parameters +/// * `slot_clock` - Slot clock for determining current epoch +/// * `fork_schedule` - Fork schedule configuration +/// * `slots_per_epoch` - Number of slots per epoch +/// * `subnet_count` - Total number of subnets pub fn calculate_message_rate_for_subnet( subnet: &SubnetId, network_state: impl Deref, chain_spec: &ChainSpec, + slot_clock: &impl SlotClock, + fork_schedule: &ForkSchedule, + slots_per_epoch: u64, + subnet_count: NonZeroU64, ) -> f64 { - let committees_info = get_committee_info_for_subnet(subnet, network_state); + let committees_info = get_committee_info_for_subnet( + subnet, + network_state, + slot_clock, + fork_schedule, + slots_per_epoch, + subnet_count, + ); message_rate::calculate_message_rate_for_topic::(&committees_info, chain_spec) } @@ -37,23 +59,69 @@ pub fn calculate_message_rate_for_subnet( /// This function retrieves clusters that map to the given subnet and converts /// them to `CommitteeInfo` which includes both committee members and validator indices. /// +/// Fork-aware: checks if cluster maps to subnet under ANY accepted schema at current epoch. +/// /// # Arguments /// /// * `subnet` - The subnet to get committee info for /// * `network_state` - Current network state containing cluster information +/// * `slot_clock` - Slot clock for determining current epoch +/// * `fork_schedule` - Fork schedule configuration +/// * `slots_per_epoch` - Number of slots per epoch +/// * `subnet_count` - Total number of subnets pub fn get_committee_info_for_subnet( subnet: &SubnetId, network_state: impl Deref, + slot_clock: &impl SlotClock, + fork_schedule: &ForkSchedule, + slots_per_epoch: u64, + subnet_count: NonZeroU64, ) -> Vec { - use database::NonUniqueIndex; + // Determine which schemas to accept at current epoch + let current_epoch = routing::current_epoch(slot_clock, slots_per_epoch); + let schema_set = if let Some(epoch) = current_epoch { + routing::accept_schemas(epoch, fork_schedule) + } else { + // Before genesis or clock unavailable: use pre-fork only + routing::SchemaSet::pre_fork_only() + }; network_state .clusters() .values() .filter(|cluster| { - let cluster_subnet = - SubnetId::from_committee_alan(cluster.committee_id(), SUBNET_COUNT); - cluster_subnet == *subnet + let committee_id = cluster.committee_id(); + + // Check if cluster maps to target subnet under ANY accepted schema + for schema in schema_set.iter() { + match calculate_subnet_for_committee( + committee_id, + &network_state, + subnet_count, + *schema, + ) { + Ok(cluster_subnet) if cluster_subnet == *subnet => return true, + Ok(_) => continue, // Different subnet + Err(SubnetCalculationError::EmptyOperatorList) => { + warn!( + ?committee_id, + ?schema, + "Cannot calculate PostFork subnet: cluster has empty operator list" + ); + continue; + } + Err(e) => { + warn!( + ?committee_id, + ?schema, + ?e, + "Failed to calculate subnet for scoring" + ); + continue; + } + } + } + false }) .map(|cluster| { // Convert cluster to CommitteeInfo by getting validator indices diff --git a/anchor/subnet_service/src/service.rs b/anchor/subnet_service/src/service.rs index 677d73fc8..a49ac7247 100644 --- a/anchor/subnet_service/src/service.rs +++ b/anchor/subnet_service/src/service.rs @@ -3,7 +3,7 @@ //! This module provides the background service that manages subnet subscriptions //! based on the clusters owned by the operator. -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, num::NonZeroU64, sync::Arc, time::Duration}; use database::{NetworkState, UniqueIndex}; use slot_clock::SlotClock; @@ -16,8 +16,8 @@ use tracing::{debug, error, warn}; use types::{ChainSpec, EthSpec}; use crate::{ - SubnetEvent, SubnetId, message_rate, - scoring::{calculate_message_rate_for_subnet, get_committee_info_for_subnet}, + ForkSchedule, SubnetCalculationError, SubnetEvent, SubnetId, calculate_subnet_for_committee, + routing, scoring::calculate_message_rate_for_subnet, }; /// Background service that manages subnet subscriptions. @@ -27,12 +27,15 @@ use crate::{ struct SubnetService { tx: mpsc::Sender, db: watch::Receiver, - subnet_count: usize, + subnet_count: NonZeroU64, subscribe_all_subnets: bool, disable_gossipsub_topic_scoring: bool, + fork_schedule: ForkSchedule, slot_clock: S, chain_spec: Arc, previous_subnets: HashSet, + /// Track last subscribe schemas to detect epoch boundary schema changes. + last_subscribe_schemas: Option, } impl SubnetService { @@ -41,14 +44,15 @@ impl SubnetService { fn new( tx: mpsc::Sender, db: watch::Receiver, - subnet_count: usize, + subnet_count: NonZeroU64, subscribe_all_subnets: bool, disable_gossipsub_topic_scoring: bool, + fork_schedule: ForkSchedule, slot_clock: S, chain_spec: Arc, ) -> Self { let previous_subnets = if subscribe_all_subnets { - (0..(subnet_count as u64)).map(SubnetId::new).collect() + (0..subnet_count.get()).map(SubnetId::new).collect() } else { HashSet::new() }; @@ -59,9 +63,11 @@ impl SubnetService { subnet_count, subscribe_all_subnets, disable_gossipsub_topic_scoring, + fork_schedule, slot_clock, chain_spec, previous_subnets, + last_subscribe_schemas: None, } } @@ -82,6 +88,8 @@ impl SubnetService { // Periodically update scoring rates to reflect clusters joining/leaving. self.run_scoring_loop::().await; } else { + // Initial subscription calculation + self.handle_subnet_changes::().await; self.run_monitoring_loop::().await; } } @@ -102,8 +110,8 @@ impl SubnetService { _ = self.db.changed() => { self.handle_subnet_changes::().await; } - _ = sleep(delay), if !self.disable_gossipsub_topic_scoring => { - self.send_scoring_rate_updates::().await; + _ = sleep(delay) => { + self.handle_epoch_boundary::().await; } } } @@ -113,7 +121,7 @@ impl SubnetService { async fn send_initial_joins(&mut self) -> Result<(), ()> { let initial_events: Vec<_> = { let current_state = self.db.borrow(); - (0..self.subnet_count as u64) + (0..self.subnet_count.get()) .map(|id| { let subnet = SubnetId::new(id); let rate = self.subnet_message_rate::(&subnet, ¤t_state); @@ -132,18 +140,73 @@ impl SubnetService { Ok(()) } + /// Handle epoch boundary: check for schema transitions and update scoring. + async fn handle_epoch_boundary(&mut self) { + // Check if subscribe schemas changed (fork transition) + let current_schemas = self.current_subscribe_schemas::(); + let schema_changed = self.last_subscribe_schemas != Some(current_schemas); + + if schema_changed { + let current_epoch = routing::current_epoch(&self.slot_clock, E::slots_per_epoch()); + debug!( + ?current_epoch, + "Subscribe schemas changed, recomputing subnet subscriptions" + ); + self.handle_subnet_changes::().await; + } + + // Update scoring rates if enabled + if !self.disable_gossipsub_topic_scoring { + self.send_scoring_rate_updates::().await; + } + } + + /// Resolve the active subscription schemas for the current epoch. + fn current_subscribe_schemas(&self) -> routing::SchemaSet { + routing::current_epoch(&self.slot_clock, E::slots_per_epoch()) + .map(|epoch| routing::subscribe_schemas(epoch, &self.fork_schedule)) + .unwrap_or_else(routing::SchemaSet::pre_fork_only) + } + /// Compare current and previous subnets, emitting join/leave events. + /// + /// Fork-aware: calculates subnets for all active schemas during transition. async fn handle_subnet_changes(&mut self) { let mut current_subnets = HashSet::new(); + // Get current subscribe schemas for fork-aware calculation + let current_schemas = self.current_subscribe_schemas::(); + // Get current subnets from database { let state = self.db.borrow(); for cluster_id in state.get_own_clusters() { if let Some(cluster) = state.clusters().get_by(cluster_id) { - let subnet_id = - SubnetId::from_committee_alan(cluster.committee_id(), self.subnet_count); - current_subnets.insert(subnet_id); + let committee_id = cluster.committee_id(); + + // During transition windows, we may compute subnets for multiple schemas + for schema in current_schemas.iter() { + match calculate_subnet_for_committee( + committee_id, + &state, + self.subnet_count, + *schema, + ) { + Ok(subnet_id) => { + current_subnets.insert(subnet_id); + } + Err(SubnetCalculationError::EmptyOperatorList) => { + warn!( + ?cluster_id, + ?schema, + "Cluster has empty operator list, skipping subnet calculation" + ); + } + Err(e) => { + error!(?cluster_id, ?schema, ?e, "Failed to calculate subnet"); + } + } + } } } } @@ -178,8 +241,9 @@ impl SubnetService { } } - // Update the previous_subnets for next iteration + // Update the previous_subnets and schemas for next iteration self.previous_subnets = current_subnets; + self.last_subscribe_schemas = Some(current_schemas); } /// Emit updated message-rate estimates for gossipsub topic scoring. @@ -199,7 +263,15 @@ impl SubnetService { for &subnet in &self.previous_subnets { let message_rate = { let state = self.db.borrow(); - calculate_message_rate_for_subnet::(&subnet, &*state, &self.chain_spec) + calculate_message_rate_for_subnet::( + &subnet, + &*state, + &self.chain_spec, + &self.slot_clock, + &self.fork_schedule, + E::slots_per_epoch(), + self.subnet_count, + ) }; if self @@ -224,10 +296,14 @@ impl SubnetService { return None; } - let committees_info = get_committee_info_for_subnet(subnet, network_state); - Some(message_rate::calculate_message_rate_for_topic::( - &committees_info, + Some(calculate_message_rate_for_subnet::( + subnet, + network_state, &self.chain_spec, + &self.slot_clock, + &self.fork_schedule, + E::slots_per_epoch(), + self.subnet_count, )) } } @@ -236,15 +312,16 @@ impl SubnetService { #[allow(clippy::too_many_arguments)] pub fn start_subnet_service( db: watch::Receiver, - subnet_count: usize, + subnet_count: NonZeroU64, subscribe_all_subnets: bool, disable_gossipsub_topic_scoring: bool, + fork_schedule: ForkSchedule, executor: &TaskExecutor, slot_clock: impl SlotClock + 'static, chain_spec: Arc, ) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(if subscribe_all_subnets { - subnet_count + subnet_count.get() as usize } else { 1 }); @@ -255,6 +332,7 @@ pub fn start_subnet_service( subnet_count, subscribe_all_subnets, disable_gossipsub_topic_scoring, + fork_schedule, slot_clock, chain_spec, ); diff --git a/anchor/subnet_service/src/subnet.rs b/anchor/subnet_service/src/subnet.rs index 4b1b18df8..0a9b62dbb 100644 --- a/anchor/subnet_service/src/subnet.rs +++ b/anchor/subnet_service/src/subnet.rs @@ -6,13 +6,19 @@ use std::{num::NonZeroU64, ops::Deref}; use alloy::primitives::ruint::aliases::U256; +use database::{NetworkState, NonUniqueIndex}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use ssv_types::{CommitteeId, OperatorId}; +use crate::routing; + /// Number of subnets in the SSV network. pub const SUBNET_COUNT: usize = 128; +/// Number of subnets as NonZeroU64 for safe arithmetic operations. +pub const SUBNET_COUNT_NZ: NonZeroU64 = NonZeroU64::new(SUBNET_COUNT as u64).unwrap(); + /// Bit array representing subnet membership. pub type SubnetBits = [u8; SUBNET_COUNT / 8]; @@ -21,10 +27,10 @@ pub type SubnetBits = [u8; SUBNET_COUNT / 8]; pub enum SubnetCalculationError { /// The operator list provided was empty. EmptyOperatorList, - /// The subnet count is invalid (zero). - InvalidSubnetCount, /// The calculated subnet ID doesn't fit in a u64. SubnetIdOutOfRange, + /// Message arrived on wrong topic/subnet for the committee. + IncorrectTopic, } /// Identifies a subnet in the SSV network. @@ -48,14 +54,18 @@ impl SubnetId { /// # Algorithm /// /// `committee_id % subnet_count` - pub fn from_committee_alan(committee_id: CommitteeId, subnet_count: usize) -> Self { - // Derive a numeric "committee ID" and convert to an index in [0..subnet_count]. + pub fn from_committee_alan( + committee_id: CommitteeId, + subnet_count: NonZeroU64, + ) -> Result { let id = U256::from_be_bytes(*committee_id); - SubnetId( - (id % U256::from(subnet_count)) - .try_into() - .expect("modulo must be < subnet_count"), - ) + let modulus = U256::from(subnet_count.get()); + + let subnet_id: u64 = (id % modulus) + .try_into() + .map_err(|_| SubnetCalculationError::SubnetIdOutOfRange)?; + + Ok(SubnetId(subnet_id)) } /// Calculate subnet using MinHash of operator IDs (post-fork algorithm). @@ -75,7 +85,6 @@ impl SubnetId { /// # Errors /// /// - `SubnetCalculationError::EmptyOperatorList` if `operator_ids` is empty - /// - `SubnetCalculationError::SubnetIdOutOfRange` if the modulo result cannot fit in `u64` pub fn from_operators( operator_ids: &[OperatorId], subnet_count: NonZeroU64, @@ -90,13 +99,32 @@ impl SubnetId { let id = U256::from_be_bytes(min_hash); let modulus = U256::from(subnet_count.get()); - // Safe: x % subnet_count is always < subnet_count, which is a u64. let subnet_id: u64 = (id % modulus) .try_into() .map_err(|_| SubnetCalculationError::SubnetIdOutOfRange)?; Ok(SubnetId(subnet_id)) } + + /// Topic prefix for SSV gossipsub topics. + pub const TOPIC_PREFIX: &'static str = "ssv.v2."; + + /// Format this subnet as a gossipsub topic string. + /// + /// Returns the topic in format "ssv.v2.". + pub fn to_topic_string(&self) -> String { + format!("{}{}", Self::TOPIC_PREFIX, self.0) + } + + /// Parse a subnet ID from a gossipsub topic string. + /// + /// Expects format "ssv.v2.". + pub fn from_topic_str(topic: &str) -> Option { + topic + .strip_prefix(Self::TOPIC_PREFIX) + .and_then(|rest| rest.parse::().ok()) + .map(SubnetId::from) + } } impl From for SubnetId { @@ -124,6 +152,34 @@ pub enum SubnetEvent { RateUpdate(SubnetId, f64), } +/// Calculate subnet for a committee using the specified schema. +/// +/// This is the centralized subnet calculation logic used by both subscription +/// (subnet service) and publishing (message_sender). +pub fn calculate_subnet_for_committee( + committee_id: CommitteeId, + network_state: &NetworkState, + subnet_count: NonZeroU64, + schema: routing::SubnetSchema, +) -> Result { + match schema { + routing::SubnetSchema::PreFork => SubnetId::from_committee_alan(committee_id, subnet_count), + routing::SubnetSchema::PostFork => { + // Look up any cluster by committee_id to get operator IDs. + // All clusters with the same committee_id have identical operators + // (committee_id is derived from the operator set). + let cluster = network_state + .clusters() + .get_all_by(&committee_id) + .next() + .ok_or(SubnetCalculationError::EmptyOperatorList)?; + + let operator_ids: Vec<_> = cluster.cluster_members.iter().copied().collect(); + SubnetId::from_operators(&operator_ids, subnet_count) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -223,10 +279,11 @@ mod tests { fn test_from_committee_alan_unchanged() { // Verify old algorithm still works correctly let committee_id = CommitteeId::from([0x01u8; 32]); - let subnet = SubnetId::from_committee_alan(committee_id, 128); + let subnet = + SubnetId::from_committee_alan(committee_id, SUBNET_COUNT_NZ).expect("valid subnet"); // committee_id % 128 should give predictable result - let expected = U256::from_be_bytes([0x01u8; 32]) % U256::from(128); + let expected = U256::from_be_bytes([0x01u8; 32]) % U256::from(SUBNET_COUNT); assert_eq!(*subnet, u64::try_from(expected).unwrap()); } @@ -244,8 +301,9 @@ mod tests { ]; for committee_id in committee_ids { - let subnet = SubnetId::from_committee_alan(committee_id, 128); - assert!((*subnet) < 128); + let subnet = + SubnetId::from_committee_alan(committee_id, SUBNET_COUNT_NZ).expect("valid subnet"); + assert!((*subnet) < SUBNET_COUNT as u64); } } @@ -260,10 +318,11 @@ mod tests { let subnet_new = SubnetId::from_operators(&operators, SUBNET_COUNT_NZ).expect("valid operators"); - assert!((*subnet_new) < 128); + assert!((*subnet_new) < SUBNET_COUNT as u64); let committee_id = CommitteeId::from([0xffu8; 32]); - let subnet_old = SubnetId::from_committee_alan(committee_id, 128); - assert!((*subnet_old) < 128); + let subnet_old = + SubnetId::from_committee_alan(committee_id, SUBNET_COUNT_NZ).expect("valid subnet"); + assert!((*subnet_old) < SUBNET_COUNT as u64); } } diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs index 8e99f2ad7..f9ac65a49 100644 --- a/anchor/validator_store/src/lib.rs +++ b/anchor/validator_store/src/lib.rs @@ -116,6 +116,7 @@ pub struct AnchorValidatorStore { } impl AnchorValidatorStore { + // TODO: Refactor to use config struct to reduce parameter count (see #755) #[allow(clippy::too_many_arguments)] pub fn new( database: Arc, @@ -199,6 +200,7 @@ impl AnchorValidatorStore { ) } + // TODO: Refactor to reduce parameter count (see #755) #[allow(clippy::too_many_arguments)] async fn collect_signature( &self,