Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 30 additions & 171 deletions ringbuf/reader.go
Original file line number Diff line number Diff line change
@@ -1,145 +1,52 @@
package ringbuf

import (
"errors"
"fmt"
"os"
"sync"
"time"
"unsafe"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/platform"
"github.com/cilium/ebpf/internal/sys"
)

var (
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
errBusy = errors.New("sample not committed yet")
)

// poller abstracts platform-specific event notification.
type poller interface {
Wait(deadline time.Time) error
Flush() error
Close() error
// Wraps an eventRing to provide a copy-and-advance read for Reader.
type safeRing struct {
eventRing
}

// eventRing abstracts platform-specific ring buffer memory access.
type eventRing interface {
size() int
AvailableBytes() uint64
readRecord(rec *Record) error
Close() error
}
func (sr *safeRing) readRecord(rec *Record) error {
buf := rec.RawSample

// ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
type ringbufHeader struct {
Len uint32
_ uint32 // pg_off, only used by kernel internals
}

const ringbufHeaderSize = int(unsafe.Sizeof(ringbufHeader{}))

func (rh *ringbufHeader) isBusy() bool {
return rh.Len&sys.BPF_RINGBUF_BUSY_BIT != 0
}
defer func() { sr.advanceTo(sr.pendingPosition()) }()

func (rh *ringbufHeader) isDiscard() bool {
return rh.Len&sys.BPF_RINGBUF_DISCARD_BIT != 0
}

func (rh *ringbufHeader) dataLen() int {
return int(rh.Len & ^uint32(sys.BPF_RINGBUF_BUSY_BIT|sys.BPF_RINGBUF_DISCARD_BIT))
}
err := sr.readRecordUnsafe(rec)
if err != nil {
return err
}

type Record struct {
RawSample []byte
n := len(rec.RawSample)
if cap(buf) < n {
buf = make([]byte, n)
} else {
buf = buf[:n]
}
copy(buf, rec.RawSample)
rec.RawSample = buf

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int
return nil
}

// Reader allows reading bpf_ringbuf_output
// from user space.
// Allows reading bpf_ringbuf_output from user space.
type Reader struct {
poller poller

// mu protects read/write access to the Reader structure
mu sync.Mutex
ring eventRing
haveData bool
deadline time.Time
bufferSize int
pendingErr error
readerBase
safeRing *safeRing
}

// NewReader creates a new BPF ringbuf reader.
// Creates a new BPF ringbuf reader.
func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf {
return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
}

maxEntries := int(ringbufMap.MaxEntries())
if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
}

poller, err := newPoller(ringbufMap.FD())
if err != nil {
r := new(Reader)
if err := initReaderBase(&r.readerBase, ringbufMap); err != nil {
return nil, err
}

ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
if err != nil {
poller.Close()
return nil, fmt.Errorf("failed to create ringbuf ring: %w", err)
}

return &Reader{
poller: poller,
ring: ring,
bufferSize: ring.size(),
// On Windows, the wait handle is only set when the reader is created,
// so we miss any wakeups that happened before.
// Do an opportunistic read to get any pending samples.
haveData: platform.IsWindows,
}, nil
}

// Close frees resources used by the reader.
//
// It interrupts calls to Read.
func (r *Reader) Close() error {
if err := r.poller.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return nil
}
return err
}

// Acquire the lock. This ensures that Read isn't running.
r.mu.Lock()
defer r.mu.Unlock()

var err error
if r.ring != nil {
err = r.ring.Close()
r.ring = nil
}

return err
}

// SetDeadline controls how long Read and ReadInto will block waiting for samples.
//
// Passing a zero time.Time will remove the deadline.
func (r *Reader) SetDeadline(t time.Time) {
r.mu.Lock()
defer r.mu.Unlock()

r.deadline = t
r.safeRing = &safeRing{r.ring}
return r, nil
}

// Read the next record from the BPF ringbuf.
Expand All @@ -166,55 +73,7 @@ func (r *Reader) ReadInto(rec *Record) error {
return fmt.Errorf("ringbuffer: %w", ErrClosed)
}

for {
if !r.haveData {
if pe := r.pendingErr; pe != nil {
r.pendingErr = nil
return pe
}

err := r.poller.Wait(r.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// Ignoring this for reading a valid entry after timeout or flush.
// This can occur if the producer submitted to the ring buffer
// with BPF_RB_NO_WAKEUP.
r.pendingErr = err
} else if err != nil {
return err
}
r.haveData = true
}

for {
err := r.ring.readRecord(rec)
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
continue
}
if err == errEOR {
r.haveData = false
break
}
return err
}
}
}

// BufferSize returns the size in bytes of the ring buffer
func (r *Reader) BufferSize() int {
return r.bufferSize
}

// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a ErrFlushed error.
func (r *Reader) Flush() error {
return r.poller.Flush()
}

// AvailableBytes returns the amount of data available to read in the ring buffer in bytes.
func (r *Reader) AvailableBytes() int {
// Don't need to acquire the lock here since the implementation of AvailableBytes
// performs atomic loads on the producer and consumer positions.
return int(r.ring.AvailableBytes())
return r.readWaitLocked(func() error {
return r.safeRing.readRecord(rec)
})
}
163 changes: 163 additions & 0 deletions ringbuf/reader_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package ringbuf

import (
"errors"
"fmt"
"os"
"sync"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal/platform"
)

var (
ErrClosed = os.ErrClosed
errEOR = errors.New("end of ring")
errBusy = errors.New("sample not committed yet")
)

type poller interface {
Wait(deadline time.Time) error
Flush() error
Close() error
}

type Record struct {
RawSample []byte

// The minimum number of bytes remaining in the ring buffer after this Record has been read.
Remaining int
}

type readerBase struct {
poller poller

// mu protects read/write access to the reader state.
mu sync.Mutex
ring eventRing
haveData bool
deadline time.Time
bufferSize int
pendingErr error
}

func initReaderBase(b *readerBase, ringbufMap *ebpf.Map) error {
if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf {
return fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
}

maxEntries := int(ringbufMap.MaxEntries())
if maxEntries == 0 || (maxEntries&(maxEntries-1)) != 0 {
return fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
}

poller, err := newPoller(ringbufMap.FD())
if err != nil {
return err
}

ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
if err != nil {
poller.Close()
return fmt.Errorf("failed to create ringbuf ring: %w", err)
}

b.poller = poller
b.ring = ring
b.bufferSize = ring.size()
// On Windows, the wait handle is only set when the reader is created,
// so we miss any wakeups that happened before.
// Do an opportunistic read to get any pending samples.
b.haveData = platform.IsWindows
return nil
}

// Close frees resources used by the reader.
//
// It interrupts calls to Read.
func (b *readerBase) Close() error {
if err := b.poller.Close(); err != nil {
if errors.Is(err, os.ErrClosed) {
return nil
}
return err
}

// Acquire the lock. This ensures that Read isn't running.
b.mu.Lock()
defer b.mu.Unlock()

var err error
if b.ring != nil {
err = b.ring.Close()
b.ring = nil
}

return err
}

// SetDeadline controls how long Read and ReadInto will block waiting for samples.
//
// Passing a zero time.Time will remove the deadline.
func (b *readerBase) SetDeadline(t time.Time) {
b.mu.Lock()
defer b.mu.Unlock()

b.deadline = t
}

// BufferSize returns the size in bytes of the ring buffer.
func (b *readerBase) BufferSize() int {
return b.bufferSize
}

// Flush unblocks Read/ReadInto and successive Read/ReadInto calls will return pending samples at this point,
// until you receive a ErrFlushed error.
func (b *readerBase) Flush() error {
return b.poller.Flush()
}

// AvailableBytes returns the amount of data available to read in the ring buffer in bytes.
func (b *readerBase) AvailableBytes() int {
// Don't need to acquire the lock here since the implementation of AvailableBytes
// performs atomic loads on the producer and consumer positions.
return int(b.ring.AvailableBytes())
}

// Polls for data and calls read in a loop. Must be called with b.mu held.
func (b *readerBase) readWaitLocked(read func() error) error {
for {
if !b.haveData {
if pe := b.pendingErr; pe != nil {
b.pendingErr = nil
return pe
}

err := b.poller.Wait(b.deadline)
if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
// Ignoring this for reading a valid entry after timeout or flush.
// This can occur if the producer submitted to the ring buffer
// with BPF_RB_NO_WAKEUP.
b.pendingErr = err
} else if err != nil {
return err
}
b.haveData = true
}

for {
err := read()
// Not using errors.Is which is quite a bit slower
// For a tight loop it might make a difference
if err == errBusy {
continue
}
if err == errEOR {
b.haveData = false
break
}
return err
}
}
}
Loading
Loading