diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 6ae80f95e02..ba535dddde8 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -89,6 +89,10 @@ type Reactor struct { // Sequencer signature verification (set after upgrade via SetVerifier/SetSigStore) verifier sequencer.SequencerVerifier sigStore *sequencer.SignatureStore + + // metrics records sequencer blocksync-V2 catch-up stats. Set after upgrade + // via SetMetrics; defaults to a no-op implementation. + metrics *sequencer.Metrics } // NewReactor returns new reactor instance. @@ -138,6 +142,7 @@ func NewReactor( blockSync: blockSync, requestsCh: requestsCh, errorsCh: errorsCh, + metrics: sequencer.NopMetrics(), } bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR) return bcR @@ -164,6 +169,14 @@ func (bcR *Reactor) SetSigStore(s *sequencer.SignatureStore) { bcR.sigStore = s } +// SetMetrics sets the sequencer metrics for blocksync-V2 instrumentation +// (called after upgrade). When unset, metrics default to a no-op. +func (bcR *Reactor) SetMetrics(m *sequencer.Metrics) { + if m != nil { + bcR.metrics = m + } +} + // Pool returns the block pool for broadcast reactor to check peer heights. func (bcR *Reactor) Pool() *BlockPool { return bcR.pool @@ -431,6 +444,7 @@ func (bcR *Reactor) syncBlockV2(block types.SyncableBlock, blocksSynced *uint64, bcR.pool.PopRequest() *blocksSynced++ + bcR.metrics.IncSyncV2Blocks() if *blocksSynced%100 == 0 { *lastRate = 0.9*(*lastRate) + 0.1*(100/time.Since(*lastHundred).Seconds()) diff --git a/node/node.go b/node/node.go index 2a40c2d16b0..644e3160a90 100644 --- a/node/node.go +++ b/node/node.go @@ -118,20 +118,25 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { ) } -// MetricsProvider returns a consensus and p2p Metrics. -type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics) +// MetricsProvider returns consensus, p2p, state, proxy and sequencer Metrics. +type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics, *sequencer.Metrics) // DefaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. +// +// The sequencer (post-upgrade) metrics use the "morphnode" namespace to align +// with the morph-node executor/syncer metrics (morphnode_sequencer_*), rather +// than the "tendermint" namespace used by the PBFT-era consensus/p2p metrics. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { - return func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics) { + return func(chainID string) (*cs.Metrics, *p2p.Metrics, *sm.Metrics, *proxy.Metrics, *sequencer.Metrics) { if config.Prometheus { return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), sm.PrometheusMetrics(config.Namespace, "chain_id", chainID), - proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID) + proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID), + sequencer.PrometheusMetrics("morphnode", "chain_id", chainID) } - return cs.NopMetrics(), p2p.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics() + return cs.NopMetrics(), p2p.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), sequencer.NopMetrics() } } @@ -520,6 +525,7 @@ func createSequencerComponents( signer sequencer.Signer, sigStore *sequencer.SignatureStore, ha sequencer.SequencerHA, + metrics *sequencer.Metrics, ) (*sequencer.StateV2, *sequencer.BlockBroadcastReactor, error) { // Create StateV2 stateV2, err := sequencer.NewStateV2( @@ -534,6 +540,7 @@ func createSequencerComponents( if err != nil { return nil, nil, fmt.Errorf("failed to create StateV2: %w", err) } + stateV2.SetMetrics(metrics) // Create BlockBroadcastReactor (not started yet). // Routines are started later via StartSequencerRoutines() — see OnStart() @@ -546,6 +553,7 @@ func createSequencerComponents( l1Tracker, sigStore, ) + broadcastReactor.SetMetrics(metrics) broadcastReactor.SetLogger(logger.With("module", "sequencer")) return stateV2, broadcastReactor, nil @@ -829,7 +837,7 @@ func NewNode( return nil, err } - csMetrics, p2pMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID) + csMetrics, p2pMetrics, smMetrics, abciMetrics, seqMetrics := metricsProvider(genDoc.ChainID) // Create the proxyApp and establish connections to the ABCI app (consensus, query). proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics) @@ -1044,6 +1052,7 @@ func NewNode( sequencerSigner, sigStore, ha, // HA service injected from NewNode caller; nil disables HA mode + seqMetrics, ); err != nil { return nil, err } @@ -1057,6 +1066,7 @@ func NewNode( bcR.SetStateV2(node.stateV2) bcR.SetVerifier(sequencerVerifier) bcR.SetSigStore(sigStore) + bcR.SetMetrics(seqMetrics) // Register BlockBroadcastReactor with Switch sw.AddReactor("SEQUENCER", node.blockBroadcastReactor) diff --git a/sequencer/broadcast_reactor.go b/sequencer/broadcast_reactor.go index fb7509210d4..2908df0d52d 100644 --- a/sequencer/broadcast_reactor.go +++ b/sequencer/broadcast_reactor.go @@ -102,6 +102,7 @@ type BlockBroadcastReactor struct { verifier SequencerVerifier l1Tracker L1Tracker // required: gates peer-block sync on L1 freshness sigStore *SignatureStore + metrics *Metrics // syncRequests tracks pending sync channel requests, keyed by height. // Used to reject unsolicited responses before decode/verification. @@ -151,11 +152,20 @@ func NewBlockBroadcastReactor( verifier: verifier, l1Tracker: l1Tracker, sigStore: sigStore, + metrics: NopMetrics(), } r.BaseReactor = *p2p.NewBaseReactor("BlockBroadcast", r) return r } +// SetMetrics wires the sequencer metrics. Called once after construction. +// When unset, metrics default to a no-op implementation. +func (r *BlockBroadcastReactor) SetMetrics(m *Metrics) { + if m != nil { + r.metrics = m + } +} + func (r *BlockBroadcastReactor) SetLogger(l log.Logger) { r.BaseService.Logger = l r.logger = l.With("module", "broadcastReactor") @@ -222,6 +232,7 @@ func (r *BlockBroadcastReactor) StartSequencerRoutines() error { // Open the Receive() gate: from this point on, incoming P2P messages // will be processed instead of being silently dropped. r.routinesStarted.Store(true) + r.metrics.SetBcastRoutinesStarted(true) // Fullnode (no signer): only applyRoutine (P2P sync) // Nodes with signer (ActiveSeq / HA-Leader / HA-Follower): only broadcastRoutine @@ -245,6 +256,7 @@ func (r *BlockBroadcastReactor) OnStop() { r.startMu.Lock() r.routinesStarted.Store(false) + r.metrics.SetBcastRoutinesStarted(false) r.startMu.Unlock() r.bannedPeersMu.Lock() @@ -318,7 +330,7 @@ func (r *BlockBroadcastReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte msg, err := decodeMsg(msgBytes) if err != nil { r.logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - r.banPeer(src, "invalid message encoding") + r.banPeer(src, "decode_error") return } @@ -343,7 +355,7 @@ func (r *BlockBroadcastReactor) handleBroadcastMsg(msg interface{}, src p2p.Peer blockV2, err := types.BlockV2FromProto(msg.Block) if err != nil { r.logger.Error("Invalid BlockV2 from broadcast channel", "peer", src.ID(), "err", err) - r.banPeer(src, "invalid BlockV2 in broadcast") + r.banPeer(src, "invalid_block") return } localHeight := r.stateV2.LatestHeight() @@ -359,6 +371,7 @@ func (r *BlockBroadcastReactor) handleBroadcastMsg(msg interface{}, src p2p.Peer return } if r.markSeen(blockV2.Hash) { + r.metrics.IncBcastBlocksDeduped() r.logger.Debug("onBlockV2 broadcast dedup", "number", blockV2.Number, "hash", blockV2.Hash.Hex()) return } @@ -395,7 +408,7 @@ func (r *BlockBroadcastReactor) handleSyncMsg(msg interface{}, src p2p.Peer) { blockV2, err := types.BlockV2FromProto(msg.Block) if err != nil { r.logger.Error("Invalid BlockV2 from sync channel", "peer", src.ID(), "err", err) - r.banPeer(src, "invalid BlockV2 in sync response") + r.banPeer(src, "invalid_block") return } r.onBlockV2(blockV2, src, false) // from sync channel @@ -438,6 +451,7 @@ func (r *BlockBroadcastReactor) broadcastRoutine() { continue } r.broadcast(block) + r.metrics.SetBroadcastChannelDepth(len(source)) } } } @@ -491,7 +505,7 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, fromBroa if errors.Is(err, ErrInvalidSignature) { r.logger.Error("Block signature invalid, banning peer", "number", block.Number, "hash", block.Hash.Hex(), "err", err) - r.banPeer(src, "block signature verification failed") + r.banPeer(src, "invalid_sig") } else { // Verifier unavailable — local problem, not peer's fault. r.logger.Error("Verifier unavailable, dropping block without ban", @@ -506,7 +520,7 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, fromBroa localHeight := r.stateV2.LatestHeight() if r.isNextBlock(block) { - if err := r.applyBlock(block); err != nil { + if err := r.applyBlock(block, "p2p"); err != nil { r.logger.Error("Apply failed", "number", block.Number, "hash", block.Hash.Hex(), "err", err) r.pendingCache.Add(block, uint64(localHeight)) return @@ -522,6 +536,7 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, fromBroa // tryApplyFromCache: apply blocks from unlink cache (called by applyRoutine) // Blocks in cache don't need signature verification (already verified or from sync) func (r *BlockBroadcastReactor) tryApplyFromCache() { + r.metrics.SetBcastPendingCacheSize(r.pendingCache.Size()) currentBlock := r.stateV2.LatestBlock() if currentBlock == nil { return @@ -539,7 +554,7 @@ func (r *BlockBroadcastReactor) tryApplyFromCache() { break } r.logger.Debug("Trying to apply from cache", "number", block.Number, "hash", block.Hash.Hex()) - if err := r.applyBlock(block); err != nil { + if err := r.applyBlock(block, "cache"); err != nil { r.logger.Error("Apply from cache failed", "number", block.Number, "err", err) break } @@ -563,6 +578,7 @@ func (r *BlockBroadcastReactor) checkSyncGap() { localHeight := r.stateV2.LatestHeight() maxPeerHeight := r.getPool().MaxPeerHeight() gap := maxPeerHeight - localHeight + r.metrics.SetBcastSyncGap(gap) r.logger.Debug("Checking sync goroutines", "gap", gap, "localHeight", localHeight, "maxPeerHeight", maxPeerHeight) if gap <= smallGapThreshold && !r.firstSync.CompareAndSwap(true, false) { @@ -647,7 +663,9 @@ func (r *BlockBroadcastReactor) isNextBlock(block *BlockV2) bool { // applyBlock verifies signature, applies the block atomically, and persists the signature. // Thread-safe: uses mutex to ensure sequential block application. -func (r *BlockBroadcastReactor) applyBlock(block *BlockV2) error { +// source labels where the block came from: "p2p" (direct from a peer) or +// "cache" (applied later from the pending out-of-order cache). +func (r *BlockBroadcastReactor) applyBlock(block *BlockV2, source string) error { r.applyMtx.Lock() defer r.applyMtx.Unlock() @@ -664,6 +682,7 @@ func (r *BlockBroadcastReactor) applyBlock(block *BlockV2) error { // Add to recent blocks r.recentBlocks.Add(block) + r.metrics.IncBcastBlocksApplied(source) r.logger.Info("Applied block", "number", block.Number) return nil @@ -777,7 +796,7 @@ func (r *BlockBroadcastReactor) removeTimeoutPeers() { if peer == nil { continue } - r.banPeer(peer, "sync request timed out") + r.banPeer(peer, "sync_timeout") } } @@ -821,6 +840,10 @@ func (r *BlockBroadcastReactor) removeSyncRequestsByPeer(peerID p2p.ID) { // reconnect them, and on reconnect AddPeer's isBanned check will let them in. func (r *BlockBroadcastReactor) banPeer(peer p2p.Peer, reason string) { peerID := peer.ID() + // reason is a compact, low-cardinality token (e.g. invalid_sig, + // invalid_block, decode_error, sync_timeout); detailed context is logged + // at each call site just before banning. + r.metrics.IncBcastPeersBanned(reason) if peer.IsPersistent() { r.logger.Error("[WHITELIST_ALARM] whitelisted peer misbehaved, will reconnect", diff --git a/sequencer/broadcast_reactor_test.go b/sequencer/broadcast_reactor_test.go index 611ae74a03a..12b3afbc353 100644 --- a/sequencer/broadcast_reactor_test.go +++ b/sequencer/broadcast_reactor_test.go @@ -20,6 +20,7 @@ func newReactorForTest() *BlockBroadcastReactor { syncPeerCounts: make(map[p2p.ID]int), bannedPeers: make(map[p2p.ID]time.Time), blockReqLimiter: NewPeerRateLimiter(blockRequestRateLimit, blockRequestBurst), + metrics: NopMetrics(), } } diff --git a/sequencer/metrics.gen.go b/sequencer/metrics.gen.go new file mode 100644 index 00000000000..6425cbc5c8e --- /dev/null +++ b/sequencer/metrics.gen.go @@ -0,0 +1,163 @@ +// Code generated by metricsgen. DO NOT EDIT. + +package sequencer + +import ( + "github.com/go-kit/kit/metrics/discard" + prometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + IsActiveSequencer: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "is_active_sequencer", + Help: "Whether this node is currently the active sequencer (1) or not (0).", + }, labels).With(labelsAndValues...), + BlocksProducedTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "blocks_produced_total", + Help: "Total number of blocks produced by this node while acting as the active sequencer. Only advances on the leader/active node.", + }, labels).With(labelsAndValues...), + BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_interval_seconds", + Help: "Whole seconds between consecutive applied blocks (block-timestamp delta).", + + Buckets: stdprometheus.ExponentialBuckets(1, 2, 6), + }, labels).With(labelsAndValues...), + BlockSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_size_bytes", + Help: "Size in bytes of applied V2 blocks.", + + Buckets: stdprometheus.ExponentialBuckets(1024, 2, 16), + }, labels).With(labelsAndValues...), + ApplyDurationMilliseconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "apply_duration_milliseconds", + Help: "Milliseconds per block-apply step, labeled by step (sig store / geth).", + + Buckets: stdprometheus.ExponentialBuckets(1, 2, 14), + }, append(labels, "step")).With(labelsAndValues...), + BlockTxs: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_txs", + Help: "Number of transactions per applied block.", + + Buckets: stdprometheus.ExponentialBuckets(1, 2, 14), + }, labels).With(labelsAndValues...), + BroadcastChannelDroppedTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "broadcast_channel_dropped_total", + Help: "Total blocks dropped because the local broadcast channel was full.", + }, labels).With(labelsAndValues...), + AssembleDurationMilliseconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "assemble_duration_milliseconds", + Help: "Milliseconds to assemble a block from the L2 node.", + + Buckets: stdprometheus.ExponentialBuckets(1, 2, 14), + }, labels).With(labelsAndValues...), + SignDurationMilliseconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "sign_duration_milliseconds", + Help: "Milliseconds to sign a produced block.", + + Buckets: stdprometheus.ExponentialBuckets(1, 2, 14), + }, labels).With(labelsAndValues...), + CommitDurationMilliseconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "commit_duration_milliseconds", + Help: "Milliseconds to commit a produced block, labeled by mode (single-node).", + + Buckets: stdprometheus.ExponentialBuckets(1, 2, 14), + }, append(labels, "mode")).With(labelsAndValues...), + BroadcastChannelDepth: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "broadcast_channel_depth", + Help: "Current depth of the local broadcast channel.", + }, labels).With(labelsAndValues...), + BcastSyncGap: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "bcast_sync_gap", + Help: "Number of blocks this node is behind the broadcast tip.", + }, labels).With(labelsAndValues...), + BcastBlocksAppliedTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "bcast_blocks_applied_total", + Help: "Total V2 blocks applied via the broadcast reactor, labeled by source (p2p / cache).", + }, append(labels, "source")).With(labelsAndValues...), + BcastRoutinesStarted: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "bcast_routines_started", + Help: "Whether the broadcast reactor routines are running (1) or stopped (0).", + }, labels).With(labelsAndValues...), + BcastPeersBannedTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "bcast_peers_banned_total", + Help: "Total peers banned by the broadcast reactor, labeled by reason. An invalid-signature (forged) block bans its sender and is surfaced here.", + }, append(labels, "reason")).With(labelsAndValues...), + BcastPendingCacheSize: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "bcast_pending_cache_size", + Help: "Number of blocks held in the pending (out-of-order) cache.", + }, labels).With(labelsAndValues...), + BcastBlocksDedupedTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "bcast_blocks_deduped_total", + Help: "Total inbound blocks dropped as already-seen duplicates.", + }, labels).With(labelsAndValues...), + SyncV2BlocksTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "sync_v2_blocks_total", + Help: "Total V2 blocks applied via the blocksync catch-up path. The apply rate is derived on the query side with rate(...) — no float gauge is kept.", + }, labels).With(labelsAndValues...), + } +} + +func NopMetrics() *Metrics { + return &Metrics{ + IsActiveSequencer: discard.NewGauge(), + BlocksProducedTotal: discard.NewCounter(), + BlockIntervalSeconds: discard.NewHistogram(), + BlockSizeBytes: discard.NewHistogram(), + ApplyDurationMilliseconds: discard.NewHistogram(), + BlockTxs: discard.NewHistogram(), + BroadcastChannelDroppedTotal: discard.NewCounter(), + AssembleDurationMilliseconds: discard.NewHistogram(), + SignDurationMilliseconds: discard.NewHistogram(), + CommitDurationMilliseconds: discard.NewHistogram(), + BroadcastChannelDepth: discard.NewGauge(), + BcastSyncGap: discard.NewGauge(), + BcastBlocksAppliedTotal: discard.NewCounter(), + BcastRoutinesStarted: discard.NewGauge(), + BcastPeersBannedTotal: discard.NewCounter(), + BcastPendingCacheSize: discard.NewGauge(), + BcastBlocksDedupedTotal: discard.NewCounter(), + SyncV2BlocksTotal: discard.NewCounter(), + } +} diff --git a/sequencer/metrics.go b/sequencer/metrics.go new file mode 100644 index 00000000000..5edb9ac44e1 --- /dev/null +++ b/sequencer/metrics.go @@ -0,0 +1,171 @@ +package sequencer + +import ( + "time" + + "github.com/go-kit/kit/metrics" +) + +const ( + // MetricsSubsystem is the subsystem label shared by all metrics exposed by + // the sequencer package (block production, P2P broadcast and blocksync V2). + // With the "morphnode" namespace this yields names like + // morphnode_sequencer_*. + MetricsSubsystem = "sequencer" +) + +//go:generate go run ../scripts/metricsgen -struct=Metrics + +// Metrics contains the metrics exposed by the centralized sequencer stack that +// replaces PBFT consensus after the upgrade: block production (StateV2), the +// block broadcast reactor and the blocksync V2 catch-up path. A single struct +// is shared by all three so they land under one subsystem. +// +// All values are integer-valued. Short processing latencies use integer +// milliseconds (sub-second work would round to 0 in seconds); the block +// interval and L1 lag are large-scale and use integer seconds. Use the typed +// helper methods below so call sites never deal with float64 directly. +type Metrics struct { + // ---- Block production (StateV2) ---- + + // Whether this node is currently the active sequencer (1) or not (0). + IsActiveSequencer metrics.Gauge + + // Total number of blocks produced by this node while acting as the active + // sequencer. Only advances on the leader/active node. + BlocksProducedTotal metrics.Counter + + // Whole seconds between consecutive applied blocks (block-timestamp delta). + BlockIntervalSeconds metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1, 2, 6"` + + // Size in bytes of applied V2 blocks. + BlockSizeBytes metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1024, 2, 16"` + + // Milliseconds per block-apply step, labeled by step (sig store / geth). + ApplyDurationMilliseconds metrics.Histogram `metrics_labels:"step" metrics_buckettype:"exp" metrics_bucketsizes:"1, 2, 14"` + + // Number of transactions per applied block. + BlockTxs metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1, 2, 14"` + + // Total blocks dropped because the local broadcast channel was full. + BroadcastChannelDroppedTotal metrics.Counter + + // Milliseconds to assemble a block from the L2 node. + AssembleDurationMilliseconds metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1, 2, 14"` + + // Milliseconds to sign a produced block. + SignDurationMilliseconds metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1, 2, 14"` + + // Milliseconds to commit a produced block, labeled by mode (single-node). + CommitDurationMilliseconds metrics.Histogram `metrics_labels:"mode" metrics_buckettype:"exp" metrics_bucketsizes:"1, 2, 14"` + + // Current depth of the local broadcast channel. + BroadcastChannelDepth metrics.Gauge + + // ---- Block broadcast reactor ---- + + // Number of blocks this node is behind the broadcast tip. + BcastSyncGap metrics.Gauge + + // Total V2 blocks applied via the broadcast reactor, labeled by source + // (p2p / cache). + BcastBlocksAppliedTotal metrics.Counter `metrics_labels:"source"` + + // Whether the broadcast reactor routines are running (1) or stopped (0). + BcastRoutinesStarted metrics.Gauge + + // Total peers banned by the broadcast reactor, labeled by reason. An + // invalid-signature (forged) block bans its sender and is surfaced here. + BcastPeersBannedTotal metrics.Counter `metrics_labels:"reason"` + + // Number of blocks held in the pending (out-of-order) cache. + BcastPendingCacheSize metrics.Gauge + + // Total inbound blocks dropped as already-seen duplicates. + BcastBlocksDedupedTotal metrics.Counter + + // ---- Blocksync V2 catch-up ---- + + // Total V2 blocks applied via the blocksync catch-up path. The apply rate + // is derived on the query side with rate(...) — no float gauge is kept. + SyncV2BlocksTotal metrics.Counter +} + +// ---- Typed helpers (keep float64 conversions out of call sites) ---- + +// setBool sets a 0/1 gauge from a bool. +func setBool(g metrics.Gauge, b bool) { + if b { + g.Set(1) + } else { + g.Set(0) + } +} + +// SetActiveSequencer records whether this node is the active sequencer. +func (m *Metrics) SetActiveSequencer(active bool) { setBool(m.IsActiveSequencer, active) } + +// IncBlocksProduced counts one block produced by this node. +func (m *Metrics) IncBlocksProduced() { m.BlocksProducedTotal.Add(1) } + +// ObserveBlockIntervalSeconds records the whole-second gap between blocks. +func (m *Metrics) ObserveBlockIntervalSeconds(secs uint64) { + m.BlockIntervalSeconds.Observe(float64(secs)) +} + +// ObserveBlockSizeBytes records an applied block's wire size in bytes. +func (m *Metrics) ObserveBlockSizeBytes(n int) { m.BlockSizeBytes.Observe(float64(n)) } + +// ObserveApplyDuration records a block-apply step latency in milliseconds. +func (m *Metrics) ObserveApplyDuration(step string, d time.Duration) { + m.ApplyDurationMilliseconds.With("step", step).Observe(float64(d.Milliseconds())) +} + +// ObserveBlockTxs records the transaction count of an applied block. +func (m *Metrics) ObserveBlockTxs(n int) { m.BlockTxs.Observe(float64(n)) } + +// IncBroadcastChannelDropped counts one block dropped (broadcast channel full). +func (m *Metrics) IncBroadcastChannelDropped() { m.BroadcastChannelDroppedTotal.Add(1) } + +// ObserveAssembleDuration records block-assemble latency in milliseconds. +func (m *Metrics) ObserveAssembleDuration(d time.Duration) { + m.AssembleDurationMilliseconds.Observe(float64(d.Milliseconds())) +} + +// ObserveSignDuration records block-sign latency in milliseconds. +func (m *Metrics) ObserveSignDuration(d time.Duration) { + m.SignDurationMilliseconds.Observe(float64(d.Milliseconds())) +} + +// ObserveCommitDuration records single-node commit latency in milliseconds. +func (m *Metrics) ObserveCommitDuration(mode string, d time.Duration) { + m.CommitDurationMilliseconds.With("mode", mode).Observe(float64(d.Milliseconds())) +} + +// SetBroadcastChannelDepth records the current broadcast channel depth. +func (m *Metrics) SetBroadcastChannelDepth(n int) { m.BroadcastChannelDepth.Set(float64(n)) } + +// SetBcastSyncGap records how many blocks this node is behind the tip. +func (m *Metrics) SetBcastSyncGap(gap int64) { m.BcastSyncGap.Set(float64(gap)) } + +// IncBcastBlocksApplied counts one block applied via the broadcast reactor. +func (m *Metrics) IncBcastBlocksApplied(source string) { + m.BcastBlocksAppliedTotal.With("source", source).Add(1) +} + +// SetBcastRoutinesStarted records whether the reactor routines are running. +func (m *Metrics) SetBcastRoutinesStarted(started bool) { setBool(m.BcastRoutinesStarted, started) } + +// IncBcastPeersBanned counts one peer ban with a low-cardinality reason token. +func (m *Metrics) IncBcastPeersBanned(reason string) { + m.BcastPeersBannedTotal.With("reason", reason).Add(1) +} + +// SetBcastPendingCacheSize records the pending (out-of-order) cache size. +func (m *Metrics) SetBcastPendingCacheSize(n int) { m.BcastPendingCacheSize.Set(float64(n)) } + +// IncBcastBlocksDeduped counts one inbound duplicate block dropped. +func (m *Metrics) IncBcastBlocksDeduped() { m.BcastBlocksDedupedTotal.Add(1) } + +// IncSyncV2Blocks counts one block applied via blocksync catch-up. +func (m *Metrics) IncSyncV2Blocks() { m.SyncV2BlocksTotal.Add(1) } diff --git a/sequencer/state_v2.go b/sequencer/state_v2.go index 4fb5ce139ae..7d62f1cd04d 100644 --- a/sequencer/state_v2.go +++ b/sequencer/state_v2.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/l2node" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/types" ) const ( @@ -42,6 +43,7 @@ type StateV2 struct { sigStore *SignatureStore ha SequencerHA // nil = single-node mode logger log.Logger + metrics *Metrics // Block production blockInterval time.Duration // empty-block fallback interval (default 3s) @@ -77,6 +79,7 @@ func NewStateV2( blockInterval: BlockInterval, fastBlockInterval: FastBlockInterval, logger: logger.With("module", "stateV2"), + metrics: NopMetrics(), broadcastCh: make(chan *BlockV2, 100), } @@ -85,6 +88,14 @@ func NewStateV2( return s, nil } +// SetMetrics wires the sequencer metrics. Called once after construction +// (before OnStart). When unset, metrics default to a no-op implementation. +func (s *StateV2) SetMetrics(m *Metrics) { + if m != nil { + s.metrics = m + } +} + // OnStart implements service.Service. // Initializes state from geth. Nodes with a signer start roleCheckRoutine. func (s *StateV2) OnStart() error { @@ -200,7 +211,11 @@ func resetTimer(t *time.Timer, d time.Duration) { // isActiveSequencer returns true if this node should produce the next block. // For HA mode: must be Raft leader AND L1-designated sequencer. // For single-node mode: must be L1-designated sequencer. -func (s *StateV2) isActiveSequencer() bool { +func (s *StateV2) isActiveSequencer() (active bool) { + defer func() { + s.metrics.SetActiveSequencer(active) + }() + // HA mode: must be Raft leader if s.ha != nil && !s.ha.IsLeader() { return false @@ -236,11 +251,13 @@ func (s *StateV2) assembleBlock() (*BlockV2, bool, error) { parentHash := s.latestBlock.Hash s.mtx.RUnlock() + tAssemble := time.Now() block, collectedL1Msgs, err := s.l2Node.RequestBlockDataV2(parentHash.Bytes()) if err != nil { s.logger.Error("Failed to assemble block", "error", err) return nil, false, err } + s.metrics.ObserveAssembleDuration(time.Since(tAssemble)) return block, collectedL1Msgs, nil } @@ -255,6 +272,7 @@ func (s *StateV2) commitBlock(block *BlockV2, collectedL1Msgs bool) { return } signDur := time.Since(tSign) + s.metrics.ObserveSignDuration(signDur) if s.ha != nil { // HA mode: replicate via Raft. FSM callback handles ApplyBlock + SaveSignature @@ -266,6 +284,7 @@ func (s *StateV2) commitBlock(block *BlockV2, collectedL1Msgs bool) { } commitDur := time.Since(tCommit) totalDur := time.Since(t0) + s.metrics.ObserveCommitDuration("ha", totalDur) s.logger.Debug("[PERF] commitBlock", "mode", "HA", @@ -285,6 +304,7 @@ func (s *StateV2) commitBlock(block *BlockV2, collectedL1Msgs bool) { } applyDur := time.Since(tApply) totalDur := time.Since(t0) + s.metrics.ObserveCommitDuration("single", totalDur) s.logger.Debug("[PERF] commitBlock", "mode", "single", @@ -304,9 +324,11 @@ func (s *StateV2) commitBlock(block *BlockV2, collectedL1Msgs bool) { "txCount", len(block.Transactions), "collectedL1Msgs", collectedL1Msgs) default: + s.metrics.IncBroadcastChannelDropped() s.logger.Error("Broadcast channel full, dropping block", "number", block.Number) } } + s.metrics.IncBlocksProduced() } // signBlock signs the block hash with the signer. @@ -343,6 +365,7 @@ func (s *StateV2) ApplyBlock(block *BlockV2) error { return err } sigDur := time.Since(tSig) + s.metrics.ObserveApplyDuration("sig", sigDur) tGeth := time.Now() applied, err := s.l2Node.ApplyBlockV2(block) @@ -350,8 +373,18 @@ func (s *StateV2) ApplyBlock(block *BlockV2) error { return err } gethDur := time.Since(tGeth) + s.metrics.ObserveApplyDuration("geth", gethDur) if applied { + // Block attributes are recorded on every node that applies a block + // (leader, HA follower, fullnode), so these are role-independent. + // Interval uses block timestamps (not wall-clock) to stay accurate + // even while catching up. + if s.latestBlock != nil && block.Timestamp >= s.latestBlock.Timestamp { + s.metrics.ObserveBlockIntervalSeconds(block.Timestamp - s.latestBlock.Timestamp) + } + s.metrics.ObserveBlockSizeBytes(types.BlockV2ToProto(block).Size()) + s.metrics.ObserveBlockTxs(len(block.Transactions)) s.latestBlock = block } diff --git a/sequencer/state_v2_test.go b/sequencer/state_v2_test.go index f95dfab315a..d411d14e387 100644 --- a/sequencer/state_v2_test.go +++ b/sequencer/state_v2_test.go @@ -4,6 +4,7 @@ import ( "errors" "testing" + "github.com/go-kit/kit/metrics" "github.com/morph-l2/go-ethereum/common" "github.com/tendermint/tendermint/l2node" "github.com/tendermint/tendermint/libs/log" @@ -44,6 +45,14 @@ func (m *mockSequencerVerifier) IsSequencerAt(addr common.Address, l2Height uint return m.isSequencer, nil } +type recordingGauge struct { + value float64 +} + +func (g *recordingGauge) With(labelValues ...string) metrics.Gauge { return g } +func (g *recordingGauge) Set(value float64) { g.value = value } +func (g *recordingGauge) Add(delta float64) { g.value += delta } + // mockSequencerHA is a mock implementation of SequencerHA for testing. type mockSequencerHA struct { leader bool @@ -340,6 +349,55 @@ func TestStateV2_IsActiveSequencer_VerifierError(t *testing.T) { } } +func TestStateV2_IsActiveSequencer_ReportsInactiveAfterBecomingInactive(t *testing.T) { + logger := log.NewNopLogger() + verifier := &mockSequencerVerifier{isSequencer: true} + signer := &mockSignerImpl{address: common.HexToAddress("0x1")} + l1Tracker := &mockL1Tracker{halt: false} + ha := newMockSequencerHA(true) + gauge := &recordingGauge{} + + s, err := NewStateV2(newTestMockL2Node(), logger, verifier, l1Tracker, signer, nil, ha) + if err != nil { + t.Fatalf("NewStateV2: %v", err) + } + metrics := NopMetrics() + metrics.IsActiveSequencer = gauge + s.SetMetrics(metrics) + s.latestBlock = &BlockV2{Number: 10} + + if !s.isActiveSequencer() { + t.Fatal("expected active when HA leader, L1 healthy, and verifier says sequencer") + } + if gauge.value != 1 { + t.Fatalf("active metric = %v, want 1", gauge.value) + } + + l1Tracker.halt = true + if s.isActiveSequencer() { + t.Fatal("expected inactive when L1 tracker halts production") + } + if gauge.value != 0 { + t.Fatalf("active metric after L1 halt = %v, want 0", gauge.value) + } + + l1Tracker.halt = false + if !s.isActiveSequencer() { + t.Fatal("expected active again after L1 recovers") + } + if gauge.value != 1 { + t.Fatalf("active metric after L1 recovery = %v, want 1", gauge.value) + } + + ha.leader = false + if s.isActiveSequencer() { + t.Fatal("expected inactive when HA node is no longer leader") + } + if gauge.value != 0 { + t.Fatalf("active metric after losing HA leadership = %v, want 0", gauge.value) + } +} + // ============================================================================ // New tests: ApplyBlock // ============================================================================