From f4e6dcb3653a1efee503c1c30b753f300eaacecc Mon Sep 17 00:00:00 2001 From: mananuf Date: Mon, 25 May 2026 13:09:34 +0100 Subject: [PATCH] perf(node): pool prep vectors in aggregateFromSnapshot to cut GC pressure (#309) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, aggregateFromSnapshot allocated four nil-initialised slices per data root and discarded them at iteration end: var childProofs []xmss.ChildProof // grew via append doubling var rawPubkeys []xmss.CPubKey var rawSigs []xmss.CSig var rawIDs []uint64 At typical 5-20 data roots per pass × 4 slices, that's 20-80 allocations per pass that the young-gen GC has to clean up. A 400 ms FFI call is a generous window for the runtime to schedule a 10-30 ms STW pause inside — the dominant suspected source of gean's p99 tail above 1 s. ethlambda pays nothing equivalent (Rust ownership, stack-allocated buffers dropped at scope exit per crates/blockchain/src/aggregation.rs:238). This PR adds sync.Pool reuse for all four slice types, plus capacity hints in the pool New functions sized to typical working set (8 children, 32 raw sigs). Pattern mirrors the existing xmss/proof_pool.go — pool stores *[]T, typed get/put wrappers, put resets length to 0 preserving capacity. Refactor of aggregateFromSnapshot: - Wraps the per-data-root loop body in func(){}() so defer pool-Put fires per iteration rather than accumulating to function return - Replaces continue with return inside the anonymous func - Slice access via *bufPtr; append targets *bufPtr = append(*bufPtr, x) Secondary win: defer xmss.FreeSignature(parsed) inside the sig loop also now fires per iteration — previously those C-side handles accumulated across ALL data roots in a pass, only freeing at function return. Tighter memory lifetime. Also adds a capacity hint on allIDs (was 0; now rawIDs + covered count). go build ./... + go test ./node/... green. Closes #309. --- node/aggregation_pool.go | 35 ++++++ node/store_aggregate.go | 225 ++++++++++++++++++++------------------- 2 files changed, 151 insertions(+), 109 deletions(-) create mode 100644 node/aggregation_pool.go diff --git a/node/aggregation_pool.go b/node/aggregation_pool.go new file mode 100644 index 0000000..b801275 --- /dev/null +++ b/node/aggregation_pool.go @@ -0,0 +1,35 @@ +package node + +import ( + "sync" + + "github.com/geanlabs/gean/xmss" +) + +// Per-aggregate scratch slices reused across calls via sync.Pool. Cuts the +// young-gen GC pressure from re-allocating these four slices per data root +// (~5-20 per pass × every interval-2 tick). Initial capacities are first-fit +// guesses; pool reuse grows them to the working set and preserves the +// capacity across passes. +// +// Same pattern as xmss/proof_pool.go: pool stores *[]T, get/put wrappers +// reset length on return so callers always see an empty slice. + +var ( + childProofsPool = sync.Pool{New: func() any { s := make([]xmss.ChildProof, 0, 8); return &s }} + rawPubkeysPool = sync.Pool{New: func() any { s := make([]xmss.CPubKey, 0, 32); return &s }} + rawSigsPool = sync.Pool{New: func() any { s := make([]xmss.CSig, 0, 32); return &s }} + rawIDsPool = sync.Pool{New: func() any { s := make([]uint64, 0, 32); return &s }} +) + +func getChildProofsBuf() *[]xmss.ChildProof { return childProofsPool.Get().(*[]xmss.ChildProof) } +func putChildProofsBuf(b *[]xmss.ChildProof) { *b = (*b)[:0]; childProofsPool.Put(b) } + +func getRawPubkeysBuf() *[]xmss.CPubKey { return rawPubkeysPool.Get().(*[]xmss.CPubKey) } +func putRawPubkeysBuf(b *[]xmss.CPubKey) { *b = (*b)[:0]; rawPubkeysPool.Put(b) } + +func getRawSigsBuf() *[]xmss.CSig { return rawSigsPool.Get().(*[]xmss.CSig) } +func putRawSigsBuf(b *[]xmss.CSig) { *b = (*b)[:0]; rawSigsPool.Put(b) } + +func getRawIDsBuf() *[]uint64 { return rawIDsPool.Get().(*[]uint64) } +func putRawIDsBuf(b *[]uint64) { *b = (*b)[:0]; rawIDsPool.Put(b) } diff --git a/node/store_aggregate.go b/node/store_aggregate.go index f3e4507..0a81e9a 100644 --- a/node/store_aggregate.go +++ b/node/store_aggregate.go @@ -127,138 +127,145 @@ func aggregateFromSnapshot(snap *AggregationSnapshot, cache *xmss.PubKeyCache) ( } for dataRoot := range dataRoots { - prepStart := time.Now() - gossipEntry := snap.attSigs[dataRoot] - newEntry := snap.newEntries[dataRoot] - knownEntry := snap.knownEntries[dataRoot] - - // Need attestation data from any available source. - var attData *types.AttestationData - if gossipEntry != nil { - attData = gossipEntry.Data - } else if newEntry != nil { - attData = newEntry.Data - } else if knownEntry != nil { - attData = knownEntry.Data - } - if attData == nil { - continue - } - - targetState := snap.targetStates[attData.Target.Root] - if targetState == nil { - continue - } - - // Phase 1: Select — greedy pick existing child proofs. - var childProofs []xmss.ChildProof - covered := make(map[uint64]bool) + // Anonymous func per iteration so pooled scratch slices (and the + // defer xmss.FreeSignature inside the sig loop) release per data + // root rather than accumulating until aggregateFromSnapshot returns. + func() { + childProofsBuf := getChildProofsBuf() + defer putChildProofsBuf(childProofsBuf) + rawPubkeysBuf := getRawPubkeysBuf() + defer putRawPubkeysBuf(rawPubkeysBuf) + rawSigsBuf := getRawSigsBuf() + defer putRawSigsBuf(rawSigsBuf) + rawIDsBuf := getRawIDsBuf() + defer putRawIDsBuf(rawIDsBuf) + + prepStart := time.Now() + gossipEntry := snap.attSigs[dataRoot] + newEntry := snap.newEntries[dataRoot] + knownEntry := snap.knownEntries[dataRoot] + + // Need attestation data from any available source. + var attData *types.AttestationData + if gossipEntry != nil { + attData = gossipEntry.Data + } else if newEntry != nil { + attData = newEntry.Data + } else if knownEntry != nil { + attData = knownEntry.Data + } + if attData == nil { + return + } - selectChildProofs(newEntry, targetState, &childProofs, covered, cache) - selectChildProofs(knownEntry, targetState, &childProofs, covered, cache) + targetState := snap.targetStates[attData.Target.Root] + if targetState == nil { + return + } - // Phase 2: Fill — collect raw gossip signatures for uncovered validators. - var rawPubkeys []xmss.CPubKey - var rawSigs []xmss.CSig - var rawIDs []uint64 + // Phase 1: Select — greedy pick existing child proofs. + covered := make(map[uint64]bool) + selectChildProofs(newEntry, targetState, childProofsBuf, covered, cache) + selectChildProofs(knownEntry, targetState, childProofsBuf, covered, cache) + + // Phase 2: Fill — collect raw gossip signatures for uncovered validators. + if gossipEntry != nil && len(gossipEntry.Signatures) > 0 { + sortedSigs := make([]AttestationSignatureEntry, len(gossipEntry.Signatures)) + copy(sortedSigs, gossipEntry.Signatures) + sort.Slice(sortedSigs, func(i, j int) bool { + return sortedSigs[i].ValidatorID < sortedSigs[j].ValidatorID + }) - if gossipEntry != nil && len(gossipEntry.Signatures) > 0 { - sortedSigs := make([]AttestationSignatureEntry, len(gossipEntry.Signatures)) - copy(sortedSigs, gossipEntry.Signatures) - sort.Slice(sortedSigs, func(i, j int) bool { - return sortedSigs[i].ValidatorID < sortedSigs[j].ValidatorID - }) + for _, sigEntry := range sortedSigs { + if covered[sigEntry.ValidatorID] { + continue + } + if sigEntry.ValidatorID >= uint64(len(targetState.Validators)) { + continue + } - for _, sigEntry := range sortedSigs { - if covered[sigEntry.ValidatorID] { - continue - } - if sigEntry.ValidatorID >= uint64(len(targetState.Validators)) { - continue - } + sigHandle := sigEntry.SigHandle + if sigHandle == nil { + parsed, err := xmss.ParseSignature(sigEntry.Signature[:]) + if err != nil { + continue + } + defer xmss.FreeSignature(parsed) + sigHandle = parsed + } - sigHandle := sigEntry.SigHandle - if sigHandle == nil { - parsed, err := xmss.ParseSignature(sigEntry.Signature[:]) + pk, err := cache.Get(targetState.Validators[sigEntry.ValidatorID].AttestationPubkey) if err != nil { continue } - defer xmss.FreeSignature(parsed) - sigHandle = parsed - } - pk, err := cache.Get(targetState.Validators[sigEntry.ValidatorID].AttestationPubkey) - if err != nil { - continue + *rawPubkeysBuf = append(*rawPubkeysBuf, pk) + *rawSigsBuf = append(*rawSigsBuf, sigHandle) + *rawIDsBuf = append(*rawIDsBuf, sigEntry.ValidatorID) } - - rawPubkeys = append(rawPubkeys, pk) - rawSigs = append(rawSigs, sigHandle) - rawIDs = append(rawIDs, sigEntry.ValidatorID) } - } - // Prover requires at least 2 total inputs. - totalInputs := len(rawIDs) + len(childProofs) - if totalInputs < 2 { - continue - } + // Prover requires at least 2 total inputs. + if len(*rawIDsBuf)+len(*childProofsBuf) < 2 { + return + } - // Phase 3: Aggregate — produce recursive proof. - dataRootHash, _ := attData.HashTreeRoot() - slot := uint32(attData.Slot) + // Phase 3: Aggregate — produce recursive proof. + dataRootHash, _ := attData.HashTreeRoot() + slot := uint32(attData.Slot) - ObserveAggregationPrepTime(time.Since(prepStart).Seconds()) + ObserveAggregationPrepTime(time.Since(prepStart).Seconds()) - aggStart := time.Now() - proofBytes, err := xmss.AggregateWithChildren(rawPubkeys, rawSigs, childProofs, dataRootHash, slot) - aggDuration := time.Since(aggStart) - if err != nil { - logger.Error(logger.Signature, "aggregate: failed slot=%d raw=%d children=%d duration=%v: %v", - slot, len(rawIDs), len(childProofs), aggDuration, err) - continue - } + aggStart := time.Now() + proofBytes, err := xmss.AggregateWithChildren(*rawPubkeysBuf, *rawSigsBuf, *childProofsBuf, dataRootHash, slot) + aggDuration := time.Since(aggStart) + if err != nil { + logger.Error(logger.Signature, "aggregate: failed slot=%d raw=%d children=%d duration=%v: %v", + slot, len(*rawIDsBuf), len(*childProofsBuf), aggDuration, err) + return + } - allIDs := make([]uint64, 0, len(rawIDs)) - allIDs = append(allIDs, rawIDs...) - for vid := range covered { - allIDs = append(allIDs, vid) - } + allIDs := make([]uint64, 0, len(*rawIDsBuf)+len(covered)) + allIDs = append(allIDs, (*rawIDsBuf)...) + for vid := range covered { + allIDs = append(allIDs, vid) + } - participants := AggregationBitsFromIndices(allIDs) - proof := &types.AggregatedSignatureProof{ - Participants: participants, - ProofData: proofBytes, - } + participants := AggregationBitsFromIndices(allIDs) + proof := &types.AggregatedSignatureProof{ + Participants: participants, + ProofData: proofBytes, + } - logger.Info(logger.Signature, "aggregate: slot=%d raw=%d children=%d total=%d proof=%d bytes duration=%v", - slot, len(rawIDs), len(childProofs), len(allIDs), len(proofBytes), aggDuration) + logger.Info(logger.Signature, "aggregate: slot=%d raw=%d children=%d total=%d proof=%d bytes duration=%v", + slot, len(*rawIDsBuf), len(*childProofsBuf), len(allIDs), len(proofBytes), aggDuration) - if AggregateMetricsFunc != nil { - AggregateMetricsFunc(aggDuration.Seconds(), len(allIDs)) - } + if AggregateMetricsFunc != nil { + AggregateMetricsFunc(aggDuration.Seconds(), len(allIDs)) + } - postStart := time.Now() - newAggregates = append(newAggregates, &types.SignedAggregatedAttestation{ - Data: attData, - Proof: proof, - }) + postStart := time.Now() + newAggregates = append(newAggregates, &types.SignedAggregatedAttestation{ + Data: attData, + Proof: proof, + }) - mut.PayloadEntries = append(mut.PayloadEntries, PayloadKV{ - DataRoot: dataRoot, - Data: attData, - Proof: proof, - }) + mut.PayloadEntries = append(mut.PayloadEntries, PayloadKV{ + DataRoot: dataRoot, + Data: attData, + Proof: proof, + }) - if gossipEntry != nil { - for _, sig := range gossipEntry.Signatures { - mut.KeysToDelete = append(mut.KeysToDelete, AttestationDeleteKey{ - ValidatorID: sig.ValidatorID, - DataRoot: dataRoot, - }) + if gossipEntry != nil { + for _, sig := range gossipEntry.Signatures { + mut.KeysToDelete = append(mut.KeysToDelete, AttestationDeleteKey{ + ValidatorID: sig.ValidatorID, + DataRoot: dataRoot, + }) + } } - } - ObserveAggregationPostTime(time.Since(postStart).Seconds()) + ObserveAggregationPostTime(time.Since(postStart).Seconds()) + }() } return newAggregates, mut