Skip to content

Commit a3f4377

Browse files
authored
feat: replay flashblocks to validator for latency benchmarking (#154)
* feat: replay flashblocks to validator for latency benchmarking Add flashblock replay server that emits collected flashblock payloads to validator clients via websocket during benchmark runs. * chore: addresses pr comments
1 parent d160b6d commit a3f4377

10 files changed

Lines changed: 451 additions & 29 deletions

File tree

configs/examples/rbuilder.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ benchmarks:
1313
- type: node_type
1414
values:
1515
- rbuilder
16+
- type: validator_node_type
17+
values:
18+
- reth
1619
- type: num_blocks
1720
value: 10
1821
- type: gas_limit

runner/benchmark/benchmark.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ func NewParamsFromValues(assignments map[string]interface{}) (*types.RunParams,
5454
} else {
5555
return nil, fmt.Errorf("invalid node type %s", v)
5656
}
57+
case "validator_node_type":
58+
if vStr, ok := v.(string); ok {
59+
params.ValidatorNodeType = vStr
60+
} else {
61+
return nil, fmt.Errorf("invalid validator node type %s", v)
62+
}
5763
case "gas_limit":
5864
if vInt, ok := v.(int); ok {
5965
params.GasLimit = uint64(vInt)

runner/clients/geth/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,8 @@ func (g *GethClient) SetHead(ctx context.Context, blockNumber uint64) error {
275275
func (g *GethClient) FlashblocksClient() types.FlashblocksClient {
276276
return nil
277277
}
278+
279+
// SupportsFlashblocks returns false as geth does not support receiving flashblock payloads.
280+
func (g *GethClient) SupportsFlashblocks() bool {
281+
return false
282+
}

runner/clients/rbuilder/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,8 @@ func (r *RbuilderClient) SetHead(ctx context.Context, blockNumber uint64) error
119119
func (r *RbuilderClient) FlashblocksClient() types.FlashblocksClient {
120120
return r.flashblocksClient
121121
}
122+
123+
// SupportsFlashblocks returns false as rbuilder doesn't support receiving flashblock payloads.
124+
func (r *RbuilderClient) SupportsFlashblocks() bool {
125+
return false
126+
}

runner/clients/reth/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ func (r *RethClient) Run(ctx context.Context, cfg *types.RuntimeConfig) error {
105105
args = append(args, "--db.read-transaction-timeout", "0")
106106
args = append(args, cfg.Args...)
107107

108+
// Add flashblocks URL if provided
109+
if cfg.FlashblocksURL != nil && *cfg.FlashblocksURL != "" {
110+
r.logger.Info("Configuring reth with flashblocks websocket URL", "url", *cfg.FlashblocksURL)
111+
args = append(args, "--flashblocks-url", *cfg.FlashblocksURL)
112+
}
113+
108114
// delete datadir/txpool-transactions-backup.rlp if it exists
109115
txpoolBackupPath := fmt.Sprintf("%s/txpool-transactions-backup.rlp", r.options.DataDirPath)
110116
if _, err := os.Stat(txpoolBackupPath); err == nil {
@@ -283,3 +289,9 @@ func (r *RethClient) SetHead(ctx context.Context, blockNumber uint64) error {
283289
func (r *RethClient) FlashblocksClient() types.FlashblocksClient {
284290
return nil
285291
}
292+
293+
// SupportsFlashblocks returns true if the client supports receiving flashblock payloads.
294+
// Reth supports flashblocks when configured with the appropriate flags.
295+
func (r *RethClient) SupportsFlashblocks() bool {
296+
return true
297+
}

runner/clients/types/types.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
)
1111

1212
type RuntimeConfig struct {
13-
Stdout io.WriteCloser
14-
Stderr io.WriteCloser
15-
Args []string
13+
Stdout io.WriteCloser
14+
Stderr io.WriteCloser
15+
Args []string
16+
FlashblocksURL *string // Optional URL for flashblocks websocket server (only used by clients that support it)
1617
}
1718

1819
// ExecutionClient is an abstraction over the different clients that can be used to run the chain like
@@ -28,4 +29,5 @@ type ExecutionClient interface {
2829
GetVersion(ctx context.Context) (string, error)
2930
SetHead(ctx context.Context, blockNumber uint64) error
3031
FlashblocksClient() FlashblocksClient // returns nil for clients that don't support flashblocks
32+
SupportsFlashblocks() bool // returns true if the client supports receiving flashblock payloads
3133
}
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
package flashblocks
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net/http"
8+
"sync"
9+
"time"
10+
11+
"github.com/base/base-bench/runner/clients/types"
12+
"github.com/ethereum/go-ethereum/log"
13+
"github.com/gorilla/websocket"
14+
)
15+
16+
// ReplayServer replays flashblock payloads to connected clients via websocket.
17+
type ReplayServer struct {
18+
log log.Logger
19+
port uint64
20+
flashblocks []types.FlashblocksPayloadV1
21+
blockTime time.Duration
22+
23+
server *http.Server
24+
upgrader websocket.Upgrader
25+
26+
mu sync.RWMutex
27+
connections []*websocket.Conn
28+
started bool
29+
stopChan chan struct{}
30+
stopOnce sync.Once
31+
}
32+
33+
func NewReplayServer(log log.Logger, port uint64, flashblocks []types.FlashblocksPayloadV1, blockTime time.Duration) *ReplayServer {
34+
return &ReplayServer{
35+
log: log,
36+
port: port,
37+
flashblocks: flashblocks,
38+
blockTime: blockTime,
39+
upgrader: websocket.Upgrader{
40+
CheckOrigin: func(r *http.Request) bool { return true },
41+
},
42+
connections: make([]*websocket.Conn, 0),
43+
stopChan: make(chan struct{}),
44+
}
45+
}
46+
47+
func (s *ReplayServer) URL() string {
48+
return fmt.Sprintf("ws://127.0.0.1:%d", s.port)
49+
}
50+
51+
func (s *ReplayServer) Start(ctx context.Context) error {
52+
s.mu.Lock()
53+
if s.started {
54+
s.mu.Unlock()
55+
return fmt.Errorf("server already started")
56+
}
57+
s.started = true
58+
s.mu.Unlock()
59+
60+
mux := http.NewServeMux()
61+
mux.HandleFunc("/", s.handleWebSocket)
62+
63+
s.server = &http.Server{
64+
Addr: fmt.Sprintf(":%d", s.port),
65+
Handler: mux,
66+
}
67+
68+
go func() {
69+
s.log.Info("Starting flashblock replay server", "port", s.port)
70+
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
71+
s.log.Error("Flashblock replay server error", "err", err)
72+
}
73+
}()
74+
75+
time.Sleep(100 * time.Millisecond)
76+
return nil
77+
}
78+
79+
func (s *ReplayServer) handleWebSocket(w http.ResponseWriter, r *http.Request) {
80+
conn, err := s.upgrader.Upgrade(w, r, nil)
81+
if err != nil {
82+
s.log.Error("Failed to upgrade websocket connection", "err", err)
83+
return
84+
}
85+
86+
s.mu.Lock()
87+
s.connections = append(s.connections, conn)
88+
s.mu.Unlock()
89+
90+
s.log.Info("New client connected to flashblock replay server")
91+
92+
for {
93+
select {
94+
case <-s.stopChan:
95+
return
96+
default:
97+
if _, _, err := conn.ReadMessage(); err != nil {
98+
s.removeConnection(conn)
99+
return
100+
}
101+
}
102+
}
103+
}
104+
105+
func (s *ReplayServer) removeConnection(conn *websocket.Conn) {
106+
s.mu.Lock()
107+
defer s.mu.Unlock()
108+
109+
for i, c := range s.connections {
110+
if c == conn {
111+
s.connections = append(s.connections[:i], s.connections[i+1:]...)
112+
_ = conn.Close()
113+
s.log.Debug("Client disconnected from flashblock replay server")
114+
return
115+
}
116+
}
117+
}
118+
119+
// ReplayFlashblocks replays flashblocks to connected clients at evenly spaced intervals.
120+
func (s *ReplayServer) ReplayFlashblocks(ctx context.Context) error {
121+
if len(s.flashblocks) == 0 {
122+
s.log.Info("No flashblocks to replay")
123+
return nil
124+
}
125+
126+
blockGroups := s.groupFlashblocksByBlock()
127+
128+
s.log.Info("Starting flashblock replay",
129+
"total_flashblocks", len(s.flashblocks),
130+
"num_blocks", len(blockGroups),
131+
)
132+
133+
for blockNum, flashblocks := range blockGroups {
134+
select {
135+
case <-ctx.Done():
136+
return ctx.Err()
137+
default:
138+
}
139+
140+
if len(flashblocks) == 0 {
141+
continue
142+
}
143+
144+
interval := s.blockTime / time.Duration(len(flashblocks)+1)
145+
146+
s.log.Debug("Replaying flashblocks for block",
147+
"block_number", blockNum,
148+
"num_flashblocks", len(flashblocks),
149+
"interval", interval,
150+
)
151+
152+
for i, flashblock := range flashblocks {
153+
select {
154+
case <-ctx.Done():
155+
return ctx.Err()
156+
default:
157+
}
158+
159+
time.Sleep(interval)
160+
161+
if err := s.broadcastFlashblock(flashblock); err != nil {
162+
s.log.Warn("Error broadcasting flashblock", "err", err, "index", i)
163+
}
164+
}
165+
166+
remainingTime := s.blockTime - interval*time.Duration(len(flashblocks))
167+
if remainingTime > 0 {
168+
time.Sleep(remainingTime)
169+
}
170+
}
171+
172+
s.log.Info("Flashblock replay complete")
173+
return nil
174+
}
175+
176+
// groupFlashblocksByBlock groups flashblocks by block number, sorted by index.
177+
func (s *ReplayServer) groupFlashblocksByBlock() map[uint64][]types.FlashblocksPayloadV1 {
178+
groups := make(map[uint64][]types.FlashblocksPayloadV1)
179+
180+
// Build PayloadID -> blockNum mapping from flashblocks with Base
181+
payloadIDToBlockNum := make(map[types.PayloadID]uint64)
182+
for _, fb := range s.flashblocks {
183+
if fb.Base != nil {
184+
payloadIDToBlockNum[fb.PayloadID] = uint64(fb.Base.BlockNumber)
185+
}
186+
}
187+
188+
for _, fb := range s.flashblocks {
189+
var blockNum uint64
190+
if fb.Base != nil {
191+
blockNum = uint64(fb.Base.BlockNumber)
192+
} else if bn, ok := payloadIDToBlockNum[fb.PayloadID]; ok {
193+
blockNum = bn
194+
}
195+
groups[blockNum] = append(groups[blockNum], fb)
196+
}
197+
198+
for blockNum := range groups {
199+
sortByIndex(groups[blockNum])
200+
}
201+
202+
return groups
203+
}
204+
205+
func sortByIndex(flashblocks []types.FlashblocksPayloadV1) {
206+
for i := 1; i < len(flashblocks); i++ {
207+
j := i
208+
for j > 0 && flashblocks[j-1].Index > flashblocks[j].Index {
209+
flashblocks[j-1], flashblocks[j] = flashblocks[j], flashblocks[j-1]
210+
j--
211+
}
212+
}
213+
}
214+
215+
func (s *ReplayServer) broadcastFlashblock(flashblock types.FlashblocksPayloadV1) error {
216+
data, err := json.Marshal(flashblock)
217+
if err != nil {
218+
return fmt.Errorf("failed to marshal flashblock: %w", err)
219+
}
220+
221+
s.mu.RLock()
222+
connections := make([]*websocket.Conn, len(s.connections))
223+
copy(connections, s.connections)
224+
s.mu.RUnlock()
225+
226+
var lastErr error
227+
for _, conn := range connections {
228+
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
229+
s.log.Warn("Failed to send flashblock to client", "err", err)
230+
lastErr = err
231+
}
232+
}
233+
234+
s.log.Debug("Broadcasted flashblock",
235+
"payload_id", fmt.Sprintf("%x", flashblock.PayloadID),
236+
"index", flashblock.Index,
237+
"num_clients", len(connections),
238+
)
239+
240+
return lastErr
241+
}
242+
243+
// Stop stops the server. Safe to call multiple times.
244+
func (s *ReplayServer) Stop() error {
245+
var stopErr error
246+
247+
s.stopOnce.Do(func() {
248+
s.mu.Lock()
249+
if !s.started {
250+
s.mu.Unlock()
251+
return
252+
}
253+
s.started = false
254+
s.mu.Unlock()
255+
256+
s.log.Info("Stopping flashblock replay server")
257+
258+
close(s.stopChan)
259+
260+
s.mu.Lock()
261+
for _, conn := range s.connections {
262+
_ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
263+
_ = conn.Close()
264+
}
265+
s.connections = nil
266+
s.mu.Unlock()
267+
268+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
269+
defer cancel()
270+
271+
if s.server != nil {
272+
if err := s.server.Shutdown(ctx); err != nil {
273+
s.log.Warn("Error shutting down flashblock replay server", "err", err)
274+
stopErr = err
275+
return
276+
}
277+
}
278+
279+
s.log.Info("Flashblock replay server stopped")
280+
})
281+
282+
return stopErr
283+
}
284+
285+
func (s *ReplayServer) WaitForConnection(ctx context.Context, timeout time.Duration) error {
286+
deadline := time.Now().Add(timeout)
287+
288+
for time.Now().Before(deadline) {
289+
select {
290+
case <-ctx.Done():
291+
return ctx.Err()
292+
default:
293+
}
294+
295+
s.mu.RLock()
296+
numConnections := len(s.connections)
297+
s.mu.RUnlock()
298+
299+
if numConnections > 0 {
300+
return nil
301+
}
302+
303+
time.Sleep(100 * time.Millisecond)
304+
}
305+
306+
return fmt.Errorf("timeout waiting for client connection")
307+
}

0 commit comments

Comments
 (0)