Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type Reactor struct {
// Sequencer signature verification (set after upgrade via SetVerifier/SetSigStore)
verifier sequencer.SequencerVerifier
sigStore *sequencer.SignatureStore

// metrics records sequencer blocksync-V2 catch-up stats. Set after upgrade
// via SetMetrics; defaults to a no-op implementation.
metrics *sequencer.Metrics
}

// NewReactor returns new reactor instance.
Expand Down Expand Up @@ -138,6 +142,7 @@ func NewReactor(
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
metrics: sequencer.NopMetrics(),
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
Expand All @@ -164,6 +169,14 @@ func (bcR *Reactor) SetSigStore(s *sequencer.SignatureStore) {
bcR.sigStore = s
}

// SetMetrics sets the sequencer metrics for blocksync-V2 instrumentation
// (called after upgrade). When unset, metrics default to a no-op.
func (bcR *Reactor) SetMetrics(m *sequencer.Metrics) {
if m != nil {
bcR.metrics = m
}
}

// Pool returns the block pool for broadcast reactor to check peer heights.
func (bcR *Reactor) Pool() *BlockPool {
return bcR.pool
Expand Down Expand Up @@ -431,6 +444,7 @@ func (bcR *Reactor) syncBlockV2(block types.SyncableBlock, blocksSynced *uint64,

bcR.pool.PopRequest()
*blocksSynced++
bcR.metrics.IncSyncV2Blocks()

if *blocksSynced%100 == 0 {
*lastRate = 0.9*(*lastRate) + 0.1*(100/time.Since(*lastHundred).Seconds())
Expand Down
22 changes: 16 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,25 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
)
}

// MetricsProvider returns a consensus and p2p Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics)
// MetricsProvider returns consensus, p2p, state, proxy and sequencer Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics, *sequencer.Metrics)

// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
//
// The sequencer (post-upgrade) metrics use the "morphnode" namespace to align
// with the morph-node executor/syncer metrics (morphnode_sequencer_*), rather
// than the "tendermint" namespace used by the PBFT-era consensus/p2p metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics) {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics, *sequencer.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sequencer.PrometheusMetrics("morphnode", "chain_id", chainID)
}
return cs.NopMetrics(), p2p.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
return cs.NopMetrics(), p2p.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), sequencer.NopMetrics()
}
}

Expand Down Expand Up @@ -520,6 +525,7 @@ func createSequencerComponents(
signer sequencer.Signer,
sigStore *sequencer.SignatureStore,
ha sequencer.SequencerHA,
metrics *sequencer.Metrics,
) (*sequencer.StateV2, *sequencer.BlockBroadcastReactor, error) {
// Create StateV2
stateV2, err := sequencer.NewStateV2(
Expand All @@ -534,6 +540,7 @@ func createSequencerComponents(
if err != nil {
return nil, nil, fmt.Errorf("failed to create StateV2: %w", err)
}
stateV2.SetMetrics(metrics)

// Create BlockBroadcastReactor (not started yet).
// Routines are started later via StartSequencerRoutines() — see OnStart()
Expand All @@ -546,6 +553,7 @@ func createSequencerComponents(
l1Tracker,
sigStore,
)
broadcastReactor.SetMetrics(metrics)
broadcastReactor.SetLogger(logger.With("module", "sequencer"))

return stateV2, broadcastReactor, nil
Expand Down Expand Up @@ -829,7 +837,7 @@ func NewNode(
return nil, err
}

csMetrics, p2pMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
csMetrics, p2pMetrics, smMetrics, abciMetrics, seqMetrics := metricsProvider(genDoc.ChainID)

// Create the proxyApp and establish connections to the ABCI app (consensus, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
Expand Down Expand Up @@ -1044,6 +1052,7 @@ func NewNode(
sequencerSigner,
sigStore,
ha, // HA service injected from NewNode caller; nil disables HA mode
seqMetrics,
); err != nil {
return nil, err
}
Expand All @@ -1057,6 +1066,7 @@ func NewNode(
bcR.SetStateV2(node.stateV2)
bcR.SetVerifier(sequencerVerifier)
bcR.SetSigStore(sigStore)
bcR.SetMetrics(seqMetrics)

// Register BlockBroadcastReactor with Switch
sw.AddReactor("SEQUENCER", node.blockBroadcastReactor)
Expand Down
39 changes: 31 additions & 8 deletions sequencer/broadcast_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type BlockBroadcastReactor struct {
verifier SequencerVerifier
l1Tracker L1Tracker // required: gates peer-block sync on L1 freshness
sigStore *SignatureStore
metrics *Metrics

// syncRequests tracks pending sync channel requests, keyed by height.
// Used to reject unsolicited responses before decode/verification.
Expand Down Expand Up @@ -151,11 +152,20 @@ func NewBlockBroadcastReactor(
verifier: verifier,
l1Tracker: l1Tracker,
sigStore: sigStore,
metrics: NopMetrics(),
}
r.BaseReactor = *p2p.NewBaseReactor("BlockBroadcast", r)
return r
}

// SetMetrics wires the sequencer metrics. Called once after construction.
// When unset, metrics default to a no-op implementation.
func (r *BlockBroadcastReactor) SetMetrics(m *Metrics) {
if m != nil {
r.metrics = m
}
}

func (r *BlockBroadcastReactor) SetLogger(l log.Logger) {
r.BaseService.Logger = l
r.logger = l.With("module", "broadcastReactor")
Expand Down Expand Up @@ -222,6 +232,7 @@ func (r *BlockBroadcastReactor) StartSequencerRoutines() error {
// Open the Receive() gate: from this point on, incoming P2P messages
// will be processed instead of being silently dropped.
r.routinesStarted.Store(true)
r.metrics.SetBcastRoutinesStarted(true)

// Fullnode (no signer): only applyRoutine (P2P sync)
// Nodes with signer (ActiveSeq / HA-Leader / HA-Follower): only broadcastRoutine
Expand All @@ -245,6 +256,7 @@ func (r *BlockBroadcastReactor) OnStop() {

r.startMu.Lock()
r.routinesStarted.Store(false)
r.metrics.SetBcastRoutinesStarted(false)
r.startMu.Unlock()

r.bannedPeersMu.Lock()
Expand Down Expand Up @@ -318,7 +330,7 @@ func (r *BlockBroadcastReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte
msg, err := decodeMsg(msgBytes)
if err != nil {
r.logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
r.banPeer(src, "invalid message encoding")
r.banPeer(src, "decode_error")
return
}

Expand All @@ -343,7 +355,7 @@ func (r *BlockBroadcastReactor) handleBroadcastMsg(msg interface{}, src p2p.Peer
blockV2, err := types.BlockV2FromProto(msg.Block)
if err != nil {
r.logger.Error("Invalid BlockV2 from broadcast channel", "peer", src.ID(), "err", err)
r.banPeer(src, "invalid BlockV2 in broadcast")
r.banPeer(src, "invalid_block")
return
}
localHeight := r.stateV2.LatestHeight()
Expand All @@ -359,6 +371,7 @@ func (r *BlockBroadcastReactor) handleBroadcastMsg(msg interface{}, src p2p.Peer
return
}
if r.markSeen(blockV2.Hash) {
r.metrics.IncBcastBlocksDeduped()
r.logger.Debug("onBlockV2 broadcast dedup", "number", blockV2.Number, "hash", blockV2.Hash.Hex())
return
}
Expand Down Expand Up @@ -395,7 +408,7 @@ func (r *BlockBroadcastReactor) handleSyncMsg(msg interface{}, src p2p.Peer) {
blockV2, err := types.BlockV2FromProto(msg.Block)
if err != nil {
r.logger.Error("Invalid BlockV2 from sync channel", "peer", src.ID(), "err", err)
r.banPeer(src, "invalid BlockV2 in sync response")
r.banPeer(src, "invalid_block")
return
}
r.onBlockV2(blockV2, src, false) // from sync channel
Expand Down Expand Up @@ -438,6 +451,7 @@ func (r *BlockBroadcastReactor) broadcastRoutine() {
continue
}
r.broadcast(block)
r.metrics.SetBroadcastChannelDepth(len(source))
}
}
}
Expand Down Expand Up @@ -491,7 +505,7 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, fromBroa
if errors.Is(err, ErrInvalidSignature) {
r.logger.Error("Block signature invalid, banning peer",
"number", block.Number, "hash", block.Hash.Hex(), "err", err)
r.banPeer(src, "block signature verification failed")
r.banPeer(src, "invalid_sig")
} else {
// Verifier unavailable — local problem, not peer's fault.
r.logger.Error("Verifier unavailable, dropping block without ban",
Expand All @@ -506,7 +520,7 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, fromBroa
localHeight := r.stateV2.LatestHeight()

if r.isNextBlock(block) {
if err := r.applyBlock(block); err != nil {
if err := r.applyBlock(block, "p2p"); err != nil {
r.logger.Error("Apply failed", "number", block.Number, "hash", block.Hash.Hex(), "err", err)
r.pendingCache.Add(block, uint64(localHeight))
return
Expand All @@ -522,6 +536,7 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, fromBroa
// tryApplyFromCache: apply blocks from unlink cache (called by applyRoutine)
// Blocks in cache don't need signature verification (already verified or from sync)
func (r *BlockBroadcastReactor) tryApplyFromCache() {
r.metrics.SetBcastPendingCacheSize(r.pendingCache.Size())
currentBlock := r.stateV2.LatestBlock()
if currentBlock == nil {
return
Expand All @@ -539,7 +554,7 @@ func (r *BlockBroadcastReactor) tryApplyFromCache() {
break
}
r.logger.Debug("Trying to apply from cache", "number", block.Number, "hash", block.Hash.Hex())
if err := r.applyBlock(block); err != nil {
if err := r.applyBlock(block, "cache"); err != nil {
r.logger.Error("Apply from cache failed", "number", block.Number, "err", err)
break
}
Expand All @@ -563,6 +578,7 @@ func (r *BlockBroadcastReactor) checkSyncGap() {
localHeight := r.stateV2.LatestHeight()
maxPeerHeight := r.getPool().MaxPeerHeight()
gap := maxPeerHeight - localHeight
r.metrics.SetBcastSyncGap(gap)
r.logger.Debug("Checking sync goroutines", "gap", gap, "localHeight", localHeight, "maxPeerHeight", maxPeerHeight)
if gap <= smallGapThreshold &&
!r.firstSync.CompareAndSwap(true, false) {
Expand Down Expand Up @@ -647,7 +663,9 @@ func (r *BlockBroadcastReactor) isNextBlock(block *BlockV2) bool {

// applyBlock verifies signature, applies the block atomically, and persists the signature.
// Thread-safe: uses mutex to ensure sequential block application.
func (r *BlockBroadcastReactor) applyBlock(block *BlockV2) error {
// source labels where the block came from: "p2p" (direct from a peer) or
// "cache" (applied later from the pending out-of-order cache).
func (r *BlockBroadcastReactor) applyBlock(block *BlockV2, source string) error {
r.applyMtx.Lock()
defer r.applyMtx.Unlock()

Expand All @@ -664,6 +682,7 @@ func (r *BlockBroadcastReactor) applyBlock(block *BlockV2) error {

// Add to recent blocks
r.recentBlocks.Add(block)
r.metrics.IncBcastBlocksApplied(source)

r.logger.Info("Applied block", "number", block.Number)
return nil
Expand Down Expand Up @@ -777,7 +796,7 @@ func (r *BlockBroadcastReactor) removeTimeoutPeers() {
if peer == nil {
continue
}
r.banPeer(peer, "sync request timed out")
r.banPeer(peer, "sync_timeout")
}
}

Expand Down Expand Up @@ -821,6 +840,10 @@ func (r *BlockBroadcastReactor) removeSyncRequestsByPeer(peerID p2p.ID) {
// reconnect them, and on reconnect AddPeer's isBanned check will let them in.
func (r *BlockBroadcastReactor) banPeer(peer p2p.Peer, reason string) {
peerID := peer.ID()
// reason is a compact, low-cardinality token (e.g. invalid_sig,
// invalid_block, decode_error, sync_timeout); detailed context is logged
// at each call site just before banning.
r.metrics.IncBcastPeersBanned(reason)

if peer.IsPersistent() {
r.logger.Error("[WHITELIST_ALARM] whitelisted peer misbehaved, will reconnect",
Expand Down
1 change: 1 addition & 0 deletions sequencer/broadcast_reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func newReactorForTest() *BlockBroadcastReactor {
syncPeerCounts: make(map[p2p.ID]int),
bannedPeers: make(map[p2p.ID]time.Time),
blockReqLimiter: NewPeerRateLimiter(blockRequestRateLimit, blockRequestBurst),
metrics: NopMetrics(),
}
}

Expand Down
Loading
Loading