Skip to content

feat(node): move interval-2 aggregation off the tick loop into a worker goroutine (#311)#312

Merged
mananuf merged 1 commit into
perf/instrument-aggregation-phasesfrom
feat/aggregation-worker-goroutine
May 25, 2026
Merged

feat(node): move interval-2 aggregation off the tick loop into a worker goroutine (#311)#312
mananuf merged 1 commit into
perf/instrument-aggregation-phasesfrom
feat/aggregation-worker-goroutine

Conversation

@mananuf
Copy link
Copy Markdown
Collaborator

@mananuf mananuf commented May 25, 2026

Summary

Moves committee aggregation off the consensus tick loop into a single worker goroutine. The tick thread snapshots inputs synchronously (cheap, ms) and hands them to the worker via a buffered channel; the worker runs the FFI + publishes results off-tick. Mirrors ethlambda's tokio::spawn_blocking pattern and zeam's worker-thread model. Closes #311.

This is the architecture change Partha pointed at when he said zeam's measurement was "worker thread execution time" — gean now has the same shape.

Why

Before this PR, the tick comment was honest about the trade-off (node/tick.go:48-51):

"Blocks the tick loop for 1-4s but keeps source consistency — headState doesn't change during proving. Async aggregation breaks source alignment because head drifts during the background prover run. Acceptable until prover is <800ms."

We're now under 800ms at p50 (407ms FFI) but p99 still hits 1.4s. While aggregation runs, the tick can't advance to interval 3 (update_safe_target) or interval 4 (accept_new_attestations), and crucially the next slot's interval 0/1 (block production + attestation duties) gets delayed if the previous slot's interval-2 overruns.

ethlambda runs aggregation on tokio::spawn_blocking with a 750ms deadline (ethlambda/crates/blockchain/src/aggregation.rs:395-462). Spec read confirmed off-tick dispatch is permissible — forks/lstar/spec.py's aggregate(store) → (store, aggregates) is a pure transformation; the spec doesn't model when execution happens, only what state must eventually exist before:

  • Soft deadline (interval 3, ~800ms after start): update_safe_target reads latest_new_aggregated_payloads
  • Hard deadline (interval 4, ~1600ms after start): accept_new_attestations promotes latest_new → latest_known

Both fit comfortably inside gean's current p99 of 1.4s.

What changed

node/node.go (+ field, + worker spawn)

 type Engine struct {
     ...
+    AggregationDispatchCh chan AggregationDispatch  // buffered 1
 }

 func (e *Engine) Run(ctx context.Context) {
     ...
     go e.runFetchBatcher(ctx)
+    go e.runAggregationWorker(ctx)
 }

node/tick.go (interval-2 rewrite)

-if currentInterval == 2 && isAgg {
-    aggs := AggregateCommitteeSignatures(e.Store)   // BLOCKING 400-1400ms
-    for _, agg := range aggs {
-        if e.P2P != nil {
-            e.P2P.PublishAggregatedAttestation(context.Background(), agg)
-        }
-    }
-}
+if currentInterval == 2 && isAgg {
+    snap := snapshotAggregationInputs(e.Store)   // cheap, ms
+    if snap != nil {
+        select {
+        case e.AggregationDispatchCh <- AggregationDispatch{snapshot: snap, slot: currentSlot}:
+        default:
+            IncAggregationDispatchDropped()
+        }
+    }
+}

context import removed from tick.go (no longer used after the publish moved to the worker).

node/store_aggregate.go (+ type, + worker function)

type AggregationDispatch struct {
    snapshot *AggregationSnapshot
    slot     uint64
}

func (e *Engine) runAggregationWorker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case dispatch := <-e.AggregationDispatchCh:
            workerStart := time.Now()
            aggs, mut := aggregateFromSnapshot(dispatch.snapshot, e.Store.PubKeyCache)
            applyAggregationMutations(e.Store, mut)
            for _, agg := range aggs {
                if e.P2P != nil {
                    e.P2P.PublishAggregatedAttestation(context.Background(), agg)
                }
            }
            ObserveAggregationWorkerTotalTime(time.Since(workerStart).Seconds())
            logger.Info(...)
        }
    }
}

node/metrics.go (2 new metrics)

  • lean_aggregation_worker_total_time_seconds (STATE_TRANSITION_BUCKETS shape) — end-to-end worker pass time
  • lean_aggregation_dispatch_dropped_total counter — interval-2 dispatches the worker couldn't pick up

Channel-capacity design

AggregationDispatchCh is buffered to 1 deliberately. A backlog of more than one slot means the worker is too slow for the protocol cadence; queueing more dispatches would only delay the wrong work. Dropping is the right answer:

  • Spec permits it (aggregation is best-effort per slot — slot N's drop just means the next slot's accept_new_attestations doesn't get slot N's aggregate; payloads from prior slots are still in latest_known_aggregated_payloads)
  • Drops surface via the counter so operators can size the worker if it happens
  • With current p99 = 1.4s and slot = 4s, drops should be near-zero

Race analysis

applyAggregationMutations writes to:

  • KnownPayloads.PushBatch — already mutex-protected against concurrent gossip writes
  • AttestationSignatures.Delete — same

Both are already designed for concurrent access from gossip handlers. The worker's writes serialise correctly without new locking.

The snapshot read on the tick thread is consistent because s.AttestationSignatures.Snapshot() takes its own mutex and s.NewPayloads.data / s.KnownPayloads.data reads happen on the tick goroutine which is the sole writer for those particular maps (per engine convention).

Behavior change

  • Tick loop no longer blocks on aggregation — interval 3 / 4 / next slot's interval 0 proceed without waiting
  • Aggregation result latency unchanged — still ~400ms p50 / ~1400ms p99 from snapshot to broadcast. The win is that the tick loop is responsive during that window.
  • Drop case (rare): if worker is still busy when next interval-2 fires, the new dispatch increments the counter and is discarded. No crash, no inconsistent state, no work that needed to land before deadline is silently lost — payloads from previous slots remain available in latest_known_aggregated_payloads.

Expected impact

  • p50 tick-loop time spent in aggregation: ~682ms → ~5-10ms (snapshot cost only)
  • p99 tick-loop time spent in aggregation: ~1420ms → ~5-10ms
  • Next-slot block production: starts on time even if previous slot's aggregation overran
  • Gossip handlers stay responsive throughout the aggregation window

Spec compliance

Zero impact. Same eventual store state, same broadcast set, same FFI calls in the same order. Spec permits off-tick execution provided soft/hard deadlines are met, which gean's p99 already does.

Test plan

  • go build ./... clean
  • go test -count=1 ./node/... green
  • go test -count=1 ./... full suite green
  • Post-merge: confirm lean_aggregation_dispatch_dropped_total stays at 0 under typical devnet load; confirm tick-loop responsiveness improvement (proxy: lean_state_transition_time_seconds p99 should not be inflated by interval-2 sitting on it)
  • Hive multi-client soak: verify next-slot duties (block production, attestation production) land on time even when previous slot's interval-2 takes >800ms

Lean-review

  • dead-code: ✅ removed unused context import from tick.go
  • premature-abstraction: single worker, single channel, single dispatch type — sized to current need, no speculative dispatch surface
  • defensive-bloat: N/A
  • duplicates-existing-util: N/A — follows existing runFetchBatcher goroutine pattern in node.go
  • comment-bloat: 4-line comment on AggregationDispatchCh documents the capacity-1 design choice (non-obvious); worker has 5-line block on the ethlambda/zeam pattern + drop semantics; both earn their keep
  • over-validated-boundary: N/A

Stack

Stacks on PR #306 (instrumentation) and PR #308 (snapshot refactor — already merged into perf/instrument-aggregation-phases). Depends on the snapshot type from #308; consumes the per-phase histograms from #306. Pairs naturally with PR #310 (prep-vector pooling) — D + C together give the biggest combined win.

Targets perf/instrument-aggregation-phases. Will land alongside #306 + #310 before the integration branch merges to devnet-4.

…er goroutine (#311)

Before this PR, AggregateCommitteeSignatures ran SYNCHRONOUSLY on the
consensus tick loop at interval 2 (node/tick.go:52). At gean's measured
p99 of ~1.4s, the tick couldn't advance to interval 3 (safe_target),
interval 4 (accept_new_attestations), or the next slot's interval 0
(proposal) until the FFI finished. The existing comment
acknowledged this as "acceptable until prover is <800ms" — we're now
under 800ms at p50 but p99 still slips past.

ethlambda runs aggregation on a tokio::spawn_blocking worker with a
750ms deadline (aggregation.rs:395-462); zeam runs worker threads.
Spec read confirmed off-tick dispatch is permissible (forks/lstar/spec.py
aggregate(store) → (store, aggregates) is a pure transformation; the
spec doesn't model WHEN execution happens, only what state must
eventually exist before the soft deadline at interval 3 ~800ms after
dispatch or the hard deadline at interval 4 ~1600ms after dispatch).

This PR mirrors that off-tick pattern:

  Engine struct:
    + AggregationDispatchCh chan AggregationDispatch (buffered to 1)

  Engine.Run():
    + go e.runAggregationWorker(ctx)

  tick.go interval-2:
    - aggs := AggregateCommitteeSignatures(e.Store)
    - for _, agg := range aggs { e.P2P.PublishAggregatedAttestation(...) }
    + snap := snapshotAggregationInputs(e.Store)
    + select { case e.AggregationDispatchCh <- AggregationDispatch{snap, slot}: default: drop counter }

  New: runAggregationWorker (store_aggregate.go)
    - Drains AggregationDispatchCh
    - Runs aggregateFromSnapshot + applyAggregationMutations off the tick
    - Publishes aggregates to P2P
    - Records lean_aggregation_worker_total_time_seconds

  New: AggregationDispatch type (store_aggregate.go)
    Carries one slot's snapshot + slot number from tick to worker.

  New metrics (metrics.go):
    + lean_aggregation_worker_total_time_seconds (STATE_TRANSITION_BUCKETS)
    + lean_aggregation_dispatch_dropped_total counter

Channel capacity is 1 by design: a backlog of more than one slot
means the worker is too slow and the new dispatch should drop rather
than queue. Drops are spec-permissible (best-effort aggregation per
slot) and surface via the dropped counter. With current numbers
(p99 = 1.4s, slot = 4s) drops should be vanishingly rare.

Store-mutation races: applyAggregationMutations writes to KnownPayloads
(mutex'd in PushBatch) and AttestationSignatures (mutex'd in Delete).
Both already protected against concurrent gossip-handler writes; the
worker's writes serialise correctly without new locking.

Behavior change: the tick loop no longer blocks on aggregation.
Aggregation result latency unchanged; what changes is that the rest
of the tick can proceed while it runs.

Spec compliance: zero impact. Same eventual store state, same broadcast
set, same FFI calls in the same per-aggregate order.

context import removed from tick.go (no longer used after dispatch
rewrite; previously imported only for context.Background() at the
publish call site, which moved to the worker).

go build ./... + go test -count=1 ./... all packages green.

Closes #311.
@mananuf mananuf merged commit f08a6c0 into perf/instrument-aggregation-phases May 25, 2026
@mananuf mananuf deleted the feat/aggregation-worker-goroutine branch May 25, 2026 12:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant