Skip to content
Merged
3 changes: 3 additions & 0 deletions internal/pkg/bulk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (a actionT) String() string {
return actionStrings[a]
}

// reset zeros blk's fields so it can be reused from the pool. Only freeBlk
// should call this; tests rely on this invariant to use reset as a proxy
// for observing that freeBlk ran.
func (blk *bulkT) reset() {
blk.action = 0
blk.flags = 0
Expand Down
92 changes: 92 additions & 0 deletions internal/pkg/bulk/dispatch_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bulk

import (
"context"
"testing"
)

// BenchmarkDispatchAbortQueue measures allocation behavior when dispatch aborts
// in the first select() of dispatch (blk never enqueued) due to context
// cancellation. This simulates the scenario where agents disconnect while
// their checkin request is waiting to enter the bulk engine's channel.
func BenchmarkDispatchAbortQueue(b *testing.B) {
bulker := NewBulker(nil, nil, WithBlockQueueSize(1))

// Fill the queue so dispatch always blocks in the first select().
bulker.ch <- &bulkT{ch: make(chan respT, 1)}

ctx, cancel := context.WithCancel(context.Background())
cancel() // pre-cancel so dispatch aborts immediately

b.ReportAllocs()

for b.Loop() {
blk := bulker.newBlk(ActionSearch, optionsT{})
_, _ = blk.buf.WriteString(`{"index":"test"}`)
bulker.dispatch(ctx, blk)
}
}

// BenchmarkDispatchAbortResponse measures the per-dispatch allocation cost
// when dispatch aborts in the second select() of dispatch (blk enqueued,
// waiting for response) due to context cancellation. The drain goroutine
// is spawned but its cleanup (freeBlk) is async and does not affect alloc
// counts reported here. See TestDrainAndFreeAbortedBlkResponse for
// verification that the drain completes correctly.
func BenchmarkDispatchAbortResponse(b *testing.B) {
bulker := NewBulker(nil, nil, WithBlockQueueSize(1))

// Drain bulker.ch so dispatch's first select() can always enqueue,
// forcing each iteration into the second select().
done := make(chan struct{})
defer close(done)
go func() {
for {
select {
case <-bulker.ch:
case <-done:
return
}
}
}()

ctx, cancel := context.WithCancel(context.Background())
cancel() // pre-cancel so the second select() aborts immediately

b.ReportAllocs()

for b.Loop() {
blk := bulker.newBlk(ActionSearch, optionsT{})
_, _ = blk.buf.WriteString(`{"index":"test"}`)
bulker.dispatch(ctx, blk)
}
}

// BenchmarkDispatchSuccess measures allocation behavior on the success path
// for comparison with the abort benchmarks.
func BenchmarkDispatchSuccess(b *testing.B) {
bulker := NewBulker(nil, nil, WithBlockQueueSize(1))

b.ReportAllocs()

for b.Loop() {
blk := bulker.newBlk(ActionSearch, optionsT{})
_, _ = blk.buf.WriteString(`{"index":"test"}`)

// Simulate the Run loop: drain channel and respond.
go func() {
item := <-bulker.ch
item.ch <- respT{}
}()

resp := bulker.dispatch(context.Background(), blk)
if resp.err != nil {
b.Fatal(resp.err)
}
bulker.freeBlk(blk)
}
}
125 changes: 125 additions & 0 deletions internal/pkg/bulk/dispatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package bulk

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestDispatchAbortQueueFreesBlk(t *testing.T) {
// When dispatch aborts in its first select() (blk never enqueued),
// blk must be freed back to the pool so it can be reused.
b := NewBulker(nil, nil, WithBlockQueueSize(1))

// Fill the queue so dispatch blocks in its first select().
b.ch <- &bulkT{ch: make(chan respT, 1)}

ctx, cancel := context.WithCancel(context.Background())
// Pre-cancel so the first select() deterministically aborts and blk
// is never enqueued.
cancel()

blk := b.newBlk(ActionSearch, optionsT{})
_, err := blk.buf.WriteString(`{"index":"test"}`)
require.NoError(t, err)

resp := b.dispatch(ctx, blk)
require.Error(t, resp.err, "expected error from cancelled context")

// blk should have been returned to the pool. Getting from the pool should
// return the same (reset) object without a new allocation.
reused, ok := b.blkPool.Get().(*bulkT)
require.True(t, ok)
require.Zero(t, reused.buf.Len(), "expected reused blk to have reset buf")
require.Zero(t, reused.action, "expected reused blk to have reset action")
}

func TestDispatchAbortResponseReachesSecondSelect(t *testing.T) {
// When dispatch aborts in its second select() (blk enqueued, waiting
// for response), it must return ctx.Err() after the Run loop has
// already taken the blk.
b := NewBulker(nil, nil, WithBlockQueueSize(1))

ctx, cancel := context.WithCancel(context.Background())

blk := b.newBlk(ActionSearch, optionsT{})
_, err := blk.buf.WriteString(`{"index":"test"}`)
require.NoError(t, err)

// Drain b.ch and signal once dispatch has enqueued. After this signal,
// dispatch is guaranteed to be in (or about to enter) its second
// select(), so cancelling ctx now deterministically triggers the
// abort from that select().
enqueued := make(chan struct{})
go func() {
<-b.ch
close(enqueued)
}()

// Run dispatch in a goroutine so we can cancel its ctx after it has
// crossed into the second select().
respCh := make(chan respT, 1)
go func() {
respCh <- b.dispatch(ctx, blk)
}()

<-enqueued
cancel()

resp := <-respCh
require.ErrorIs(t, resp.err, context.Canceled)

// Unblock the drain goroutine that dispatch spawned so it doesn't
// linger past the test.
blk.ch <- respT{}
}

func TestDrainAndFreeAbortedBlkResponse(t *testing.T) {
// When the Run loop delivers a response to an abandoned blk,
// drainAndFreeAbortedBlk must free it (which resets its fields and
// returns it to the pool).
b := NewBulker(nil, nil, WithBlockQueueSize(1))

blk := b.newBlk(ActionSearch, optionsT{})
_, err := blk.buf.WriteString(`{"index":"test"}`)
require.NoError(t, err)
require.NotZero(t, blk.action)
require.NotZero(t, blk.buf.Len())

// Simulate the Run loop's late response before the drain runs. blk.ch
// is buffered cap 1, so this send does not block.
blk.ch <- respT{}

// Run synchronously so blk is only touched by this goroutine, making
// the post-drain field reads race-free.
//
// blk.reset() is only called from freeBlk, so observing the reset
// below proves freeBlk ran.
b.drainAndFreeAbortedBlk(blk)

require.Zero(t, blk.action)
require.Zero(t, blk.buf.Len())
}

func TestDispatchSuccess(t *testing.T) {
b := NewBulker(nil, nil, WithBlockQueueSize(1))

ctx := context.Background()
blk := b.newBlk(ActionSearch, optionsT{})
_, err := blk.buf.WriteString(`{"index":"test"}`)
require.NoError(t, err)

// Simulate a fast ES response: send response before timeout.
go func() {
item := <-b.ch
item.ch <- respT{data: nil, err: nil}
}()

resp := b.dispatch(ctx, blk)
require.NoError(t, resp.err)
}
23 changes: 23 additions & 0 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@
defaultApikeyMaxReqSize = 100 * 1024 * 1024
defaultFlushContextTimeout = time.Minute * 1
defaultMaxPendingBulkDispatches int64 = 0 // 0 means no limit

// dispatchAbortDrainTimeout bounds how long the drain helper waits for
// a late response from the Run loop on an abort from the second
// select() of the dispatch method before giving up on freeing blk.
// Three flush intervals give enough headroom for the blk to be picked
// up, sent to ES, and responded to even under heavy load.
dispatchAbortDrainTimeout = 3 * defaultFlushInterval
)

func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker {
Expand Down Expand Up @@ -268,7 +275,7 @@
defer b.remoteOutputMutex.RUnlock()
curCfg := b.remoteOutputConfigMap[name]

hasChanged := false

Check failure on line 278 in internal/pkg/bulk/engine.go

View workflow job for this annotation

GitHub Actions / lint (linux)

QF1007: could merge conditional assignment into variable declaration (staticcheck)

// when output config first added, not reporting change
if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) {
Expand Down Expand Up @@ -632,6 +639,7 @@
Bool("refresh", blk.flags.Has(flagRefresh)).
Dur("rtt", time.Since(start)).
Msg("Dispatch abort queue")
b.freeBlk(blk)
return respT{err: ctx.Err()}
}

Expand All @@ -655,7 +663,22 @@
Bool("refresh", blk.flags.Has(flagRefresh)).
Dur("rtt", time.Since(start)).
Msg("Dispatch abort response")
// blk is in the Run loop's queue; drain the response and free
// asynchronously so the caller can return immediately.
go b.drainAndFreeAbortedBlk(blk)
}

return respT{err: ctx.Err()}
}

// drainAndFreeAbortedBlk waits for the Run loop to deliver a response to an
// abandoned blk, then returns it to the pool. If the response does not
// arrive within dispatchAbortDrainTimeout (e.g. during shutdown), it gives
// up rather than blocking forever; in that case blk is not reclaimed.
func (b *Bulker) drainAndFreeAbortedBlk(blk *bulkT) {
select {
case <-blk.ch:
b.freeBlk(blk)
case <-time.After(dispatchAbortDrainTimeout):
}
}
Loading