Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions bin/strata/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ fn get_config(args: Args) -> Result<Config, InitError> {
}

fn validate_config(config: Config) -> Result<Config, InitError> {
if !config.client.is_sequencer && config.client.sync_endpoint.is_none() {
return Err(InitError::MissingSyncEndpoint);
}

if config.client.is_sequencer
&& config
.client
Expand Down
136 changes: 136 additions & 0 deletions bin/strata/src/css.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//! Checkpoint sync service wiring for the Strata binary.

use std::sync::Arc;

use anyhow::{Result, anyhow};
use strata_chain_worker_new::ChainWorkerHandle;
use strata_checkpoint_types::EpochSummary;
use strata_consensus_logic::checkpoint_sync::{
CheckpointSyncCtx, CssServiceHandle, start_css_service,
};
use strata_csm_types::CheckpointL1Ref;
use strata_csm_worker::CsmWorkerStatus;
use strata_db_types::DbResult;
use strata_identifiers::Epoch;
use strata_node_context::NodeContext;
use strata_params::RollupParams;
use strata_primitives::{EpochCommitment, L1Height, OLBlockCommitment};
use strata_service::ServiceMonitor;
use strata_status::{OLSyncStatus, OLSyncStatusUpdate, StatusChannel};
use strata_storage::NodeStorage;

/// Production [`CheckpointSyncCtx`] backed by node storage, the chain worker,
/// the CSM monitor and the status channel.
struct StrataCheckpointSyncContext {
/// Node storage for checkpoint and L1 lookups.
storage: Arc<NodeStorage>,
/// Chain worker handle used to apply, advance and finalize epochs.
chain_worker: Arc<ChainWorkerHandle>,
/// Monitor exposing the current CSM worker status.
csm_monitor: Arc<ServiceMonitor<CsmWorkerStatus>>,
/// Status channel for publishing OL sync status updates.
status_channel: Arc<StatusChannel>,
/// Rollup params, used for the L1 reorg-safe depth.
rollup_params: RollupParams,
}

impl StrataCheckpointSyncContext {
fn new(
storage: Arc<NodeStorage>,
chain_worker: Arc<ChainWorkerHandle>,
csm_monitor: Arc<ServiceMonitor<CsmWorkerStatus>>,
status_channel: Arc<StatusChannel>,
rollup_params: RollupParams,
) -> Self {
Self {
storage,
chain_worker,
csm_monitor,
status_channel,
rollup_params,
}
}
}

impl CheckpointSyncCtx for StrataCheckpointSyncContext {
fn rollup_params(&self) -> &RollupParams {
&self.rollup_params
}

async fn fetch_l1_tip_height(&self) -> anyhow::Result<L1Height> {
let tip = self
.storage
.l1()
.get_canonical_chain_tip_async()
.await?
.ok_or_else(|| anyhow!("no L1 canonical chain tip in db"))?;
Ok(tip.0)
}

async fn fetch_csm_status(&self) -> anyhow::Result<CsmWorkerStatus> {
Ok(self.csm_monitor.get_current())
}

async fn get_canonical_epoch_commitment(&self, ep: Epoch) -> DbResult<Option<EpochCommitment>> {
self.storage
.ol_checkpoint()
.get_canonical_epoch_commitment_at_async(ep)
.await
}

async fn get_checkpoint_l1_ref(
&self,
epoch: EpochCommitment,
) -> DbResult<Option<CheckpointL1Ref>> {
self.storage
.ol_checkpoint()
.get_checkpoint_l1_ref_async(epoch)
.await
}

async fn get_epoch_summary(&self, epoch: EpochCommitment) -> DbResult<Option<EpochSummary>> {
self.storage
.ol_checkpoint()
.get_epoch_summary_async(epoch)
.await
}

async fn apply_checkpoint(&self, epoch: EpochCommitment) -> anyhow::Result<()> {
Ok(self.chain_worker.apply_checkpoint(epoch).await?)
}

async fn update_safe_tip(&self, tip: OLBlockCommitment) -> anyhow::Result<()> {
Ok(self.chain_worker.update_safe_tip(tip).await?)
}

async fn finalize_epoch(&self, epoch: EpochCommitment) -> anyhow::Result<()> {
Ok(self.chain_worker.finalize_epoch(epoch).await?)
}

fn publish_ol_sync_status(&self, status: OLSyncStatus) {
self.status_channel
.update_ol_sync_status(OLSyncStatusUpdate::new(status));
}
}

/// Starts the checkpoint sync service.
pub(crate) fn start(
nodectx: &NodeContext,
chain_worker_handle: Arc<ChainWorkerHandle>,
csm_monitor: Arc<ServiceMonitor<CsmWorkerStatus>>,
) -> Result<CssServiceHandle> {
let checkpoint_state_rx = nodectx.status_channel().subscribe_checkpoint_state();
let css_ctx = Arc::new(StrataCheckpointSyncContext::new(
nodectx.storage().clone(),
chain_worker_handle,
csm_monitor,
nodectx.status_channel().clone(),
nodectx.params().rollup().clone(),
));

nodectx.task_manager().handle().block_on(start_css_service(
css_ctx,
checkpoint_state_rx,
nodectx.executor().clone(),
))
}
3 changes: 0 additions & 3 deletions bin/strata/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ pub(crate) enum InitError {
#[error("failed to create node storage: {0}")]
StorageCreation(String),

#[error("missing sync endpoint (required for non-sequencer nodes)")]
MissingSyncEndpoint,

#[error("missing sequencer config file: {0}")]
MissingSequencerConfig(path::PathBuf),

Expand Down
1 change: 1 addition & 0 deletions bin/strata/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
mod args;
mod config;
mod context;
mod css;
mod errors;
mod fcm;
mod genesis;
Expand Down
37 changes: 28 additions & 9 deletions bin/strata/src/run_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use strata_asm_worker::AsmWorkerHandle;
use strata_btcio::{broadcaster::L1BroadcastHandle, writer::EnvelopeHandle};
use strata_chain_worker_new::ChainWorkerHandle;
use strata_config::Config;
use strata_consensus_logic::FcmServiceHandle;
use strata_consensus_logic::{FcmServiceHandle, checkpoint_sync::CssServiceHandle};
use strata_csm_worker::CsmWorkerStatus;
use strata_node_context::{CommonContext, NodeContext};
#[cfg(feature = "sequencer")]
Expand Down Expand Up @@ -75,9 +75,16 @@ impl RunContext {
}

/// Returns the fork choice manager handle.
///
/// Only called on the sequencer RPC path, where the node always runs FCM.
#[cfg(feature = "sequencer")]
pub(crate) fn fcm_handle(&self) -> &Arc<FcmServiceHandle> {
&self.service_handles.fcm_handle
match &self.service_handles.sync_handle {
SyncServiceHandle::Fcm(handle) => handle,
SyncServiceHandle::Css(_) => {
panic!("fcm_handle requested on a non-sequencer (checkpoint sync) node")
}
}
}

/// Returns the executor.
Expand Down Expand Up @@ -153,6 +160,18 @@ impl SequencerServiceHandles {
}
}

/// Handle for whichever OL sync service the node runs.
///
/// A node runs exactly one: the fork-choice manager when it is a sequencer,
/// the checkpoint sync service otherwise.
pub(crate) enum SyncServiceHandle {
/// Fork-choice manager handle (sequencer nodes).
Fcm(Arc<FcmServiceHandle>),
/// Checkpoint sync service handle (non-sequencer nodes).
#[expect(dead_code, reason = "held to keep the service alive; not yet read")]
Css(Arc<CssServiceHandle>),
}

/// Handles for all services.
#[expect(unused, reason = "will be used later")]
pub(crate) struct ServiceHandles {
Expand All @@ -171,8 +190,8 @@ pub(crate) struct ServiceHandles {
/// Handle for the checkpoint worker.
checkpoint_handle: Arc<OLCheckpointWorkerHandle>,

/// Handle for the FCM service.
fcm_handle: Arc<FcmServiceHandle>,
/// Handle for the OL sync service (FCM or checkpoint sync).
sync_handle: SyncServiceHandle,

/// Handles for sequencer-specific services ([`None`] when not running as sequencer).
#[cfg(feature = "sequencer")]
Expand All @@ -187,15 +206,15 @@ impl ServiceHandles {
mempool_handle: Arc<MempoolHandle>,
chain_worker_handle: Arc<ChainWorkerHandle>,
checkpoint_handle: Arc<OLCheckpointWorkerHandle>,
fcm_handle: Arc<FcmServiceHandle>,
sync_handle: SyncServiceHandle,
) -> ServiceHandlesBuilder {
ServiceHandlesBuilder {
asm_handle,
csm_monitor,
mempool_handle,
chain_worker_handle,
checkpoint_handle,
fcm_handle,
sync_handle,
#[cfg(feature = "sequencer")]
sequencer_handles: None,
}
Expand All @@ -219,8 +238,8 @@ pub(crate) struct ServiceHandlesBuilder {
/// Handle for the checkpoint worker.
checkpoint_handle: Arc<OLCheckpointWorkerHandle>,

/// Handle for the FCM service.
fcm_handle: Arc<FcmServiceHandle>,
/// Handle for the OL sync service (FCM or checkpoint sync).
sync_handle: SyncServiceHandle,

/// Handles for sequencer-specific services ([`None`] when not running as sequencer).
#[cfg(feature = "sequencer")]
Expand All @@ -246,7 +265,7 @@ impl ServiceHandlesBuilder {
mempool_handle: self.mempool_handle,
chain_worker_handle: self.chain_worker_handle,
checkpoint_handle: self.checkpoint_handle,
fcm_handle: self.fcm_handle,
sync_handle: self.sync_handle,
#[cfg(feature = "sequencer")]
sequencer_handles: self.sequencer_handles,
}
Expand Down
32 changes: 26 additions & 6 deletions bin/strata/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ use std::sync::Arc;

use anyhow::Result;
use strata_btcio::reader::query::{ReaderValidation, bitcoin_data_reader_task};
use strata_chain_worker_new::start_chain_worker_service_from_ctx;
use strata_chain_worker_new::{ChainWorkerHandle, start_chain_worker_service_from_ctx};
use strata_consensus_logic::{
AsmBlockSubmitter,
sync_manager::{spawn_asm_worker_with_ctx, spawn_csm_listener_with_ctx},
};
use strata_csm_worker::CsmWorkerStatus;
use strata_node_context::NodeContext;
use strata_ol_checkpoint::OLCheckpointBuilder;
use strata_ol_mempool::{MempoolBuilder, MempoolHandle, OLMempoolConfig};
use strata_service::ServiceMonitor;

use crate::{
context::ensure_genesis,
fcm,
css, fcm,
helpers::rollup_to_btcio_params,
run_context::{RunContext, ServiceHandles},
run_context::{RunContext, ServiceHandles, SyncServiceHandle},
};

#[cfg(feature = "sequencer")]
Expand Down Expand Up @@ -296,16 +298,16 @@ pub(crate) fn start_strata_services(
let sequencer_handles =
sequencer_services::start_if_enabled(&nodectx, mempool_handle.clone(), envelope_pubkey)?;

let fcm_handle = fcm::start(&nodectx, chain_worker_handle.clone(), csm_monitor.clone())?;
let fcm_handle = Arc::new(fcm_handle);
let sync_handle =
start_sync_services(&nodectx, chain_worker_handle.clone(), csm_monitor.clone())?;

let service_handles_builder = ServiceHandles::builder(
asm_handle,
csm_monitor,
mempool_handle,
chain_worker_handle,
checkpoint_handle,
fcm_handle,
sync_handle,
);
let service_handles =
sequencer_services::attach_service_handles(service_handles_builder, sequencer_handles)
Expand Down Expand Up @@ -341,6 +343,24 @@ fn start_btcio_reader(nodectx: &NodeContext, asm_handle: Arc<strata_asm_worker::
);
}

/// Starts the OL sync service for the node's role.
///
/// Sequencer nodes run the fork-choice manager; non-sequencer nodes run the
/// checkpoint sync service. A node runs exactly one.
fn start_sync_services(
nodectx: &NodeContext,
chain_worker_handle: Arc<ChainWorkerHandle>,
csm_monitor: Arc<ServiceMonitor<CsmWorkerStatus>>,
) -> Result<SyncServiceHandle> {
if nodectx.config().client.is_sequencer {
let fcm_handle = fcm::start(nodectx, chain_worker_handle, csm_monitor)?;
Ok(SyncServiceHandle::Fcm(Arc::new(fcm_handle)))
} else {
let css_handle = css::start(nodectx, chain_worker_handle, csm_monitor)?;
Ok(SyncServiceHandle::Css(Arc::new(css_handle)))
}
}

/// Starts the mempool service.
fn start_mempool(nodectx: &NodeContext) -> Result<MempoolHandle> {
let config = OLMempoolConfig::default();
Expand Down
6 changes: 6 additions & 0 deletions crates/chain-worker-new/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ workspace = true

[dependencies]
strata-acct-types.workspace = true
strata-asm-common.workspace = true
strata-asm-proto-checkpoint-types.workspace = true
strata-checkpoint-types.workspace = true
strata-db-types.workspace = true
strata-identifiers.workspace = true
strata-ledger-types.workspace = true
strata-node-context.workspace = true
strata-ol-chain-types-new.workspace = true
strata-ol-da.workspace = true
strata-ol-state-support-types.workspace = true
strata-ol-state-types.workspace = true
strata-ol-stf.workspace = true
Expand All @@ -32,8 +35,11 @@ tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
strata-codec.workspace = true
strata-db-store-sled.workspace = true
strata-ol-state-types = { workspace = true, features = ["test-utils"] }
strata-ol-stf = { workspace = true, features = ["test-utils"] }
strata-predicate.workspace = true
sled.workspace = true
threadpool.workspace = true
typed-sled.workspace = true
Loading
Loading