Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Server: per-block execution timeouts (`--substreams-block-execution-timeout`) are no longer silently swallowed when a WASM host-function panic (e.g. wasmtime) coincides with the deadline. Previously, `recoverExecutionPanic` would return `nil` instead of `CodeDeadlineExceeded`, causing the offending block to be skipped and the stream to complete successfully.
- CI: Docker image login, build and push are now skipped for fork PRs; image is still built (without push) to validate the Dockerfile.

### Added

- added more metrics to identify time spent squashing

## v1.18.5

### Server
Expand Down
25 changes: 18 additions & 7 deletions orchestrator/stage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
)

type mergeMetrics struct {
start time.Time
loadStart time.Time
loadEnd time.Time
mergeStart time.Time
mergeEnd time.Time
saveStart time.Time
saveEnd time.Time
start time.Time
getStoreStart time.Time
getStoreEnd time.Time
loadStart time.Time
loadEnd time.Time
mergeStart time.Time
mergeEnd time.Time
saveStart time.Time
saveEnd time.Time
writeStart time.Time
writeEnd time.Time

blockRange *block.Range
stage int
Expand All @@ -27,6 +31,9 @@ func (m mergeMetrics) logFields() []zap.Field {
f := []zap.Field{
zap.String("total_time", time.Since(m.start).String()),
}
if !m.getStoreStart.IsZero() {
f = append(f, zap.String("get_store_time", m.getStoreEnd.Sub(m.getStoreStart).String()))
}
if !m.loadStart.IsZero() {
f = append(f, zap.String("load_time", m.loadEnd.Sub(m.loadStart).String()))
}
Expand All @@ -39,6 +46,10 @@ func (m mergeMetrics) logFields() []zap.Field {
f = append(f, zap.String("save_time", m.saveEnd.Sub(m.saveStart).String()))
}

if !m.writeEnd.IsZero() {
f = append(f, zap.String("write_time", m.writeEnd.Sub(m.writeStart).String()))
}

if m.blockRange != nil {
f = append(f, zap.Uint64("start_block", m.blockRange.StartBlock), zap.Uint64("end_block", m.blockRange.ExclusiveEndBlock))
}
Expand Down
4 changes: 4 additions & 0 deletions orchestrator/stage/squash.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni

// Retrieve store to merge, from cache or load from storage. Allows skipping of segments
// for handling partials interspearsed with full KVs.
meter.getStoreStart = time.Now()
fullKV, err := modState.getStore(s.ctx, rng.StartBlock) // loads+caches or uses cached store
if err != nil {
return fmt.Errorf("getting store: %w", err)
}
meter.getStoreEnd = time.Now()

// Load
meter.loadStart = time.Now()
Expand Down Expand Up @@ -157,7 +159,9 @@ func (s *Stages) singleSquash(stage *Stage, modState *StoreModuleState, mergeUni
return fmt.Errorf("save full store: %w", err)
}
meter.saveEnd = time.Now()
meter.writeStart = time.Now()
err = writer.Write(ctx)
meter.writeEnd = time.Now()
if err == nil {
go partialKV.DeleteStore(context.Background(), partialFile)
}
Expand Down
Loading