From 14483fb4d5990cf363fc56535b48f9b8655ad4dd Mon Sep 17 00:00:00 2001 From: Ori Shussman Date: Sun, 12 Apr 2026 07:30:43 +0000 Subject: [PATCH] ringbuf unsafe reader Signed-off-by: Ori Shussman --- ringbuf/reader.go | 201 ++++-------------------- ringbuf/reader_base.go | 163 ++++++++++++++++++++ ringbuf/ring.go | 84 ++++++++-- ringbuf/unsafe_reader.go | 169 ++++++++++++++++++++ ringbuf/unsafe_reader_test.go | 283 ++++++++++++++++++++++++++++++++++ 5 files changed, 716 insertions(+), 184 deletions(-) create mode 100644 ringbuf/reader_base.go create mode 100644 ringbuf/unsafe_reader.go create mode 100644 ringbuf/unsafe_reader_test.go diff --git a/ringbuf/reader.go b/ringbuf/reader.go index d1483d78a..bf2f8dfb5 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -1,145 +1,52 @@ package ringbuf import ( - "errors" "fmt" - "os" - "sync" - "time" - "unsafe" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/internal/platform" - "github.com/cilium/ebpf/internal/sys" ) -var ( - ErrClosed = os.ErrClosed - errEOR = errors.New("end of ring") - errBusy = errors.New("sample not committed yet") -) - -// poller abstracts platform-specific event notification. -type poller interface { - Wait(deadline time.Time) error - Flush() error - Close() error +// Wraps an eventRing to provide a copy-and-advance read for Reader. +type safeRing struct { + eventRing } -// eventRing abstracts platform-specific ring buffer memory access. -type eventRing interface { - size() int - AvailableBytes() uint64 - readRecord(rec *Record) error - Close() error -} +func (sr *safeRing) readRecord(rec *Record) error { + buf := rec.RawSample -// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c -type ringbufHeader struct { - Len uint32 - _ uint32 // pg_off, only used by kernel internals -} - -const ringbufHeaderSize = int(unsafe.Sizeof(ringbufHeader{})) - -func (rh *ringbufHeader) isBusy() bool { - return rh.Len&sys.BPF_RINGBUF_BUSY_BIT != 0 -} + defer func() { sr.advanceTo(sr.pendingPosition()) }() -func (rh *ringbufHeader) isDiscard() bool { - return rh.Len&sys.BPF_RINGBUF_DISCARD_BIT != 0 -} - -func (rh *ringbufHeader) dataLen() int { - return int(rh.Len & ^uint32(sys.BPF_RINGBUF_BUSY_BIT|sys.BPF_RINGBUF_DISCARD_BIT)) -} + err := sr.readRecordUnsafe(rec) + if err != nil { + return err + } -type Record struct { - RawSample []byte + n := len(rec.RawSample) + if cap(buf) < n { + buf = make([]byte, n) + } else { + buf = buf[:n] + } + copy(buf, rec.RawSample) + rec.RawSample = buf - // The minimum number of bytes remaining in the ring buffer after this Record has been read. - Remaining int + return nil } -// Reader allows reading bpf_ringbuf_output -// from user space. +// Allows reading bpf_ringbuf_output from user space. type Reader struct { - poller poller - - // mu protects read/write access to the Reader structure - mu sync.Mutex - ring eventRing - haveData bool - deadline time.Time - bufferSize int - pendingErr error + readerBase + safeRing *safeRing } -// NewReader creates a new BPF ringbuf reader. +// Creates a new BPF ringbuf reader. func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { - if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf { - return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) - } - - maxEntries := int(ringbufMap.MaxEntries()) - if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 { - return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) - } - - poller, err := newPoller(ringbufMap.FD()) - if err != nil { + r := new(Reader) + if err := initReaderBase(&r.readerBase, ringbufMap); err != nil { return nil, err } - - ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries) - if err != nil { - poller.Close() - return nil, fmt.Errorf("failed to create ringbuf ring: %w", err) - } - - return &Reader{ - poller: poller, - ring: ring, - bufferSize: ring.size(), - // On Windows, the wait handle is only set when the reader is created, - // so we miss any wakeups that happened before. - // Do an opportunistic read to get any pending samples. - haveData: platform.IsWindows, - }, nil -} - -// Close frees resources used by the reader. -// -// It interrupts calls to Read. -func (r *Reader) Close() error { - if err := r.poller.Close(); err != nil { - if errors.Is(err, os.ErrClosed) { - return nil - } - return err - } - - // Acquire the lock. This ensures that Read isn't running. - r.mu.Lock() - defer r.mu.Unlock() - - var err error - if r.ring != nil { - err = r.ring.Close() - r.ring = nil - } - - return err -} - -// SetDeadline controls how long Read and ReadInto will block waiting for samples. -// -// Passing a zero time.Time will remove the deadline. -func (r *Reader) SetDeadline(t time.Time) { - r.mu.Lock() - defer r.mu.Unlock() - - r.deadline = t + r.safeRing = &safeRing{r.ring} + return r, nil } // Read the next record from the BPF ringbuf. @@ -166,55 +73,7 @@ func (r *Reader) ReadInto(rec *Record) error { return fmt.Errorf("ringbuffer: %w", ErrClosed) } - for { - if !r.haveData { - if pe := r.pendingErr; pe != nil { - r.pendingErr = nil - return pe - } - - err := r.poller.Wait(r.deadline) - if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { - // Ignoring this for reading a valid entry after timeout or flush. - // This can occur if the producer submitted to the ring buffer - // with BPF_RB_NO_WAKEUP. - r.pendingErr = err - } else if err != nil { - return err - } - r.haveData = true - } - - for { - err := r.ring.readRecord(rec) - // Not using errors.Is which is quite a bit slower - // For a tight loop it might make a difference - if err == errBusy { - continue - } - if err == errEOR { - r.haveData = false - break - } - return err - } - } -} - -// BufferSize returns the size in bytes of the ring buffer -func (r *Reader) BufferSize() int { - return r.bufferSize -} - -// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point, -// until you receive a ErrFlushed error. -func (r *Reader) Flush() error { - return r.poller.Flush() -} - -// AvailableBytes returns the amount of data available to read in the ring buffer in bytes. -func (r *Reader) AvailableBytes() int { - // Don't need to acquire the lock here since the implementation of AvailableBytes - // performs atomic loads on the producer and consumer positions. - return int(r.ring.AvailableBytes()) + return r.readWaitLocked(func() error { + return r.safeRing.readRecord(rec) + }) } diff --git a/ringbuf/reader_base.go b/ringbuf/reader_base.go new file mode 100644 index 000000000..fa83fc028 --- /dev/null +++ b/ringbuf/reader_base.go @@ -0,0 +1,163 @@ +package ringbuf + +import ( + "errors" + "fmt" + "os" + "sync" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/internal/platform" +) + +var ( + ErrClosed = os.ErrClosed + errEOR = errors.New("end of ring") + errBusy = errors.New("sample not committed yet") +) + +type poller interface { + Wait(deadline time.Time) error + Flush() error + Close() error +} + +type Record struct { + RawSample []byte + + // The minimum number of bytes remaining in the ring buffer after this Record has been read. + Remaining int +} + +type readerBase struct { + poller poller + + // mu protects read/write access to the reader state. + mu sync.Mutex + ring eventRing + haveData bool + deadline time.Time + bufferSize int + pendingErr error +} + +func initReaderBase(b *readerBase, ringbufMap *ebpf.Map) error { + if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf { + return fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) + } + + maxEntries := int(ringbufMap.MaxEntries()) + if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 { + return fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) + } + + poller, err := newPoller(ringbufMap.FD()) + if err != nil { + return err + } + + ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries) + if err != nil { + poller.Close() + return fmt.Errorf("failed to create ringbuf ring: %w", err) + } + + b.poller = poller + b.ring = ring + b.bufferSize = ring.size() + // On Windows, the wait handle is only set when the reader is created, + // so we miss any wakeups that happened before. + // Do an opportunistic read to get any pending samples. + b.haveData = platform.IsWindows + return nil +} + +// Close frees resources used by the reader. +// +// It interrupts calls to Read. +func (b *readerBase) Close() error { + if err := b.poller.Close(); err != nil { + if errors.Is(err, os.ErrClosed) { + return nil + } + return err + } + + // Acquire the lock. This ensures that Read isn't running. + b.mu.Lock() + defer b.mu.Unlock() + + var err error + if b.ring != nil { + err = b.ring.Close() + b.ring = nil + } + + return err +} + +// SetDeadline controls how long Read and ReadInto will block waiting for samples. +// +// Passing a zero time.Time will remove the deadline. +func (b *readerBase) SetDeadline(t time.Time) { + b.mu.Lock() + defer b.mu.Unlock() + + b.deadline = t +} + +// BufferSize returns the size in bytes of the ring buffer. +func (b *readerBase) BufferSize() int { + return b.bufferSize +} + +// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point, +// until you receive a ErrFlushed error. +func (b *readerBase) Flush() error { + return b.poller.Flush() +} + +// AvailableBytes returns the amount of data available to read in the ring buffer in bytes. +func (b *readerBase) AvailableBytes() int { + // Don't need to acquire the lock here since the implementation of AvailableBytes + // performs atomic loads on the producer and consumer positions. + return int(b.ring.AvailableBytes()) +} + +// Polls for data and calls read in a loop. Must be called with b.mu held. +func (b *readerBase) readWaitLocked(read func() error) error { + for { + if !b.haveData { + if pe := b.pendingErr; pe != nil { + b.pendingErr = nil + return pe + } + + err := b.poller.Wait(b.deadline) + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { + // Ignoring this for reading a valid entry after timeout or flush. + // This can occur if the producer submitted to the ring buffer + // with BPF_RB_NO_WAKEUP. + b.pendingErr = err + } else if err != nil { + return err + } + b.haveData = true + } + + for { + err := read() + // Not using errors.Is which is quite a bit slower + // For a tight loop it might make a difference + if err == errBusy { + continue + } + if err == errEOR { + b.haveData = false + break + } + return err + } + } +} diff --git a/ringbuf/ring.go b/ringbuf/ring.go index abb704cd5..7062696e2 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -10,11 +10,45 @@ import ( "github.com/cilium/ebpf/internal/sys" ) +type eventRing interface { + size() int + AvailableBytes() uint64 + readRecordUnsafe(rec *Record) error + advanceTo(pos uintptr) + pendingPosition() uintptr + Close() error +} + +// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c +type ringbufHeader struct { + Len uint32 + _ uint32 // pg_off, only used by kernel internals +} + +const ringbufHeaderSize = int(unsafe.Sizeof(ringbufHeader{})) + +func (rh *ringbufHeader) isBusy() bool { + return rh.Len&sys.BPF_RINGBUF_BUSY_BIT != 0 +} + +func (rh *ringbufHeader) isDiscard() bool { + return rh.Len&sys.BPF_RINGBUF_DISCARD_BIT != 0 +} + +func (rh *ringbufHeader) dataLen() int { + return int(rh.Len & ^uint32(sys.BPF_RINGBUF_BUSY_BIT|sys.BPF_RINGBUF_DISCARD_BIT)) +} + type ringReader struct { // These point into mmap'ed memory and must be accessed atomically. prod_pos, cons_pos *uintptr mask uintptr ring []byte + + // Logical consumer position tracking deferred advancement. + // Only valid when hasPending is true. + pendingCons uintptr + hasPending bool } func newRingReader(cons_ptr, prod_ptr *uintptr, ring []byte) *ringReader { @@ -41,10 +75,17 @@ func (rr *ringReader) AvailableBytes() uint64 { return uint64(prod - cons) } -// Read a record from an event ring. -func (rr *ringReader) readRecord(rec *Record) error { +// Reads the next non-discard record from the ring buffer. +// +// Sets rec.RawSample to a slice of the mmap'd ring buffer memory and does +// not advance the consumer position. Call advanceTo to release the space. +func (rr *ringReader) readRecordUnsafe(rec *Record) error { prod := atomic.LoadUintptr(rr.prod_pos) - cons := atomic.LoadUintptr(rr.cons_pos) + + cons := rr.pendingCons + if !rr.hasPending { + cons = atomic.LoadUintptr(rr.cons_pos) + } for { if remaining := prod - cons; remaining == 0 { @@ -81,21 +122,38 @@ func (rr *ringReader) readRecord(rec *Record) error { if header.isDiscard() { // when the record header indicates that the data should be - // discarded, we skip it by just updating the consumer position + // discarded, we skip it by just updating the pending position // to the next record. - atomic.StoreUintptr(rr.cons_pos, cons) + rr.pendingCons = cons + rr.hasPending = true continue } - if n := header.dataLen(); cap(rec.RawSample) < n { - rec.RawSample = make([]byte, n) - } else { - rec.RawSample = rec.RawSample[:n] - } - - copy(rec.RawSample, rr.ring[start:]) + n := header.dataLen() + rec.RawSample = rr.ring[start : start+uintptr(n)] rec.Remaining = int(prod - cons) - atomic.StoreUintptr(rr.cons_pos, cons) + rr.pendingCons = cons + rr.hasPending = true return nil } } + +// Sets the consumer position to pos, releasing ring buffer space up to that +// point. If pos matches the current pending read cursor, resets the pending +// state so the next read starts from the committed position. +func (rr *ringReader) advanceTo(pos uintptr) { + atomic.StoreUintptr(rr.cons_pos, pos) + if rr.hasPending && pos == rr.pendingCons { + rr.hasPending = false + } +} + +// Returns the current read cursor position. This is the consumer position +// that includes all records read so far (including discards) but not yet +// committed. +func (rr *ringReader) pendingPosition() uintptr { + if rr.hasPending { + return rr.pendingCons + } + return atomic.LoadUintptr(rr.cons_pos) +} diff --git a/ringbuf/unsafe_reader.go b/ringbuf/unsafe_reader.go new file mode 100644 index 000000000..aafe311d6 --- /dev/null +++ b/ringbuf/unsafe_reader.go @@ -0,0 +1,169 @@ +package ringbuf + +import ( + "fmt" + + "github.com/cilium/ebpf" +) + +// An opaque token returned by [UnsafeReader.Read]. +// Pass it to [UnsafeReader.Commit] to release the corresponding ring buffer space. +type CommitToken struct { + consPos uintptr +} + +type UnsafeRecord struct { + Record +} + +type pendingItem struct { + consPos uintptr + committed bool +} + +// Allows zero-copy reading from a BPF ring buffer. +// +// Records returned by [Read] point directly into the memory-mapped ring buffer +// region. The data is valid until the corresponding [CommitToken] is committed +// via [Commit] or [CommitAll]. After committing, the kernel may overwrite the +// underlying memory at any time. +type UnsafeReader struct { + readerBase + pending []pendingItem +} + +// Creates a new zero-copy BPF ringbuf reader. +func NewUnsafeReader(ringbufMap *ebpf.Map) (*UnsafeReader, error) { + r := new(UnsafeReader) + if err := initReaderBase(&r.readerBase, ringbufMap); err != nil { + return nil, err + } + return r, nil +} + +// Returns the next record from the BPF ringbuf without copying. +// +// rec.RawSample points directly into the memory-mapped ring buffer region and +// must not be modified. The slice is valid until the returned [CommitToken] is +// passed to [Commit] or [CommitAll]. +// +// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush] +// makes it return all records currently in the ring buffer, followed by [ErrFlushed]. +// +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. +func (r *UnsafeReader) Read(rec *UnsafeRecord) (CommitToken, error) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.ring == nil { + return CommitToken{}, fmt.Errorf("ringbuffer: %w", ErrClosed) + } + + var token CommitToken + err := r.readWaitLocked(func() error { + err := r.ring.readRecordUnsafe(&rec.Record) + if err != nil { + return err + } + + pos := r.ring.pendingPosition() + token = CommitToken{pos} + r.pending = append(r.pending, pendingItem{consPos: pos}) + return nil + }) + return token, err +} + +// Reads the next record without copying and calls f with the raw sample data. +// The consumer position for this record is advanced automatically after f +// returns (subject to preceding uncommitted records). +// +// The data slice passed to f points into the memory-mapped ring buffer region, +// must not be modified, and is only valid for the duration of the callback. +// +// Calling [Close] interrupts the method with [os.ErrClosed]. Calling [Flush] +// makes it return all records currently in the ring buffer, followed by [ErrFlushed]. +// +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. +func (r *UnsafeReader) ReadFunc(f func(data []byte)) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.ring == nil { + return fmt.Errorf("ringbuffer: %w", ErrClosed) + } + + return r.readWaitLocked(func() error { + var rec Record + err := r.ring.readRecordUnsafe(&rec) + if err != nil { + return err + } + + f(rec.RawSample) + + pos := r.ring.pendingPosition() + r.pending = append(r.pending, pendingItem{consPos: pos, committed: true}) + r.advanceContiguous() + + return nil + }) +} + +// Releases ring buffer space associated with the given token. +// +// The consumer position is only advanced when all preceding tokens have also +// been committed. For example, if tokens A, B, C were obtained in order and +// B and C are committed first, the consumer position does not advance until A +// is also committed — at which point it advances past C. +// +// The RawSample slice associated with a committed token must not be used +// after this call. +func (r *UnsafeReader) Commit(token CommitToken) { + r.mu.Lock() + defer r.mu.Unlock() + + for i := range r.pending { + if r.pending[i].consPos == token.consPos { + r.pending[i].committed = true + break + } + } + + r.advanceContiguous() +} + +// Releases all ring buffer space from preceding [Read] calls. +// +// All RawSample slices from previous Read calls are invalid after this call. +// No-op if there are no pending reads. +func (r *UnsafeReader) CommitAll() { + r.mu.Lock() + defer r.mu.Unlock() + + if len(r.pending) == 0 || r.ring == nil { + return + } + + r.ring.advanceTo(r.pending[len(r.pending)-1].consPos) + r.pending = r.pending[:0] +} + +// Advances the consumer position past all contiguously committed items +// from the front of the pending list. +func (r *UnsafeReader) advanceContiguous() { + i := 0 + for i < len(r.pending) && r.pending[i].committed { + i++ + } + if i == 0 || r.ring == nil { + return + } + + r.ring.advanceTo(r.pending[i-1].consPos) + + n := copy(r.pending, r.pending[i:]) + r.pending = r.pending[:n] +} diff --git a/ringbuf/unsafe_reader_test.go b/ringbuf/unsafe_reader_test.go new file mode 100644 index 000000000..a3d2be921 --- /dev/null +++ b/ringbuf/unsafe_reader_test.go @@ -0,0 +1,283 @@ +package ringbuf + +import ( + "errors" + "os" + "testing" + "time" + + "github.com/go-quicktest/qt" + + "github.com/cilium/ebpf/internal/testutils" +) + +func TestUnsafeReaderSingle(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec UnsafeRecord + token, err := rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Equals(len(rec.RawSample), 5)) + qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{1, 2, 3, 4, 4})) + + rd.Commit(token) + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) +} + +func TestUnsafeReaderMultiWithDiscards(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5}, + sampleMessage{size: 10, discard: true}, + sampleMessage{size: 15}, + ) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec UnsafeRecord + + tok1, err := rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Equals(len(rec.RawSample), 5)) + + tok2, err := rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Equals(len(rec.RawSample), 15)) + + qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0))) + + rd.Commit(tok1) + rd.Commit(tok2) + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) +} + +func TestUnsafeReaderCommitOrdering(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5}, + sampleMessage{size: 7}, + sampleMessage{size: 9}, + ) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec UnsafeRecord + tok1, err := rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + tok2, err := rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + tok3, err := rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + + // Commit out of order: tok2 first, then tok3. Consumer should not advance + // because tok1 is still outstanding. + rd.Commit(tok2) + qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0))) + + rd.Commit(tok3) + qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0))) + + // Commit tok1: now all three are contiguously committed, consumer advances. + rd.Commit(tok1) + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) +} + +func TestUnsafeReaderCommitAll(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, + sampleMessage{size: 5}, + sampleMessage{size: 7}, + ) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var rec UnsafeRecord + _, err = rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + _, err = rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + + qt.Assert(t, qt.Not(qt.Equals(rd.AvailableBytes(), 0))) + + rd.CommitAll() + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) +} + +func TestUnsafeReaderCommitAllNoOp(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + _, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + rd.CommitAll() +} + +func TestUnsafeReaderReadFunc(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + mustRun(t, prog) + + var got []byte + err = rd.ReadFunc(func(data []byte) { + got = make([]byte, len(data)) + copy(got, data) + }) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.DeepEquals(got, []byte{1, 2, 3, 4, 4})) + qt.Assert(t, qt.Equals(rd.AvailableBytes(), 0)) +} + +func TestUnsafeReaderDeadline(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + _, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + rd.SetDeadline(time.Now().Add(-time.Second)) + + var rec UnsafeRecord + _, err = rd.Read(&rec) + qt.Assert(t, qt.ErrorIs(err, os.ErrDeadlineExceeded)) +} + +func TestUnsafeReaderClose(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + _, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + + qt.Assert(t, qt.IsNil(rd.Close())) + + var rec UnsafeRecord + _, err = rd.Read(&rec) + qt.Assert(t, qt.ErrorIs(err, ErrClosed)) + + qt.Assert(t, qt.IsNil(rd.Close())) +} + +func TestUnsafeReaderBlocking(t *testing.T) { + testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(t, sampleMessage{size: 5}) + + mustRun(t, prog) + + rd, err := NewUnsafeReader(events) + qt.Assert(t, qt.IsNil(err)) + defer rd.Close() + + var rec UnsafeRecord + _, err = rd.Read(&rec) + qt.Assert(t, qt.IsNil(err)) + rd.CommitAll() + + errs := make(chan error, 1) + go func() { + _, err := rd.Read(&rec) + errs <- err + }() + + select { + case err := <-errs: + t.Fatal("Read returns error instead of blocking:", err) + case <-time.After(100 * time.Millisecond): + } + + qt.Assert(t, qt.IsNil(rd.Close())) + + select { + case err := <-errs: + if !errors.Is(err, ErrClosed) { + t.Fatal("Expected os.ErrClosed from interrupted Read, got:", err) + } + case <-time.After(time.Second): + t.Fatal("Close doesn't interrupt Read") + } +} + +func BenchmarkUnsafeReader(b *testing.B) { + testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80}) + + rd, err := NewUnsafeReader(events) + if err != nil { + b.Fatal(err) + } + defer rd.Close() + + b.ReportAllocs() + + var rec UnsafeRecord + for b.Loop() { + b.StopTimer() + mustRun(b, prog) + b.StartTimer() + + if _, err := rd.Read(&rec); err != nil { + b.Fatal(err) + } + rd.CommitAll() + } +} + +func BenchmarkUnsafeReaderReadFunc(b *testing.B) { + testutils.SkipOnOldKernel(b, "5.8", "BPF ring buffer") + + prog, events := mustOutputSamplesProg(b, sampleMessage{size: 80}) + + rd, err := NewUnsafeReader(events) + if err != nil { + b.Fatal(err) + } + defer rd.Close() + + b.ReportAllocs() + + for b.Loop() { + b.StopTimer() + mustRun(b, prog) + b.StartTimer() + + if err := rd.ReadFunc(func(data []byte) {}); err != nil { + b.Fatal(err) + } + } +}