From daef0027304b70ccf29863ee41db96a1726fbaa7 Mon Sep 17 00:00:00 2001 From: graymon Date: Mon, 6 Apr 2026 03:56:51 +0000 Subject: [PATCH 1/3] ringbuf: refactor reader internals for shared record path No functional change intended. This commit only reshapes internal call paths so upcoming zero-copy APIs can reuse the same read/poll machinery. Introduce two internal APIs: - readWithPoll: centralizes the Reader poll/wait loop (deadline, flush, pending errors, busy/EOR handling) and executes a caller-supplied record read callback. - readRecordFunc: centralizes ring record decoding and yields a zero-copy sample view plus remaining bytes and next consumer position to a caller-supplied callback. Signed-off-by: graymon --- ringbuf/read_internal.go | 48 ++++++++++++++++++++++ ringbuf/reader.go | 88 +++++++++++++++++----------------------- ringbuf/ring.go | 22 ++++------ 3 files changed, 95 insertions(+), 63 deletions(-) create mode 100644 ringbuf/read_internal.go diff --git a/ringbuf/read_internal.go b/ringbuf/read_internal.go new file mode 100644 index 000000000..5ee849d4d --- /dev/null +++ b/ringbuf/read_internal.go @@ -0,0 +1,48 @@ +package ringbuf + +import ( + "errors" + "fmt" + "os" + "time" +) + +func readWithPoll(poller poller, ring eventRing, deadline time.Time, haveData *bool, pendingErr *error, read func() error) error { + if ring == nil { + return fmt.Errorf("ringbuffer: %w", ErrClosed) + } + + for { + if !*haveData { + if pe := *pendingErr; pe != nil { + *pendingErr = nil + return pe + } + + err := poller.Wait(deadline) + if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { + // Ignoring this for reading a valid entry after timeout or flush. + // This can occur if the producer submitted to the ring buffer + // with BPF_RB_NO_WAKEUP. + *pendingErr = err + } else if err != nil { + return err + } + *haveData = true + } + + for { + err := read() + // Not using errors.Is which is quite a bit slower. + // For a tight loop it might make a difference. + if err == errBusy { + continue + } + if err == errEOR { + *haveData = false + break + } + return err + } + } +} diff --git a/ringbuf/reader.go b/ringbuf/reader.go index d1483d78a..e2a54c093 100644 --- a/ringbuf/reader.go +++ b/ringbuf/reader.go @@ -30,7 +30,8 @@ type poller interface { type eventRing interface { size() int AvailableBytes() uint64 - readRecord(rec *Record) error + readRecordFunc(func(sample []byte, remaining int, cons uintptr) error) error + commitRecord(cons uintptr) Close() error } @@ -77,40 +78,49 @@ type Reader struct { // NewReader creates a new BPF ringbuf reader. func NewReader(ringbufMap *ebpf.Map) (*Reader, error) { + poller, ring, err := newReaderResources(ringbufMap) + if err != nil { + return nil, err + } + + return &Reader{ + poller: poller, + ring: ring, + bufferSize: ring.size(), + // On Windows, the wait handle is only set when the reader is created, + // so we miss any wakeups that happened before. + // Do an opportunistic read to get any pending samples. + haveData: platform.IsWindows, + }, nil +} + +func newReaderResources(ringbufMap *ebpf.Map) (poller, eventRing, error) { if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf { - return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) + return nil, nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type()) } maxEntries := int(ringbufMap.MaxEntries()) if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 { - return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) + return nil, nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries) } poller, err := newPoller(ringbufMap.FD()) if err != nil { - return nil, err + return nil, nil, err } ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries) if err != nil { poller.Close() - return nil, fmt.Errorf("failed to create ringbuf ring: %w", err) + return nil, nil, fmt.Errorf("failed to create ringbuf ring: %w", err) } - return &Reader{ - poller: poller, - ring: ring, - bufferSize: ring.size(), - // On Windows, the wait handle is only set when the reader is created, - // so we miss any wakeups that happened before. - // Do an opportunistic read to get any pending samples. - haveData: platform.IsWindows, - }, nil + return poller, ring, nil } // Close frees resources used by the reader. // -// It interrupts calls to Read. +// It interrupts calls to Read and ReadInto. func (r *Reader) Close() error { if err := r.poller.Close(); err != nil { if errors.Is(err, os.ErrClosed) { @@ -162,43 +172,21 @@ func (r *Reader) ReadInto(rec *Record) error { r.mu.Lock() defer r.mu.Unlock() - if r.ring == nil { - return fmt.Errorf("ringbuffer: %w", ErrClosed) - } - - for { - if !r.haveData { - if pe := r.pendingErr; pe != nil { - r.pendingErr = nil - return pe + return readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error { + return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error { + n := len(sample) + if cap(rec.RawSample) < n { + rec.RawSample = make([]byte, n) + } else { + rec.RawSample = rec.RawSample[:n] } - err := r.poller.Wait(r.deadline) - if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) { - // Ignoring this for reading a valid entry after timeout or flush. - // This can occur if the producer submitted to the ring buffer - // with BPF_RB_NO_WAKEUP. - r.pendingErr = err - } else if err != nil { - return err - } - r.haveData = true - } - - for { - err := r.ring.readRecord(rec) - // Not using errors.Is which is quite a bit slower - // For a tight loop it might make a difference - if err == errBusy { - continue - } - if err == errEOR { - r.haveData = false - break - } - return err - } - } + copy(rec.RawSample, sample) + rec.Remaining = remaining + r.ring.commitRecord(cons) + return nil + }) + }) } // BufferSize returns the size in bytes of the ring buffer diff --git a/ringbuf/ring.go b/ringbuf/ring.go index abb704cd5..dd89338d2 100644 --- a/ringbuf/ring.go +++ b/ringbuf/ring.go @@ -41,8 +41,12 @@ func (rr *ringReader) AvailableBytes() uint64 { return uint64(prod - cons) } -// Read a record from an event ring. -func (rr *ringReader) readRecord(rec *Record) error { +func (rr *ringReader) commitRecord(cons uintptr) { + atomic.StoreUintptr(rr.cons_pos, cons) +} + +// Read a record from an event ring and invoke f with a zero-copy sample view. +func (rr *ringReader) readRecordFunc(f func(sample []byte, remaining int, cons uintptr) error) error { prod := atomic.LoadUintptr(rr.prod_pos) cons := atomic.LoadUintptr(rr.cons_pos) @@ -83,19 +87,11 @@ func (rr *ringReader) readRecord(rec *Record) error { // when the record header indicates that the data should be // discarded, we skip it by just updating the consumer position // to the next record. - atomic.StoreUintptr(rr.cons_pos, cons) + rr.commitRecord(cons) continue } - if n := header.dataLen(); cap(rec.RawSample) < n { - rec.RawSample = make([]byte, n) - } else { - rec.RawSample = rec.RawSample[:n] - } - - copy(rec.RawSample, rr.ring[start:]) - rec.Remaining = int(prod - cons) - atomic.StoreUintptr(rr.cons_pos, cons) - return nil + n := header.dataLen() + return f(rr.ring[start:start+uintptr(n)], int(prod-cons), cons) } } From f85872e6e19a6b1ccd714d47764bc3f271128481 Mon Sep 17 00:00:00 2001 From: graymon Date: Mon, 6 Apr 2026 03:56:55 +0000 Subject: [PATCH 2/3] ringbuf: add UnsafeReader zero-copy API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new ringbuf.UnsafeReader dedicated to zero-copy consumption. It supports two ways of reading: either call Read/ReadInto and then Commit explicitly, or use ReadFunc and have the record committed automatically after the callback returns. The new type reuses the same internal polling and ring decoding paths as Reader (readWithPoll + readRecordFunc), and keeps Reader’s existing copy-based behavior unchanged. This API is intentionally unsafe for concurrent use. Signed-off-by: graymon --- ringbuf/doc.go | 3 + ringbuf/unsafe_reader.go | 170 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 ringbuf/unsafe_reader.go diff --git a/ringbuf/doc.go b/ringbuf/doc.go index eb9617e63..3988886c5 100644 --- a/ringbuf/doc.go +++ b/ringbuf/doc.go @@ -3,4 +3,7 @@ // BPF allows submitting custom events to a BPF ring buffer map set up // by userspace. This is very useful to push things like packet samples // from BPF to a daemon running in user space. +// +// UnsafeReader provides zero-copy access to samples and is not safe for +// concurrent use. package ringbuf diff --git a/ringbuf/unsafe_reader.go b/ringbuf/unsafe_reader.go new file mode 100644 index 000000000..c5b1c9833 --- /dev/null +++ b/ringbuf/unsafe_reader.go @@ -0,0 +1,170 @@ +package ringbuf + +import ( + "errors" + "os" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/internal/platform" +) + +// UnsafeReader allows reading ringbuf records without copying sample data. +// +// UnsafeReader is not safe for concurrent use. +// +// Samples returned from Read / ReadInto remain valid only until Commit is called. +// Samples passed to ReadFunc are valid only during callback execution. +type UnsafeReader struct { + poller poller + ring eventRing + + haveData bool + deadline time.Time + bufferSize int + pendingErr error + + pendingCons uintptr + pendingRead bool +} + +// NewUnsafeReader creates a new BPF ringbuf reader exposing zero-copy APIs. +func NewUnsafeReader(ringbufMap *ebpf.Map) (*UnsafeReader, error) { + poller, ring, err := newReaderResources(ringbufMap) + if err != nil { + return nil, err + } + + return &UnsafeReader{ + poller: poller, + ring: ring, + bufferSize: ring.size(), + // On Windows, the wait handle is only set when the reader is created, + // so we miss any wakeups that happened before. + // Do an opportunistic read to get any pending samples. + haveData: platform.IsWindows, + }, nil +} + +// SetDeadline controls how long Read, ReadInto and ReadFunc will block waiting for samples. +// +// Passing a zero time.Time will remove the deadline. +func (r *UnsafeReader) SetDeadline(t time.Time) { + r.deadline = t +} + +// Read the next record from the BPF ringbuf. +// +// Calling [UnsafeReader.Close] interrupts the method with [os.ErrClosed]. +// Calling [UnsafeReader.Flush] makes it return all records currently in the ring +// buffer, followed by [ErrFlushed]. +// +// Returns [os.ErrDeadlineExceeded] if a deadline was set and after all records +// have been read from the ring. +// +// The returned sample aliases the ring buffer and remains valid until Commit is +// called. +func (r *UnsafeReader) Read() (Record, error) { + var rec Record + err := r.ReadInto(&rec) + return rec, err +} + +// ReadInto is like Read except that it allows reusing Record. +// +// ReadInto does not copy sample bytes. rec.RawSample aliases ring buffer memory +// and remains valid until Commit is called. +func (r *UnsafeReader) ReadInto(rec *Record) error { + if r.pendingRead { + return errors.New("ringbuffer: previous record must be committed") + } + + return readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error { + return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error { + rec.RawSample = sample + rec.Remaining = remaining + r.pendingRead = true + r.pendingCons = cons + return nil + }) + }) +} + +// ReadFunc reads and processes one record via callback. +// +// The callback receives a sample view into ring buffer memory, which is valid +// only for the duration of the callback. The consumed record is committed even +// if the callback returns an error. +// +// The returned value is the minimum bytes remaining in the ring buffer after +// this record has been consumed. +func (r *UnsafeReader) ReadFunc(f func(sample []byte, remaining int) error) (int, error) { + if r.pendingRead { + return 0, errors.New("ringbuffer: previous record must be committed") + } + + var ( + rec Record + err error + ) + + err = readWithPoll(r.poller, r.ring, r.deadline, &r.haveData, &r.pendingErr, func() error { + return r.ring.readRecordFunc(func(sample []byte, remaining int, cons uintptr) error { + defer r.ring.commitRecord(cons) + + callErr := f(sample, remaining) + rec.Remaining = remaining + return callErr + }) + }) + return rec.Remaining, err +} + +// Commit advances the reader past the most recently read record. +func (r *UnsafeReader) Commit() error { + if !r.pendingRead { + return errors.New("ringbuffer: no pending record to commit") + } + + r.ring.commitRecord(r.pendingCons) + r.pendingCons = 0 + r.pendingRead = false + return nil +} + +// Close frees resources used by the reader. +// +// It interrupts calls to Read, ReadInto and ReadFunc. +func (r *UnsafeReader) Close() error { + if err := r.poller.Close(); err != nil { + if errors.Is(err, os.ErrClosed) { + return nil + } + return err + } + + var err error + if r.ring != nil { + err = r.ring.Close() + r.ring = nil + } + + return err +} + +// BufferSize returns the size in bytes of the ring buffer. +func (r *UnsafeReader) BufferSize() int { + return r.bufferSize +} + +// Flush unblocks Read/ReadInto/ReadFunc and successive Read/ReadInto/ReadFunc calls return pending +// samples at this point, until ErrFlushed is returned. +func (r *UnsafeReader) Flush() error { + return r.poller.Flush() +} + +// AvailableBytes returns the amount of data available to read in the ring +// buffer in bytes. +func (r *UnsafeReader) AvailableBytes() int { + return int(r.ring.AvailableBytes()) +} From 8aaa63c71d97d1b9a2f9f9ee75199ee544e78fdc Mon Sep 17 00:00:00 2001 From: graymon Date: Mon, 6 Apr 2026 03:57:02 +0000 Subject: [PATCH 3/3] ringbuf: add tests for UnsafeReader Signed-off-by: graymon --- ringbuf/unsafe_reader_test.go | 121 ++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 ringbuf/unsafe_reader_test.go diff --git a/ringbuf/unsafe_reader_test.go b/ringbuf/unsafe_reader_test.go new file mode 100644 index 000000000..670861d3f --- /dev/null +++ b/ringbuf/unsafe_reader_test.go @@ -0,0 +1,121 @@ +package ringbuf + +import ( + "encoding/binary" + "errors" + "testing" + "time" + + "github.com/go-quicktest/qt" + + "github.com/cilium/ebpf/internal" + "github.com/cilium/ebpf/internal/sys" +) + +type testPoller struct{} + +func (testPoller) Wait(deadline time.Time) error { return nil } +func (testPoller) Flush() error { return nil } +func (testPoller) Close() error { return nil } + +type testEventRing struct { + rr *ringReader +} + +func (r *testEventRing) size() int { + return r.rr.size() +} + +func (r *testEventRing) AvailableBytes() uint64 { + return r.rr.AvailableBytes() +} + +func (r *testEventRing) readRecordFunc(f func(sample []byte, remaining int, cons uintptr) error) error { + return r.rr.readRecordFunc(f) +} + +func (r *testEventRing) commitRecord(cons uintptr) { + r.rr.commitRecord(cons) +} + +func (r *testEventRing) Close() error { + return nil +} + +func newUnsafeReaderForRecords(t *testing.T, samples ...[]byte) (*UnsafeReader, *uintptr, *uintptr) { + t.Helper() + + data := make([]byte, 512) + var ( + prod uintptr + cons uintptr + ) + + offset := uintptr(0) + for _, sample := range samples { + binary.LittleEndian.PutUint32(data[offset:], uint32(len(sample))) + offset += sys.BPF_RINGBUF_HDR_SZ + copy(data[offset:offset+uintptr(len(sample))], sample) + offset += uintptr(internal.Align(len(sample), 8)) + } + prod = offset + + rr := newRingReader(&cons, &prod, data) + er := &testEventRing{rr: rr} + reader := &UnsafeReader{ + poller: testPoller{}, + ring: er, + haveData: true, + bufferSize: rr.size(), + } + + return reader, &cons, &prod +} + +func TestUnsafeReaderReadIntoRequiresCommit(t *testing.T) { + reader, cons, _ := newUnsafeReaderForRecords(t, []byte{1, 2, 3}, []byte{4, 5}) + + var rec Record + err := reader.ReadInto(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{1, 2, 3})) + qt.Assert(t, qt.Equals(*cons, uintptr(0))) + + err = reader.ReadInto(&rec) + qt.Assert(t, qt.ErrorMatches(err, "ringbuffer: previous record must be committed")) + + err = reader.Commit() + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Not(qt.Equals(*cons, uintptr(0)))) + firstCons := *cons + + err = reader.ReadInto(&rec) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.DeepEquals(rec.RawSample, []byte{4, 5})) + qt.Assert(t, qt.Equals(*cons, firstCons)) + + err = reader.Commit() + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.Not(qt.Equals(*cons, firstCons))) +} + +func TestUnsafeReaderReadFuncAlwaysCommits(t *testing.T) { + reader, cons, prod := newUnsafeReaderForRecords(t, []byte{1, 2, 3}) + + wantErr := errors.New("callback failed") + remaining, err := reader.ReadFunc(func(sample []byte, remaining int) error { + qt.Assert(t, qt.DeepEquals(sample, []byte{1, 2, 3})) + qt.Assert(t, qt.Equals(remaining, 0)) + return wantErr + }) + + qt.Assert(t, qt.Equals(remaining, 0)) + qt.Assert(t, qt.Equals(err, wantErr)) + qt.Assert(t, qt.Equals(*cons, *prod)) +} + +func TestUnsafeReaderCommitWithoutPendingRecord(t *testing.T) { + reader, _, _ := newUnsafeReaderForRecords(t) + err := reader.Commit() + qt.Assert(t, qt.ErrorMatches(err, "ringbuffer: no pending record to commit")) +}