From e8978fe6d4a4a0892f1adb5af9e4aa9f7f859a4e Mon Sep 17 00:00:00 2001 From: yoni Date: Wed, 4 Feb 2026 19:09:33 +0200 Subject: [PATCH 01/12] feat: add row group limit by byte --- parquet/src/arrow/arrow_writer/mod.rs | 268 ++++++++++++++++++++++++-- parquet/src/file/properties.rs | 65 ++++++- 2 files changed, 310 insertions(+), 23 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 99d0455b31b9..7ea97a49fa07 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -187,8 +187,11 @@ pub struct ArrowWriter { /// Creates new [`ArrowRowGroupWriter`] instances as required row_group_writer_factory: ArrowRowGroupWriterFactory, - /// The length of arrays to write to each row group - max_row_group_size: usize, + /// The maximum number of rows to write to each row group, or None for unlimited + max_row_group_row_count: Option, + + /// The maximum size in bytes for a row group, or None for unlimited + max_row_group_bytes: Option, } impl std::fmt::Debug for ArrowWriter { @@ -199,7 +202,8 @@ impl std::fmt::Debug for ArrowWriter { .field("in_progress_size", &format_args!("{buffered_memory} bytes")) .field("in_progress_rows", &self.in_progress_rows()) .field("arrow_schema", &self.arrow_schema) - .field("max_row_group_size", &self.max_row_group_size) + .field("max_row_group_row_count", &self.max_row_group_row_count) + .field("max_row_group_bytes", &self.max_row_group_bytes) .finish() } } @@ -247,7 +251,8 @@ impl ArrowWriter { add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); } - let max_row_group_size = props.max_row_group_size(); + let max_row_group_row_count = props.max_row_group_row_count(); + let max_row_group_bytes = props.max_row_group_bytes(); let props_ptr = Arc::new(props); let file_writer = @@ -261,7 +266,8 @@ impl ArrowWriter { in_progress: None, arrow_schema, row_group_writer_factory, - max_row_group_size, + max_row_group_row_count, + max_row_group_bytes, }) } @@ -314,8 +320,12 @@ impl ArrowWriter { /// Encodes the provided [`RecordBatch`] /// /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] - /// rows, the contents of `batch` will be written to one or more row groups such that all but - /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows. + /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be + /// written to one or more row groups such that limits are respected. + /// + /// If both limits are `None`, all data is written to a single row group. + /// If one limit is set, that limit is respected. + /// If both limits are set, the lower bound (whichever triggers first) is respected. /// /// This will fail if the `batch`'s schema does not match the writer's schema. pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { @@ -331,18 +341,57 @@ impl ArrowWriter { ), }; - // If would exceed max_row_group_size, split batch - if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size { - let to_write = self.max_row_group_size - in_progress.buffered_rows; - let a = batch.slice(0, to_write); - let b = batch.slice(to_write, batch.num_rows() - to_write); - self.write(&a)?; - return self.write(&b); + if let Some(max_rows) = self.max_row_group_row_count { + if in_progress.buffered_rows + batch.num_rows() > max_rows { + let to_write = max_rows - in_progress.buffered_rows; + let a = batch.slice(0, to_write); + let b = batch.slice(to_write, batch.num_rows() - to_write); + self.write(&a)?; + return self.write(&b); + } + } + + // Check byte limit: if we have buffered data, use measured average row size + // to split batch proactively before exceeding byte limit + if let Some(max_bytes) = self.max_row_group_bytes { + if in_progress.buffered_rows > 0 { + let current_bytes = in_progress.get_estimated_total_bytes(); + + if current_bytes >= max_bytes { + self.flush()?; + return self.write(batch); + } + + let avg_row_bytes = current_bytes / in_progress.buffered_rows; + if avg_row_bytes > 0 { + let remaining_bytes = max_bytes - current_bytes; + let rows_that_fit = remaining_bytes / avg_row_bytes; + + if batch.num_rows() > rows_that_fit { + if rows_that_fit > 0 { + let a = batch.slice(0, rows_that_fit); + let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit); + self.write(&a)?; + return self.write(&b); + } else { + self.flush()?; + return self.write(batch); + } + } + } + } } in_progress.write(batch)?; - if in_progress.buffered_rows >= self.max_row_group_size { + let should_flush = self + .max_row_group_row_count + .is_some_and(|max| in_progress.buffered_rows >= max) + || self + .max_row_group_bytes + .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max); + + if should_flush { self.flush()? } Ok(()) @@ -914,6 +963,14 @@ impl ArrowRowGroupWriter { Ok(()) } + /// Returns the estimated total encoded bytes for this row group + fn get_estimated_total_bytes(&self) -> usize { + self.writers + .iter() + .map(|x| x.get_estimated_total_bytes()) + .sum() + } + fn close(self) -> Result> { self.writers .into_iter() @@ -4518,4 +4575,185 @@ mod tests { assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024); assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } + + /// Helper to create a test batch with the given number of rows. + /// Each row is approximately 4 bytes (one i32). + fn create_test_batch(num_rows: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new( + "int", + ArrowDataType::Int32, + false, + )])); + let array = Int32Array::from((0..num_rows as i32).collect::>()); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() + } + + #[test] + fn test_row_group_limit_none_writes_single_row_group() { + // When both limits are None, all data should go into a single row group + let batch = create_test_batch(1000); + + let props = WriterProperties::builder() + .set_max_row_group_row_count(None) + .set_max_row_group_bytes(None) + .build(); + + let file = tempfile::tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)).unwrap(); + + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + assert_eq!( + &row_group_sizes(builder.metadata()), + &[1000], + "With no limits, all rows should be in a single row group" + ); + } + + #[test] + fn test_row_group_limit_rows_only() { + // When only max_row_group_size is set, respect the row limit + let batch = create_test_batch(1000); + + let props = WriterProperties::builder() + .set_max_row_group_size(300) + .set_max_row_group_bytes(None) + .build(); + + let file = tempfile::tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)).unwrap(); + + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + assert_eq!( + &row_group_sizes(builder.metadata()), + &[300, 300, 300, 100], + "Row groups should be split by row count" + ); + } + + #[test] + fn test_row_group_limit_bytes_only() { + // When only max_row_group_bytes is set, respect the byte limit + // Create batches with string data for more predictable byte sizes + // Write in multiple small batches so byte-based splitting can work + // (first batch establishes the avg row size, subsequent batches are split) + let schema = Arc::new(Schema::new(vec![Field::new( + "str", + ArrowDataType::Utf8, + false, + )])); + + // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) + let props = WriterProperties::builder() + .set_max_row_group_row_count(None) + .set_max_row_group_bytes(Some(3500)) + .build(); + + let file = tempfile::tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + + // Write 10 batches of 10 rows each (100 rows total) + // Each string is ~100 bytes + for batch_idx in 0..10 { + let strings: Vec = (0..10) + .map(|i| format!("{:0>100}", batch_idx * 10 + i)) + .collect(); + let array = StringArray::from(strings); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let sizes = row_group_sizes(builder.metadata()); + + assert!( + sizes.len() > 1, + "Should have multiple row groups due to byte limit, got {sizes:?}", + ); + + let total_rows: i64 = sizes.iter().sum(); + assert_eq!(total_rows, 100, "Total rows should be preserved"); + } + + #[test] + fn test_row_group_limit_both_row_wins() { + // When both limits are set, the row limit triggers first + let batch = create_test_batch(1000); + + let props = WriterProperties::builder() + .set_max_row_group_size(200) // Will trigger at 200 rows + .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data + .build(); + + let file = tempfile::tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)).unwrap(); + + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + assert_eq!( + &row_group_sizes(builder.metadata()), + &[200, 200, 200, 200, 200], + "Row limit should trigger before byte limit" + ); + } + + #[test] + fn test_row_group_limit_both_bytes_wins() { + // When both limits are set, the byte limit triggers first + // Write in multiple small batches so byte-based splitting can work + let schema = Arc::new(Schema::new(vec![Field::new( + "str", + ArrowDataType::Utf8, + false, + )])); + + let props = WriterProperties::builder() + .set_max_row_group_size(1000) // Won't trigger for 100 rows + .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows + .build(); + + let file = tempfile::tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + + // Write 10 batches of 10 rows each (100 rows total) + // Each string is ~100 bytes + for batch_idx in 0..10 { + let strings: Vec = (0..10) + .map(|i| format!("{:0>100}", batch_idx * 10 + i)) + .collect(); + let array = StringArray::from(strings); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let sizes = row_group_sizes(builder.metadata()); + + assert!( + sizes.len() > 1, + "Byte limit should trigger before row limit, got {sizes:?}", + ); + + assert!( + sizes.iter().all(|&s| s < 1000), + "No row group should hit the row limit" + ); + + let total_rows: i64 = sizes.iter().sum(); + assert_eq!(total_rows, 100, "Total rows should be preserved"); + } } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 38a5a804c0b7..472aa22618ae 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -45,6 +45,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; +/// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as parquet-mr's parquet.block.size) +pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; /// Default value for [`WriterProperties::bloom_filter_position`] pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] @@ -156,7 +158,8 @@ pub struct WriterProperties { data_page_size_limit: usize, data_page_row_count_limit: usize, write_batch_size: usize, - max_row_group_size: usize, + max_row_group_row_count: Option, + max_row_group_bytes: Option, bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, @@ -247,11 +250,25 @@ impl WriterProperties { self.write_batch_size } - /// Returns maximum number of rows in a row group. + /// Returns maximum number of rows in a row group, or `usize::MAX` if unlimited. /// /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] pub fn max_row_group_size(&self) -> usize { - self.max_row_group_size + self.max_row_group_row_count.unwrap_or(usize::MAX) + } + + /// Returns maximum number of rows in a row group, or `None` if unlimited. + /// + /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] + pub fn max_row_group_row_count(&self) -> Option { + self.max_row_group_row_count + } + + /// Returns maximum size of a row group in bytes, or `None` if unlimited. + /// + /// For more details see [`WriterPropertiesBuilder::set_max_row_group_bytes`] + pub fn max_row_group_bytes(&self) -> Option { + self.max_row_group_bytes } /// Returns bloom filter position. @@ -445,7 +462,8 @@ pub struct WriterPropertiesBuilder { data_page_size_limit: usize, data_page_row_count_limit: usize, write_batch_size: usize, - max_row_group_size: usize, + max_row_group_row_count: Option, + max_row_group_bytes: Option, bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, @@ -468,7 +486,8 @@ impl Default for WriterPropertiesBuilder { data_page_size_limit: DEFAULT_PAGE_SIZE, data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, - max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, + max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_SIZE), + max_row_group_bytes: None, bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), @@ -493,7 +512,8 @@ impl WriterPropertiesBuilder { data_page_size_limit: self.data_page_size_limit, data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, - max_row_group_size: self.max_row_group_size, + max_row_group_row_count: self.max_row_group_row_count, + max_row_group_bytes: self.max_row_group_bytes, bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { /// If the value is set to 0. pub fn set_max_row_group_size(mut self, value: usize) -> Self { assert!(value > 0, "Cannot have a 0 max row group size"); - self.max_row_group_size = value; + self.max_row_group_row_count = Some(value); + self + } + + /// Sets maximum number of rows in a row group, or `None` for unlimited. + /// + /// # Panics + /// If the value is `Some(0)`. + pub fn set_max_row_group_row_count(mut self, value: Option) -> Self { + if let Some(v) = value { + assert!(v > 0, "Cannot have a 0 max row group size"); + } + self.max_row_group_row_count = value; + self + } + + /// Sets maximum size of a row group in bytes, or `None` for unlimited. + /// + /// Row groups are flushed when their estimated encoded size exceeds this threshold. + /// This is similar to parquet-mr's `parquet.block.size` behavior. + /// + /// # Panics + /// If the value is `Some(0)`. + pub fn set_max_row_group_bytes(mut self, value: Option) -> Self { + if let Some(v) = value { + assert!(v > 0, "Cannot have a 0 max row group bytes"); + } + self.max_row_group_bytes = value; self } @@ -952,7 +999,8 @@ impl From for WriterPropertiesBuilder { data_page_size_limit: props.data_page_size_limit, data_page_row_count_limit: props.data_page_row_count_limit, write_batch_size: props.write_batch_size, - max_row_group_size: props.max_row_group_size, + max_row_group_row_count: props.max_row_group_row_count, + max_row_group_bytes: props.max_row_group_bytes, bloom_filter_position: props.bloom_filter_position, writer_version: props.writer_version, created_by: props.created_by, @@ -1334,6 +1382,7 @@ mod tests { ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); + assert_eq!(props.max_row_group_bytes(), None); assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); From 0e07315f5dda108ac075f34b1839d7ed14f7dc45 Mon Sep 17 00:00:00 2001 From: yoni Date: Thu, 5 Feb 2026 16:17:05 +0200 Subject: [PATCH 02/12] [CR] Update comment to avoid referring parquet-mr specifically --- arrow-ord/src/sort.rs | 51 +++++++++++- arrow-string/src/length.rs | 83 ++++++++++++++++++- parquet/src/arrow/arrow_reader/mod.rs | 50 ----------- parquet/src/arrow/async_reader/mod.rs | 58 ------------- parquet/src/file/properties.rs | 2 +- .../tests/arrow_reader/bloom_filter/async.rs | 82 ++++++++++++++++++ .../tests/arrow_reader/bloom_filter/mod.rs | 20 +++++ .../tests/arrow_reader/bloom_filter/sync.rs | 74 +++++++++++++++++ parquet/tests/arrow_reader/io/mod.rs | 70 ++++++++++++++++ parquet/tests/arrow_reader/mod.rs | 1 + parquet/tests/arrow_reader/predicate_cache.rs | 52 +----------- 11 files changed, 379 insertions(+), 164 deletions(-) create mode 100644 parquet/tests/arrow_reader/bloom_filter/async.rs create mode 100644 parquet/tests/arrow_reader/bloom_filter/mod.rs create mode 100644 parquet/tests/arrow_reader/bloom_filter/sync.rs diff --git a/arrow-ord/src/sort.rs b/arrow-ord/src/sort.rs index f7f553abb6cf..773d9cf12e13 100644 --- a/arrow-ord/src/sort.rs +++ b/arrow-ord/src/sort.rs @@ -256,7 +256,9 @@ fn can_sort_to_indices(data_type: &DataType) -> bool { ) || match data_type { DataType::List(f) if can_rank(f.data_type()) => true, + DataType::ListView(f) if can_rank(f.data_type()) => true, DataType::LargeList(f) if can_rank(f.data_type()) => true, + DataType::LargeListView(f) if can_rank(f.data_type()) => true, DataType::FixedSizeList(f, _) if can_rank(f.data_type()) => true, DataType::Dictionary(_, values) if can_rank(values.as_ref()) => true, DataType::RunEndEncoded(_, f) if can_sort_to_indices(f.data_type()) => true, @@ -286,7 +288,9 @@ pub fn sort_to_indices( DataType::BinaryView => sort_byte_view(array.as_binary_view(), v, n, options, limit), DataType::FixedSizeBinary(_) => sort_fixed_size_binary(array.as_fixed_size_binary(), v, n, options, limit), DataType::List(_) => sort_list(array.as_list::(), v, n, options, limit)?, + DataType::ListView(_) => sort_list_view(array.as_list_view::(), v, n, options, limit)?, DataType::LargeList(_) => sort_list(array.as_list::(), v, n, options, limit)?, + DataType::LargeListView(_) => sort_list_view(array.as_list_view::(), v, n, options, limit)?, DataType::FixedSizeList(_, _) => sort_fixed_size_list(array.as_fixed_size_list(), v, n, options, limit)?, DataType::Dictionary(_, _) => downcast_dictionary_array!{ array => sort_dictionary(array, v, n, options, limit)?, @@ -581,6 +585,28 @@ fn sort_list( Ok(sort_impl(options, &mut valids, &null_indices, limit, Ord::cmp).into()) } +fn sort_list_view( + array: &GenericListViewArray, + value_indices: Vec, + null_indices: Vec, + options: SortOptions, + limit: Option, +) -> Result { + let rank = child_rank(array.values().as_ref(), options)?; + let offsets = array.offsets(); + let sizes = array.sizes(); + let mut valids = value_indices + .into_iter() + .map(|index| { + let start = offsets[index as usize].as_usize(); + let size = sizes[index as usize].as_usize(); + let end = start + size; + (index, &rank[start..end]) + }) + .collect::>(); + Ok(sort_impl(options, &mut valids, &null_indices, limit, Ord::cmp).into()) +} + fn sort_fixed_size_list( array: &FixedSizeListArray, value_indices: Vec, @@ -1373,16 +1399,37 @@ mod tests { assert_eq!(&sorted, &expected); + // for ListView + let input = Arc::new(ListViewArray::from_iter_primitive::(data.clone())); + let sorted = match limit { + Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(), + _ => sort(&(input as ArrayRef), options).unwrap(), + }; + let expected = Arc::new(ListViewArray::from_iter_primitive::( + expected_data.clone(), + )) as ArrayRef; + assert_eq!(&sorted, &expected); + // for LargeList - let input = Arc::new(LargeListArray::from_iter_primitive::(data)); + let input = Arc::new(LargeListArray::from_iter_primitive::(data.clone())); let sorted = match limit { Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(), _ => sort(&(input as ArrayRef), options).unwrap(), }; let expected = Arc::new(LargeListArray::from_iter_primitive::( - expected_data, + expected_data.clone(), )) as ArrayRef; + assert_eq!(&sorted, &expected); + // for LargeListView + let input = Arc::new(LargeListViewArray::from_iter_primitive::(data)); + let sorted = match limit { + Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(), + _ => sort(&(input as ArrayRef), options).unwrap(), + }; + let expected = Arc::new(LargeListViewArray::from_iter_primitive::( + expected_data, + )) as ArrayRef; assert_eq!(&sorted, &expected); } diff --git a/arrow-string/src/length.rs b/arrow-string/src/length.rs index de9aa5367058..15b2f2eca12d 100644 --- a/arrow-string/src/length.rs +++ b/arrow-string/src/length.rs @@ -49,8 +49,8 @@ fn bit_length_impl( /// For list array, length is the number of elements in each list. /// For string array and binary array, length is the number of bytes of each value. /// -/// * this only accepts ListArray/LargeListArray, StringArray/LargeStringArray/StringViewArray, BinaryArray/LargeBinaryArray, and FixedSizeListArray, -/// or DictionaryArray with above Arrays as values +/// * this only accepts ListArray/LargeListArray, StringArray/LargeStringArray/StringViewArray, BinaryArray/LargeBinaryArray, FixedSizeListArray, +/// and ListViewArray/LargeListViewArray, or DictionaryArray with above Arrays as values /// * length of null is null. pub fn length(array: &dyn Array) -> Result { if let Some(d) = array.as_any_dictionary_opt() { @@ -67,6 +67,20 @@ pub fn length(array: &dyn Array) -> Result { let list = array.as_list::(); Ok(length_impl::(list.offsets(), list.nulls())) } + DataType::ListView(_) => { + let list = array.as_list_view::(); + Ok(Arc::new(Int32Array::new( + list.sizes().clone(), + list.nulls().cloned(), + ))) + } + DataType::LargeListView(_) => { + let list = array.as_list_view::(); + Ok(Arc::new(Int64Array::new( + list.sizes().clone(), + list.nulls().cloned(), + ))) + } DataType::Utf8 => { let list = array.as_string::(); Ok(length_impl::(list.offsets(), list.nulls())) @@ -170,7 +184,7 @@ pub fn bit_length(array: &dyn Array) -> Result { #[cfg(test)] mod tests { use super::*; - use arrow_buffer::Buffer; + use arrow_buffer::{Buffer, ScalarBuffer}; use arrow_data::ArrayData; use arrow_schema::Field; @@ -398,6 +412,69 @@ mod tests { length_list_helper!(i64, Int64Array, Float32Type, value, result) } + #[test] + fn length_test_list_view() { + // Create a ListViewArray with values [0, 1, 2], [3, 4, 5], [6, 7] + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); + let values = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7]); + let offsets = ScalarBuffer::from(vec![0i32, 3, 6]); + let sizes = ScalarBuffer::from(vec![3i32, 3, 2]); + let list_array = ListViewArray::new(field, offsets, sizes, Arc::new(values), None); + + let result = length(&list_array).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let expected: Int32Array = vec![3, 3, 2].into(); + assert_eq!(&expected, result); + } + + #[test] + fn length_test_large_list_view() { + // Create a LargeListViewArray with values [0, 1, 2], [3, 4, 5], [6, 7] + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); + let values = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7]); + let offsets = ScalarBuffer::from(vec![0i64, 3, 6]); + let sizes = ScalarBuffer::from(vec![3i64, 3, 2]); + let list_array = LargeListViewArray::new(field, offsets, sizes, Arc::new(values), None); + + let result = length(&list_array).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let expected: Int64Array = vec![3i64, 3, 2].into(); + assert_eq!(&expected, result); + } + + #[test] + fn length_null_list_view() { + // Create a ListViewArray with nulls: [], null, [1, 2, 3, 4], [0] + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); + let values = Int32Array::from(vec![1, 2, 3, 4, 0]); + let offsets = ScalarBuffer::from(vec![0i32, 0, 0, 4]); + let sizes = ScalarBuffer::from(vec![0i32, 0, 4, 1]); + let nulls = NullBuffer::from(vec![true, false, true, true]); + let list_array = ListViewArray::new(field, offsets, sizes, Arc::new(values), Some(nulls)); + + let result = length(&list_array).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let expected: Int32Array = vec![Some(0), None, Some(4), Some(1)].into(); + assert_eq!(&expected, result); + } + + #[test] + fn length_null_large_list_view() { + // Create a LargeListViewArray with nulls: [], null, [1.0, 2.0, 3.0], [0.1] + let field = Arc::new(Field::new_list_field(DataType::Float32, true)); + let values = Float32Array::from(vec![1.0, 2.0, 3.0, 0.1]); + let offsets = ScalarBuffer::from(vec![0i64, 0, 0, 3]); + let sizes = ScalarBuffer::from(vec![0i64, 0, 3, 1]); + let nulls = NullBuffer::from(vec![true, false, true, true]); + let list_array = + LargeListViewArray::new(field, offsets, sizes, Arc::new(values), Some(nulls)); + + let result = length(&list_array).unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let expected: Int64Array = vec![Some(0i64), None, Some(3), Some(1)].into(); + assert_eq!(&expected, result); + } + /// Tests that length is not valid for u64. #[test] fn length_wrong_type() { diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 3a31c69ff377..d0398418007a 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -5505,56 +5505,6 @@ pub(crate) mod tests { c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r)); } - #[test] - fn test_get_row_group_column_bloom_filter_with_length() { - // convert to new parquet file with bloom_filter_length - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); - let file = File::open(path).unwrap(); - let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); - let schema = builder.schema().clone(); - let reader = builder.build().unwrap(); - - let mut parquet_data = Vec::new(); - let props = WriterProperties::builder() - .set_bloom_filter_enabled(true) - .build(); - let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); - for batch in reader { - let batch = batch.unwrap(); - writer.write(&batch).unwrap(); - } - writer.close().unwrap(); - - // test the new parquet file - test_get_row_group_column_bloom_filter(parquet_data.into(), true); - } - - #[test] - fn test_get_row_group_column_bloom_filter_without_length() { - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); - let data = Bytes::from(std::fs::read(path).unwrap()); - test_get_row_group_column_bloom_filter(data, false); - } - - fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) { - let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap(); - - let metadata = builder.metadata(); - assert_eq!(metadata.num_row_groups(), 1); - let row_group = metadata.row_group(0); - let column = row_group.column(0); - assert_eq!(column.bloom_filter_length().is_some(), with_length); - - let sbbf = builder - .get_row_group_column_bloom_filter(0, 0) - .unwrap() - .unwrap(); - assert!(sbbf.check(&"Hello")); - assert!(!sbbf.check(&"Hello_Not_Exists")); - } - #[test] fn test_read_unknown_logical_type() { let testdata = arrow::util::test_util::parquet_test_data(); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 7aa1ea3b6ff2..b4824365eae6 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1609,14 +1609,6 @@ mod tests { let _stream = builder.build().unwrap(); } - #[tokio::test] - async fn test_get_row_group_column_bloom_filter_without_length() { - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); - let data = Bytes::from(std::fs::read(path).unwrap()); - test_get_row_group_column_bloom_filter(data, false).await; - } - #[tokio::test] async fn test_parquet_record_batch_stream_schema() { fn get_all_field_names(schema: &Schema) -> Vec<&String> { @@ -1725,56 +1717,6 @@ mod tests { } } - #[tokio::test] - async fn test_get_row_group_column_bloom_filter_with_length() { - // convert to new parquet file with bloom_filter_length - let testdata = arrow::util::test_util::parquet_test_data(); - let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); - let data = Bytes::from(std::fs::read(path).unwrap()); - let async_reader = TestReader::new(data.clone()); - let builder = ParquetRecordBatchStreamBuilder::new(async_reader) - .await - .unwrap(); - let schema = builder.schema().clone(); - let stream = builder.build().unwrap(); - let batches = stream.try_collect::>().await.unwrap(); - - let mut parquet_data = Vec::new(); - let props = WriterProperties::builder() - .set_bloom_filter_enabled(true) - .build(); - let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); - for batch in batches { - writer.write(&batch).unwrap(); - } - writer.close().unwrap(); - - // test the new parquet file - test_get_row_group_column_bloom_filter(parquet_data.into(), true).await; - } - - async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) { - let async_reader = TestReader::new(data.clone()); - - let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) - .await - .unwrap(); - - let metadata = builder.metadata(); - assert_eq!(metadata.num_row_groups(), 1); - let row_group = metadata.row_group(0); - let column = row_group.column(0); - assert_eq!(column.bloom_filter_length().is_some(), with_length); - - let sbbf = builder - .get_row_group_column_bloom_filter(0, 0) - .await - .unwrap() - .unwrap(); - assert!(sbbf.check(&"Hello")); - assert!(!sbbf.check(&"Hello_Not_Exists")); - } - #[tokio::test] async fn test_nested_skip() { let schema = Arc::new(Schema::new(vec![ diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 472aa22618ae..24df2c80e271 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -614,7 +614,7 @@ impl WriterPropertiesBuilder { /// Sets maximum size of a row group in bytes, or `None` for unlimited. /// /// Row groups are flushed when their estimated encoded size exceeds this threshold. - /// This is similar to parquet-mr's `parquet.block.size` behavior. + /// This is similar to the official `parquet.block.size` behavior. /// /// # Panics /// If the value is `Some(0)`. diff --git a/parquet/tests/arrow_reader/bloom_filter/async.rs b/parquet/tests/arrow_reader/bloom_filter/async.rs new file mode 100644 index 000000000000..e230b33d2d9e --- /dev/null +++ b/parquet/tests/arrow_reader/bloom_filter/async.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::TestReader; +use bytes::Bytes; +use futures::TryStreamExt; +use parquet::{ + arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}, + file::properties::WriterProperties, +}; + +#[tokio::test] +async fn test_get_row_group_column_bloom_filter_without_length() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + test_get_row_group_column_bloom_filter(data, false).await; +} + +#[tokio::test] +async fn test_get_row_group_column_bloom_filter_with_length() { + // convert to new parquet file with bloom_filter_length + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + let async_reader = TestReader::new(data.clone()); + let builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + let schema = builder.schema().clone(); + let stream = builder.build().unwrap(); + let batches = stream.try_collect::>().await.unwrap(); + + let mut parquet_data = Vec::new(); + let props = WriterProperties::builder() + .set_bloom_filter_enabled(true) + .build(); + let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + // test the new parquet file + test_get_row_group_column_bloom_filter(parquet_data.into(), true).await; +} + +async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) { + let async_reader = TestReader::new(data.clone()); + + let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader) + .await + .unwrap(); + + let metadata = builder.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + let row_group = metadata.row_group(0); + let column = row_group.column(0); + assert_eq!(column.bloom_filter_length().is_some(), with_length); + + let sbbf = builder + .get_row_group_column_bloom_filter(0, 0) + .await + .unwrap() + .unwrap(); + assert!(sbbf.check(&"Hello")); + assert!(!sbbf.check(&"Hello_Not_Exists")); +} diff --git a/parquet/tests/arrow_reader/bloom_filter/mod.rs b/parquet/tests/arrow_reader/bloom_filter/mod.rs new file mode 100644 index 000000000000..a09fd4a8ac59 --- /dev/null +++ b/parquet/tests/arrow_reader/bloom_filter/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(feature = "async")] +mod r#async; +mod sync; diff --git a/parquet/tests/arrow_reader/bloom_filter/sync.rs b/parquet/tests/arrow_reader/bloom_filter/sync.rs new file mode 100644 index 000000000000..90d0cdc509ae --- /dev/null +++ b/parquet/tests/arrow_reader/bloom_filter/sync.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs::File; + +use bytes::Bytes; +use parquet::{ + arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder}, + file::properties::WriterProperties, +}; + +#[test] +fn test_get_row_group_column_bloom_filter_with_length() { + // convert to new parquet file with bloom_filter_length + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); + let file = File::open(path).unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let schema = builder.schema().clone(); + let reader = builder.build().unwrap(); + + let mut parquet_data = Vec::new(); + let props = WriterProperties::builder() + .set_bloom_filter_enabled(true) + .build(); + let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap(); + for batch in reader { + let batch = batch.unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + // test the new parquet file + test_get_row_group_column_bloom_filter(parquet_data.into(), true); +} + +#[test] +fn test_get_row_group_column_bloom_filter_without_length() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + test_get_row_group_column_bloom_filter(data, false); +} + +fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) { + let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap(); + + let metadata = builder.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + let row_group = metadata.row_group(0); + let column = row_group.column(0); + assert_eq!(column.bloom_filter_length().is_some(), with_length); + + let sbbf = builder + .get_row_group_column_bloom_filter(0, 0) + .unwrap() + .unwrap(); + assert!(sbbf.check(&"Hello")); + assert!(!sbbf.check(&"Hello_Not_Exists")); +} diff --git a/parquet/tests/arrow_reader/io/mod.rs b/parquet/tests/arrow_reader/io/mod.rs index 3b11429be407..3e18d7065bf2 100644 --- a/parquet/tests/arrow_reader/io/mod.rs +++ b/parquet/tests/arrow_reader/io/mod.rs @@ -41,13 +41,21 @@ use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringViewArray}; use bytes::Bytes; +#[cfg(feature = "async")] +use futures::FutureExt; +#[cfg(feature = "async")] +use futures::future::BoxFuture; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, }; +#[cfg(feature = "async")] +use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ProjectionMask}; use parquet::data_type::AsBytes; use parquet::file::FOOTER_SIZE; use parquet::file::metadata::PageIndexPolicy; +#[cfg(feature = "async")] +use parquet::file::metadata::ParquetMetaDataReader; use parquet::file::metadata::{FooterTail, ParquetMetaData, ParquetOffsetIndex}; use parquet::file::page_index::offset_index::PageLocation; use parquet::file::properties::WriterProperties; @@ -77,6 +85,68 @@ fn test_options() -> ArrowReaderOptions { ArrowReaderOptions::default().with_page_index_policy(PageIndexPolicy::from(true)) } +/// In-memory [`AsyncFileReader`] implementation for tests. +#[cfg(feature = "async")] +#[derive(Clone)] +pub(crate) struct TestReader { + data: Bytes, + metadata: Option>, + requests: Arc>>>, +} + +#[cfg(feature = "async")] +impl TestReader { + pub(crate) fn new(data: Bytes) -> Self { + Self { + data, + metadata: Default::default(), + requests: Default::default(), + } + } + + #[allow(dead_code)] + pub(crate) fn requests(&self) -> Vec> { + self.requests.lock().unwrap().clone() + } + + #[allow(dead_code)] + pub(crate) fn clear_requests(&self) { + self.requests.lock().unwrap().clear(); + } +} + +#[cfg(feature = "async")] +impl AsyncFileReader for TestReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.requests + .lock() + .unwrap() + .push(range.start as usize..range.end as usize); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let mut metadata_reader = ParquetMetaDataReader::new(); + + if let Some(options) = options { + metadata_reader = metadata_reader + .with_column_index_policy(options.column_index_policy()) + .with_offset_index_policy(options.offset_index_policy()); + } + + self.metadata = Some(Arc::new( + metadata_reader.parse_and_finish(&self.data).unwrap(), + )); + futures::future::ready(Ok(self.metadata.clone().unwrap())).boxed() + } +} + /// Return a row filter that evaluates "b > 575" AND "b < 625" /// /// last data page in Row Group 0 and first DataPage in Row Group 1 diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 9acfebda4895..ffc36655b39a 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -39,6 +39,7 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod bad_data; +mod bloom_filter; #[cfg(feature = "crc")] mod checksum; mod int96_stats_roundtrip; diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index bf3412dd4db1..73613c83d257 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -17,6 +17,7 @@ //! Test for predicate cache in Parquet Arrow reader +use super::io::TestReader; use arrow::array::ArrayRef; use arrow::array::Int64Array; use arrow::compute::and; @@ -26,16 +27,12 @@ use arrow_array::types::Int64Type; use arrow_array::{RecordBatch, StringArray, StringViewArray, StructArray}; use arrow_schema::{DataType, Field}; use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; -use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use parquet::file::properties::WriterProperties; -use std::ops::Range; use std::sync::Arc; use std::sync::LazyLock; @@ -325,48 +322,3 @@ impl ArrowReaderBuilderExt for ArrowReaderBuilder { .with_row_filter(row_filter) } } - -/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮 -/// TODO put this in a common place -#[derive(Clone)] -struct TestReader { - data: Bytes, - metadata: Option>, -} - -impl TestReader { - fn new(data: Bytes) -> Self { - Self { - data, - metadata: Default::default(), - } - } -} - -impl AsyncFileReader for TestReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - let range = range.clone(); - futures::future::ready(Ok(self - .data - .slice(range.start as usize..range.end as usize))) - .boxed() - } - - fn get_metadata<'a>( - &'a mut self, - options: Option<&'a ArrowReaderOptions>, - ) -> BoxFuture<'a, parquet::errors::Result>> { - let mut metadata_reader = ParquetMetaDataReader::new(); - - if let Some(options) = options { - metadata_reader = metadata_reader - .with_column_index_policy(options.column_index_policy()) - .with_offset_index_policy(options.offset_index_policy()); - } - - self.metadata = Some(Arc::new( - metadata_reader.parse_and_finish(&self.data).unwrap(), - )); - futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed() - } -} From c30893385a5ab790b87893cd607ffd4cb4fe45b3 Mon Sep 17 00:00:00 2001 From: yoni Date: Thu, 5 Feb 2026 18:18:22 +0200 Subject: [PATCH 03/12] [CR] Comment changes, minor touch-ups --- parquet/src/arrow/arrow_writer/mod.rs | 16 ++++++++-------- parquet/src/file/properties.rs | 19 +++++++++++-------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 7ea97a49fa07..c8a24e59734a 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4577,20 +4577,20 @@ mod tests { } /// Helper to create a test batch with the given number of rows. - /// Each row is approximately 4 bytes (one i32). + /// Each row is 4 bytes (one `i32`). fn create_test_batch(num_rows: usize) -> RecordBatch { let schema = Arc::new(Schema::new(vec![Field::new( "int", ArrowDataType::Int32, false, )])); - let array = Int32Array::from((0..num_rows as i32).collect::>()); + let array = Int32Array::from_iter(0..num_rows as i32); RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() } #[test] + // When both limits are None, all data should go into a single row group fn test_row_group_limit_none_writes_single_row_group() { - // When both limits are None, all data should go into a single row group let batch = create_test_batch(1000); let props = WriterProperties::builder() @@ -4614,8 +4614,8 @@ mod tests { } #[test] + // When only max_row_group_size is set, respect the row limit fn test_row_group_limit_rows_only() { - // When only max_row_group_size is set, respect the row limit let batch = create_test_batch(1000); let props = WriterProperties::builder() @@ -4639,8 +4639,8 @@ mod tests { } #[test] + // When only max_row_group_bytes is set, respect the byte limit fn test_row_group_limit_bytes_only() { - // When only max_row_group_bytes is set, respect the byte limit // Create batches with string data for more predictable byte sizes // Write in multiple small batches so byte-based splitting can work // (first batch establishes the avg row size, subsequent batches are split) @@ -4650,9 +4650,9 @@ mod tests { false, )])); - // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) let props = WriterProperties::builder() .set_max_row_group_row_count(None) + // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) .set_max_row_group_bytes(Some(3500)) .build(); @@ -4685,8 +4685,8 @@ mod tests { } #[test] + // When both limits are set, the row limit triggers first fn test_row_group_limit_both_row_wins() { - // When both limits are set, the row limit triggers first let batch = create_test_batch(1000); let props = WriterProperties::builder() @@ -4710,8 +4710,8 @@ mod tests { } #[test] + // When both limits are set, the byte limit triggers first fn test_row_group_limit_both_bytes_wins() { - // When both limits are set, the byte limit triggers first // Write in multiple small batches so byte-based splitting can work let schema = Arc::new(Schema::new(vec![Field::new( "str", diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 24df2c80e271..eb12efbc7a2a 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -45,7 +45,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; -/// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as parquet-mr's parquet.block.size) +/// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java +/// implementation for `parquet.block.size`) pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; /// Default value for [`WriterProperties::bloom_filter_position`] pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; @@ -601,12 +602,13 @@ impl WriterPropertiesBuilder { /// Sets maximum number of rows in a row group, or `None` for unlimited. /// + /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, + /// the row group with the smallest limit will be applied. + /// /// # Panics /// If the value is `Some(0)`. pub fn set_max_row_group_row_count(mut self, value: Option) -> Self { - if let Some(v) = value { - assert!(v > 0, "Cannot have a 0 max row group size"); - } + assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); self.max_row_group_row_count = value; self } @@ -614,14 +616,15 @@ impl WriterPropertiesBuilder { /// Sets maximum size of a row group in bytes, or `None` for unlimited. /// /// Row groups are flushed when their estimated encoded size exceeds this threshold. - /// This is similar to the official `parquet.block.size` behavior. + /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. + /// + /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, + /// the row group with the smallest limit will be applied. /// /// # Panics /// If the value is `Some(0)`. pub fn set_max_row_group_bytes(mut self, value: Option) -> Self { - if let Some(v) = value { - assert!(v > 0, "Cannot have a 0 max row group bytes"); - } + assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); self.max_row_group_bytes = value; self } From 148fc15aef074b7d5d97cec9fa71cf7e2a99e6b3 Mon Sep 17 00:00:00 2001 From: yoni Date: Thu, 5 Feb 2026 18:18:43 +0200 Subject: [PATCH 04/12] [CR] Deprecate set_max_row_group_size in favor of set_max_row_group_row_count --- parquet/benches/arrow_statistics.rs | 2 +- parquet/benches/parquet_round_trip.rs | 2 +- parquet/benches/row_group_index_reader.rs | 2 +- parquet/src/arrow/array_reader/list_array.rs | 2 +- parquet/src/arrow/arrow_reader/mod.rs | 10 +++++----- parquet/src/arrow/arrow_writer/mod.rs | 16 ++++++++-------- parquet/src/arrow/async_reader/mod.rs | 8 ++++---- parquet/src/arrow/async_writer/mod.rs | 2 +- parquet/src/arrow/push_decoder/mod.rs | 2 +- parquet/src/bin/parquet-fromcsv.rs | 2 +- parquet/src/bin/parquet-rewrite.rs | 2 +- parquet/src/file/metadata/push_decoder.rs | 2 +- parquet/src/file/properties.rs | 6 +++++- parquet/src/file/writer.rs | 2 +- parquet/tests/arrow_reader/io/mod.rs | 2 +- parquet/tests/arrow_reader/mod.rs | 2 +- parquet/tests/arrow_reader/predicate_cache.rs | 2 +- parquet/tests/arrow_reader/statistics.rs | 4 ++-- parquet/tests/encryption/encryption.rs | 4 ++-- 19 files changed, 39 insertions(+), 35 deletions(-) diff --git a/parquet/benches/arrow_statistics.rs b/parquet/benches/arrow_statistics.rs index a4aa9d137e9c..6da816bde9aa 100644 --- a/parquet/benches/arrow_statistics.rs +++ b/parquet/benches/arrow_statistics.rs @@ -79,7 +79,7 @@ fn create_parquet_file( )])), }; - let mut props = WriterProperties::builder().set_max_row_group_size(row_groups); + let mut props = WriterProperties::builder().set_max_row_group_row_count(Some(row_groups)); if let Some(limit) = data_page_row_count_limit { props = props .set_data_page_row_count_limit(*limit) diff --git a/parquet/benches/parquet_round_trip.rs b/parquet/benches/parquet_round_trip.rs index b239c3ccc759..7f46b02b78e2 100644 --- a/parquet/benches/parquet_round_trip.rs +++ b/parquet/benches/parquet_round_trip.rs @@ -299,7 +299,7 @@ fn file_from_spec(spec: ParquetFileSpec, buffer: &mut Vec) { let schema = schema(spec.column_type, spec.num_columns); let props = WriterProperties::builder() - .set_max_row_group_size(spec.rows_per_row_group) + .set_max_row_group_row_count(Some(spec.rows_per_row_group)) .set_data_page_row_count_limit(spec.rows_per_page) .set_encoding(spec.encoding) .set_dictionary_enabled(spec.use_dict) diff --git a/parquet/benches/row_group_index_reader.rs b/parquet/benches/row_group_index_reader.rs index f9e52684401b..1fadfb5d4b7c 100644 --- a/parquet/benches/row_group_index_reader.rs +++ b/parquet/benches/row_group_index_reader.rs @@ -87,7 +87,7 @@ fn create_test_file(num_row_groups: usize, rows_per_group: usize) -> Bytes { let mut buffer = Vec::new(); let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) - .set_max_row_group_size(rows_per_group) + .set_max_row_group_row_count(Some(rows_per_group)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index ff1b414c27bb..1d5c68c22e11 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -541,7 +541,7 @@ mod tests { let file = tempfile::tempfile().unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .build(); let writer = ArrowWriter::try_new( diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d0398418007a..aec3337837f4 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -4633,7 +4633,7 @@ pub(crate) mod tests { schema.clone(), Some( WriterProperties::builder() - .set_max_row_group_size(row_group_size) + .set_max_row_group_row_count(Some(row_group_size)) .build(), ), ) @@ -5629,7 +5629,7 @@ pub(crate) mod tests { let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?; let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(50) + .set_max_row_group_row_count(Some(50)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?; // write in 10 row batches as the size limits are enforced after each batch @@ -5795,7 +5795,7 @@ pub(crate) mod tests { let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(2) + .set_max_row_group_row_count(Some(2)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap(); writer.write(&batch1).unwrap(); @@ -5854,7 +5854,7 @@ pub(crate) mod tests { let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(3) + .set_max_row_group_row_count(Some(3)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap(); writer.write(&batch1).unwrap(); @@ -5903,7 +5903,7 @@ pub(crate) mod tests { fn test_read_row_group_indices_with_selection() -> Result<()> { let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(10) + .set_max_row_group_row_count(Some(10)) .build(); let schema = Arc::new(Schema::new(vec![Field::new( diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index c8a24e59734a..a161fd311aee 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -2411,7 +2411,7 @@ mod tests { let mut props = WriterProperties::builder().set_writer_version(version); if let Some(size) = max_row_group_size { - props = props.set_max_row_group_size(size) + props = props.set_max_row_group_row_count(Some(size)) } let props = props.build(); @@ -2542,7 +2542,7 @@ mod tests { for row_group_size in row_group_sizes { let props = WriterProperties::builder() .set_writer_version(version) - .set_max_row_group_size(row_group_size) + .set_max_row_group_row_count(Some(row_group_size)) .set_dictionary_enabled(dictionary_size != 0) .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) @@ -3204,7 +3204,7 @@ mod tests { for row_group_size in row_group_sizes { let props = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) - .set_max_row_group_size(row_group_size) + .set_max_row_group_row_count(Some(row_group_size)) .set_dictionary_enabled(false) .set_encoding(*encoding) .set_data_page_size_limit(data_page_size_limit) @@ -3832,7 +3832,7 @@ mod tests { let file = tempfile::tempfile().unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .build(); let mut writer = @@ -3974,7 +3974,7 @@ mod tests { // Write data let file = tempfile::tempfile().unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(6) + .set_max_row_group_row_count(Some(6)) .build(); let mut writer = @@ -4619,7 +4619,7 @@ mod tests { let batch = create_test_batch(1000); let props = WriterProperties::builder() - .set_max_row_group_size(300) + .set_max_row_group_row_count(Some(300)) .set_max_row_group_bytes(None) .build(); @@ -4690,7 +4690,7 @@ mod tests { let batch = create_test_batch(1000); let props = WriterProperties::builder() - .set_max_row_group_size(200) // Will trigger at 200 rows + .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data .build(); @@ -4720,7 +4720,7 @@ mod tests { )])); let props = WriterProperties::builder() - .set_max_row_group_size(1000) // Won't trigger for 100 rows + .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows .build(); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index b4824365eae6..38efee99a69f 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1448,7 +1448,7 @@ mod tests { let mut buf = Vec::with_capacity(1024); let props = WriterProperties::builder() - .set_max_row_group_size(3) + .set_max_row_group_row_count(Some(3)) .build(); let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap(); writer.write(&data).unwrap(); @@ -1728,7 +1728,7 @@ mod tests { let props = WriterProperties::builder() .set_data_page_row_count_limit(256) .set_write_batch_size(256) - .set_max_row_group_size(1024); + .set_max_row_group_row_count(Some(1024)); // Write data let mut file = tempfile().unwrap(); @@ -2055,7 +2055,7 @@ mod tests { let props = WriterProperties::builder() .set_data_page_row_count_limit(1) .set_write_batch_size(1) - .set_max_row_group_size(10) + .set_max_row_group_row_count(Some(10)) .set_write_page_header_statistics(true) .build(); let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap(); @@ -2284,7 +2284,7 @@ mod tests { ])); let props = WriterProperties::builder() - .set_max_row_group_size(300) + .set_max_row_group_row_count(Some(300)) .set_data_page_row_count_limit(33) .build(); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 9018c09f2a89..a050ef77c46c 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -340,7 +340,7 @@ mod tests { let reader = get_test_reader(); let write_props = WriterProperties::builder() - .set_max_row_group_size(64) + .set_max_row_group_row_count(Some(64)) .build(); let mut async_buffer = Vec::new(); diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 50451aee120e..cdb0715edb55 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -1142,7 +1142,7 @@ mod test { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index bba07b9b056e..cf9958f06956 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -297,7 +297,7 @@ fn configure_writer_properties(args: &Args) -> WriterProperties { properties_builder = properties_builder.set_writer_version(writer_version); } if let Some(max_row_group_size) = args.max_row_group_size { - properties_builder = properties_builder.set_max_row_group_size(max_row_group_size); + properties_builder = properties_builder.set_max_row_group_row_count(Some(max_row_group_size)); } if let Some(enable_bloom_filter) = args.enable_bloom_filter { properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter); diff --git a/parquet/src/bin/parquet-rewrite.rs b/parquet/src/bin/parquet-rewrite.rs index 31058e552d15..615c792cce7e 100644 --- a/parquet/src/bin/parquet-rewrite.rs +++ b/parquet/src/bin/parquet-rewrite.rs @@ -352,7 +352,7 @@ fn main() { } if let Some(value) = args.max_row_group_size { - writer_properties_builder = writer_properties_builder.set_max_row_group_size(value); + writer_properties_builder = writer_properties_builder.set_max_row_group_row_count(Some(value)); } if let Some(value) = args.data_page_row_count_limit { writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value); diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 34b7fec2c0c5..abc788426260 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -653,7 +653,7 @@ mod tests { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index eb12efbc7a2a..af927db6b971 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -594,6 +594,10 @@ impl WriterPropertiesBuilder { /// /// # Panics /// If the value is set to 0. + #[deprecated( + since = "57.3.0", + note = "Use `set_max_row_group_row_count` instead", + )] pub fn set_max_row_group_size(mut self, value: usize) -> Self { assert!(value > 0, "Cannot have a 0 max row group size"); self.max_row_group_row_count = Some(value); @@ -1470,7 +1474,7 @@ mod tests { .set_data_page_size_limit(10) .set_dictionary_page_size_limit(20) .set_write_batch_size(30) - .set_max_row_group_size(40) + .set_max_row_group_row_count(Some(40)) .set_created_by("default".to_owned()) .set_key_value_metadata(Some(vec![KeyValue::new( "key".to_string(), diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 35948af022f1..7d69904451d3 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -2198,7 +2198,7 @@ mod tests { let props = Arc::new( WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::None) - .set_max_row_group_size(1) + .set_max_row_group_row_count(Some(1)) .build(), ); let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); diff --git a/parquet/tests/arrow_reader/io/mod.rs b/parquet/tests/arrow_reader/io/mod.rs index 3e18d7065bf2..e8c5147f9cea 100644 --- a/parquet/tests/arrow_reader/io/mod.rs +++ b/parquet/tests/arrow_reader/io/mod.rs @@ -225,7 +225,7 @@ static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index ffc36655b39a..39214e0488b6 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -1130,7 +1130,7 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem .expect("tempfile creation"); let mut builder = WriterProperties::builder() - .set_max_row_group_size(row_per_group) + .set_max_row_group_row_count(Some(row_per_group)) .set_bloom_filter_enabled(true) .set_statistics_enabled(EnabledStatistics::Page); if scenario.truncate_stats() { diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index 73613c83d257..85dba68c9c69 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -218,7 +218,7 @@ static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/tests/arrow_reader/statistics.rs b/parquet/tests/arrow_reader/statistics.rs index 2f7859d06355..4f7ddcff4ad1 100644 --- a/parquet/tests/arrow_reader/statistics.rs +++ b/parquet/tests/arrow_reader/statistics.rs @@ -116,7 +116,7 @@ fn build_parquet_file( .tempfile() .expect("tempfile creation"); - let mut builder = WriterProperties::builder().set_max_row_group_size(row_per_group); + let mut builder = WriterProperties::builder().set_max_row_group_row_count(Some(row_per_group)); if let Some(enable_stats) = enable_stats { builder = builder.set_statistics_enabled(enable_stats); } @@ -2903,7 +2903,7 @@ mod test { fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> Arc { let props = WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::Chunk) - .set_max_row_group_size(ROWS_PER_ROW_GROUP) + .set_max_row_group_row_count(Some(ROWS_PER_ROW_GROUP)) .build(); let mut buffer = Vec::new(); diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index b401742dcb30..c26a0edee682 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -432,7 +432,7 @@ fn uniform_encryption_roundtrip( let props = WriterProperties::builder() // Ensure multiple row groups - .set_max_row_group_size(50) + .set_max_row_group_row_count(Some(50)) // Ensure multiple pages per row group .set_write_batch_size(20) .set_data_page_row_count_limit(20) @@ -537,7 +537,7 @@ fn uniform_encryption_page_skipping(page_index: bool) -> parquet::errors::Result let props = WriterProperties::builder() // Ensure multiple row groups - .set_max_row_group_size(50) + .set_max_row_group_row_count(Some(50)) // Ensure multiple pages per row group .set_write_batch_size(20) .set_data_page_row_count_limit(20) From 8069ee7d71f52b4f7a9c396c393c79f1f61f8392 Mon Sep 17 00:00:00 2001 From: yoni Date: Thu, 5 Feb 2026 18:54:10 +0200 Subject: [PATCH 05/12] Simplify tests: Extract batches writing into a helper function --- parquet/src/arrow/arrow_writer/mod.rs | 130 ++++++++++---------------- 1 file changed, 48 insertions(+), 82 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index a161fd311aee..f0deb3e72622 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4576,36 +4576,48 @@ mod tests { assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } - /// Helper to create a test batch with the given number of rows. - /// Each row is 4 bytes (one `i32`). - fn create_test_batch(num_rows: usize) -> RecordBatch { + struct WriteBatchesShape { + num_batches: usize, + rows_per_batch: usize, + row_size: usize, + } + + /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter` + fn write_batches(WriteBatchesShape {num_batches, rows_per_batch, row_size}: WriteBatchesShape, props: WriterProperties) -> ParquetRecordBatchReaderBuilder { let schema = Arc::new(Schema::new(vec![Field::new( - "int", - ArrowDataType::Int32, + "str", + ArrowDataType::Utf8, false, )])); - let array = Int32Array::from_iter(0..num_rows as i32); - RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() + let file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + + for batch_idx in 0..num_batches { + let strings: Vec = (0..rows_per_batch) + .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size)) + .collect(); + let array = StringArray::from(strings); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + ParquetRecordBatchReaderBuilder::try_new(file).unwrap() } #[test] // When both limits are None, all data should go into a single row group fn test_row_group_limit_none_writes_single_row_group() { - let batch = create_test_batch(1000); - let props = WriterProperties::builder() .set_max_row_group_row_count(None) .set_max_row_group_bytes(None) .build(); - let file = tempfile::tempfile().unwrap(); - let mut writer = - ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)).unwrap(); + let builder = write_batches(WriteBatchesShape { + num_batches: 1, + rows_per_batch: 1000, + row_size: 4 + }, props); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); assert_eq!( &row_group_sizes(builder.metadata()), &[1000], @@ -4616,21 +4628,17 @@ mod tests { #[test] // When only max_row_group_size is set, respect the row limit fn test_row_group_limit_rows_only() { - let batch = create_test_batch(1000); - let props = WriterProperties::builder() .set_max_row_group_row_count(Some(300)) .set_max_row_group_bytes(None) .build(); - let file = tempfile::tempfile().unwrap(); - let mut writer = - ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)).unwrap(); - - writer.write(&batch).unwrap(); - writer.close().unwrap(); + let builder = write_batches(WriteBatchesShape { + num_batches: 1, + rows_per_batch: 1000, + row_size: 4 + }, props); - let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); assert_eq!( &row_group_sizes(builder.metadata()), &[300, 300, 300, 100], @@ -4641,38 +4649,18 @@ mod tests { #[test] // When only max_row_group_bytes is set, respect the byte limit fn test_row_group_limit_bytes_only() { - // Create batches with string data for more predictable byte sizes - // Write in multiple small batches so byte-based splitting can work - // (first batch establishes the avg row size, subsequent batches are split) - let schema = Arc::new(Schema::new(vec![Field::new( - "str", - ArrowDataType::Utf8, - false, - )])); - let props = WriterProperties::builder() .set_max_row_group_row_count(None) // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) .set_max_row_group_bytes(Some(3500)) .build(); - let file = tempfile::tempfile().unwrap(); - let mut writer = - ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); - - // Write 10 batches of 10 rows each (100 rows total) - // Each string is ~100 bytes - for batch_idx in 0..10 { - let strings: Vec = (0..10) - .map(|i| format!("{:0>100}", batch_idx * 10 + i)) - .collect(); - let array = StringArray::from(strings); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); - writer.write(&batch).unwrap(); - } - writer.close().unwrap(); + let builder = write_batches(WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100 + }, props); - let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let sizes = row_group_sizes(builder.metadata()); assert!( @@ -4687,21 +4675,17 @@ mod tests { #[test] // When both limits are set, the row limit triggers first fn test_row_group_limit_both_row_wins() { - let batch = create_test_batch(1000); - let props = WriterProperties::builder() .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data .build(); - let file = tempfile::tempfile().unwrap(); - let mut writer = - ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props)).unwrap(); - - writer.write(&batch).unwrap(); - writer.close().unwrap(); + let builder = write_batches(WriteBatchesShape { + num_batches: 1, + row_size: 4, + rows_per_batch: 1000 + }, props); - let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); assert_eq!( &row_group_sizes(builder.metadata()), &[200, 200, 200, 200, 200], @@ -4712,35 +4696,17 @@ mod tests { #[test] // When both limits are set, the byte limit triggers first fn test_row_group_limit_both_bytes_wins() { - // Write in multiple small batches so byte-based splitting can work - let schema = Arc::new(Schema::new(vec![Field::new( - "str", - ArrowDataType::Utf8, - false, - )])); - let props = WriterProperties::builder() .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows .build(); - let file = tempfile::tempfile().unwrap(); - let mut writer = - ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + let builder = write_batches(WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100 + }, props); - // Write 10 batches of 10 rows each (100 rows total) - // Each string is ~100 bytes - for batch_idx in 0..10 { - let strings: Vec = (0..10) - .map(|i| format!("{:0>100}", batch_idx * 10 + i)) - .collect(); - let array = StringArray::from(strings); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); - writer.write(&batch).unwrap(); - } - writer.close().unwrap(); - - let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); let sizes = row_group_sizes(builder.metadata()); assert!( From a2227f7bfe1c4bf5635178a6f3d003f7da36d93c Mon Sep 17 00:00:00 2001 From: yoni Date: Thu, 5 Feb 2026 18:58:09 +0200 Subject: [PATCH 06/12] [CR] Add a test where both limits are set with multiple batches --- parquet/src/arrow/arrow_writer/mod.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index f0deb3e72622..522a03ccf643 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4674,7 +4674,7 @@ mod tests { #[test] // When both limits are set, the row limit triggers first - fn test_row_group_limit_both_row_wins() { + fn test_row_group_limit_both_row_wins_single_batch() { let props = WriterProperties::builder() .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data @@ -4693,6 +4693,30 @@ mod tests { ); } + + #[test] + // When both limits are set, the row limit triggers first + fn test_row_group_limit_both_row_wins_multiple_batches() { + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows + .set_max_row_group_bytes(Some(9999)) // Won't trigger + .build(); + + let builder = write_batches(WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100 + }, props); + + let sizes = row_group_sizes(builder.metadata()); + + assert_eq!( + &row_group_sizes(builder.metadata()), + &[5; 20], + "Row limit should trigger before byte limit" + ); + } + #[test] // When both limits are set, the byte limit triggers first fn test_row_group_limit_both_bytes_wins() { From 8d71637b23e090fa515b54e833639834505fd13c Mon Sep 17 00:00:00 2001 From: yoni Date: Thu, 5 Feb 2026 19:07:57 +0200 Subject: [PATCH 07/12] Fix deprecated use leftovers --- parquet/src/file/properties.rs | 2 +- parquet/tests/arrow_reader/row_filter/async.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index af927db6b971..e58e3bc9146c 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -260,7 +260,7 @@ impl WriterProperties { /// Returns maximum number of rows in a row group, or `None` if unlimited. /// - /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] + /// For more details see [`WriterPropertiesBuilder::set_max_row_group_row_count`] pub fn max_row_group_row_count(&self) -> Option { self.max_row_group_row_count } diff --git a/parquet/tests/arrow_reader/row_filter/async.rs b/parquet/tests/arrow_reader/row_filter/async.rs index f756d6e89383..6fa616d714f1 100644 --- a/parquet/tests/arrow_reader/row_filter/async.rs +++ b/parquet/tests/arrow_reader/row_filter/async.rs @@ -417,7 +417,7 @@ async fn test_predicate_pushdown_with_skipped_pages() { ])); let props = WriterProperties::builder() - .set_max_row_group_size(300) + .set_max_row_group_row_count(Some(300)) .set_data_page_row_count_limit(33) .build(); From d8cfb7f33424748304c2a573e476de9c48fb2d68 Mon Sep 17 00:00:00 2001 From: yoni Date: Sun, 8 Feb 2026 10:54:29 +0200 Subject: [PATCH 08/12] fmt + clippy --- parquet/src/arrow/arrow_writer/mod.rs | 93 +++++++++++++++++---------- parquet/src/bin/parquet-fromcsv.rs | 3 +- parquet/src/bin/parquet-rewrite.rs | 3 +- parquet/src/file/properties.rs | 5 +- 4 files changed, 63 insertions(+), 41 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 522a03ccf643..7ebee6e7172b 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4583,14 +4583,22 @@ mod tests { } /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter` - fn write_batches(WriteBatchesShape {num_batches, rows_per_batch, row_size}: WriteBatchesShape, props: WriterProperties) -> ParquetRecordBatchReaderBuilder { + fn write_batches( + WriteBatchesShape { + num_batches, + rows_per_batch, + row_size, + }: WriteBatchesShape, + props: WriterProperties, + ) -> ParquetRecordBatchReaderBuilder { let schema = Arc::new(Schema::new(vec![Field::new( "str", ArrowDataType::Utf8, false, )])); let file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); for batch_idx in 0..num_batches { let strings: Vec = (0..rows_per_batch) @@ -4612,11 +4620,14 @@ mod tests { .set_max_row_group_bytes(None) .build(); - let builder = write_batches(WriteBatchesShape { - num_batches: 1, - rows_per_batch: 1000, - row_size: 4 - }, props); + let builder = write_batches( + WriteBatchesShape { + num_batches: 1, + rows_per_batch: 1000, + row_size: 4, + }, + props, + ); assert_eq!( &row_group_sizes(builder.metadata()), @@ -4633,11 +4644,14 @@ mod tests { .set_max_row_group_bytes(None) .build(); - let builder = write_batches(WriteBatchesShape { - num_batches: 1, - rows_per_batch: 1000, - row_size: 4 - }, props); + let builder = write_batches( + WriteBatchesShape { + num_batches: 1, + rows_per_batch: 1000, + row_size: 4, + }, + props, + ); assert_eq!( &row_group_sizes(builder.metadata()), @@ -4655,11 +4669,14 @@ mod tests { .set_max_row_group_bytes(Some(3500)) .build(); - let builder = write_batches(WriteBatchesShape { - num_batches: 10, - rows_per_batch: 10, - row_size: 100 - }, props); + let builder = write_batches( + WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100, + }, + props, + ); let sizes = row_group_sizes(builder.metadata()); @@ -4680,11 +4697,14 @@ mod tests { .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data .build(); - let builder = write_batches(WriteBatchesShape { - num_batches: 1, - row_size: 4, - rows_per_batch: 1000 - }, props); + let builder = write_batches( + WriteBatchesShape { + num_batches: 1, + row_size: 4, + rows_per_batch: 1000, + }, + props, + ); assert_eq!( &row_group_sizes(builder.metadata()), @@ -4693,7 +4713,6 @@ mod tests { ); } - #[test] // When both limits are set, the row limit triggers first fn test_row_group_limit_both_row_wins_multiple_batches() { @@ -4702,13 +4721,14 @@ mod tests { .set_max_row_group_bytes(Some(9999)) // Won't trigger .build(); - let builder = write_batches(WriteBatchesShape { - num_batches: 10, - rows_per_batch: 10, - row_size: 100 - }, props); - - let sizes = row_group_sizes(builder.metadata()); + let builder = write_batches( + WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100, + }, + props, + ); assert_eq!( &row_group_sizes(builder.metadata()), @@ -4725,11 +4745,14 @@ mod tests { .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows .build(); - let builder = write_batches(WriteBatchesShape { - num_batches: 10, - rows_per_batch: 10, - row_size: 100 - }, props); + let builder = write_batches( + WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100, + }, + props, + ); let sizes = row_group_sizes(builder.metadata()); diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index cf9958f06956..4a59385a4479 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -297,7 +297,8 @@ fn configure_writer_properties(args: &Args) -> WriterProperties { properties_builder = properties_builder.set_writer_version(writer_version); } if let Some(max_row_group_size) = args.max_row_group_size { - properties_builder = properties_builder.set_max_row_group_row_count(Some(max_row_group_size)); + properties_builder = + properties_builder.set_max_row_group_row_count(Some(max_row_group_size)); } if let Some(enable_bloom_filter) = args.enable_bloom_filter { properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter); diff --git a/parquet/src/bin/parquet-rewrite.rs b/parquet/src/bin/parquet-rewrite.rs index 615c792cce7e..5bdd432be530 100644 --- a/parquet/src/bin/parquet-rewrite.rs +++ b/parquet/src/bin/parquet-rewrite.rs @@ -352,7 +352,8 @@ fn main() { } if let Some(value) = args.max_row_group_size { - writer_properties_builder = writer_properties_builder.set_max_row_group_row_count(Some(value)); + writer_properties_builder = + writer_properties_builder.set_max_row_group_row_count(Some(value)); } if let Some(value) = args.data_page_row_count_limit { writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index e58e3bc9146c..bb5ea2c5d0ea 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -594,10 +594,7 @@ impl WriterPropertiesBuilder { /// /// # Panics /// If the value is set to 0. - #[deprecated( - since = "57.3.0", - note = "Use `set_max_row_group_row_count` instead", - )] + #[deprecated(since = "57.3.0", note = "Use `set_max_row_group_row_count` instead")] pub fn set_max_row_group_size(mut self, value: usize) -> Self { assert!(value > 0, "Cannot have a 0 max row group size"); self.max_row_group_row_count = Some(value); From a3e4a1f0046efd3049e69d39dbb76741e98f1922 Mon Sep 17 00:00:00 2001 From: yoni Date: Sun, 8 Feb 2026 11:04:38 +0200 Subject: [PATCH 09/12] [CR] Fixes: - Change deprecation notice to 58.0.0 - Improve wording in comments - Cleanup references to the newly deprecated API --- parquet/src/arrow/arrow_writer/mod.rs | 2 +- parquet/src/file/properties.rs | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 7ebee6e7172b..4fabdae2d39d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -319,7 +319,7 @@ impl ArrowWriter { /// Encodes the provided [`RecordBatch`] /// - /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] + /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`] /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be /// written to one or more row groups such that limits are respected. /// diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index bb5ea2c5d0ea..46ac1147b6eb 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -45,9 +45,6 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; -/// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java -/// implementation for `parquet.block.size`) -pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; /// Default value for [`WriterProperties::bloom_filter_position`] pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] @@ -594,7 +591,7 @@ impl WriterPropertiesBuilder { /// /// # Panics /// If the value is set to 0. - #[deprecated(since = "57.3.0", note = "Use `set_max_row_group_row_count` instead")] + #[deprecated(since = "58.0.0", note = "Use `set_max_row_group_row_count` instead")] pub fn set_max_row_group_size(mut self, value: usize) -> Self { assert!(value > 0, "Cannot have a 0 max row group size"); self.max_row_group_row_count = Some(value); @@ -604,12 +601,12 @@ impl WriterPropertiesBuilder { /// Sets maximum number of rows in a row group, or `None` for unlimited. /// /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, - /// the row group with the smallest limit will be applied. + /// the row group with the smaller limit will be produced. /// /// # Panics /// If the value is `Some(0)`. pub fn set_max_row_group_row_count(mut self, value: Option) -> Self { - assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); + assert_ne!(value, Some(0), "Cannot have a 0 max row group row count"); self.max_row_group_row_count = value; self } @@ -620,7 +617,7 @@ impl WriterPropertiesBuilder { /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. /// /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, - /// the row group with the smallest limit will be applied. + /// the row group with the smaller limit will be produced. /// /// # Panics /// If the value is `Some(0)`. From f3be44ca7e7834809e6bcd2392cba8e1d99d2d68 Mon Sep 17 00:00:00 2001 From: yoni Date: Sun, 8 Feb 2026 14:31:34 +0200 Subject: [PATCH 10/12] [CR] Rename const to match property name --- parquet/src/file/properties.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 46ac1147b6eb..84a9abb91554 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -44,7 +44,7 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag /// Default value for [`WriterProperties::write_page_header_statistics`] pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; /// Default value for [`WriterProperties::max_row_group_size`] -pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; +pub const DEFAULT_MAX_ROW_GROUP_ROW_COUNT: usize = 1024 * 1024; /// Default value for [`WriterProperties::bloom_filter_position`] pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] @@ -484,7 +484,7 @@ impl Default for WriterPropertiesBuilder { data_page_size_limit: DEFAULT_PAGE_SIZE, data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, - max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_SIZE), + max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_ROW_COUNT), max_row_group_bytes: None, bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, @@ -587,7 +587,7 @@ impl WriterPropertiesBuilder { } /// Sets maximum number of rows in a row group (defaults to `1024 * 1024` - /// via [`DEFAULT_MAX_ROW_GROUP_SIZE`]). + /// via [`DEFAULT_MAX_ROW_GROUP_ROW_COUNT`]). /// /// # Panics /// If the value is set to 0. @@ -1382,7 +1382,7 @@ mod tests { DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); - assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); + assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_ROW_COUNT); assert_eq!(props.max_row_group_bytes(), None); assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); From 029afdc3e4de3e1163efdba845818d8b2495e253 Mon Sep 17 00:00:00 2001 From: yoni Date: Sun, 8 Feb 2026 14:37:48 +0200 Subject: [PATCH 11/12] [CR] Deprecate max_row_group_size getter --- parquet/src/file/properties.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 84a9abb91554..e165f0f72bd7 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -43,7 +43,7 @@ pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize = 20_000; pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; /// Default value for [`WriterProperties::write_page_header_statistics`] pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; -/// Default value for [`WriterProperties::max_row_group_size`] +/// Default value for [`WriterProperties::max_row_group_row_count`] pub const DEFAULT_MAX_ROW_GROUP_ROW_COUNT: usize = 1024 * 1024; /// Default value for [`WriterProperties::bloom_filter_position`] pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; @@ -251,6 +251,7 @@ impl WriterProperties { /// Returns maximum number of rows in a row group, or `usize::MAX` if unlimited. /// /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] + #[deprecated(since = "58.0.0", note = "Use `max_row_group_row_count` instead")] pub fn max_row_group_size(&self) -> usize { self.max_row_group_row_count.unwrap_or(usize::MAX) } @@ -1382,7 +1383,10 @@ mod tests { DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); - assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_ROW_COUNT); + assert_eq!( + props.max_row_group_row_count(), + Some(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) + ); assert_eq!(props.max_row_group_bytes(), None); assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); @@ -1494,7 +1498,7 @@ mod tests { assert_eq!(props.data_page_size_limit(), 10); assert_eq!(props.dictionary_page_size_limit(), 20); assert_eq!(props.write_batch_size(), 30); - assert_eq!(props.max_row_group_size(), 40); + assert_eq!(props.max_row_group_row_count(), Some(40)); assert_eq!(props.created_by(), "default"); assert_eq!( props.key_value_metadata(), From d6728ad4edcbd679c9694bcd928f364184e64596 Mon Sep 17 00:00:00 2001 From: yoni Date: Mon, 9 Feb 2026 22:24:12 +0200 Subject: [PATCH 12/12] [CR] Clarify safe subtraction in ArrowWriter::write --- parquet/src/arrow/arrow_writer/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 4fabdae2d39d..a164257667f5 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -364,6 +364,7 @@ impl ArrowWriter { let avg_row_bytes = current_bytes / in_progress.buffered_rows; if avg_row_bytes > 0 { + // At this point, `current_bytes < max_bytes` (checked above) let remaining_bytes = max_bytes - current_bytes; let rows_that_fit = remaining_bytes / avg_row_bytes;