From dcb28614aa57c32e1e48763f6a1c73cad6569817 Mon Sep 17 00:00:00 2001 From: syntrust Date: Wed, 11 Mar 2026 10:24:55 +0800 Subject: [PATCH 1/2] wait block --- ethstorage/miner/worker.go | 57 ++++++++++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/ethstorage/miner/worker.go b/ethstorage/miner/worker.go index f3ae5fc0..a20ee764 100644 --- a/ethstorage/miner/worker.go +++ b/ethstorage/miner/worker.go @@ -115,6 +115,9 @@ type worker struct { db ethdb.Database storageMgr *es.StorageManager + latestL1Head uint64 + headUpdateCh chan struct{} + chainHeadCh chan eth.L1BlockRef startCh chan uint64 exitCh chan struct{} @@ -155,6 +158,7 @@ func newWorker( dataReader: dr, prover: prover, chainHeadCh: chainHeadCh, + headUpdateCh: make(chan struct{}, 1), shardTaskMap: make(map[uint64]task), exitCh: make(chan struct{}), startCh: make(chan uint64, 1), @@ -183,6 +187,27 @@ func newWorker( return worker } +func (w *worker) updateLatestL1Head(incoming uint64) { + for { + current := atomic.LoadUint64(&w.latestL1Head) + if incoming <= current { + w.lg.Info("L1 head is already updated", "current", current, "incoming", incoming) + return + } + if atomic.CompareAndSwapUint64(&w.latestL1Head, current, incoming) { + w.lg.Info("Latest L1 head updated", "current", current, "incoming", incoming) + select { + case w.headUpdateCh <- struct{}{}: + w.lg.Info("headUpdateCh notified", "current", current, "incoming", incoming) + default: + w.lg.Info("headUpdateCh notification skipped to avoid blocking", "current", current, "incoming", incoming) + } + return + } + w.lg.Info("Concurrent L1 head update detected, retrying", "current", current, "incoming", incoming) + } +} + func (w *worker) start() { w.lg.Info("Worker is being started...") atomic.StoreInt32(&w.running, 1) @@ -275,6 +300,7 @@ func (w *worker) newWorkLoop() { w.shardTaskMap[shardIdx] = task case block := <-w.chainHeadCh: + w.updateLatestL1Head(block.Number) if !w.isRunning() { break } @@ -410,6 +436,28 @@ func (w *worker) notifyResultLoop() { } } +// waitUntilBlockAdvanced waits until L1 head is strictly newer than the mined block. +func (w *worker) waitUntilBlockAdvanced(mined uint64) bool { + loggedWaiting := false + for { + latest := atomic.LoadUint64(&w.latestL1Head) + if latest > mined { + w.lg.Info("L1 head advanced since mined block", "mined", mined, "latest", latest) + return true + } + if !loggedWaiting { + w.lg.Info("Hold on submitting mining result until L1 head advances", "mined", mined, "latest", latest) + loggedWaiting = true + } + select { + case <-w.headUpdateCh: + w.lg.Info("L1 head update received", "mined", mined, "latest", atomic.LoadUint64(&w.latestL1Head)) + case <-w.exitCh: + return false + } + } +} + // resultLoop is a standalone goroutine to submit mining result to L1 contract. func (w *worker) resultLoop() { defer w.wg.Done() @@ -426,12 +474,9 @@ func (w *worker) resultLoop() { continue } w.lg.Info("Mining result loop get result", "shard", result.startShardId, "block", result.blockNumber, "nonce", result.nonce) - - // Mining result comes within the same block time window - if tillNextSlot := int64(result.timestamp) + int64(w.config.Slot) - time.Now().Unix(); tillNextSlot > 0 { - // Wait until next block comes to avoid empty blockhash on gas estimation - w.lg.Info("Hold on submitting mining result till block+1", "block", result.blockNumber, "secondsToWait", tillNextSlot) - time.Sleep(time.Duration(tillNextSlot) * time.Second) + // Wait until next block comes to avoid empty blockhash on gas estimation + if !w.waitUntilBlockAdvanced(result.blockNumber.Uint64()) { + return } txHash, err := w.l1API.SubmitMinedResult( context.Background(), From 132791a8184695b7c9993012217a97e3a17068f0 Mon Sep 17 00:00:00 2001 From: syntrust Date: Wed, 11 Mar 2026 14:15:00 +0800 Subject: [PATCH 2/2] simplify --- ethstorage/miner/worker.go | 78 ++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 46 deletions(-) diff --git a/ethstorage/miner/worker.go b/ethstorage/miner/worker.go index a20ee764..518b58b9 100644 --- a/ethstorage/miner/worker.go +++ b/ethstorage/miner/worker.go @@ -115,8 +115,7 @@ type worker struct { db ethdb.Database storageMgr *es.StorageManager - latestL1Head uint64 - headUpdateCh chan struct{} + headUpdateCh chan uint64 chainHeadCh chan eth.L1BlockRef startCh chan uint64 @@ -158,7 +157,7 @@ func newWorker( dataReader: dr, prover: prover, chainHeadCh: chainHeadCh, - headUpdateCh: make(chan struct{}, 1), + headUpdateCh: make(chan uint64, 1), shardTaskMap: make(map[uint64]task), exitCh: make(chan struct{}), startCh: make(chan uint64, 1), @@ -187,27 +186,6 @@ func newWorker( return worker } -func (w *worker) updateLatestL1Head(incoming uint64) { - for { - current := atomic.LoadUint64(&w.latestL1Head) - if incoming <= current { - w.lg.Info("L1 head is already updated", "current", current, "incoming", incoming) - return - } - if atomic.CompareAndSwapUint64(&w.latestL1Head, current, incoming) { - w.lg.Info("Latest L1 head updated", "current", current, "incoming", incoming) - select { - case w.headUpdateCh <- struct{}{}: - w.lg.Info("headUpdateCh notified", "current", current, "incoming", incoming) - default: - w.lg.Info("headUpdateCh notification skipped to avoid blocking", "current", current, "incoming", incoming) - } - return - } - w.lg.Info("Concurrent L1 head update detected, retrying", "current", current, "incoming", incoming) - } -} - func (w *worker) start() { w.lg.Info("Worker is being started...") atomic.StoreInt32(&w.running, 1) @@ -322,6 +300,20 @@ func (w *worker) newWorkLoop() { } } +func (w *worker) updateLatestL1Head(new uint64) { + for { + select { + case w.headUpdateCh <- new: + return + case old := <-w.headUpdateCh: + // Keep monotonic block height while replacing buffered value. + if old > new { + new = old + } + } + } +} + // assign tasks to threads with split nonce range func (w *worker) assignTasks(task task, block eth.L1BlockRef, reqDiff *big.Int) { seg := w.config.NonceLimit / w.config.ThreadsPerShard @@ -436,28 +428,6 @@ func (w *worker) notifyResultLoop() { } } -// waitUntilBlockAdvanced waits until L1 head is strictly newer than the mined block. -func (w *worker) waitUntilBlockAdvanced(mined uint64) bool { - loggedWaiting := false - for { - latest := atomic.LoadUint64(&w.latestL1Head) - if latest > mined { - w.lg.Info("L1 head advanced since mined block", "mined", mined, "latest", latest) - return true - } - if !loggedWaiting { - w.lg.Info("Hold on submitting mining result until L1 head advances", "mined", mined, "latest", latest) - loggedWaiting = true - } - select { - case <-w.headUpdateCh: - w.lg.Info("L1 head update received", "mined", mined, "latest", atomic.LoadUint64(&w.latestL1Head)) - case <-w.exitCh: - return false - } - } -} - // resultLoop is a standalone goroutine to submit mining result to L1 contract. func (w *worker) resultLoop() { defer w.wg.Done() @@ -550,6 +520,22 @@ func (w *worker) resultLoop() { } } +// waitUntilBlockAdvanced waits until L1 head is strictly newer than the mined block. +func (w *worker) waitUntilBlockAdvanced(mined uint64) bool { + for { + select { + case latest := <-w.headUpdateCh: + if latest > mined { + w.lg.Info("L1 head advanced since mined block", "mined", mined, "latest", latest) + return true + } + w.lg.Info("L1 head not advanced since mined block, keep waiting", "mined", mined, "latest", latest) + case <-w.exitCh: + return false + } + } +} + func (w *worker) reportMiningResult(rs *result, txHash common.Hash, err error) { msg := fmt.Sprintf( "A storage proof was generated by es-node for shard %d at block %v.\r\n\r\n",