Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b5f75f4
feat: add centralized routing policy module for fork transition
diegomrsantos Dec 16, 2025
b3b0153
feat: add subnet_topology_fork_epoch configuration
diegomrsantos Dec 16, 2025
7590118
feat: wire ForkSchedule through client initialization
diegomrsantos Dec 16, 2025
f578abf
feat: implement dual-subscription support for fork transition
diegomrsantos Dec 16, 2025
bde1dcb
feat(subnet_service): centralize subnet calculation and add fork-awar…
diegomrsantos Dec 16, 2025
dbe7b3f
fix(subnet): make gossipsub topic scoring fork-aware
diegomrsantos Dec 16, 2025
248bc14
feat: add fork-aware topic validation for incoming/outgoing messages
diegomrsantos Dec 16, 2025
08baea2
chore: remove analysis artifacts and apply formatting fixes
diegomrsantos Dec 16, 2025
285212e
fix: mark current_epoch doctest as ignore
diegomrsantos Dec 16, 2025
34dd0b7
docs: address review feedback with enhanced documentation and logging
diegomrsantos Dec 16, 2025
a18d133
chore: add TODO comments for too_many_arguments suppressions
diegomrsantos Dec 16, 2025
bc3a954
refactor(routing): align fork transition with SIP-43
diegomrsantos Jan 2, 2026
638ac24
feat(message_validator): track matched schema for fork-aware processing
diegomrsantos Jan 2, 2026
452d8d9
feat(message_receiver): filter messages based on process_schemas
diegomrsantos Jan 2, 2026
d7d4aa1
refactor: centralize subnet routing logic into SubnetRouter
diegomrsantos Jan 3, 2026
bc1a408
fix: trigger subnet subscription recomputation at fork boundaries
diegomrsantos Jan 3, 2026
d0a3447
docs: address review feedback with enhanced documentation and logging
diegomrsantos Jan 3, 2026
cc7af2c
Refactor subnet routing schema sets
diegomrsantos Jan 3, 2026
7c28a24
chore: remove trailing empty lines from doc comments
diegomrsantos Jan 3, 2026
5f5ae8b
Merge upstream/unstable into feat/subnet-topology-fork-transition
diegomrsantos Jan 5, 2026
ea4faa1
perf: simplify PostFork subnet calculation
diegomrsantos Jan 5, 2026
a291191
refactor: use NonZeroU64 for subnet_count and improve error handling
diegomrsantos Jan 5, 2026
e41da40
chore: apply cargo fmt formatting fixes
diegomrsantos Jan 5, 2026
cb06d7e
fix: handle per-schema errors in validate_subnet without early return
diegomrsantos Jan 5, 2026
21ae7a5
fix: use checked_sub to prevent underflow in operator ID validation
diegomrsantos Jan 5, 2026
93d1769
refactor: centralize topic parsing in SubnetId
diegomrsantos Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use sensitive_url::SensitiveUrl;
use signature_collector::SignatureCollectorManager;
use slashing_protection::SlashingDatabase;
use slot_clock::{SlotClock, SystemTimeSlotClock};
use subnet_service::{SUBNET_COUNT, SubnetId, start_subnet_service};
use subnet_service::{ForkSchedule, SUBNET_COUNT_NZ, SubnetId, SubnetRouter, start_subnet_service};
use task_executor::TaskExecutor;
use tokio::{
net::TcpListener,
Expand Down Expand Up @@ -412,13 +412,26 @@ impl Client {
));
duties_tracker.clone().start(executor.clone());

let message_validator = Validator::new(
// Create ForkSchedule from config
let fork_schedule = match config.global_config.ssv_network.subnet_topology_fork_epoch {
Some(fork_epoch) => ForkSchedule::new(fork_epoch),
None => ForkSchedule::pre_fork(),
};

// Create centralized SubnetRouter for fork-aware subnet calculations
let router = Arc::new(SubnetRouter::new(
database.watch(),
fork_schedule.clone(),
slot_clock.clone(),
E::slots_per_epoch(),
SUBNET_COUNT_NZ,
));

let message_validator = Validator::new(
spec.epochs_per_sync_committee_period.as_u64(),
E::sync_committee_size(),
duties_tracker.clone(),
slot_clock.clone(),
router.clone(),
&executor,
);

Expand Down Expand Up @@ -447,12 +460,15 @@ impl Client {
private_key: key.clone(),
operator_id: operator_id.clone(),
validator: Some(message_validator.clone()),
subnet_count: SUBNET_COUNT,
is_synced: is_synced.clone(),
router: router.clone(),
},
)?)
} else {
Arc::new(ImpostorMessageSender::new(network_tx.clone(), SUBNET_COUNT))
Arc::new(ImpostorMessageSender::new(
network_tx.clone(),
SUBNET_COUNT_NZ,
))
};

// Create the signature collector
Expand All @@ -478,9 +494,10 @@ impl Client {
// Start the subnet service now that we have slot_clock
let subnet_service = start_subnet_service::<E>(
database.watch(),
SUBNET_COUNT,
SUBNET_COUNT_NZ,
config.network.subscribe_all_subnets,
config.network.disable_gossipsub_topic_scoring,
fork_schedule.clone(),
&executor,
slot_clock.clone(),
spec.clone(),
Expand All @@ -497,6 +514,7 @@ impl Client {
outcome_tx,
message_validator,
doppelganger_service.clone(),
router,
);

// Start the p2p network
Expand Down
1 change: 1 addition & 0 deletions anchor/common/ssv_network_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ enr = { workspace = true }
eth2_network_config = { workspace = true }
serde_yaml = { workspace = true }
ssv_types = { workspace = true }
types = { workspace = true }
12 changes: 12 additions & 0 deletions anchor/common/ssv_network_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use alloy::primitives::Address;
use enr::{CombinedKey, Enr};
use eth2_network_config::Eth2NetworkConfig;
use ssv_types::domain_type::DomainType;
use types::Epoch;

macro_rules! include_str_for_net {
($network:ident, $file:literal) => {
Expand Down Expand Up @@ -38,6 +39,9 @@ pub struct SsvNetworkConfig {
pub ssv_contract: Address,
pub ssv_contract_block: u64,
pub ssv_domain_type: DomainType,
/// The epoch at which the subnet topology fork activates.
/// `None` means pre-fork operation (no fork configured).
pub subnet_topology_fork_epoch: Option<Epoch>,
Comment on lines +42 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be boole_fork_epoch.

}

impl SsvNetworkConfig {
Expand Down Expand Up @@ -65,6 +69,7 @@ impl SsvNetworkConfig {
ssv_domain_type: domain_type
.parse()
.map_err(|e| format!("Unable to parse built-in domain type: {e}"))?,
subnet_topology_fork_epoch: None,
}))
}

Expand All @@ -82,12 +87,19 @@ impl SsvNetworkConfig {
})
.transpose()?;

let subnet_topology_fork_epoch_path = base_dir.join("ssv_subnet_topology_fork_epoch.txt");
let subnet_topology_fork_epoch = subnet_topology_fork_epoch_path
.exists()
.then(|| read(&subnet_topology_fork_epoch_path))
.transpose()?;

Ok(Self {
ssv_boot_nodes,
ssv_contract: read(&base_dir.join("ssv_contract_address.txt"))?,
ssv_contract_block: read(&base_dir.join("ssv_contract_block.txt"))?,
ssv_domain_type: read(&base_dir.join("ssv_domain_type.txt"))?,
eth2_network: Self::load_eth2_network_config(base_dir)?,
subnet_topology_fork_epoch,
})
}

Expand Down
5 changes: 3 additions & 2 deletions anchor/eth/src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,10 @@ impl EventProcessor {

let max_seen = self.db.state().get_max_operator_id_seen();

// Only check for missing operators if we have a previous max (not a migrated database)
// Only check for missing operators if we have a previous max (not a migrated database).
// Use checked_sub to avoid underflow if operatorId is 0.
if let Some(max_seen) = max_seen
&& max_seen != operatorId - 1
&& operatorId.checked_sub(1) != Some(max_seen)
Comment on lines +193 to +196
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a unrelated change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a fix for a potential underflow. Would you like to create a new PR for that?

{
return Err(ExecutionError::InvalidEvent(format!(
"Missing OperatorAdded events: database has only seen up to id {max_seen}, \
Expand Down
1 change: 1 addition & 0 deletions anchor/message_receiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ qbft_manager = { workspace = true }
signature_collector = { workspace = true }
slot_clock = { workspace = true }
ssv_types = { workspace = true }
subnet_service = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
20 changes: 19 additions & 1 deletion anchor/message_receiver/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use qbft_manager::QbftManager;
use signature_collector::SignatureCollectorManager;
use slot_clock::SlotClock;
use ssv_types::msgid::DutyExecutor;
use subnet_service::SubnetRouter;
use tokio::sync::{mpsc, mpsc::error::TrySendError, watch};
use tracing::{debug, debug_span, error, trace};

Expand All @@ -34,6 +35,8 @@ pub struct NetworkMessageReceiver<S: SlotClock, D: DutiesProvider> {
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S, D>>,
doppelganger_service: Option<Arc<OperatorDoppelgangerService>>,
/// Centralized router for fork-aware subnet calculations
router: Arc<SubnetRouter<S>>,
}

impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
Expand All @@ -47,6 +50,7 @@ impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S, D>>,
doppelganger_service: Option<Arc<OperatorDoppelgangerService>>,
router: Arc<SubnetRouter<S>>,
) -> Arc<Self> {
Arc::new(Self {
processor,
Expand All @@ -57,6 +61,7 @@ impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
outcome_tx,
validator,
doppelganger_service,
router,
})
}
}
Expand All @@ -76,7 +81,7 @@ impl<S: SlotClock + 'static, D: DutiesProvider> MessageReceiver
let span = debug_span!("message_receiver", msg=%message_id);
let _enter = span.enter();

let result = receiver.validator.validate(&message.data);
let result = receiver.validator.validate(&message.data, &message.topic);

let mut action = MessageAcceptance::from(&result);

Expand Down Expand Up @@ -104,6 +109,7 @@ impl<S: SlotClock + 'static, D: DutiesProvider> MessageReceiver
let ValidatedMessage {
signed_ssv_message,
ssv_message,
matched_schema,
} = match result {
ValidationResult::Success(message) => message,
ValidationResult::PreDecodeFailure(failure) => {
Expand All @@ -120,6 +126,18 @@ impl<S: SlotClock + 'static, D: DutiesProvider> MessageReceiver
}
};

// Check if the matched schema should be processed at the current epoch.
// During pre-subscribe window (F-2, F-1), PostFork messages are accepted for
// gossipsub propagation but not processed for committee work.
if !receiver.router.should_process(matched_schema) {
trace!(
gossipsub_message_id = ?message_id,
?matched_schema,
"Message accepted for propagation but not processed (pre-subscribe window)"
);
return;
}
Comment on lines +129 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This idea does not work.

As validation is stateful, validating but discarding a message received on a post fork topic may cause the message to fail validation if it is then received on a pre fork topic.


let msg_id = signed_ssv_message.ssv_message().msg_id().clone();

match msg_id.duty_executor() {
Expand Down
1 change: 1 addition & 0 deletions anchor/message_sender/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ testing = []
[dependencies]
database = { workspace = true }
ethereum_ssz = { workspace = true }
gossipsub = { workspace = true }
message_validator = { workspace = true }
openssl = { workspace = true }
processor = { workspace = true }
Expand Down
28 changes: 21 additions & 7 deletions anchor/message_sender/src/impostor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::num::NonZeroU64;

use ssv_types::{CommitteeId, consensus::UnsignedSSVMessage, message::SignedSSVMessage};
use subnet_service::SubnetId;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::{debug, warn};

use crate::{Error, MessageCallback, MessageSender};

#[derive(Clone)]
pub struct ImpostorMessageSender {
// we only hold this so network does not get sad over the closed channel lol
_network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
subnet_count: usize,
subnet_count: NonZeroU64,
}

impl MessageSender for ImpostorMessageSender {
Expand All @@ -19,20 +21,32 @@ impl MessageSender for ImpostorMessageSender {
committee_id: CommitteeId,
_additional_message_callback: Option<Box<MessageCallback>>,
) -> Result<(), Error> {
let subnet = SubnetId::from_committee_alan(committee_id, self.subnet_count);
debug!(?msg, ?subnet, "Would send message");
match SubnetId::from_committee_alan(committee_id, self.subnet_count) {
Ok(subnet) => debug!(?msg, ?subnet, "Would send message"),
Err(e) => warn!(
?committee_id,
?e,
"Failed to calculate subnet for impostor message"
),
}
Ok(())
}

fn send(&self, msg: SignedSSVMessage, committee_id: CommitteeId) -> Result<(), Error> {
let subnet = SubnetId::from_committee_alan(committee_id, self.subnet_count);
debug!(?msg, ?subnet, "Would send message");
match SubnetId::from_committee_alan(committee_id, self.subnet_count) {
Ok(subnet) => debug!(?msg, ?subnet, "Would send message"),
Err(e) => warn!(
?committee_id,
?e,
"Failed to calculate subnet for impostor message"
),
}
Ok(())
}
}

impl ImpostorMessageSender {
pub fn new(network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>, subnet_count: usize) -> Self {
pub fn new(network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>, subnet_count: NonZeroU64) -> Self {
Self {
_network_tx: network_tx,
subnet_count,
Expand Down
30 changes: 24 additions & 6 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use database::OwnOperatorId;
use gossipsub::IdentTopic;
use message_validator::{DutiesProvider, MessageAcceptance, Validator};
use openssl::{
hash::MessageDigest,
Expand All @@ -13,7 +14,7 @@ use ssv_types::{
CommitteeId, RSA_SIGNATURE_SIZE, consensus::UnsignedSSVMessage, message::SignedSSVMessage,
};
use ssz::Encode;
use subnet_service::SubnetId;
use subnet_service::{SubnetId, SubnetRouter};
use tokio::sync::{mpsc, mpsc::error::TrySendError, watch};
use tracing::{debug, error, trace, warn};

Expand All @@ -29,8 +30,9 @@ pub struct NetworkMessageSenderConfig<S: SlotClock, D: DutiesProvider> {
pub private_key: Rsa<Private>,
pub operator_id: OwnOperatorId,
pub validator: Option<Arc<Validator<S, D>>>,
pub subnet_count: usize,
pub is_synced: watch::Receiver<bool>,
/// Centralized router for fork-aware subnet calculations
pub router: Arc<SubnetRouter<S>>,
}

pub struct NetworkMessageSender<S: SlotClock, D: DutiesProvider> {
Expand All @@ -39,8 +41,9 @@ pub struct NetworkMessageSender<S: SlotClock, D: DutiesProvider> {
private_key: PKey<Private>,
operator_id: OwnOperatorId,
validator: Option<Arc<Validator<S, D>>>,
subnet_count: usize,
is_synced: watch::Receiver<bool>,
/// Centralized router for fork-aware subnet calculations
router: Arc<SubnetRouter<S>>,
}

impl<S: SlotClock + 'static, D: DutiesProvider> MessageSender for Arc<NetworkMessageSender<S, D>> {
Expand Down Expand Up @@ -125,16 +128,32 @@ impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageSender<S, D> {
private_key,
operator_id: config.operator_id,
validator: config.validator,
subnet_count: config.subnet_count,
is_synced: config.is_synced,
router: config.router,
}))
}

fn do_send(&self, message: SignedSSVMessage, committee_id: CommitteeId) {
let message_bytes = message.as_ssz_bytes();

// Calculate subnet using centralized router
let subnet = match self.router.publish_subnet(committee_id) {
Ok(subnet) => subnet,
Err(e) => {
error!(
?committee_id,
?e,
"Failed to calculate subnet for publishing"
);
return;
}
};

// Construct the topic for validation (format: "ssv.v2.<subnet_id>")
let topic = IdentTopic::new(format!("ssv.v2.{}", *subnet)).hash();

if let Some(validator) = self.validator.as_ref()
&& let Err(err) = validator.validate(&message_bytes).as_result()
&& let Err(err) = validator.validate(&message_bytes, &topic).as_result()
{
// `Reject` is more severe and can be punished by other peers. We should not have
// created this message ever, while `Ignore` can be triggered simply because the message
Expand All @@ -148,7 +167,6 @@ impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageSender<S, D> {
return;
}

let subnet = SubnetId::from_committee_alan(committee_id, self.subnet_count);
match self.network_tx.try_send((subnet, message_bytes)) {
Ok(_) => trace!(?subnet, "Successfully sent message to network"),
Err(TrySendError::Closed(_)) => warn!("Network queue closed (shutting down?)"),
Expand Down
Loading
Loading