diff --git a/crates/consensus/src/host.rs b/crates/consensus/src/host.rs index 04d76d0..678e9dc 100644 --- a/crates/consensus/src/host.rs +++ b/crates/consensus/src/host.rs @@ -104,6 +104,13 @@ pub trait ValueBuilder: Send + Sync + 'static { /// /// The `ConsensusValue` if found, or `None` if the value is not in the cache. async fn get_value_by_id(&self, value_id: &ConsensusValueId) -> Option; + + /// Store a received proposal value. + /// + /// Called when this node receives a proposal from another validator. + /// The value is stored so it can be retrieved later when consensus decides. + /// This enables non-proposer nodes to process the decision correctly. + async fn store_received_value(&self, value_id: ConsensusValueId, value: ConsensusValue); } /// Handler for processing decided values. @@ -401,6 +408,15 @@ impl Actor for CipherBftHost { "Assembled complete proposal from single part" ); + // Store the received value so it can be found when consensus decides. + // Without this, non-proposer nodes can't process decisions because + // get_value_by_id() would return None. + let value_id = + informalsystems_malachitebft_core_types::Value::id(&value); + self.value_builder + .store_received_value(value_id, value.clone()) + .await; + // Build ProposedValue for the received proposal // Use the actual valid_round from the proposal (Round::Nil for fresh // proposals, or the POL round for re-proposals) @@ -990,6 +1006,15 @@ impl ValueBuilder for ChannelValueBuilder { cuts.get(value_id) .map(|cut| ConsensusValue::from(cut.clone())) } + + async fn store_received_value(&self, value_id: ConsensusValueId, value: ConsensusValue) { + let cut = value.0; + debug!( + "ChannelValueBuilder: Storing received value for height {}, value_id={:?}", + cut.height, value_id + ); + self.cuts_by_value_id.write().await.insert(value_id, cut); + } } /// Type alias for the decided cuts storage (cut + commit certificate by height). diff --git a/crates/consensus/src/proposal.rs b/crates/consensus/src/proposal.rs index 555effa..b510391 100644 --- a/crates/consensus/src/proposal.rs +++ b/crates/consensus/src/proposal.rs @@ -23,9 +23,11 @@ pub struct CutProposal { impl BorshSerialize for CutProposal { fn serialize(&self, writer: &mut W) -> std::io::Result<()> { self.height.serialize(writer)?; - (self.round.as_i64() as u32).serialize(writer)?; + // Serialize round as i64 to handle all valid round values + (self.round.as_i64()).serialize(writer)?; self.value.serialize(writer)?; - (self.pol_round.as_i64() as u32).serialize(writer)?; + // Serialize pol_round as i64 to handle Round::Nil (-1) properly + (self.pol_round.as_i64()).serialize(writer)?; self.proposer.serialize(writer)?; Ok(()) } @@ -34,11 +36,21 @@ impl BorshSerialize for CutProposal { impl BorshDeserialize for CutProposal { fn deserialize_reader(reader: &mut R) -> std::io::Result { let height = ConsensusHeight::deserialize_reader(reader)?; - let round_val: u32 = BorshDeserialize::deserialize_reader(reader)?; - let round = Round::new(round_val); + // Deserialize round: stored as i64 + let round_val: i64 = BorshDeserialize::deserialize_reader(reader)?; + let round = if round_val < 0 { + Round::Nil + } else { + Round::new(round_val as u32) + }; let value = ConsensusValue::deserialize_reader(reader)?; - let pol_round_val: u32 = BorshDeserialize::deserialize_reader(reader)?; - let pol_round = Round::new(pol_round_val); + // Deserialize pol_round: stored as i64 to handle Round::Nil (-1) + let pol_round_val: i64 = BorshDeserialize::deserialize_reader(reader)?; + let pol_round = if pol_round_val < 0 { + Round::Nil + } else { + Round::new(pol_round_val as u32) + }; let proposer = ConsensusAddress::deserialize_reader(reader)?; Ok(Self { height, diff --git a/crates/data-chain/src/primary/runner.rs b/crates/data-chain/src/primary/runner.rs index 7bdc0ed..8cfa474 100644 --- a/crates/data-chain/src/primary/runner.rs +++ b/crates/data-chain/src/primary/runner.rs @@ -48,6 +48,16 @@ pub enum PrimaryEvent { }, } +/// Commands sent to the Primary from external sources +#[derive(Debug)] +pub enum PrimaryCommand { + /// Notify that consensus has decided on a height + ConsensusDecided { + /// The height that was decided + height: u64, + }, +} + /// Network interface for Primary-to-Primary communication #[async_trait::async_trait] pub trait PrimaryNetwork: Send + Sync { @@ -74,6 +84,8 @@ pub struct PrimaryHandle { worker_sender: mpsc::Sender, /// Sender for network messages (from peer Primaries) network_sender: mpsc::Sender<(ValidatorId, DclMessage)>, + /// Sender for commands to the Primary + command_sender: mpsc::Sender, /// Receiver for Primary events event_receiver: mpsc::Receiver, } @@ -124,6 +136,18 @@ impl PrimaryHandle { pub fn is_finished(&self) -> bool { self.handle.is_finished() } + + /// Notify the Primary that consensus has decided on a height + /// + /// This triggers the Primary to advance its state and continue producing cuts. + pub async fn notify_decision( + &self, + height: u64, + ) -> Result<(), mpsc::error::SendError> { + self.command_sender + .send(PrimaryCommand::ConsensusDecided { height }) + .await + } } /// Primary process - handles Car creation, attestation collection, and Cut formation @@ -147,6 +171,8 @@ pub struct Primary { to_workers: Vec>, /// Channel to receive messages from peer Primaries from_network: mpsc::Receiver<(ValidatorId, DclMessage)>, + /// Channel to receive commands (e.g., consensus decisions) + from_commands: mpsc::Receiver, /// Network interface network: Box, /// Optional persistent storage for Cars and attestations @@ -180,6 +206,7 @@ impl Primary { ) -> (PrimaryHandle, Vec>) { let (from_workers_tx, from_workers_rx) = mpsc::channel(1024); let (from_network_tx, from_network_rx) = mpsc::channel(1024); + let (command_tx, command_rx) = mpsc::channel(64); let (event_tx, event_rx) = mpsc::channel(256); // Create worker channels @@ -199,6 +226,7 @@ impl Primary { from_workers_rx, to_workers, from_network_rx, + command_rx, network, event_tx, storage, @@ -210,6 +238,7 @@ impl Primary { handle, worker_sender: from_workers_tx, network_sender: from_network_tx, + command_sender: command_tx, event_receiver: event_rx, }; @@ -217,12 +246,14 @@ impl Primary { } /// Create a new Primary + #[allow(clippy::too_many_arguments)] pub fn new( config: PrimaryConfig, validator_pubkeys: HashMap, from_workers: mpsc::Receiver, to_workers: Vec>, from_network: mpsc::Receiver<(ValidatorId, DclMessage)>, + from_commands: mpsc::Receiver, network: Box, event_sender: mpsc::Sender, ) -> Self { @@ -232,6 +263,7 @@ impl Primary { from_workers, to_workers, from_network, + from_commands, network, event_sender, None, @@ -246,6 +278,7 @@ impl Primary { from_workers: mpsc::Receiver, to_workers: Vec>, from_network: mpsc::Receiver<(ValidatorId, DclMessage)>, + from_commands: mpsc::Receiver, network: Box, event_sender: mpsc::Sender, storage: Option>, @@ -292,6 +325,7 @@ impl Primary { from_workers, to_workers, from_network, + from_commands, network, storage, event_sender, @@ -338,6 +372,11 @@ impl Primary { self.handle_network_message(peer, msg).await; } + // Handle commands (e.g., consensus decisions) + Some(cmd) = self.from_commands.recv() => { + self.handle_command(cmd).await; + } + // Periodic Car creation _ = car_interval.tick() => { self.try_create_car().await; @@ -407,6 +446,28 @@ impl Primary { } } + /// Handle commands from external sources (e.g., node) + async fn handle_command(&mut self, cmd: PrimaryCommand) { + match cmd { + PrimaryCommand::ConsensusDecided { height } => { + info!( + height, + validator = %self.config.validator_id, + "Received consensus decision notification" + ); + + // Advance state to allow producing cuts for the next height + self.state.finalize_height(height); + + debug!( + new_height = self.state.current_height, + last_finalized = self.state.last_finalized_height, + "Primary state advanced after consensus decision" + ); + } + } + } + /// Handle message from peer Primary async fn handle_network_message(&mut self, peer: ValidatorId, msg: DclMessage) { match msg { @@ -963,6 +1024,7 @@ mod tests { let (_from_workers_tx, from_workers_rx) = mpsc::channel(100); let (_from_network_tx, from_network_rx) = mpsc::channel(100); + let (_command_tx, command_rx) = mpsc::channel(64); let (event_tx, mut event_rx) = mpsc::channel(100); let network = MockNetwork::new(); @@ -974,6 +1036,7 @@ mod tests { from_workers_rx, vec![], from_network_rx, + command_rx, Box::new(network), event_tx, ); @@ -1037,6 +1100,7 @@ mod tests { let (_from_workers_tx, from_workers_rx) = mpsc::channel(100); let (_from_network_tx, from_network_rx) = mpsc::channel(100); + let (_command_tx, command_rx) = mpsc::channel(64); let (event_tx, mut event_rx) = mpsc::channel(100); let network = MockNetwork::new(); @@ -1048,6 +1112,7 @@ mod tests { from_workers_rx, vec![], from_network_rx, + command_rx, Box::new(network), event_tx, ); @@ -1093,6 +1158,7 @@ mod tests { let (_from_workers_tx, from_workers_rx) = mpsc::channel(100); let (_from_network_tx, from_network_rx) = mpsc::channel(100); + let (_command_tx, command_rx) = mpsc::channel(64); let (event_tx, mut event_rx) = mpsc::channel(100); let (to_worker_tx, mut to_worker_rx) = mpsc::channel::(100); @@ -1105,6 +1171,7 @@ mod tests { from_workers_rx, vec![to_worker_tx], from_network_rx, + command_rx, Box::new(network), event_tx, None, diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 1877d39..2575f15 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -776,6 +776,12 @@ impl Node { cut.cars.len() ); + // Notify Primary that consensus has decided on this height + // This allows Primary to advance its state and produce cuts for the next height + if let Err(e) = primary_handle.notify_decision(height.0).await { + warn!("Failed to notify Primary of consensus decision: {:?}", e); + } + // Execute Cut if execution layer is enabled if let Some(ref bridge) = execution_bridge { match bridge.execute_cut(cut).await {