From 2b3e6efac6e4a8e833500d3bb2a8e9188c4a8cf8 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Fri, 24 Oct 2025 12:32:19 +0200 Subject: [PATCH 1/6] Utilize memory allocator in ReaderProperties.GetStream --- internal/utils/buf_reader.go | 51 +++++++++++++++++++++++++++++++----- parquet/file/page_reader.go | 6 ++++- parquet/reader_properties.go | 11 +++++--- 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/internal/utils/buf_reader.go b/internal/utils/buf_reader.go index c222c8bd..7762f7db 100644 --- a/internal/utils/buf_reader.go +++ b/internal/utils/buf_reader.go @@ -22,6 +22,8 @@ import ( "errors" "fmt" "io" + + "github.com/apache/arrow-go/v18/arrow/memory" ) type Reader interface { @@ -38,6 +40,7 @@ type byteReader struct { // NewByteReader creates a new ByteReader instance from the given byte slice. // It wraps the bytes.NewReader function to implement BufferedReader interface. +// It is considered not to own the underlying byte slice, so the Free method is a no-op. func NewByteReader(buf []byte) *byteReader { r := bytes.NewReader(buf) return &byteReader{ @@ -108,10 +111,43 @@ func (r *byteReader) Reset(Reader) {} func (r *byteReader) BufferSize() int { return len(r.buf) } +func (r *byteReader) Free() {} + +// bytesBufferReader is a byte slice with a bytes reader wrapped around it. +// It uses an allocator to allocate and free the underlying byte slice. +type bytesBufferReader struct { + alloc memory.Allocator + byteReader +} + +// NewBytesBufferReader creates a new bytesBufferReader with the given size and allocator. +func NewBytesBufferReader(size int, alloc memory.Allocator) *bytesBufferReader { + buf := alloc.Allocate(size) + return &bytesBufferReader{ + alloc: alloc, + byteReader: byteReader{ + bytes.NewReader(buf), + buf, + 0, + }, + } +} + +// Outer returns the underlying byte slice. +func (r *bytesBufferReader) Buffer() []byte { + return r.buf +} + +// Free releases the underlying byte slice back to the allocator. +func (r *bytesBufferReader) Free() { + r.alloc.Free(r.buf) +} + // bufferedReader is similar to bufio.Reader except // it will expand the buffer if necessary when asked to Peek // more bytes than are in the buffer type bufferedReader struct { + alloc memory.Allocator // allocator used to allocate the buffer bufferSz int buf []byte r, w int @@ -122,9 +158,10 @@ type bufferedReader struct { // NewBufferedReader returns a buffered reader with similar semantics to bufio.Reader // except Peek will expand the internal buffer if needed rather than return // an error. -func NewBufferedReader(rd Reader, sz int) *bufferedReader { +func NewBufferedReader(rd Reader, sz int, alloc memory.Allocator) *bufferedReader { r := &bufferedReader{ - rd: rd, + alloc: alloc, + rd: rd, } r.resizeBuffer(sz) return r @@ -140,11 +177,9 @@ func (b *bufferedReader) Reset(rd Reader) { func (b *bufferedReader) resetBuffer() { if b.buf == nil { - b.buf = make([]byte, b.bufferSz) + b.buf = b.alloc.Allocate(b.bufferSz) } else if b.bufferSz > cap(b.buf) { - buf := b.buf - b.buf = make([]byte, b.bufferSz) - copy(b.buf, buf) + b.buf = b.alloc.Reallocate(b.bufferSz, b.buf) } else { b.buf = b.buf[:b.bufferSz] } @@ -298,3 +333,7 @@ func (b *bufferedReader) Read(p []byte) (n int, err error) { b.r += n return n, nil } + +func (b *bufferedReader) Free() { + b.alloc.Free(b.buf) +} diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go index 903f2ec0..f8d787f6 100644 --- a/parquet/file/page_reader.go +++ b/parquet/file/page_reader.go @@ -382,6 +382,9 @@ func (p *serializedPageReader) Close() error { p.dictPageBuffer.Release() p.dataPageBuffer.Release() } + if p.r != nil { + p.r.Free() + } return nil } @@ -603,7 +606,8 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) { readBufSize := min(int(p.dataOffset-p.baseOffset), p.r.BufferSize()) rd := utils.NewBufferedReader( io.NewSectionReader(p.r.Outer(), p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset), - readBufSize) + readBufSize, + p.mem) if err := p.readPageHeader(rd, hdr); err != nil { return nil, err } diff --git a/parquet/reader_properties.go b/parquet/reader_properties.go index 5b119dcf..11ead445 100644 --- a/parquet/reader_properties.go +++ b/parquet/reader_properties.go @@ -52,6 +52,7 @@ type BufferedReader interface { Outer() utils.Reader BufferSize() int Reset(utils.Reader) + Free() io.Reader } @@ -74,17 +75,19 @@ func (r *ReaderProperties) Allocator() memory.Allocator { return r.alloc } // into a buffer in memory and return a bytes.NewReader for that buffer. func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (BufferedReader, error) { if r.BufferedStreamEnabled { - return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize)), nil + return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize), r.alloc), nil } - data := make([]byte, nbytes) - n, err := source.ReadAt(data, start) + buf := utils.NewBytesBufferReader(int(nbytes), r.alloc) + n, err := source.ReadAt(buf.Buffer(), start) if err != nil { + buf.Free() return nil, fmt.Errorf("parquet: tried reading from file, but got error: %w", err) } if n != int(nbytes) { + buf.Free() return nil, fmt.Errorf("parquet: tried reading %d bytes starting at position %d from file but only got %d", nbytes, start, n) } - return utils.NewByteReader(data), nil + return buf, nil } From 18d3486c65e9d53c6e2504167795447e243e7b3b Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Fri, 24 Oct 2025 13:13:21 +0200 Subject: [PATCH 2/6] Ensure that decompressBuffer doesn't get reallocated by io.CopyN --- internal/utils/buf_reader.go | 6 +++ parquet/file/page_reader.go | 2 + parquet/file/row_group_reader.go | 2 + parquet/reader_writer_properties_test.go | 65 +++++++++++++++++++++++- 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/internal/utils/buf_reader.go b/internal/utils/buf_reader.go index 7762f7db..46a16da2 100644 --- a/internal/utils/buf_reader.go +++ b/internal/utils/buf_reader.go @@ -122,6 +122,9 @@ type bytesBufferReader struct { // NewBytesBufferReader creates a new bytesBufferReader with the given size and allocator. func NewBytesBufferReader(size int, alloc memory.Allocator) *bytesBufferReader { + if alloc == nil { + alloc = memory.DefaultAllocator + } buf := alloc.Allocate(size) return &bytesBufferReader{ alloc: alloc, @@ -159,6 +162,9 @@ type bufferedReader struct { // except Peek will expand the internal buffer if needed rather than return // an error. func NewBufferedReader(rd Reader, sz int, alloc memory.Allocator) *bufferedReader { + if alloc == nil { + alloc = memory.DefaultAllocator + } r := &bufferedReader{ alloc: alloc, rd: rd, diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go index f8d787f6..e1a5e5a3 100644 --- a/parquet/file/page_reader.go +++ b/parquet/file/page_reader.go @@ -608,6 +608,7 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) { io.NewSectionReader(p.r.Outer(), p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset), readBufSize, p.mem) + defer rd.Free() if err := p.readPageHeader(rd, hdr); err != nil { return nil, err } @@ -818,6 +819,7 @@ func (p *serializedPageReader) Next() bool { firstRowIdx := p.rowsSeen p.rowsSeen += int64(dataHeader.GetNumValues()) + data, err := p.decompress(p.r, lenCompressed, buf.Bytes()) if err != nil { p.err = err diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go index ea5f7098..bf75db46 100644 --- a/parquet/file/row_group_reader.go +++ b/parquet/file/row_group_reader.go @@ -134,11 +134,13 @@ func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) { } if r.fileDecryptor == nil { + stream.Free() return nil, xerrors.New("column in rowgroup is encrypted, but no file decryptor") } const encryptedRowGroupsLimit = 32767 if i > encryptedRowGroupsLimit { + stream.Free() return nil, xerrors.New("encrypted files cannot contain more than 32767 column chunks") } diff --git a/parquet/reader_writer_properties_test.go b/parquet/reader_writer_properties_test.go index 00b26a83..a8e9b752 100644 --- a/parquet/reader_writer_properties_test.go +++ b/parquet/reader_writer_properties_test.go @@ -18,6 +18,7 @@ package parquet_test import ( "bytes" + "errors" "testing" "github.com/apache/arrow-go/v18/arrow/memory" @@ -67,7 +68,67 @@ func TestReaderPropsGetStreamInsufficient(t *testing.T) { buf := memory.NewBufferBytes([]byte(data)) rdr := bytes.NewReader(buf.Bytes()) - props := parquet.NewReaderProperties(nil) - _, err := props.GetStream(rdr, 12, 15) + props1 := parquet.NewReaderProperties(nil) + _, err := props1.GetStream(rdr, 12, 15) + assert.Error(t, err) +} + +type mockReaderAt struct{} + +func (m *mockReaderAt) ReadAt(p []byte, off int64) (int, error) { + return 0, errors.New("mock error") +} + +func TestReaderPropsGetStreamWithAllocator(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + data := "data to read" + buf := memory.NewBufferBytes([]byte(data)) + rdr := bytes.NewReader(buf.Bytes()) + + // no leak on success + props := parquet.NewReaderProperties(pool) + bufRdr, err := props.GetStream(rdr, 0, int64(len(data))) + assert.NoError(t, err) + bufRdr.Free() + + // no leak on reader error + _, err = props.GetStream(&mockReaderAt{}, 0, 10) + assert.Error(t, err) + + // no leak on insufficient read + _, err = props.GetStream(rdr, 0, int64(len(data)+10)) assert.Error(t, err) } + +func TestReaderPropsGetStreamBufferedWithAllocator(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + data := "data to read" + rdr := bytes.NewReader(memory.NewBufferBytes([]byte(data)).Bytes()) + + props := parquet.NewReaderProperties(pool) + props.BufferedStreamEnabled = true + + buf := make([]byte, len(data)) + bufRdr, err := props.GetStream(rdr, 0, int64(len(data))) + assert.NoError(t, err) + _, err = bufRdr.Read(buf) + assert.NoError(t, err) + bufRdr.Free() + + bufRdr, err = props.GetStream(&mockReaderAt{}, 0, 10) + assert.NoError(t, err) + _, err = bufRdr.Read(buf) + assert.Error(t, err) + bufRdr.Free() + + bufRdr, err = props.GetStream(rdr, 0, int64(len(data)+10)) + assert.NoError(t, err) + n, err := bufRdr.Read(buf) + assert.NoError(t, err) + assert.NotEqual(t, len(data)+10, n) + bufRdr.Free() +} From b7339669f2a4a99d49e94ae92b34dc74f665c957 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Tue, 28 Oct 2025 13:01:59 +0100 Subject: [PATCH 3/6] Read uncompressed data directly into the page buffer --- internal/utils/buf_reader.go | 2 + parquet/file/page_reader.go | 73 ++++++++++++++++++++++++++++-------- parquet/reader_properties.go | 2 + 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/internal/utils/buf_reader.go b/internal/utils/buf_reader.go index 46a16da2..7ba904d1 100644 --- a/internal/utils/buf_reader.go +++ b/internal/utils/buf_reader.go @@ -111,6 +111,8 @@ func (r *byteReader) Reset(Reader) {} func (r *byteReader) BufferSize() int { return len(r.buf) } +func (r *byteReader) Buffered() int { return len(r.buf) - r.pos } + func (r *byteReader) Free() {} // bytesBufferReader is a byte slice with a bytes reader wrapped around it. diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go index e1a5e5a3..172a57e5 100644 --- a/parquet/file/page_reader.go +++ b/parquet/file/page_reader.go @@ -374,6 +374,8 @@ type serializedPageReader struct { dataPageBuffer *memory.Buffer dictPageBuffer *memory.Buffer err error + + isCompressed bool } func (p *serializedPageReader) Close() error { @@ -402,6 +404,7 @@ func (p *serializedPageReader) init(compressType compress.Compression, ctx *Cryp return err } p.codec = codec + p.isCompressed = compressType != compress.Codecs.Uncompressed if ctx != nil { p.cryptoCtx = *ctx @@ -444,6 +447,7 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress. dictPageBuffer: memory.NewResizableBuffer(mem), } rdr.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize) + rdr.isCompressed = compressType != compress.Codecs.Uncompressed if ctx != nil { rdr.cryptoCtx = *ctx rdr.initDecryption() @@ -460,6 +464,8 @@ func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, comp if p.err != nil { return } + p.isCompressed = compressType != compress.Codecs.Uncompressed + if ctx != nil { p.cryptoCtx = *ctx p.initDecryption() @@ -502,6 +508,36 @@ func (p *serializedPageReader) Page() Page { return p.curPage } +func (p *serializedPageReader) stealFromBuffer(br parquet.BufferedReader, lenUncompressed int) ([]byte, error) { + data, err := br.Peek(lenUncompressed) + if err != nil { + return nil, err + } + if p.cryptoCtx.DataDecryptor != nil { + data = p.cryptoCtx.DataDecryptor.Decrypt(data) + } + // advance the reader + _, err = br.Discard(lenUncompressed) + if err != nil && err != io.EOF { + return nil, err + } + return data, nil +} + +func (p *serializedPageReader) readUncompressed(br parquet.BufferedReader, lenUncompressed int, buf []byte) ([]byte, error) { + n, err := io.ReadFull(br, buf[:lenUncompressed]) + if err != nil { + return nil, err + } + if n != lenUncompressed { + return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n) + } + if p.cryptoCtx.DataDecryptor != nil { + buf = p.cryptoCtx.DataDecryptor.Decrypt(buf) + } + return buf, nil +} + func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) { p.decompressBuffer.ResizeNoShrink(lenCompressed) @@ -634,12 +670,9 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) { return nil, errors.New("parquet: invalid page header (negative number of values)") } - p.dictPageBuffer.ResizeNoShrink(lenUncompressed) - buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes()) - - data, err := p.decompress(rd, lenCompressed, buf.Bytes()) + data, err := p.getPageBytes(rd, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer) if err != nil { - return nil, err + return nil, fmt.Errorf("parquet: could not read dictionary page data: %w", err) } if len(data) != lenUncompressed { return nil, fmt.Errorf("parquet: metadata said %d bytes uncompressed dictionary page, got %d bytes", lenUncompressed, len(data)) @@ -647,7 +680,7 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) { return &DictionaryPage{ page: page{ - buf: buf, + buf: memory.NewBufferBytes(data), typ: hdr.Type, nvals: dictHeader.GetNumValues(), encoding: dictHeader.GetEncoding(), @@ -743,6 +776,20 @@ func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error { return p.err } +func (p *serializedPageReader) getPageBytes( + r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer, +) ([]byte, error) { + if isCompressed { + buffer.ResizeNoShrink(lenUncompressed) + return p.decompress(r, lenCompressed, buffer.Bytes()) + } + if r.Buffered() >= lenCompressed { + return p.stealFromBuffer(r, lenCompressed) + } + buffer.ResizeNoShrink(lenUncompressed) + return p.readUncompressed(r, lenCompressed, buffer.Bytes()) +} + func (p *serializedPageReader) Next() bool { // Loop here because there may be unhandled page types that we skip until // finding a page that we do know what to do with @@ -782,10 +829,7 @@ func (p *serializedPageReader) Next() bool { return false } - p.dictPageBuffer.ResizeNoShrink(lenUncompressed) - buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes()) - - data, err := p.decompress(p.r, lenCompressed, buf.Bytes()) + data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer) if err != nil { p.err = err return false @@ -798,7 +842,7 @@ func (p *serializedPageReader) Next() bool { // make dictionary page p.curPage = &DictionaryPage{ page: page{ - buf: buf, + buf: memory.NewBufferBytes(data), typ: p.curPageHdr.Type, nvals: dictHeader.GetNumValues(), encoding: dictHeader.GetEncoding(), @@ -814,13 +858,10 @@ func (p *serializedPageReader) Next() bool { return false } - p.dataPageBuffer.ResizeNoShrink(lenUncompressed) - buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes()) - firstRowIdx := p.rowsSeen p.rowsSeen += int64(dataHeader.GetNumValues()) - data, err := p.decompress(p.r, lenCompressed, buf.Bytes()) + data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dataPageBuffer) if err != nil { p.err = err return false @@ -833,7 +874,7 @@ func (p *serializedPageReader) Next() bool { // make datapagev1 p.curPage = &DataPageV1{ page: page{ - buf: buf, + buf: memory.NewBufferBytes(data), typ: p.curPageHdr.Type, nvals: dataHeader.GetNumValues(), encoding: dataHeader.GetEncoding(), diff --git a/parquet/reader_properties.go b/parquet/reader_properties.go index 11ead445..254cf217 100644 --- a/parquet/reader_properties.go +++ b/parquet/reader_properties.go @@ -50,6 +50,8 @@ type BufferedReader interface { Peek(int) ([]byte, error) Discard(int) (int, error) Outer() utils.Reader + // Buffered returns the number of bytes already read and stored in the buffer + Buffered() int BufferSize() int Reset(utils.Reader) Free() From 442bd0333e0a3f6c2207bdb8185478c1f70f279a Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Thu, 30 Oct 2025 19:05:14 +0100 Subject: [PATCH 4/6] Fix encryption for DataPageV2 --- parquet/file/column_reader_test.go | 152 +++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/parquet/file/column_reader_test.go b/parquet/file/column_reader_test.go index 581f8957..5e06b650 100644 --- a/parquet/file/column_reader_test.go +++ b/parquet/file/column_reader_test.go @@ -33,7 +33,9 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/utils" "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/compress" "github.com/apache/arrow-go/v18/parquet/file" + "github.com/apache/arrow-go/v18/parquet/internal/encryption" "github.com/apache/arrow-go/v18/parquet/internal/testutils" "github.com/apache/arrow-go/v18/parquet/pqarrow" "github.com/apache/arrow-go/v18/parquet/schema" @@ -42,6 +44,17 @@ import ( "github.com/stretchr/testify/suite" ) +const ( + FooterEncryptionKey = "0123456789012345" + ColumnEncryptionKey1 = "1234567890123450" + ColumnEncryptionKey2 = "1234567890123451" + ColumnEncryptionKey3 = "1234567890123452" + FooterEncryptionKeyID = "kf" + ColumnEncryptionKey1ID = "kc1" + ColumnEncryptionKey2ID = "kc2" + ColumnEncryptionKey3ID = "kc3" +) + func initValues(values reflect.Value) { if values.Kind() != reflect.Slice { panic("must init values with slice") @@ -814,6 +827,145 @@ func TestFullSeekRow(t *testing.T) { } } +func checkDecryptedValues(t *testing.T, writerProps *parquet.WriterProperties, readProps *parquet.ReaderProperties) { + sc := arrow.NewSchema([]arrow.Field{ + {Name: "c0", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + {Name: "c1", Type: arrow.BinaryTypes.String, Nullable: true}, + {Name: "c2", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64), Nullable: true}, + }, nil) + + tbl, err := array.TableFromJSON(mem, sc, []string{`[ + {"c0": 1, "c1": "a", "c2": [1]}, + {"c0": 2, "c1": "b", "c2": [1, 2]}, + {"c0": 3, "c1": "c", "c2": [null]}, + {"c0": null, "c1": "d", "c2": []}, + {"c0": 5, "c1": null, "c2": [3, 3, 3]}, + {"c0": 6, "c1": "f", "c2": null} + ]`}) + require.NoError(t, err) + defer tbl.Release() + + schema := tbl.Schema() + arrWriterProps := pqarrow.NewArrowWriterProperties() + + var buf bytes.Buffer + wr, err := pqarrow.NewFileWriter(schema, &buf, writerProps, arrWriterProps) + require.NoError(t, err) + + require.NoError(t, wr.WriteTable(tbl, tbl.NumRows())) + require.NoError(t, wr.Close()) + + rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), file.WithReadProps(readProps)) + require.NoError(t, err) + defer rdr.Close() + + rgr := rdr.RowGroup(0) + col0, err := rgr.Column(0) + require.NoError(t, err) + + icr := col0.(*file.Int64ColumnChunkReader) + // require.NoError(t, icr.SeekToRow(3)) // TODO: this causes a panic currently + + vals := make([]int64, 6) + defLvls := make([]int16, 6) + repLvls := make([]int16, 6) + + totalLvls, read, err := icr.ReadBatch(6, vals, defLvls, repLvls) + require.NoError(t, err) + assert.EqualValues(t, 6, totalLvls) + assert.EqualValues(t, 5, read) + assert.Equal(t, []int64{1, 2, 3, 5, 6}, vals[:read]) + assert.Equal(t, []int16{1, 1, 1, 0, 1, 1}, defLvls[:totalLvls]) + assert.Equal(t, []int16{0, 0, 0, 0, 0, 0}, repLvls[:totalLvls]) + + col1, err := rgr.Column(1) + require.NoError(t, err) + + scr := col1.(*file.ByteArrayColumnChunkReader) + + bavals := make([]parquet.ByteArray, 6) + badefLvls := make([]int16, 6) + barepLvls := make([]int16, 6) + + totalLvls, read, err = scr.ReadBatch(6, bavals, badefLvls, barepLvls) + require.NoError(t, err) + assert.EqualValues(t, 6, totalLvls) + assert.EqualValues(t, 5, read) + expectedBAs := []parquet.ByteArray{ + []byte("a"), + []byte("b"), + []byte("c"), + []byte("d"), + []byte("f"), + } + assert.Equal(t, expectedBAs, bavals[:read]) + assert.Equal(t, []int16{1, 1, 1, 1, 0, 1}, badefLvls[:totalLvls]) + assert.Equal(t, []int16{0, 0, 0, 0, 0, 0}, barepLvls[:totalLvls]) + + col2, err := rgr.Column(2) + require.NoError(t, err) + + lcr := col2.(*file.Int64ColumnChunkReader) + vals = make([]int64, 10) + defLvls = make([]int16, 10) + repLvls = make([]int16, 10) + totalLvls, read, err = lcr.ReadBatch(6, vals, defLvls, repLvls) + require.NoError(t, err) + + assert.EqualValues(t, 6, totalLvls) + assert.EqualValues(t, 4, read) + + assert.Equal(t, []int64{1, 1, 2, 3}, vals[:read]) + assert.Equal(t, []int16{3, 3, 3, 2, 1, 3}, defLvls[:totalLvls]) + assert.Equal(t, []int16{0, 0, 1, 0, 0, 0}, repLvls[:totalLvls]) +} + +func TestDecryptColumns(t *testing.T) { + encryptCols := make(parquet.ColumnPathToEncryptionPropsMap) + encryptCols["c0"] = parquet.NewColumnEncryptionProperties("c0", parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID(ColumnEncryptionKey1ID)) + encryptCols["c1"] = parquet.NewColumnEncryptionProperties("c1", parquet.WithKey(ColumnEncryptionKey2), parquet.WithKeyID(ColumnEncryptionKey2ID)) + encryptCols["c2.list.element"] = parquet.NewColumnEncryptionProperties("c2.list.element", parquet.WithKey(ColumnEncryptionKey3), parquet.WithKeyID(ColumnEncryptionKey3ID)) + encryptProps := parquet.NewFileEncryptionProperties(FooterEncryptionKey, parquet.WithFooterKeyMetadata(FooterEncryptionKeyID), + parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr)) + + stringKr1 := make(encryption.StringKeyIDRetriever) + stringKr1.PutKey(FooterEncryptionKeyID, FooterEncryptionKey) + stringKr1.PutKey(ColumnEncryptionKey1ID, ColumnEncryptionKey1) + stringKr1.PutKey(ColumnEncryptionKey2ID, ColumnEncryptionKey2) + stringKr1.PutKey(ColumnEncryptionKey3ID, ColumnEncryptionKey3) + decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr1)) + + tests := []struct { + name string + dataPageVersion parquet.DataPageVersion + bufferedStream bool + compression compress.Compression + }{ + {"DataPageV2_BufferedRead", parquet.DataPageV2, true, compress.Codecs.Uncompressed}, + {"DataPageV2_DirectRead", parquet.DataPageV2, false, compress.Codecs.Uncompressed}, + {"DataPageV2_BufferedRead_Compressed", parquet.DataPageV2, true, compress.Codecs.Snappy}, + {"DataPageV2_DirectRead_Compressed", parquet.DataPageV2, false, compress.Codecs.Snappy}, + {"DataPageV1_BufferedRead", parquet.DataPageV1, true, compress.Codecs.Uncompressed}, + {"DataPageV1_DirectRead", parquet.DataPageV1, false, compress.Codecs.Uncompressed}, + {"DataPageV1_BufferedRead_Compressed", parquet.DataPageV1, true, compress.Codecs.Snappy}, + {"DataPageV1_DirectRead_Compressed", parquet.DataPageV1, false, compress.Codecs.Snappy}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + writerProps := parquet.NewWriterProperties( + parquet.WithDataPageVersion(tt.dataPageVersion), + parquet.WithEncryptionProperties(encryptProps.Clone("")), + parquet.WithCompression(tt.compression), + ) + readProps := parquet.NewReaderProperties(nil) + readProps.FileDecryptProps = decryptProps.Clone("") + readProps.BufferedStreamEnabled = tt.bufferedStream + checkDecryptedValues(t, writerProps, readProps) + }) + } +} + func BenchmarkReadInt32Column(b *testing.B) { // generate parquet with RLE-dictionary encoded int32 column tempdir := b.TempDir() From ee5ac32e79f6df23c30ac967301aa53bdf347fa9 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Mon, 26 Jan 2026 15:37:37 +0100 Subject: [PATCH 5/6] perf(parquet): optimize page reader for memory reuse and add comprehensive benchmarks - Refactor page reader to unify V1/V2 page reading logic with better buffer management - Implement readOrStealData to optimize buffered vs direct reads - Reduce buffer allocations by strategically reusing decompress/data buffers Benchmark infrastructure improvements: - Add TestMain with setup/teardown for shared benchmark files - Create 8 benchmark file variants (V1/V2, plain/snappy, encrypted/unencrypted) - Add BenchmarkReadInt32 and BenchmarkReadInt32Buffered with sub-benchmarks - Move file creation to shared setup to eliminate per-benchmark overhead --- parquet/file/column_reader_test.go | 289 +++++++++++++++++++++++++++-- parquet/file/page_reader.go | 254 ++++++++++++++----------- 2 files changed, 419 insertions(+), 124 deletions(-) diff --git a/parquet/file/column_reader_test.go b/parquet/file/column_reader_test.go index 5e06b650..196c4d3b 100644 --- a/parquet/file/column_reader_test.go +++ b/parquet/file/column_reader_test.go @@ -966,44 +966,213 @@ func TestDecryptColumns(t *testing.T) { } } -func BenchmarkReadInt32Column(b *testing.B) { - // generate parquet with RLE-dictionary encoded int32 column - tempdir := b.TempDir() - filepath := filepath.Join(tempdir, "rle-dict-int32.parquet") +var ( + benchmarkFilesOnce sync.Once + benchmarkTempDir string + benchmarkFiles = make(map[string]string) + + benchmarkKeyRetriever = func() encryption.StringKeyIDRetriever { + stringKr := make(encryption.StringKeyIDRetriever) + stringKr.PutKey(FooterEncryptionKeyID, FooterEncryptionKey) + stringKr.PutKey(ColumnEncryptionKey1ID, ColumnEncryptionKey1) + return stringKr + } +) + +func TestMain(m *testing.M) { + // Create benchmark files + err := setupBenchmarkFiles() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to set up benchmark files: %v\n", err) + os.Exit(1) + } + + // Run the tests/benchmarks + code := m.Run() + + // Cleanup temp directory after everything is done + if benchmarkTempDir != "" { + os.RemoveAll(benchmarkTempDir) + } + os.Exit(code) +} + +func setupBenchmarkFiles() error { + var err error + benchmarkTempDir, err = os.MkdirTemp("", "parquet-bench-*") + if err != nil { + return fmt.Errorf("failed to create temp dir: %v", err) + } + + // Create V1 file + v1File := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.parquet") props := parquet.NewWriterProperties( parquet.WithDictionaryDefault(true), parquet.WithDataPageSize(128*1024*1024), // 128MB parquet.WithBatchSize(128*1024*1024), - parquet.WithMaxRowGroupLength(100_000), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithDataPageVersion(parquet.DataPageV1), + parquet.WithVersion(parquet.V2_LATEST), + ) + if err := createBenchmarkFile(v1File, props); err != nil { + return err + } + benchmarkFiles["v1"] = v1File + + // Create V2 file + v2File := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.parquet") + props = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithDataPageSize(128*1024*1024), // 128MB + parquet.WithBatchSize(128*1024*1024), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithDataPageVersion(parquet.DataPageV2), + parquet.WithVersion(parquet.V2_LATEST), + ) + if err := createBenchmarkFile(v2File, props); err != nil { + return err + } + benchmarkFiles["v2"] = v2File + + // Create V1 Snappy file + v1SnappyFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.snappy.parquet") + props = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithDataPageSize(128*1024*1024), // 128MB + parquet.WithBatchSize(128*1024*1024), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithCompression(compress.Codecs.Snappy), + parquet.WithDataPageVersion(parquet.DataPageV1), + parquet.WithVersion(parquet.V2_LATEST), + ) + if err = createBenchmarkFile(v1SnappyFile, props); err != nil { + return err + } + benchmarkFiles["v1-snappy"] = v1SnappyFile + + // Create V2 Snappy file + v2SnappyFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.snappy.parquet") + props = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithDataPageSize(128*1024*1024), // 128MB + parquet.WithBatchSize(128*1024*1024), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithCompression(compress.Codecs.Snappy), + parquet.WithDataPageVersion(parquet.DataPageV2), + parquet.WithVersion(parquet.V2_LATEST), + ) + if err = createBenchmarkFile(v2SnappyFile, props); err != nil { + return err + } + benchmarkFiles["v2-snappy"] = v2SnappyFile + + // Create encrypted files + encryptCols := make(parquet.ColumnPathToEncryptionPropsMap) + encryptCols["col"] = parquet.NewColumnEncryptionProperties("col", parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID(ColumnEncryptionKey1ID)) + encryptProps := parquet.NewFileEncryptionProperties(FooterEncryptionKey, parquet.WithFooterKeyMetadata(FooterEncryptionKeyID), + parquet.WithEncryptedColumns(encryptCols), parquet.WithAlg(parquet.AesCtr)) + + // Create V1 encrypted file + v1EncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.encrypted.parquet") + props = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithDataPageSize(128*1024*1024), // 128MB + parquet.WithBatchSize(128*1024*1024), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithDataPageVersion(parquet.DataPageV1), + parquet.WithVersion(parquet.V2_LATEST), + parquet.WithEncryptionProperties(encryptProps.Clone("")), + ) + if err := createBenchmarkFile(v1EncFile, props); err != nil { + return err + } + benchmarkFiles["v1-encrypted"] = v1EncFile + + // Create V2 encrypted file + v2EncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.encrypted.parquet") + props = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithDataPageSize(128*1024*1024), // 128MB + parquet.WithBatchSize(128*1024*1024), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithDataPageVersion(parquet.DataPageV2), + parquet.WithVersion(parquet.V2_LATEST), + parquet.WithEncryptionProperties(encryptProps.Clone("")), + ) + if err := createBenchmarkFile(v2EncFile, props); err != nil { + return err + } + benchmarkFiles["v2-encrypted"] = v2EncFile + + // Create V1 Snappy encrypted file + v1SnappyEncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v1.snappy.encrypted.parquet") + props = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithDataPageSize(128*1024*1024), // 128MB + parquet.WithBatchSize(128*1024*1024), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithCompression(compress.Codecs.Snappy), + parquet.WithDataPageVersion(parquet.DataPageV1), + parquet.WithVersion(parquet.V2_LATEST), + parquet.WithEncryptionProperties(encryptProps.Clone("")), + ) + if err := createBenchmarkFile(v1SnappyEncFile, props); err != nil { + return err + } + benchmarkFiles["v1-snappy-encrypted"] = v1SnappyEncFile + + // Create V2 Snappy encrypted file + v2SnappyEncFile := filepath.Join(benchmarkTempDir, "rle-dict-int32.v2.snappy.encrypted.parquet") + props = parquet.NewWriterProperties( + parquet.WithDictionaryDefault(true), + parquet.WithDataPageSize(128*1024*1024), // 128MB + parquet.WithBatchSize(128*1024*1024), + parquet.WithMaxRowGroupLength(1_000_000), + parquet.WithCompression(compress.Codecs.Snappy), parquet.WithDataPageVersion(parquet.DataPageV2), parquet.WithVersion(parquet.V2_LATEST), + parquet.WithEncryptionProperties(encryptProps.Clone("")), ) + if err := createBenchmarkFile(v2SnappyEncFile, props); err != nil { + return err + } + benchmarkFiles["v2-snappy-encrypted"] = v2SnappyEncFile + + return nil +} + +func createBenchmarkFile(filepath string, props *parquet.WriterProperties) error { outFile, err := os.Create(filepath) - require.NoError(b, err) + if err != nil { + return fmt.Errorf("failed to create benchmark file: %v", err) + } + defer outFile.Close() sc, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{ schema.NewInt32Node("col", parquet.Repetitions.Required, -1), }, -1) - require.NoError(b, err) + if err != nil { + return fmt.Errorf("failed to create schema: %v", err) + } writer := file.NewParquetWriter(outFile, sc, file.WithWriterProps(props)) - // 10 row groups of 100 000 rows = 1 000 000 rows in total + // 10 row groups of 1_000_000 rows = 10_000_000 rows in total value := int32(1) for range 10 { rgWriter := writer.AppendBufferedRowGroup() cwr, _ := rgWriter.Column(0) cw := cwr.(*file.Int32ColumnChunkWriter) - valuesIn := make([]int32, 0, 100_000) + valuesIn := make([]int32, 0, 1_000_000) repeats := 1 - for len(valuesIn) < 100_000 { + for len(valuesIn) < 1_000_000 { repeatedValue := make([]int32, repeats) for i := range repeatedValue { repeatedValue[i] = value } - if len(valuesIn)+len(repeatedValue) > 100_000 { - repeatedValue = repeatedValue[:100_000-len(valuesIn)] + if len(valuesIn)+len(repeatedValue) > 1_000_000 { + repeatedValue = repeatedValue[:1_000_000-len(valuesIn)] } valuesIn = append(valuesIn, repeatedValue[:]...) // repeat values from 1 to 50 times @@ -1013,16 +1182,27 @@ func BenchmarkReadInt32Column(b *testing.B) { cw.WriteBatch(valuesIn, nil, nil) rgWriter.Close() } - err = writer.Close() - require.NoError(b, err) + if err := writer.Close(); err != nil { + return fmt.Errorf("failed to close parquet writer: %v", err) + } + return nil +} + +func benchmarkReadInt32ColumnWithDecryption(b *testing.B, filepath string, readProps *parquet.ReaderProperties) { + var reader *file.Reader + var err error - reader, err := file.OpenParquetFile(filepath, false) + if readProps != nil { + reader, err = file.OpenParquetFile(filepath, false, file.WithReadProps(readProps)) + } else { + reader, err = file.OpenParquetFile(filepath, false) + } require.NoError(b, err) defer reader.Close() numValues := reader.NumRows() values := make([]int32, numValues) - b.StopTimer() + b.ResetTimer() for i := 0; i < b.N; i++ { startIndex := 0 @@ -1034,9 +1214,7 @@ func BenchmarkReadInt32Column(b *testing.B) { cr, ok := colReader.(*file.Int32ColumnChunkReader) require.True(b, ok) - b.StartTimer() - _, valuesRead, err := cr.ReadBatch(rgReader.NumRows(), values, nil, nil) - b.StopTimer() + _, valuesRead, err := cr.ReadBatch(rgReader.NumRows(), values[startIndex:], nil, nil) require.NoError(b, err) startIndex += valuesRead @@ -1045,3 +1223,76 @@ func BenchmarkReadInt32Column(b *testing.B) { require.Equal(b, numValues, int64(startIndex)) } } + +func BenchmarkReadInt32(b *testing.B) { + b.Run("V1Page", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1"], nil) + }) + b.Run("V2Page", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2"], nil) + }) + b.Run("V1PageSnappy", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy"], nil) + }) + b.Run("V2PageSnappy", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy"], nil) + }) + + // Set up decryption properties + decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(benchmarkKeyRetriever())) + readProps := parquet.NewReaderProperties(mem) + b.Run("V1PageEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-encrypted"], readProps) + }) + b.Run("V2PageEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-encrypted"], readProps) + }) + + b.Run("V1PageSnappyEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy-encrypted"], readProps) + }) + + b.Run("V2PageSnappyEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy-encrypted"], readProps) + }) +} + +func BenchmarkReadInt32Buffered(b *testing.B) { + readProps := parquet.NewReaderProperties(mem) + readProps.BufferedStreamEnabled = true + b.Run("V1Page", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1"], readProps) + }) + b.Run("V2Page", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2"], readProps) + }) + b.Run("V1PageSnappy", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy"], readProps) + }) + b.Run("V2PageSnappy", func(b *testing.B) { + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy"], readProps) + }) + + // Set up decryption properties + decryptProps := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(benchmarkKeyRetriever())) + b.Run("V1PageEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-encrypted"], readProps) + }) + b.Run("V2PageEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-encrypted"], readProps) + }) + b.Run("V1PageSnappyEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v1-snappy-encrypted"], readProps) + }) + b.Run("V2PageSnappyEncrypted", func(b *testing.B) { + readProps.FileDecryptProps = decryptProps.Clone("") + benchmarkReadInt32ColumnWithDecryption(b, benchmarkFiles["v2-snappy-encrypted"], readProps) + }) +} diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go index 172a57e5..0e12e7b1 100644 --- a/parquet/file/page_reader.go +++ b/parquet/file/page_reader.go @@ -508,36 +508,6 @@ func (p *serializedPageReader) Page() Page { return p.curPage } -func (p *serializedPageReader) stealFromBuffer(br parquet.BufferedReader, lenUncompressed int) ([]byte, error) { - data, err := br.Peek(lenUncompressed) - if err != nil { - return nil, err - } - if p.cryptoCtx.DataDecryptor != nil { - data = p.cryptoCtx.DataDecryptor.Decrypt(data) - } - // advance the reader - _, err = br.Discard(lenUncompressed) - if err != nil && err != io.EOF { - return nil, err - } - return data, nil -} - -func (p *serializedPageReader) readUncompressed(br parquet.BufferedReader, lenUncompressed int, buf []byte) ([]byte, error) { - n, err := io.ReadFull(br, buf[:lenUncompressed]) - if err != nil { - return nil, err - } - if n != lenUncompressed { - return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n) - } - if p.cryptoCtx.DataDecryptor != nil { - buf = p.cryptoCtx.DataDecryptor.Decrypt(buf) - } - return buf, nil -} - func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) { p.decompressBuffer.ResizeNoShrink(lenCompressed) @@ -558,55 +528,6 @@ func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf [ return p.codec.Decode(buf, data), nil } -func (p *serializedPageReader) readV2Encrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error { - // if encrypted, we need to decrypt before decompressing - p.decompressBuffer.ResizeNoShrink(lenCompressed) - - n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed]) - if err != nil { - return err - } - if n != lenCompressed { - return fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", lenCompressed, n) - } - - data := p.cryptoCtx.DataDecryptor.Decrypt(p.decompressBuffer.Bytes()[:lenCompressed]) - // encrypted + uncompressed -> just copy the decrypted data to output buffer - if !compressed { - copy(buf, data) - return nil - } - - // definition + repetition levels are always uncompressed - if levelsBytelen > 0 { - copy(buf, data[:levelsBytelen]) - data = data[levelsBytelen:] - } - p.codec.Decode(buf[levelsBytelen:], data) - return nil -} - -func (p *serializedPageReader) readV2Unencrypted(rd io.Reader, lenCompressed int, levelsBytelen int, compressed bool, buf []byte) error { - if !compressed { - // uncompressed, just read into the buffer - if _, err := io.ReadFull(rd, buf); err != nil { - return err - } - return nil - } - - // definition + repetition levels are always uncompressed - if levelsBytelen > 0 { - if _, err := io.ReadFull(rd, buf[:levelsBytelen]); err != nil { - return err - } - } - if _, err := p.decompress(p.r, lenCompressed-levelsBytelen, buf[levelsBytelen:]); err != nil { - return err - } - return nil -} - type dataheader interface { IsSetStatistics() bool GetStatistics() *format.Statistics @@ -670,7 +591,7 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) { return nil, errors.New("parquet: invalid page header (negative number of values)") } - data, err := p.getPageBytes(rd, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer) + data, err := p.getPageBytesV1(rd, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer) if err != nil { return nil, fmt.Errorf("parquet: could not read dictionary page data: %w", err) } @@ -776,18 +697,151 @@ func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error { return p.err } -func (p *serializedPageReader) getPageBytes( +// readOrStealData attempts to steal data from the buffered reader if enough is buffered, +// otherwise reads from the underlying reader into the provided buffer. +func (p *serializedPageReader) readOrStealData(r parquet.BufferedReader, lenCompressed int, buffer *memory.Buffer) ([]byte, error) { + // if enough data is buffered, steal it to avoid an extra copy + if r.Buffered() >= lenCompressed { + data, err := r.Peek(lenCompressed) + if err != nil { + return nil, err + } + // advance the reader + _, err = r.Discard(lenCompressed) + if err != nil && err != io.EOF { + return nil, err + } + return data, nil + } + + buffer.ResizeNoShrink(lenCompressed) + // Read directly into the memory.Buffer's backing slice + n, err := io.ReadFull(r, buffer.Bytes()[:lenCompressed]) + if err != nil { + return nil, err + } + if n != lenCompressed { + return nil, fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", lenCompressed, n) + } + return buffer.Bytes()[:lenCompressed], nil +} + +func (p *serializedPageReader) getPageBytesV1( r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer, ) ([]byte, error) { + // 8 possible cases: + // 1. enough data buffered (r.Buffered() >= lenCompressed) + // a. encrypted + // i. compressed -> resize output buffer to lenUncompressed, steal (Peek+Discard) buffer, decrypt (allocates new buffer), decompress into output buffer (has enough allocated size) and return output buffer + // ii. not compressed -> resize output buffer to lenCompressed, steal buffer, decrypt (allocates new buffer) and return new buffer + // b. not encrypted + // i. compressed -> resize output buffer to lenUncompressed, steal buffer, decompress into output buffer (has enough allocated size) and return output buffer + // ii. not compressed -> steal from buffer and return + // 2. not enough data buffered + // a. encrypted + // i. compressed -> resize output buffer to lenUncompressed, read into output buffer, decrypt (allocates new buffer), decompress into output buffer (has enough allocated size) and return output buffer + // ii. not compressed -> resize output buffer to lenCompressed, read into output buffer, decrypt (allocates new buffer) and return new buffer + // b. not encrypted + // i. compressed -> resize decompress buffer to lenCompressed, read into decompress buffer, decompress into output buffer (has enough allocated size) and return output buffer + // ii. not compressed -> read into output buffer and return + + // Determine which buffer to read into + var readBuffer *memory.Buffer + if isCompressed { + readBuffer = p.decompressBuffer + } else { + readBuffer = buffer + } + + // Read or steal data + data, err := p.readOrStealData(r, lenCompressed, readBuffer) + if err != nil { + return nil, err + } + + // Decrypt if needed + if p.cryptoCtx.DataDecryptor != nil { + data = p.cryptoCtx.DataDecryptor.Decrypt(data) + } + + // Decompress if needed if isCompressed { buffer.ResizeNoShrink(lenUncompressed) - return p.decompress(r, lenCompressed, buffer.Bytes()) + return p.codec.Decode(buffer.Bytes(), data), nil } - if r.Buffered() >= lenCompressed { - return p.stealFromBuffer(r, lenCompressed) + + return data, nil +} + +func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.BufferedReader, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer) ([]byte, error) { + // Special case: unencrypted + compressed + has levels + // Read levels directly into output buffer, compressed data into decompress buffer + buffer.ResizeNoShrink(lenUncompressed) + // Read directly into the memory.Buffer's backing slice + n, err := io.ReadFull(r, buffer.Bytes()[:levelsBytelen]) + if err != nil { + return nil, err + } + if n != levelsBytelen { + return nil, fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", levelsBytelen, n) + } + + p.decompressBuffer.ResizeNoShrink(lenCompressed - levelsBytelen) + n, err = io.ReadFull(r, p.decompressBuffer.Bytes()[:lenCompressed-levelsBytelen]) + if err != nil { + return nil, err } + if n != lenCompressed-levelsBytelen { + return nil, fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", lenCompressed-levelsBytelen, n) + } + + p.codec.Decode(buffer.Bytes()[levelsBytelen:], p.decompressBuffer.Bytes()) + return buffer.Bytes(), nil +} + +func (p *serializedPageReader) getPageBytesV2( + r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer, +) ([]byte, error) { + // Special case: unencrypted + compressed + has levels - read levels and compressed data separately + if r.Buffered() < lenCompressed && p.cryptoCtx.DataDecryptor == nil && isCompressed && levelsBytelen > 0 { + return p.readV2UnencryptedCompressedWithLevels(r, lenCompressed, lenUncompressed, levelsBytelen, buffer) + } + + // Determine which buffer to read into + var readBuffer *memory.Buffer + if p.cryptoCtx.DataDecryptor == nil && isCompressed { + readBuffer = p.decompressBuffer + } else { + readBuffer = buffer + } + + var data []byte + var err error + // Read or steal data + data, err = p.readOrStealData(r, lenCompressed, readBuffer) + if err != nil { + return nil, err + } + + // Decrypt if needed + if p.cryptoCtx.DataDecryptor != nil { + data = p.cryptoCtx.DataDecryptor.Decrypt(data) + } + + // Handle uncompressed case + if !isCompressed { + return data, nil + } + + // Decompress - handle levels if present buffer.ResizeNoShrink(lenUncompressed) - return p.readUncompressed(r, lenCompressed, buffer.Bytes()) + if levelsBytelen > 0 { + copy(buffer.Bytes(), data[:levelsBytelen]) + p.codec.Decode(buffer.Bytes()[levelsBytelen:], data[levelsBytelen:]) + return buffer.Bytes(), nil + } + + return p.codec.Decode(buffer.Bytes(), data), nil } func (p *serializedPageReader) Next() bool { @@ -829,7 +883,7 @@ func (p *serializedPageReader) Next() bool { return false } - data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer) + data, err := p.getPageBytesV1(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer) if err != nil { p.err = err return false @@ -861,13 +915,13 @@ func (p *serializedPageReader) Next() bool { firstRowIdx := p.rowsSeen p.rowsSeen += int64(dataHeader.GetNumValues()) - data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dataPageBuffer) + data, err := p.getPageBytesV1(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dataPageBuffer) if err != nil { p.err = err return false } if len(data) != lenUncompressed { - p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, len(data)) + p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed V1 data page, got %d bytes", lenUncompressed, len(data)) return false } @@ -898,9 +952,6 @@ func (p *serializedPageReader) Next() bool { return false } - p.dataPageBuffer.ResizeNoShrink(lenUncompressed) - buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes()) - compressed := dataHeader.GetIsCompressed() // extract stats firstRowIdx := p.rowsSeen @@ -911,27 +962,20 @@ func (p *serializedPageReader) Next() bool { return false } - if p.cryptoCtx.DataDecryptor != nil { - if err := p.readV2Encrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil { - p.err = err - return false - } - } else { - if err := p.readV2Unencrypted(p.r, lenCompressed, levelsBytelen, compressed, buf.Bytes()); err != nil { - p.err = err - return false - } + data, err := p.getPageBytesV2(p.r, compressed, lenCompressed, lenUncompressed, levelsBytelen, p.dataPageBuffer) + if err != nil { + p.err = err + return false } - - if buf.Len() != lenUncompressed { - p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed data page, got %d bytes", lenUncompressed, buf.Len()) + if len(data) != lenUncompressed { + p.err = fmt.Errorf("parquet: metadata said %d bytes uncompressed V2 data page, got %d bytes", lenUncompressed, len(data)) return false } // make datapage v2 p.curPage = &DataPageV2{ page: page{ - buf: buf, + buf: memory.NewBufferBytes(data), typ: p.curPageHdr.Type, nvals: dataHeader.GetNumValues(), encoding: dataHeader.GetEncoding(), From 68ebe437cf611a7c98fe1f6406c09864743fa6cd Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Thu, 29 Jan 2026 10:32:17 +0100 Subject: [PATCH 6/6] Add BufferedReaderV2 interface for advanced buffer management - Introduce BufferedReaderV2 interface extending BufferedReader with Buffered() and Free() methods. - Enables explicit resource management and buffer introspection for readers using custom allocators. - Update GetStreamV2 to return BufferedReaderV2, allowing callers to free allocated buffers when done. - Keep BufferedReader and BufferedReader for backwards compatibility --- internal/utils/buf_reader.go | 8 +++- parquet/file/page_reader.go | 58 ++++++++++++----------- parquet/file/row_group_reader.go | 2 +- parquet/internal/testutils/pagebuilder.go | 12 +++-- parquet/reader_properties.go | 34 +++++++++++-- parquet/reader_writer_properties_test.go | 12 ++--- 6 files changed, 82 insertions(+), 44 deletions(-) diff --git a/internal/utils/buf_reader.go b/internal/utils/buf_reader.go index 7ba904d1..7ec4ae16 100644 --- a/internal/utils/buf_reader.go +++ b/internal/utils/buf_reader.go @@ -40,7 +40,7 @@ type byteReader struct { // NewByteReader creates a new ByteReader instance from the given byte slice. // It wraps the bytes.NewReader function to implement BufferedReader interface. -// It is considered not to own the underlying byte slice, so the Free method is a no-op. +// It is considered not to own the underlying byte slice. func NewByteReader(buf []byte) *byteReader { r := bytes.NewReader(buf) return &byteReader{ @@ -113,7 +113,11 @@ func (r *byteReader) BufferSize() int { return len(r.buf) } func (r *byteReader) Buffered() int { return len(r.buf) - r.pos } -func (r *byteReader) Free() {} +func (r *byteReader) Free() { + r.r = nil + r.buf = nil + r.pos = 0 +} // bytesBufferReader is a byte slice with a bytes reader wrapped around it. // It uses an allocator to allocate and free the underlying byte slice. diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go index 0e12e7b1..de558462 100644 --- a/parquet/file/page_reader.go +++ b/parquet/file/page_reader.go @@ -349,7 +349,7 @@ func (d *DictionaryPage) Release() { func (d *DictionaryPage) IsSorted() bool { return d.sorted } type serializedPageReader struct { - r parquet.BufferedReader + r parquet.BufferedReaderV2 chunk *metadata.ColumnChunkMetaData colIdx int pgIndexReader *metadata.RowGroupPageIndexReader @@ -421,6 +421,25 @@ func (p *serializedPageReader) init(compressType compress.Compression, ctx *Cryp return nil } +type bufferedReaderV2Adapter struct { + parquet.BufferedReader +} + +func (b *bufferedReaderV2Adapter) Buffered() int { + return 0 +} + +func (b *bufferedReaderV2Adapter) Free() { + // no-op +} + +func getBufferedReaderV2(r parquet.BufferedReader) parquet.BufferedReaderV2 { + if brV2, ok := r.(parquet.BufferedReaderV2); ok { + return brV2 + } + return &bufferedReaderV2Adapter{BufferedReader: r} +} + // NewPageReader returns a page reader for the data which can be read from the provided reader and compression. // // Deprecated: This function isn't properly safe for public API use and should not be utilized @@ -436,7 +455,7 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress. } rdr := &serializedPageReader{ - r: r, + r: getBufferedReaderV2(r), maxPageHeaderSize: defaultMaxPageHeaderSize, nrows: nrows, mem: mem, @@ -458,7 +477,10 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress. func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, compressType compress.Compression, ctx *CryptoContext) { p.rowsSeen, p.pageOrd, p.nrows = 0, 0, nrows p.curPageHdr, p.curPage, p.err = nil, nil, nil - p.r = r + if p.r != nil && p.r != r { + p.r.Free() + } + p.r = getBufferedReaderV2(r) p.codec, p.err = compress.GetCodec(compressType) if p.err != nil { @@ -508,26 +530,6 @@ func (p *serializedPageReader) Page() Page { return p.curPage } -func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) { - p.decompressBuffer.ResizeNoShrink(lenCompressed) - - // Read directly into the memory.Buffer's backing slice - n, err := io.ReadFull(rd, p.decompressBuffer.Bytes()[:lenCompressed]) - if err != nil { - return nil, err - } - if n != lenCompressed { - return nil, fmt.Errorf("parquet: expected to read %d compressed bytes, got %d", lenCompressed, n) - } - - data := p.decompressBuffer.Bytes()[:lenCompressed] - if p.cryptoCtx.DataDecryptor != nil { - data = p.cryptoCtx.DataDecryptor.Decrypt(data) - } - - return p.codec.Decode(buf, data), nil -} - type dataheader interface { IsSetStatistics() bool GetStatistics() *format.Statistics @@ -613,7 +615,7 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) { return nil, nil } -func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr *format.PageHeader) error { +func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReaderV2, hdr *format.PageHeader) error { allowedPgSz := defaultPageHeaderSize for { view, err := rd.Peek(allowedPgSz) @@ -699,7 +701,7 @@ func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error { // readOrStealData attempts to steal data from the buffered reader if enough is buffered, // otherwise reads from the underlying reader into the provided buffer. -func (p *serializedPageReader) readOrStealData(r parquet.BufferedReader, lenCompressed int, buffer *memory.Buffer) ([]byte, error) { +func (p *serializedPageReader) readOrStealData(r parquet.BufferedReaderV2, lenCompressed int, buffer *memory.Buffer) ([]byte, error) { // if enough data is buffered, steal it to avoid an extra copy if r.Buffered() >= lenCompressed { data, err := r.Peek(lenCompressed) @@ -727,7 +729,7 @@ func (p *serializedPageReader) readOrStealData(r parquet.BufferedReader, lenComp } func (p *serializedPageReader) getPageBytesV1( - r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer, + r parquet.BufferedReaderV2, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer, ) ([]byte, error) { // 8 possible cases: // 1. enough data buffered (r.Buffered() >= lenCompressed) @@ -773,7 +775,7 @@ func (p *serializedPageReader) getPageBytesV1( return data, nil } -func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.BufferedReader, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer) ([]byte, error) { +func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.BufferedReaderV2, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer) ([]byte, error) { // Special case: unencrypted + compressed + has levels // Read levels directly into output buffer, compressed data into decompress buffer buffer.ResizeNoShrink(lenUncompressed) @@ -800,7 +802,7 @@ func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.B } func (p *serializedPageReader) getPageBytesV2( - r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer, + r parquet.BufferedReaderV2, isCompressed bool, lenCompressed, lenUncompressed, levelsBytelen int, buffer *memory.Buffer, ) ([]byte, error) { // Special case: unencrypted + compressed + has levels - read levels and compressed data separately if r.Buffered() < lenCompressed && p.cryptoCtx.DataDecryptor == nil && isCompressed && levelsBytelen > 0 { diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go index bf75db46..10a9705d 100644 --- a/parquet/file/row_group_reader.go +++ b/parquet/file/row_group_reader.go @@ -114,7 +114,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) { colLen += padding } - stream, err := r.props.GetStream(r.r, colStart, colLen) + stream, err := r.props.GetStreamV2(r.r, colStart, colLen) if err != nil { return nil, err } diff --git a/parquet/internal/testutils/pagebuilder.go b/parquet/internal/testutils/pagebuilder.go index 95ddf57f..e84e03f5 100644 --- a/parquet/internal/testutils/pagebuilder.go +++ b/parquet/internal/testutils/pagebuilder.go @@ -136,7 +136,8 @@ type DictionaryPageBuilder struct { func NewDictionaryPageBuilder(d *schema.Column) *DictionaryPageBuilder { return &DictionaryPageBuilder{ encoding.NewEncoder(d.PhysicalType(), parquet.Encodings.Plain, true, d, mem).(encoding.DictEncoder), - 0, false} + 0, false, + } } func (d *DictionaryPageBuilder) AppendValues(values interface{}) encoding.Buffer { @@ -243,9 +244,12 @@ func (m *MockPageReader) Err() error { } func (m *MockPageReader) Reset(parquet.BufferedReader, int64, compress.Compression, *file.CryptoContext) { + // no-op } -func (m *MockPageReader) SetMaxPageHeaderSize(int) {} +func (m *MockPageReader) SetMaxPageHeaderSize(int) { + // no-op +} func (m *MockPageReader) Page() file.Page { return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1] @@ -278,8 +282,8 @@ func (m *MockPageReader) Next() bool { } func PaginatePlain(version parquet.DataPageVersion, d *schema.Column, values reflect.Value, defLevels, repLevels []int16, - maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding) []file.Page { - + maxDef, maxRep int16, lvlsPerPage int, valuesPerPage []int, enc parquet.Encoding, +) []file.Page { var ( npages = len(valuesPerPage) defLvlStart = 0 diff --git a/parquet/reader_properties.go b/parquet/reader_properties.go index 254cf217..440adf21 100644 --- a/parquet/reader_properties.go +++ b/parquet/reader_properties.go @@ -46,15 +46,20 @@ type ReaderProperties struct { BufferedStreamEnabled bool } +type BufferedReaderV2 interface { + BufferedReader + // Buffered returns the number of bytes already read and stored in the buffer + Buffered() int + // Free releases any resources held by the BufferedReader + Free() +} + type BufferedReader interface { Peek(int) ([]byte, error) Discard(int) (int, error) Outer() utils.Reader - // Buffered returns the number of bytes already read and stored in the buffer - Buffered() int BufferSize() int Reset(utils.Reader) - Free() io.Reader } @@ -76,6 +81,29 @@ func (r *ReaderProperties) Allocator() memory.Allocator { return r.alloc } // If BufferedStreamEnabled is true, it creates an io.SectionReader, otherwise it will read the entire section // into a buffer in memory and return a bytes.NewReader for that buffer. func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (BufferedReader, error) { + if r.BufferedStreamEnabled { + return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize), nil), nil + } + + data := make([]byte, nbytes) + n, err := source.ReadAt(data, start) + if err != nil { + return nil, fmt.Errorf("parquet: tried reading from file, but got error: %w", err) + } + if n != int(nbytes) { + return nil, fmt.Errorf("parquet: tried reading %d bytes starting at position %d from file but only got %d", nbytes, start, n) + } + + return utils.NewByteReader(data), nil +} + +// GetStreamV2 returns a section of the underlying reader based on whether or not BufferedStream is enabled. +// +// If BufferedStreamEnabled is true, it creates an io.SectionReader, otherwise it will read the entire section +// into a buffer in memory and return a bytes.NewReader for that buffer. +// In comparison with GetStream, this version uses r.alloc to allocate the buffer for reading data and returns BufferedReaderV2, +// to allow freeing the allocated buffer when no longer needed with the Free() method. +func (r *ReaderProperties) GetStreamV2(source io.ReaderAt, start, nbytes int64) (BufferedReaderV2, error) { if r.BufferedStreamEnabled { return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize), r.alloc), nil } diff --git a/parquet/reader_writer_properties_test.go b/parquet/reader_writer_properties_test.go index a8e9b752..161a7b50 100644 --- a/parquet/reader_writer_properties_test.go +++ b/parquet/reader_writer_properties_test.go @@ -89,16 +89,16 @@ func TestReaderPropsGetStreamWithAllocator(t *testing.T) { // no leak on success props := parquet.NewReaderProperties(pool) - bufRdr, err := props.GetStream(rdr, 0, int64(len(data))) + bufRdr, err := props.GetStreamV2(rdr, 0, int64(len(data))) assert.NoError(t, err) bufRdr.Free() // no leak on reader error - _, err = props.GetStream(&mockReaderAt{}, 0, 10) + _, err = props.GetStreamV2(&mockReaderAt{}, 0, 10) assert.Error(t, err) // no leak on insufficient read - _, err = props.GetStream(rdr, 0, int64(len(data)+10)) + _, err = props.GetStreamV2(rdr, 0, int64(len(data)+10)) assert.Error(t, err) } @@ -113,19 +113,19 @@ func TestReaderPropsGetStreamBufferedWithAllocator(t *testing.T) { props.BufferedStreamEnabled = true buf := make([]byte, len(data)) - bufRdr, err := props.GetStream(rdr, 0, int64(len(data))) + bufRdr, err := props.GetStreamV2(rdr, 0, int64(len(data))) assert.NoError(t, err) _, err = bufRdr.Read(buf) assert.NoError(t, err) bufRdr.Free() - bufRdr, err = props.GetStream(&mockReaderAt{}, 0, 10) + bufRdr, err = props.GetStreamV2(&mockReaderAt{}, 0, 10) assert.NoError(t, err) _, err = bufRdr.Read(buf) assert.Error(t, err) bufRdr.Free() - bufRdr, err = props.GetStream(rdr, 0, int64(len(data)+10)) + bufRdr, err = props.GetStreamV2(rdr, 0, int64(len(data)+10)) assert.NoError(t, err) n, err := bufRdr.Read(buf) assert.NoError(t, err)