From 7b8e058f32ac89b25b6b766a3a8f79e15a2767f2 Mon Sep 17 00:00:00 2001 From: wecharyu Date: Fri, 12 Dec 2025 15:51:15 +0800 Subject: [PATCH 1/6] GH-48467: [C++][Parquet] Add configure to limit the row group size --- .../parquet/arrow/arrow_reader_writer_test.cc | 44 +++++++++++++++++-- cpp/src/parquet/arrow/writer.cc | 37 ++++++++++------ cpp/src/parquet/arrow/writer.h | 6 +-- cpp/src/parquet/file_writer.cc | 20 +++++++++ cpp/src/parquet/file_writer.h | 5 +++ cpp/src/parquet/properties.h | 33 ++++++++++---- 6 files changed, 117 insertions(+), 28 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 0831fb62675..d5bc7845fd1 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -378,19 +378,19 @@ const double test_traits<::arrow::DoubleType>::value(4.2); template <> struct test_traits<::arrow::StringType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::BinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::FixedSizeBinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; - static std::string const value; + static const std::string value; }; const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT @@ -5794,6 +5794,44 @@ TEST(TestArrowReadWrite, WriteRecordBatchNotProduceEmptyRowGroup) { } } +TEST(TestArrowReadWrite, FlushRowGroupByBufferedSize) { + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + // Limit the max bytes in a row group to 10 so that each batch produces a new group. + auto writer_properties = WriterProperties::Builder().max_row_group_bytes(10)->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + + // Prepare schema + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())}); + std::shared_ptr parquet_schema; + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42); + + // Create writer to write data via RecordBatch. + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + // NewBufferedRowGroup() is not called explicitly and it will be called + // inside WriteRecordBatch(). + for (int i = 0; i < 5; ++i) { + auto record_batch = + gen.BatchOf({::arrow::field("a", ::arrow::int64())}, /*length=*/1); + ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch)); + } + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + auto file_metadata = arrow_writer->metadata(); + EXPECT_EQ(5, file_metadata->num_row_groups()); + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(1, file_metadata->RowGroup(i)->num_rows()); + } +} + TEST(TestArrowReadWrite, MultithreadedWrite) { const int num_columns = 20; const int num_rows = 1000; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 4b2b06e5e09..b2afc88d57f 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -442,12 +442,8 @@ class FileWriterImpl : public FileWriter { return Status::OK(); } - // Max number of rows allowed in a row group. - const int64_t max_row_group_length = this->properties().max_row_group_length(); - // Initialize a new buffered row group writer if necessary. - if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || - row_group_writer_->num_rows() >= max_row_group_length) { + if (row_group_writer_ == nullptr || !row_group_writer_->buffered()) { RETURN_NOT_OK(NewBufferedRowGroup()); } @@ -480,19 +476,32 @@ class FileWriterImpl : public FileWriter { return Status::OK(); }; + // Max number of rows allowed in a row group. + const int64_t max_row_group_length = this->properties().max_row_group_length(); + // Max number of bytes allowed in a row group. + const int64_t max_row_group_bytes = this->properties().max_row_group_bytes(); + int64_t offset = 0; while (offset < batch.num_rows()) { - const int64_t batch_size = - std::min(max_row_group_length - row_group_writer_->num_rows(), - batch.num_rows() - offset); + int64_t group_rows = row_group_writer_->num_rows(); + int64_t batch_size = + std::min(max_row_group_length - group_rows, batch.num_rows() - offset); + if (group_rows > 0) { + int64_t buffered_bytes = row_group_writer_->current_buffered_bytes(); + double avg_row_bytes = buffered_bytes * 1.0 / group_rows; + batch_size = std::min( + batch_size, + static_cast((max_row_group_bytes - buffered_bytes) / avg_row_bytes)); + } + if (batch_size <= 0) { + // Current row group is full, write remaining rows in a new group. + if (offset < batch.num_rows()) { + RETURN_NOT_OK(NewBufferedRowGroup()); + } + continue; + } RETURN_NOT_OK(WriteBatch(offset, batch_size)); offset += batch_size; - - // Flush current row group writer and create a new writer if it is full. - if (row_group_writer_->num_rows() >= max_row_group_length && - offset < batch.num_rows()) { - RETURN_NOT_OK(NewBufferedRowGroup()); - } } return Status::OK(); diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 8ec8796ffd1..9b1491ecf90 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -111,9 +111,9 @@ class PARQUET_EXPORT FileWriter { /// Multiple RecordBatches can be written into the same row group /// through this method. /// - /// WriterProperties.max_row_group_length() is respected and a new - /// row group will be created if the current row group exceeds the - /// limit. + /// WriterProperties.max_row_group_length() and WriterProperties.max_row_group_bytes() + /// are respected and a new row group will be created if the current row group exceeds + /// the limits. /// /// Batches get flushed to the output stream once NewBufferedRowGroup() /// or Close() is called. diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index ddec2c0a560..7763fc06802 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -68,6 +68,12 @@ int64_t RowGroupWriter::total_compressed_bytes_written() const { return contents_->total_compressed_bytes_written(); } +int64_t RowGroupWriter::current_buffered_bytes() const { + return contents_->total_compressed_bytes() + + contents_->total_compressed_bytes_written() + + contents_->estimated_buffered_value_bytes(); +} + bool RowGroupWriter::buffered() const { return contents_->buffered(); } int RowGroupWriter::current_column() { return contents_->current_column(); } @@ -195,6 +201,20 @@ class RowGroupSerializer : public RowGroupWriter::Contents { return total_compressed_bytes_written; } + int64_t estimated_buffered_value_bytes() const override { + if (closed_) { + return 0; + } + int64_t estimated_buffered_value_bytes = 0; + for (size_t i = 0; i < column_writers_.size(); i++) { + if (column_writers_[i]) { + estimated_buffered_value_bytes += + column_writers_[i]->estimated_buffered_value_bytes(); + } + } + return estimated_buffered_value_bytes; + } + bool buffered() const override { return buffered_row_group_; } void Close() override { diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index d5ea1d7c98a..19900c3aca4 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -58,6 +58,8 @@ class PARQUET_EXPORT RowGroupWriter { virtual int64_t total_compressed_bytes() const = 0; /// \brief total compressed bytes written by the page writer virtual int64_t total_compressed_bytes_written() const = 0; + /// \brief estimated size of the values that are not written to a page yet + virtual int64_t estimated_buffered_value_bytes() const = 0; virtual bool buffered() const = 0; }; @@ -99,6 +101,9 @@ class PARQUET_EXPORT RowGroupWriter { int64_t total_compressed_bytes() const; /// \brief total compressed bytes written by the page writer int64_t total_compressed_bytes_written() const; + /// \brief including compressed bytes in page writer and uncompressed data + /// value buffer. + int64_t current_buffered_bytes() const; /// Returns whether the current RowGroupWriter is in the buffered mode and is created /// by calling ParquetFileWriter::AppendBufferedRowGroup. diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index eb5aee29695..bdcbb9e46d2 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -160,6 +160,7 @@ static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true; static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024; +static constexpr int64_t DEFAULT_MAX_ROW_GROUP_BYTES = 128 * 1024 * 1024; static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN; @@ -293,6 +294,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT), write_batch_size_(DEFAULT_WRITE_BATCH_SIZE), max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH), + max_row_group_bytes_(DEFAULT_MAX_ROW_GROUP_BYTES), pagesize_(kDefaultDataPageSize), max_rows_per_page_(kDefaultMaxRowsPerPage), version_(ParquetVersion::PARQUET_2_6), @@ -309,6 +311,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()), write_batch_size_(properties.write_batch_size()), max_row_group_length_(properties.max_row_group_length()), + max_row_group_bytes_(properties.max_row_group_bytes()), pagesize_(properties.data_pagesize()), max_rows_per_page_(properties.max_rows_per_page()), version_(properties.version()), @@ -418,6 +421,13 @@ class PARQUET_EXPORT WriterProperties { return this; } + /// Specify the max number of bytes to put in a single row group. + /// Default 128MB. + Builder* max_row_group_bytes(int64_t max_row_group_bytes) { + max_row_group_bytes_ = max_row_group_bytes; + return this; + } + /// Specify the data page size. /// Default 1MB. Builder* data_pagesize(int64_t pg_size) { @@ -779,11 +789,12 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, - pagesize_, max_rows_per_page_, version_, created_by_, page_checksum_enabled_, - size_statistics_level_, std::move(file_encryption_properties_), - default_column_properties_, column_properties, data_page_version_, - store_decimal_as_integer_, std::move(sorting_columns_), - content_defined_chunking_enabled_, content_defined_chunking_options_)); + max_row_group_bytes_, pagesize_, max_rows_per_page_, version_, created_by_, + page_checksum_enabled_, size_statistics_level_, + std::move(file_encryption_properties_), default_column_properties_, + column_properties, data_page_version_, store_decimal_as_integer_, + std::move(sorting_columns_), content_defined_chunking_enabled_, + content_defined_chunking_options_)); } private: @@ -793,6 +804,7 @@ class PARQUET_EXPORT WriterProperties { int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; + int64_t max_row_group_bytes_; int64_t pagesize_; int64_t max_rows_per_page_; ParquetVersion::type version_; @@ -828,6 +840,8 @@ class PARQUET_EXPORT WriterProperties { inline int64_t max_row_group_length() const { return max_row_group_length_; } + inline int64_t max_row_group_bytes() const { return max_row_group_bytes_; } + inline int64_t data_pagesize() const { return pagesize_; } inline int64_t max_rows_per_page() const { return max_rows_per_page_; } @@ -946,9 +960,10 @@ class PARQUET_EXPORT WriterProperties { private: explicit WriterProperties( MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, - int64_t max_row_group_length, int64_t pagesize, int64_t max_rows_per_page, - ParquetVersion::type version, const std::string& created_by, - bool page_write_checksum_enabled, SizeStatisticsLevel size_statistics_level, + int64_t max_row_group_length, int64_t max_row_group_bytes, int64_t pagesize, + int64_t max_rows_per_page, ParquetVersion::type version, + const std::string& created_by, bool page_write_checksum_enabled, + SizeStatisticsLevel size_statistics_level, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, @@ -959,6 +974,7 @@ class PARQUET_EXPORT WriterProperties { dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), max_row_group_length_(max_row_group_length), + max_row_group_bytes_(max_row_group_bytes), pagesize_(pagesize), max_rows_per_page_(max_rows_per_page), parquet_data_page_version_(data_page_version), @@ -978,6 +994,7 @@ class PARQUET_EXPORT WriterProperties { int64_t dictionary_pagesize_limit_; int64_t write_batch_size_; int64_t max_row_group_length_; + int64_t max_row_group_bytes_; int64_t pagesize_; int64_t max_rows_per_page_; ParquetDataPageVersion parquet_data_page_version_; From 8142f17956ad6645fed7aeb88fd56f58867e06d5 Mon Sep 17 00:00:00 2001 From: wecharyu Date: Fri, 12 Dec 2025 16:07:45 +0800 Subject: [PATCH 2/6] refine code --- cpp/src/parquet/arrow/writer.cc | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index b2afc88d57f..db904bc5dba 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -493,15 +493,13 @@ class FileWriterImpl : public FileWriter { batch_size, static_cast((max_row_group_bytes - buffered_bytes) / avg_row_bytes)); } - if (batch_size <= 0) { + if (batch_size > 0) { + RETURN_NOT_OK(WriteBatch(offset, batch_size)); + offset += batch_size; + } else if (offset < batch.num_rows()) { // Current row group is full, write remaining rows in a new group. - if (offset < batch.num_rows()) { - RETURN_NOT_OK(NewBufferedRowGroup()); - } - continue; + RETURN_NOT_OK(NewBufferedRowGroup()); } - RETURN_NOT_OK(WriteBatch(offset, batch_size)); - offset += batch_size; } return Status::OK(); From e19db37e537421c45eaa0ecbfa9da0e10218c085 Mon Sep 17 00:00:00 2001 From: wecharyu Date: Fri, 12 Dec 2025 18:48:19 +0800 Subject: [PATCH 3/6] WriteTable respect max_row_group_bytes --- .../parquet/arrow/arrow_reader_writer_test.cc | 38 ++++++++++++++++--- cpp/src/parquet/arrow/writer.cc | 15 ++++++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index d5bc7845fd1..962c9c0e30b 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5794,7 +5794,7 @@ TEST(TestArrowReadWrite, WriteRecordBatchNotProduceEmptyRowGroup) { } } -TEST(TestArrowReadWrite, FlushRowGroupByBufferedSize) { +TEST(TestArrowReadWrite, WriteRecordBatchFlushRowGroupByBufferedSize) { auto pool = ::arrow::default_memory_pool(); auto sink = CreateOutputStream(); // Limit the max bytes in a row group to 10 so that each batch produces a new group. @@ -5811,10 +5811,9 @@ TEST(TestArrowReadWrite, FlushRowGroupByBufferedSize) { auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42); // Create writer to write data via RecordBatch. - auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); - std::unique_ptr arrow_writer; - ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, - &arrow_writer)); + ASSERT_OK_AND_ASSIGN(auto arrow_writer, parquet::arrow::FileWriter::Open( + *schema, pool, sink, writer_properties, + arrow_writer_properties)); // NewBufferedRowGroup() is not called explicitly and it will be called // inside WriteRecordBatch(). for (int i = 0; i < 5; ++i) { @@ -5832,6 +5831,35 @@ TEST(TestArrowReadWrite, FlushRowGroupByBufferedSize) { } } +TEST(TestArrowReadWrite, WriteTableFlushRowGroupByBufferedSize) { + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + // Limit the max bytes in a row group to 100, then first table generates one row group, + // and second table generates 5 row groups. + auto writer_properties = WriterProperties::Builder().max_row_group_bytes(100)->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + + // Prepare schema + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())}); + auto table = ::arrow::Table::Make( + schema, {::arrow::ArrayFromJSON(::arrow::int64(), R"([1, 2, 3, 4, 5])")}); + ASSERT_OK_AND_ASSIGN(auto arrow_writer, parquet::arrow::FileWriter::Open( + *schema, pool, sink, writer_properties, + arrow_writer_properties)); + for (int i = 0; i < 2; ++i) { + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, 5)); + } + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + auto file_metadata = arrow_writer->metadata(); + EXPECT_EQ(6, file_metadata->num_row_groups()); + EXPECT_EQ(5, file_metadata->RowGroup(0)->num_rows()); + for (int i = 1; i < 6; ++i) { + EXPECT_EQ(1, file_metadata->RowGroup(i)->num_rows()); + } +} + TEST(TestArrowReadWrite, MultithreadedWrite) { const int num_columns = 20; const int num_rows = 1000; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index db904bc5dba..df00a2f9f53 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -395,15 +395,24 @@ class FileWriterImpl : public FileWriter { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(table.Validate()); - if (chunk_size <= 0 && table.num_rows() > 0) { - return Status::Invalid("chunk size per row_group must be greater than 0"); - } else if (!table.schema()->Equals(*schema_, false)) { + if (!table.schema()->Equals(*schema_, false)) { return Status::Invalid("table schema does not match this writer's. table:'", table.schema()->ToString(), "' this:'", schema_->ToString(), "'"); } else if (chunk_size > this->properties().max_row_group_length()) { chunk_size = this->properties().max_row_group_length(); } + // max_row_group_bytes is applied only after the row group has accumulated data. + if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { + double avg_row_size = row_group_writer_->current_buffered_bytes() * 1.0 / + row_group_writer_->num_rows(); + chunk_size = std::min( + chunk_size, + static_cast(this->properties().max_row_group_bytes() / avg_row_size)); + } + if (chunk_size <= 0 && table.num_rows() > 0) { + return Status::Invalid("rows per row_group must be greater than 0"); + } auto WriteRowGroup = [&](int64_t offset, int64_t size) { RETURN_NOT_OK(NewRowGroup()); From 13fe7b132f9fe748f985a078d3c2b06d36bdcbd2 Mon Sep 17 00:00:00 2001 From: wecharyu Date: Tue, 23 Dec 2025 16:53:13 +0800 Subject: [PATCH 4/6] address comments --- cpp/src/parquet/arrow/writer.cc | 10 +++++----- cpp/src/parquet/file_writer.cc | 6 +++--- cpp/src/parquet/file_writer.h | 4 ++-- cpp/src/parquet/properties.h | 4 ++++ 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index df00a2f9f53..c85628a285e 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -404,8 +404,8 @@ class FileWriterImpl : public FileWriter { } // max_row_group_bytes is applied only after the row group has accumulated data. if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { - double avg_row_size = row_group_writer_->current_buffered_bytes() * 1.0 / - row_group_writer_->num_rows(); + double avg_row_size = + row_group_writer_->total_buffered_bytes() * 1.0 / row_group_writer_->num_rows(); chunk_size = std::min( chunk_size, static_cast(this->properties().max_row_group_bytes() / avg_row_size)); @@ -496,11 +496,11 @@ class FileWriterImpl : public FileWriter { int64_t batch_size = std::min(max_row_group_length - group_rows, batch.num_rows() - offset); if (group_rows > 0) { - int64_t buffered_bytes = row_group_writer_->current_buffered_bytes(); - double avg_row_bytes = buffered_bytes * 1.0 / group_rows; + int64_t buffered_bytes = row_group_writer_->total_buffered_bytes(); + double avg_row_size = buffered_bytes * 1.0 / group_rows; batch_size = std::min( batch_size, - static_cast((max_row_group_bytes - buffered_bytes) / avg_row_bytes)); + static_cast((max_row_group_bytes - buffered_bytes) / avg_row_size)); } if (batch_size > 0) { RETURN_NOT_OK(WriteBatch(offset, batch_size)); diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 7763fc06802..a24c8eddb8d 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -68,10 +68,10 @@ int64_t RowGroupWriter::total_compressed_bytes_written() const { return contents_->total_compressed_bytes_written(); } -int64_t RowGroupWriter::current_buffered_bytes() const { +int64_t RowGroupWriter::total_buffered_bytes() const { return contents_->total_compressed_bytes() + contents_->total_compressed_bytes_written() + - contents_->estimated_buffered_value_bytes(); + contents_->EstimatedBufferedValueBytes(); } bool RowGroupWriter::buffered() const { return contents_->buffered(); } @@ -201,7 +201,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { return total_compressed_bytes_written; } - int64_t estimated_buffered_value_bytes() const override { + int64_t EstimatedBufferedValueBytes() const override { if (closed_) { return 0; } diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index 19900c3aca4..872d5946b10 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -59,7 +59,7 @@ class PARQUET_EXPORT RowGroupWriter { /// \brief total compressed bytes written by the page writer virtual int64_t total_compressed_bytes_written() const = 0; /// \brief estimated size of the values that are not written to a page yet - virtual int64_t estimated_buffered_value_bytes() const = 0; + virtual int64_t EstimatedBufferedValueBytes() const = 0; virtual bool buffered() const = 0; }; @@ -103,7 +103,7 @@ class PARQUET_EXPORT RowGroupWriter { int64_t total_compressed_bytes_written() const; /// \brief including compressed bytes in page writer and uncompressed data /// value buffer. - int64_t current_buffered_bytes() const; + int64_t total_buffered_bytes() const; /// Returns whether the current RowGroupWriter is in the buffered mode and is created /// by calling ParquetFileWriter::AppendBufferedRowGroup. diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index bdcbb9e46d2..f9d221ab099 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -26,6 +26,7 @@ #include "arrow/io/caching.h" #include "arrow/type_fwd.h" #include "arrow/util/compression.h" +#include "arrow/util/logging.h" #include "arrow/util/type_fwd.h" #include "parquet/encryption/encryption.h" #include "parquet/exception.h" @@ -417,13 +418,16 @@ class PARQUET_EXPORT WriterProperties { /// Specify the max number of rows to put in a single row group. /// Default 1Mi rows. Builder* max_row_group_length(int64_t max_row_group_length) { + ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be positive"; max_row_group_length_ = max_row_group_length; return this; } /// Specify the max number of bytes to put in a single row group. + /// The size is estimated based on encoded and compressed data. /// Default 128MB. Builder* max_row_group_bytes(int64_t max_row_group_bytes) { + ARROW_CHECK_GT(max_row_group_bytes, 0) << "max_row_group_bytes must be positive"; max_row_group_bytes_ = max_row_group_bytes; return this; } From 0e6e3038931f7a7e5c194a74721712496d31855b Mon Sep 17 00:00:00 2001 From: wecharyu Date: Thu, 15 Jan 2026 00:08:33 +0800 Subject: [PATCH 5/6] Prefer to estimate avg row size from all closed row groups --- .../parquet/arrow/arrow_reader_writer_test.cc | 4 +- cpp/src/parquet/arrow/writer.cc | 43 ++++++++++--------- cpp/src/parquet/arrow/writer.h | 3 ++ cpp/src/parquet/file_writer.cc | 30 ++++++++++++- cpp/src/parquet/file_writer.h | 12 ++++-- cpp/src/parquet/properties.cc | 14 ++++++ cpp/src/parquet/properties.h | 13 +----- 7 files changed, 81 insertions(+), 38 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 962c9c0e30b..abe043d77fb 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5797,8 +5797,8 @@ TEST(TestArrowReadWrite, WriteRecordBatchNotProduceEmptyRowGroup) { TEST(TestArrowReadWrite, WriteRecordBatchFlushRowGroupByBufferedSize) { auto pool = ::arrow::default_memory_pool(); auto sink = CreateOutputStream(); - // Limit the max bytes in a row group to 10 so that each batch produces a new group. - auto writer_properties = WriterProperties::Builder().max_row_group_bytes(10)->build(); + // Limit the max bytes in a row group to 100 so that each batch produces a new group. + auto writer_properties = WriterProperties::Builder().max_row_group_bytes(100)->build(); auto arrow_writer_properties = default_arrow_writer_properties(); // Prepare schema diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index c85628a285e..6bc181ff4ed 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -395,23 +395,19 @@ class FileWriterImpl : public FileWriter { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(table.Validate()); - if (!table.schema()->Equals(*schema_, false)) { + if (chunk_size <= 0 && table.num_rows() > 0) { + return Status::Invalid("rows per row_group must be greater than 0"); + } else if (!table.schema()->Equals(*schema_, /*check_metadata=*/false)) { return Status::Invalid("table schema does not match this writer's. table:'", table.schema()->ToString(), "' this:'", schema_->ToString(), "'"); } else if (chunk_size > this->properties().max_row_group_length()) { chunk_size = this->properties().max_row_group_length(); } - // max_row_group_bytes is applied only after the row group has accumulated data. - if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { - double avg_row_size = - row_group_writer_->total_buffered_bytes() * 1.0 / row_group_writer_->num_rows(); + if (auto avg_row_size = EstimateCompressedBytesPerRow()) { chunk_size = std::min( - chunk_size, - static_cast(this->properties().max_row_group_bytes() / avg_row_size)); - } - if (chunk_size <= 0 && table.num_rows() > 0) { - return Status::Invalid("rows per row_group must be greater than 0"); + chunk_size, static_cast(this->properties().max_row_group_bytes() / + avg_row_size.value())); } auto WriteRowGroup = [&](int64_t offset, int64_t size) { @@ -485,22 +481,18 @@ class FileWriterImpl : public FileWriter { return Status::OK(); }; - // Max number of rows allowed in a row group. const int64_t max_row_group_length = this->properties().max_row_group_length(); - // Max number of bytes allowed in a row group. const int64_t max_row_group_bytes = this->properties().max_row_group_bytes(); int64_t offset = 0; while (offset < batch.num_rows()) { - int64_t group_rows = row_group_writer_->num_rows(); - int64_t batch_size = - std::min(max_row_group_length - group_rows, batch.num_rows() - offset); - if (group_rows > 0) { - int64_t buffered_bytes = row_group_writer_->total_buffered_bytes(); - double avg_row_size = buffered_bytes * 1.0 / group_rows; + int64_t batch_size = std::min(max_row_group_length - row_group_writer_->num_rows(), + batch.num_rows() - offset); + if (auto avg_row_size = EstimateCompressedBytesPerRow()) { + int64_t buffered_bytes = row_group_writer_->EstimatedTotalCompressedBytes(); batch_size = std::min( - batch_size, - static_cast((max_row_group_bytes - buffered_bytes) / avg_row_size)); + batch_size, static_cast((max_row_group_bytes - buffered_bytes) / + avg_row_size.value())); } if (batch_size > 0) { RETURN_NOT_OK(WriteBatch(offset, batch_size)); @@ -532,6 +524,17 @@ class FileWriterImpl : public FileWriter { return Status::OK(); } + std::optional EstimateCompressedBytesPerRow() const override { + if (auto value = writer_->EstimateCompressedBytesPerRow()) { + return value; + } + if (row_group_writer_ != nullptr && row_group_writer_->num_rows() > 0) { + return static_cast(row_group_writer_->EstimatedTotalCompressedBytes()) / + row_group_writer_->num_rows(); + } + return std::nullopt; + } + private: friend class FileWriter; diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 9b1491ecf90..e95985b9a3c 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -139,6 +139,9 @@ class PARQUET_EXPORT FileWriter { /// `store_schema` being unusable during read. virtual ::arrow::Status AddKeyValueMetadata( const std::shared_ptr& key_value_metadata) = 0; + /// \brief Estimate compressed bytes per row from closed row groups or the active row + /// group. + virtual std::optional EstimateCompressedBytesPerRow() const = 0; /// \brief Return the file metadata, only available after calling Close(). virtual const std::shared_ptr metadata() const = 0; }; diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index a24c8eddb8d..1949d3c5ab5 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -68,7 +68,7 @@ int64_t RowGroupWriter::total_compressed_bytes_written() const { return contents_->total_compressed_bytes_written(); } -int64_t RowGroupWriter::total_buffered_bytes() const { +int64_t RowGroupWriter::EstimatedTotalCompressedBytes() const { return contents_->total_compressed_bytes() + contents_->total_compressed_bytes_written() + contents_->EstimatedBufferedValueBytes(); @@ -349,6 +349,7 @@ class FileSerializer : public ParquetFileWriter::Contents { if (row_group_writer_) { num_rows_ += row_group_writer_->num_rows(); row_group_writer_->Close(); + compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); } row_group_writer_.reset(); @@ -372,6 +373,8 @@ class FileSerializer : public ParquetFileWriter::Contents { int64_t num_rows() const override { return num_rows_; } + int64_t compressed_bytes() const override { return compressed_bytes_; } + const std::shared_ptr& properties() const override { return properties_; } @@ -380,6 +383,7 @@ class FileSerializer : public ParquetFileWriter::Contents { if (row_group_writer_) { num_rows_ += row_group_writer_->num_rows(); row_group_writer_->Close(); + compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); } int16_t row_group_ordinal = -1; // row group ordinal not set if (file_encryptor_ != nullptr) { @@ -435,6 +439,7 @@ class FileSerializer : public ParquetFileWriter::Contents { properties_(std::move(properties)), num_row_groups_(0), num_rows_(0), + compressed_bytes_(0), metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) { PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell()); if (position == 0) { @@ -488,6 +493,7 @@ class FileSerializer : public ParquetFileWriter::Contents { const std::shared_ptr properties_; int num_row_groups_; int64_t num_rows_; + int64_t compressed_bytes_; std::unique_ptr metadata_; // Only one of the row group writers is active at a time std::unique_ptr row_group_writer_; @@ -660,6 +666,28 @@ void ParquetFileWriter::AddKeyValueMetadata( } } +std::optional ParquetFileWriter::EstimateCompressedBytesPerRow() const { + if (contents_ && contents_->num_rows() > 0) { + // Use written row groups to estimate. + return static_cast(contents_->compressed_bytes()) / contents_->num_rows(); + } + if (file_metadata_) { + // Use closed file metadata to estimate. + int64_t total_compressed_bytes = 0; + int64_t total_rows = 0; + for (int i = 0; i < file_metadata_->num_row_groups(); i++) { + const auto row_group = file_metadata_->RowGroup(i); + total_compressed_bytes += row_group->total_compressed_size(); + total_rows += row_group->num_rows(); + } + if (total_compressed_bytes == 0 || total_rows == 0) { + return std::nullopt; + } + return static_cast(total_compressed_bytes) / total_rows; + } + return std::nullopt; +} + const std::shared_ptr& ParquetFileWriter::properties() const { if (contents_) { return contents_->properties(); diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index 872d5946b10..349be3ae97e 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -58,7 +58,8 @@ class PARQUET_EXPORT RowGroupWriter { virtual int64_t total_compressed_bytes() const = 0; /// \brief total compressed bytes written by the page writer virtual int64_t total_compressed_bytes_written() const = 0; - /// \brief estimated size of the values that are not written to a page yet + /// \brief estimated bytes of values that are buffered by the page writer + /// but not written to a page yet virtual int64_t EstimatedBufferedValueBytes() const = 0; virtual bool buffered() const = 0; @@ -101,9 +102,8 @@ class PARQUET_EXPORT RowGroupWriter { int64_t total_compressed_bytes() const; /// \brief total compressed bytes written by the page writer int64_t total_compressed_bytes_written() const; - /// \brief including compressed bytes in page writer and uncompressed data - /// value buffer. - int64_t total_buffered_bytes() const; + /// \brief Estimate total compressed bytes including written and buffered bytes. + int64_t EstimatedTotalCompressedBytes() const; /// Returns whether the current RowGroupWriter is in the buffered mode and is created /// by calling ParquetFileWriter::AppendBufferedRowGroup. @@ -156,6 +156,7 @@ class PARQUET_EXPORT ParquetFileWriter { virtual RowGroupWriter* AppendBufferedRowGroup() = 0; virtual int64_t num_rows() const = 0; + virtual int64_t compressed_bytes() const = 0; virtual int num_columns() const = 0; virtual int num_row_groups() const = 0; @@ -212,6 +213,9 @@ class PARQUET_EXPORT ParquetFileWriter { void AddKeyValueMetadata( const std::shared_ptr& key_value_metadata); + /// Estimate compressed bytes per row from closed row groups. + std::optional EstimateCompressedBytesPerRow() const; + /// Number of columns. /// /// This number is fixed during the lifetime of the writer as it is determined via diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index 8a374fcdaa3..8fc9a0ba1ce 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -67,6 +67,20 @@ std::shared_ptr default_arrow_writer_properties() { return default_writer_properties; } +WriterProperties::Builder* WriterProperties::Builder::max_row_group_length( + int64_t max_row_group_length) { + ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be positive"; + max_row_group_length_ = max_row_group_length; + return this; +} + +WriterProperties::Builder* WriterProperties::Builder::max_row_group_bytes( + int64_t max_row_group_bytes) { + ARROW_CHECK_GT(max_row_group_bytes, 0) << "max_row_group_bytes must be positive"; + max_row_group_bytes_ = max_row_group_bytes; + return this; +} + void WriterProperties::Builder::CopyColumnSpecificProperties( const WriterProperties& properties) { for (const auto& [col_path, col_props] : properties.column_properties_) { diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index f9d221ab099..2c2c654aa61 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -26,7 +26,6 @@ #include "arrow/io/caching.h" #include "arrow/type_fwd.h" #include "arrow/util/compression.h" -#include "arrow/util/logging.h" #include "arrow/util/type_fwd.h" #include "parquet/encryption/encryption.h" #include "parquet/exception.h" @@ -417,20 +416,12 @@ class PARQUET_EXPORT WriterProperties { /// Specify the max number of rows to put in a single row group. /// Default 1Mi rows. - Builder* max_row_group_length(int64_t max_row_group_length) { - ARROW_CHECK_GT(max_row_group_length, 0) << "max_row_group_length must be positive"; - max_row_group_length_ = max_row_group_length; - return this; - } + Builder* max_row_group_length(int64_t max_row_group_length); /// Specify the max number of bytes to put in a single row group. /// The size is estimated based on encoded and compressed data. /// Default 128MB. - Builder* max_row_group_bytes(int64_t max_row_group_bytes) { - ARROW_CHECK_GT(max_row_group_bytes, 0) << "max_row_group_bytes must be positive"; - max_row_group_bytes_ = max_row_group_bytes; - return this; - } + Builder* max_row_group_bytes(int64_t max_row_group_bytes); /// Specify the data page size. /// Default 1MB. From ea88cc76f20f718d45ede3efb1a2960b684960e4 Mon Sep 17 00:00:00 2001 From: wecharyu Date: Fri, 16 Jan 2026 00:23:56 +0800 Subject: [PATCH 6/6] address comments --- cpp/src/parquet/arrow/writer.cc | 2 +- cpp/src/parquet/arrow/writer.h | 4 ++-- cpp/src/parquet/file_writer.cc | 13 +++++++------ cpp/src/parquet/file_writer.h | 5 +++-- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 6bc181ff4ed..6438821c0eb 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -396,7 +396,7 @@ class FileWriterImpl : public FileWriter { RETURN_NOT_OK(table.Validate()); if (chunk_size <= 0 && table.num_rows() > 0) { - return Status::Invalid("rows per row_group must be greater than 0"); + return Status::Invalid("chunk size per row_group must be greater than 0"); } else if (!table.schema()->Equals(*schema_, /*check_metadata=*/false)) { return Status::Invalid("table schema does not match this writer's. table:'", table.schema()->ToString(), "' this:'", schema_->ToString(), diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index e95985b9a3c..33eca70e31f 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -139,8 +139,8 @@ class PARQUET_EXPORT FileWriter { /// `store_schema` being unusable during read. virtual ::arrow::Status AddKeyValueMetadata( const std::shared_ptr& key_value_metadata) = 0; - /// \brief Estimate compressed bytes per row from closed row groups or the active row - /// group. + /// \brief Estimate compressed bytes per row from data written so far. + /// \note std::nullopt will be returned if there is no row written. virtual std::optional EstimateCompressedBytesPerRow() const = 0; /// \brief Return the file metadata, only available after calling Close(). virtual const std::shared_ptr metadata() const = 0; diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 1949d3c5ab5..1fecca8e488 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -349,7 +349,7 @@ class FileSerializer : public ParquetFileWriter::Contents { if (row_group_writer_) { num_rows_ += row_group_writer_->num_rows(); row_group_writer_->Close(); - compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); + written_compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); } row_group_writer_.reset(); @@ -373,7 +373,7 @@ class FileSerializer : public ParquetFileWriter::Contents { int64_t num_rows() const override { return num_rows_; } - int64_t compressed_bytes() const override { return compressed_bytes_; } + int64_t written_compressed_bytes() const override { return written_compressed_bytes_; } const std::shared_ptr& properties() const override { return properties_; @@ -383,7 +383,7 @@ class FileSerializer : public ParquetFileWriter::Contents { if (row_group_writer_) { num_rows_ += row_group_writer_->num_rows(); row_group_writer_->Close(); - compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); + written_compressed_bytes_ += row_group_writer_->total_compressed_bytes_written(); } int16_t row_group_ordinal = -1; // row group ordinal not set if (file_encryptor_ != nullptr) { @@ -439,7 +439,7 @@ class FileSerializer : public ParquetFileWriter::Contents { properties_(std::move(properties)), num_row_groups_(0), num_rows_(0), - compressed_bytes_(0), + written_compressed_bytes_(0), metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) { PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell()); if (position == 0) { @@ -493,7 +493,7 @@ class FileSerializer : public ParquetFileWriter::Contents { const std::shared_ptr properties_; int num_row_groups_; int64_t num_rows_; - int64_t compressed_bytes_; + int64_t written_compressed_bytes_; std::unique_ptr metadata_; // Only one of the row group writers is active at a time std::unique_ptr row_group_writer_; @@ -669,7 +669,8 @@ void ParquetFileWriter::AddKeyValueMetadata( std::optional ParquetFileWriter::EstimateCompressedBytesPerRow() const { if (contents_ && contents_->num_rows() > 0) { // Use written row groups to estimate. - return static_cast(contents_->compressed_bytes()) / contents_->num_rows(); + return static_cast(contents_->written_compressed_bytes()) / + contents_->num_rows(); } if (file_metadata_) { // Use closed file metadata to estimate. diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index 349be3ae97e..26500e6ca61 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -156,7 +156,7 @@ class PARQUET_EXPORT ParquetFileWriter { virtual RowGroupWriter* AppendBufferedRowGroup() = 0; virtual int64_t num_rows() const = 0; - virtual int64_t compressed_bytes() const = 0; + virtual int64_t written_compressed_bytes() const = 0; virtual int num_columns() const = 0; virtual int num_row_groups() const = 0; @@ -213,7 +213,8 @@ class PARQUET_EXPORT ParquetFileWriter { void AddKeyValueMetadata( const std::shared_ptr& key_value_metadata); - /// Estimate compressed bytes per row from closed row groups. + /// \brief Estimate compressed bytes per row from closed row groups. + /// \return Estimated bytes or std::nullopt when no written row group. std::optional EstimateCompressedBytesPerRow() const; /// Number of columns.