Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions crates/consensus/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ pub trait ValueBuilder: Send + Sync + 'static {
///
/// The `ConsensusValue` if found, or `None` if the value is not in the cache.
async fn get_value_by_id(&self, value_id: &ConsensusValueId) -> Option<ConsensusValue>;

/// Store a received proposal value.
///
/// Called when this node receives a proposal from another validator.
/// The value is stored so it can be retrieved later when consensus decides.
/// This enables non-proposer nodes to process the decision correctly.
async fn store_received_value(&self, value_id: ConsensusValueId, value: ConsensusValue);
}

/// Handler for processing decided values.
Expand Down Expand Up @@ -401,6 +408,15 @@ impl Actor for CipherBftHost {
"Assembled complete proposal from single part"
);

// Store the received value so it can be found when consensus decides.
// Without this, non-proposer nodes can't process decisions because
// get_value_by_id() would return None.
let value_id =
informalsystems_malachitebft_core_types::Value::id(&value);
self.value_builder
.store_received_value(value_id, value.clone())
.await;

// Build ProposedValue for the received proposal
// Use the actual valid_round from the proposal (Round::Nil for fresh
// proposals, or the POL round for re-proposals)
Expand Down Expand Up @@ -990,6 +1006,15 @@ impl ValueBuilder for ChannelValueBuilder {
cuts.get(value_id)
.map(|cut| ConsensusValue::from(cut.clone()))
}

async fn store_received_value(&self, value_id: ConsensusValueId, value: ConsensusValue) {
let cut = value.0;
debug!(
"ChannelValueBuilder: Storing received value for height {}, value_id={:?}",
cut.height, value_id
);
self.cuts_by_value_id.write().await.insert(value_id, cut);
}
}

/// Type alias for the decided cuts storage (cut + commit certificate by height).
Expand Down
24 changes: 18 additions & 6 deletions crates/consensus/src/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ pub struct CutProposal {
impl BorshSerialize for CutProposal {
fn serialize<W: Write>(&self, writer: &mut W) -> std::io::Result<()> {
self.height.serialize(writer)?;
(self.round.as_i64() as u32).serialize(writer)?;
// Serialize round as i64 to handle all valid round values
(self.round.as_i64()).serialize(writer)?;
self.value.serialize(writer)?;
(self.pol_round.as_i64() as u32).serialize(writer)?;
// Serialize pol_round as i64 to handle Round::Nil (-1) properly
(self.pol_round.as_i64()).serialize(writer)?;
self.proposer.serialize(writer)?;
Ok(())
}
Expand All @@ -34,11 +36,21 @@ impl BorshSerialize for CutProposal {
impl BorshDeserialize for CutProposal {
fn deserialize_reader<R: Read>(reader: &mut R) -> std::io::Result<Self> {
let height = ConsensusHeight::deserialize_reader(reader)?;
let round_val: u32 = BorshDeserialize::deserialize_reader(reader)?;
let round = Round::new(round_val);
// Deserialize round: stored as i64
let round_val: i64 = BorshDeserialize::deserialize_reader(reader)?;
let round = if round_val < 0 {
Round::Nil
} else {
Round::new(round_val as u32)
};
let value = ConsensusValue::deserialize_reader(reader)?;
let pol_round_val: u32 = BorshDeserialize::deserialize_reader(reader)?;
let pol_round = Round::new(pol_round_val);
// Deserialize pol_round: stored as i64 to handle Round::Nil (-1)
let pol_round_val: i64 = BorshDeserialize::deserialize_reader(reader)?;
let pol_round = if pol_round_val < 0 {
Round::Nil
} else {
Round::new(pol_round_val as u32)
};
let proposer = ConsensusAddress::deserialize_reader(reader)?;
Ok(Self {
height,
Expand Down
67 changes: 67 additions & 0 deletions crates/data-chain/src/primary/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ pub enum PrimaryEvent {
},
}

/// Commands sent to the Primary from external sources
#[derive(Debug)]
pub enum PrimaryCommand {
/// Notify that consensus has decided on a height
ConsensusDecided {
/// The height that was decided
height: u64,
},
}

/// Network interface for Primary-to-Primary communication
#[async_trait::async_trait]
pub trait PrimaryNetwork: Send + Sync {
Expand All @@ -74,6 +84,8 @@ pub struct PrimaryHandle {
worker_sender: mpsc::Sender<WorkerToPrimary>,
/// Sender for network messages (from peer Primaries)
network_sender: mpsc::Sender<(ValidatorId, DclMessage)>,
/// Sender for commands to the Primary
command_sender: mpsc::Sender<PrimaryCommand>,
/// Receiver for Primary events
event_receiver: mpsc::Receiver<PrimaryEvent>,
}
Expand Down Expand Up @@ -124,6 +136,18 @@ impl PrimaryHandle {
pub fn is_finished(&self) -> bool {
self.handle.is_finished()
}

/// Notify the Primary that consensus has decided on a height
///
/// This triggers the Primary to advance its state and continue producing cuts.
pub async fn notify_decision(
&self,
height: u64,
) -> Result<(), mpsc::error::SendError<PrimaryCommand>> {
self.command_sender
.send(PrimaryCommand::ConsensusDecided { height })
.await
}
}

/// Primary process - handles Car creation, attestation collection, and Cut formation
Expand All @@ -147,6 +171,8 @@ pub struct Primary {
to_workers: Vec<mpsc::Sender<PrimaryToWorker>>,
/// Channel to receive messages from peer Primaries
from_network: mpsc::Receiver<(ValidatorId, DclMessage)>,
/// Channel to receive commands (e.g., consensus decisions)
from_commands: mpsc::Receiver<PrimaryCommand>,
/// Network interface
network: Box<dyn PrimaryNetwork>,
/// Optional persistent storage for Cars and attestations
Expand Down Expand Up @@ -180,6 +206,7 @@ impl Primary {
) -> (PrimaryHandle, Vec<mpsc::Receiver<PrimaryToWorker>>) {
let (from_workers_tx, from_workers_rx) = mpsc::channel(1024);
let (from_network_tx, from_network_rx) = mpsc::channel(1024);
let (command_tx, command_rx) = mpsc::channel(64);
let (event_tx, event_rx) = mpsc::channel(256);

// Create worker channels
Expand All @@ -199,6 +226,7 @@ impl Primary {
from_workers_rx,
to_workers,
from_network_rx,
command_rx,
network,
event_tx,
storage,
Expand All @@ -210,19 +238,22 @@ impl Primary {
handle,
worker_sender: from_workers_tx,
network_sender: from_network_tx,
command_sender: command_tx,
event_receiver: event_rx,
};

(primary_handle, worker_receivers)
}

/// Create a new Primary
#[allow(clippy::too_many_arguments)]
pub fn new(
config: PrimaryConfig,
validator_pubkeys: HashMap<ValidatorId, BlsPublicKey>,
from_workers: mpsc::Receiver<WorkerToPrimary>,
to_workers: Vec<mpsc::Sender<PrimaryToWorker>>,
from_network: mpsc::Receiver<(ValidatorId, DclMessage)>,
from_commands: mpsc::Receiver<PrimaryCommand>,
network: Box<dyn PrimaryNetwork>,
event_sender: mpsc::Sender<PrimaryEvent>,
) -> Self {
Expand All @@ -232,6 +263,7 @@ impl Primary {
from_workers,
to_workers,
from_network,
from_commands,
network,
event_sender,
None,
Expand All @@ -246,6 +278,7 @@ impl Primary {
from_workers: mpsc::Receiver<WorkerToPrimary>,
to_workers: Vec<mpsc::Sender<PrimaryToWorker>>,
from_network: mpsc::Receiver<(ValidatorId, DclMessage)>,
from_commands: mpsc::Receiver<PrimaryCommand>,
network: Box<dyn PrimaryNetwork>,
event_sender: mpsc::Sender<PrimaryEvent>,
storage: Option<Arc<dyn CarStore>>,
Expand Down Expand Up @@ -292,6 +325,7 @@ impl Primary {
from_workers,
to_workers,
from_network,
from_commands,
network,
storage,
event_sender,
Expand Down Expand Up @@ -338,6 +372,11 @@ impl Primary {
self.handle_network_message(peer, msg).await;
}

// Handle commands (e.g., consensus decisions)
Some(cmd) = self.from_commands.recv() => {
self.handle_command(cmd).await;
}

// Periodic Car creation
_ = car_interval.tick() => {
self.try_create_car().await;
Expand Down Expand Up @@ -407,6 +446,28 @@ impl Primary {
}
}

/// Handle commands from external sources (e.g., node)
async fn handle_command(&mut self, cmd: PrimaryCommand) {
match cmd {
PrimaryCommand::ConsensusDecided { height } => {
info!(
height,
validator = %self.config.validator_id,
"Received consensus decision notification"
);

// Advance state to allow producing cuts for the next height
self.state.finalize_height(height);

debug!(
new_height = self.state.current_height,
last_finalized = self.state.last_finalized_height,
"Primary state advanced after consensus decision"
);
}
}
}

/// Handle message from peer Primary
async fn handle_network_message(&mut self, peer: ValidatorId, msg: DclMessage) {
match msg {
Expand Down Expand Up @@ -963,6 +1024,7 @@ mod tests {

let (_from_workers_tx, from_workers_rx) = mpsc::channel(100);
let (_from_network_tx, from_network_rx) = mpsc::channel(100);
let (_command_tx, command_rx) = mpsc::channel(64);
let (event_tx, mut event_rx) = mpsc::channel(100);

let network = MockNetwork::new();
Expand All @@ -974,6 +1036,7 @@ mod tests {
from_workers_rx,
vec![],
from_network_rx,
command_rx,
Box::new(network),
event_tx,
);
Expand Down Expand Up @@ -1037,6 +1100,7 @@ mod tests {

let (_from_workers_tx, from_workers_rx) = mpsc::channel(100);
let (_from_network_tx, from_network_rx) = mpsc::channel(100);
let (_command_tx, command_rx) = mpsc::channel(64);
let (event_tx, mut event_rx) = mpsc::channel(100);

let network = MockNetwork::new();
Expand All @@ -1048,6 +1112,7 @@ mod tests {
from_workers_rx,
vec![],
from_network_rx,
command_rx,
Box::new(network),
event_tx,
);
Expand Down Expand Up @@ -1093,6 +1158,7 @@ mod tests {

let (_from_workers_tx, from_workers_rx) = mpsc::channel(100);
let (_from_network_tx, from_network_rx) = mpsc::channel(100);
let (_command_tx, command_rx) = mpsc::channel(64);
let (event_tx, mut event_rx) = mpsc::channel(100);
let (to_worker_tx, mut to_worker_rx) = mpsc::channel::<PrimaryToWorker>(100);

Expand All @@ -1105,6 +1171,7 @@ mod tests {
from_workers_rx,
vec![to_worker_tx],
from_network_rx,
command_rx,
Box::new(network),
event_tx,
None,
Expand Down
6 changes: 6 additions & 0 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,12 @@ impl Node {
cut.cars.len()
);

// Notify Primary that consensus has decided on this height
// This allows Primary to advance its state and produce cuts for the next height
if let Err(e) = primary_handle.notify_decision(height.0).await {
warn!("Failed to notify Primary of consensus decision: {:?}", e);
}

// Execute Cut if execution layer is enabled
if let Some(ref bridge) = execution_bridge {
match bridge.execute_cut(cut).await {
Expand Down