diff --git a/parquet/benches/arrow_statistics.rs b/parquet/benches/arrow_statistics.rs index a4aa9d137e9c..6da816bde9aa 100644 --- a/parquet/benches/arrow_statistics.rs +++ b/parquet/benches/arrow_statistics.rs @@ -79,7 +79,7 @@ fn create_parquet_file( )])), }; - let mut props = WriterProperties::builder().set_max_row_group_size(row_groups); + let mut props = WriterProperties::builder().set_max_row_group_row_count(Some(row_groups)); if let Some(limit) = data_page_row_count_limit { props = props .set_data_page_row_count_limit(*limit) diff --git a/parquet/benches/parquet_round_trip.rs b/parquet/benches/parquet_round_trip.rs index b239c3ccc759..7f46b02b78e2 100644 --- a/parquet/benches/parquet_round_trip.rs +++ b/parquet/benches/parquet_round_trip.rs @@ -299,7 +299,7 @@ fn file_from_spec(spec: ParquetFileSpec, buffer: &mut Vec) { let schema = schema(spec.column_type, spec.num_columns); let props = WriterProperties::builder() - .set_max_row_group_size(spec.rows_per_row_group) + .set_max_row_group_row_count(Some(spec.rows_per_row_group)) .set_data_page_row_count_limit(spec.rows_per_page) .set_encoding(spec.encoding) .set_dictionary_enabled(spec.use_dict) diff --git a/parquet/benches/row_group_index_reader.rs b/parquet/benches/row_group_index_reader.rs index f9e52684401b..1fadfb5d4b7c 100644 --- a/parquet/benches/row_group_index_reader.rs +++ b/parquet/benches/row_group_index_reader.rs @@ -87,7 +87,7 @@ fn create_test_file(num_row_groups: usize, rows_per_group: usize) -> Bytes { let mut buffer = Vec::new(); let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) - .set_max_row_group_size(rows_per_group) + .set_max_row_group_row_count(Some(rows_per_group)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index ff1b414c27bb..1d5c68c22e11 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -541,7 +541,7 @@ mod tests { let file = tempfile::tempfile().unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .build(); let writer = ArrowWriter::try_new( diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d0398418007a..aec3337837f4 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -4633,7 +4633,7 @@ pub(crate) mod tests { schema.clone(), Some( WriterProperties::builder() - .set_max_row_group_size(row_group_size) + .set_max_row_group_row_count(Some(row_group_size)) .build(), ), ) @@ -5629,7 +5629,7 @@ pub(crate) mod tests { let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?; let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(50) + .set_max_row_group_row_count(Some(50)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?; // write in 10 row batches as the size limits are enforced after each batch @@ -5795,7 +5795,7 @@ pub(crate) mod tests { let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(2) + .set_max_row_group_row_count(Some(2)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap(); writer.write(&batch1).unwrap(); @@ -5854,7 +5854,7 @@ pub(crate) mod tests { let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(3) + .set_max_row_group_row_count(Some(3)) .build(); let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap(); writer.write(&batch1).unwrap(); @@ -5903,7 +5903,7 @@ pub(crate) mod tests { fn test_read_row_group_indices_with_selection() -> Result<()> { let mut buffer = Vec::new(); let options = WriterProperties::builder() - .set_max_row_group_size(10) + .set_max_row_group_row_count(Some(10)) .build(); let schema = Arc::new(Schema::new(vec![Field::new( diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 99d0455b31b9..a164257667f5 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -187,8 +187,11 @@ pub struct ArrowWriter { /// Creates new [`ArrowRowGroupWriter`] instances as required row_group_writer_factory: ArrowRowGroupWriterFactory, - /// The length of arrays to write to each row group - max_row_group_size: usize, + /// The maximum number of rows to write to each row group, or None for unlimited + max_row_group_row_count: Option, + + /// The maximum size in bytes for a row group, or None for unlimited + max_row_group_bytes: Option, } impl std::fmt::Debug for ArrowWriter { @@ -199,7 +202,8 @@ impl std::fmt::Debug for ArrowWriter { .field("in_progress_size", &format_args!("{buffered_memory} bytes")) .field("in_progress_rows", &self.in_progress_rows()) .field("arrow_schema", &self.arrow_schema) - .field("max_row_group_size", &self.max_row_group_size) + .field("max_row_group_row_count", &self.max_row_group_row_count) + .field("max_row_group_bytes", &self.max_row_group_bytes) .finish() } } @@ -247,7 +251,8 @@ impl ArrowWriter { add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); } - let max_row_group_size = props.max_row_group_size(); + let max_row_group_row_count = props.max_row_group_row_count(); + let max_row_group_bytes = props.max_row_group_bytes(); let props_ptr = Arc::new(props); let file_writer = @@ -261,7 +266,8 @@ impl ArrowWriter { in_progress: None, arrow_schema, row_group_writer_factory, - max_row_group_size, + max_row_group_row_count, + max_row_group_bytes, }) } @@ -313,9 +319,13 @@ impl ArrowWriter { /// Encodes the provided [`RecordBatch`] /// - /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] - /// rows, the contents of `batch` will be written to one or more row groups such that all but - /// the final row group in the file contain [`WriterProperties::max_row_group_size`] rows. + /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`] + /// rows or [`WriterProperties::max_row_group_bytes`] bytes, the contents of `batch` will be + /// written to one or more row groups such that limits are respected. + /// + /// If both limits are `None`, all data is written to a single row group. + /// If one limit is set, that limit is respected. + /// If both limits are set, the lower bound (whichever triggers first) is respected. /// /// This will fail if the `batch`'s schema does not match the writer's schema. pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { @@ -331,18 +341,58 @@ impl ArrowWriter { ), }; - // If would exceed max_row_group_size, split batch - if in_progress.buffered_rows + batch.num_rows() > self.max_row_group_size { - let to_write = self.max_row_group_size - in_progress.buffered_rows; - let a = batch.slice(0, to_write); - let b = batch.slice(to_write, batch.num_rows() - to_write); - self.write(&a)?; - return self.write(&b); + if let Some(max_rows) = self.max_row_group_row_count { + if in_progress.buffered_rows + batch.num_rows() > max_rows { + let to_write = max_rows - in_progress.buffered_rows; + let a = batch.slice(0, to_write); + let b = batch.slice(to_write, batch.num_rows() - to_write); + self.write(&a)?; + return self.write(&b); + } + } + + // Check byte limit: if we have buffered data, use measured average row size + // to split batch proactively before exceeding byte limit + if let Some(max_bytes) = self.max_row_group_bytes { + if in_progress.buffered_rows > 0 { + let current_bytes = in_progress.get_estimated_total_bytes(); + + if current_bytes >= max_bytes { + self.flush()?; + return self.write(batch); + } + + let avg_row_bytes = current_bytes / in_progress.buffered_rows; + if avg_row_bytes > 0 { + // At this point, `current_bytes < max_bytes` (checked above) + let remaining_bytes = max_bytes - current_bytes; + let rows_that_fit = remaining_bytes / avg_row_bytes; + + if batch.num_rows() > rows_that_fit { + if rows_that_fit > 0 { + let a = batch.slice(0, rows_that_fit); + let b = batch.slice(rows_that_fit, batch.num_rows() - rows_that_fit); + self.write(&a)?; + return self.write(&b); + } else { + self.flush()?; + return self.write(batch); + } + } + } + } } in_progress.write(batch)?; - if in_progress.buffered_rows >= self.max_row_group_size { + let should_flush = self + .max_row_group_row_count + .is_some_and(|max| in_progress.buffered_rows >= max) + || self + .max_row_group_bytes + .is_some_and(|max| in_progress.get_estimated_total_bytes() >= max); + + if should_flush { self.flush()? } Ok(()) @@ -914,6 +964,14 @@ impl ArrowRowGroupWriter { Ok(()) } + /// Returns the estimated total encoded bytes for this row group + fn get_estimated_total_bytes(&self) -> usize { + self.writers + .iter() + .map(|x| x.get_estimated_total_bytes()) + .sum() + } + fn close(self) -> Result> { self.writers .into_iter() @@ -2354,7 +2412,7 @@ mod tests { let mut props = WriterProperties::builder().set_writer_version(version); if let Some(size) = max_row_group_size { - props = props.set_max_row_group_size(size) + props = props.set_max_row_group_row_count(Some(size)) } let props = props.build(); @@ -2485,7 +2543,7 @@ mod tests { for row_group_size in row_group_sizes { let props = WriterProperties::builder() .set_writer_version(version) - .set_max_row_group_size(row_group_size) + .set_max_row_group_row_count(Some(row_group_size)) .set_dictionary_enabled(dictionary_size != 0) .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) @@ -3147,7 +3205,7 @@ mod tests { for row_group_size in row_group_sizes { let props = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) - .set_max_row_group_size(row_group_size) + .set_max_row_group_row_count(Some(row_group_size)) .set_dictionary_enabled(false) .set_encoding(*encoding) .set_data_page_size_limit(data_page_size_limit) @@ -3775,7 +3833,7 @@ mod tests { let file = tempfile::tempfile().unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .build(); let mut writer = @@ -3917,7 +3975,7 @@ mod tests { // Write data let file = tempfile::tempfile().unwrap(); let props = WriterProperties::builder() - .set_max_row_group_size(6) + .set_max_row_group_row_count(Some(6)) .build(); let mut writer = @@ -4518,4 +4576,198 @@ mod tests { assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024); assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } + + struct WriteBatchesShape { + num_batches: usize, + rows_per_batch: usize, + row_size: usize, + } + + /// Helper function to write batches with the provided `WriteBatchesShape` into an `ArrowWriter` + fn write_batches( + WriteBatchesShape { + num_batches, + rows_per_batch, + row_size, + }: WriteBatchesShape, + props: WriterProperties, + ) -> ParquetRecordBatchReaderBuilder { + let schema = Arc::new(Schema::new(vec![Field::new( + "str", + ArrowDataType::Utf8, + false, + )])); + let file = tempfile::tempfile().unwrap(); + let mut writer = + ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), Some(props)).unwrap(); + + for batch_idx in 0..num_batches { + let strings: Vec = (0..rows_per_batch) + .map(|i| format!("{:0>width$}", batch_idx * 10 + i, width = row_size)) + .collect(); + let array = StringArray::from(strings); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + ParquetRecordBatchReaderBuilder::try_new(file).unwrap() + } + + #[test] + // When both limits are None, all data should go into a single row group + fn test_row_group_limit_none_writes_single_row_group() { + let props = WriterProperties::builder() + .set_max_row_group_row_count(None) + .set_max_row_group_bytes(None) + .build(); + + let builder = write_batches( + WriteBatchesShape { + num_batches: 1, + rows_per_batch: 1000, + row_size: 4, + }, + props, + ); + + assert_eq!( + &row_group_sizes(builder.metadata()), + &[1000], + "With no limits, all rows should be in a single row group" + ); + } + + #[test] + // When only max_row_group_size is set, respect the row limit + fn test_row_group_limit_rows_only() { + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(300)) + .set_max_row_group_bytes(None) + .build(); + + let builder = write_batches( + WriteBatchesShape { + num_batches: 1, + rows_per_batch: 1000, + row_size: 4, + }, + props, + ); + + assert_eq!( + &row_group_sizes(builder.metadata()), + &[300, 300, 300, 100], + "Row groups should be split by row count" + ); + } + + #[test] + // When only max_row_group_bytes is set, respect the byte limit + fn test_row_group_limit_bytes_only() { + let props = WriterProperties::builder() + .set_max_row_group_row_count(None) + // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) + .set_max_row_group_bytes(Some(3500)) + .build(); + + let builder = write_batches( + WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100, + }, + props, + ); + + let sizes = row_group_sizes(builder.metadata()); + + assert!( + sizes.len() > 1, + "Should have multiple row groups due to byte limit, got {sizes:?}", + ); + + let total_rows: i64 = sizes.iter().sum(); + assert_eq!(total_rows, 100, "Total rows should be preserved"); + } + + #[test] + // When both limits are set, the row limit triggers first + fn test_row_group_limit_both_row_wins_single_batch() { + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(200)) // Will trigger at 200 rows + .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger for small int data + .build(); + + let builder = write_batches( + WriteBatchesShape { + num_batches: 1, + row_size: 4, + rows_per_batch: 1000, + }, + props, + ); + + assert_eq!( + &row_group_sizes(builder.metadata()), + &[200, 200, 200, 200, 200], + "Row limit should trigger before byte limit" + ); + } + + #[test] + // When both limits are set, the row limit triggers first + fn test_row_group_limit_both_row_wins_multiple_batches() { + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(5)) // Will trigger every 5 rows + .set_max_row_group_bytes(Some(9999)) // Won't trigger + .build(); + + let builder = write_batches( + WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100, + }, + props, + ); + + assert_eq!( + &row_group_sizes(builder.metadata()), + &[5; 20], + "Row limit should trigger before byte limit" + ); + } + + #[test] + // When both limits are set, the byte limit triggers first + fn test_row_group_limit_both_bytes_wins() { + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(1000)) // Won't trigger for 100 rows + .set_max_row_group_bytes(Some(3500)) // Will trigger at ~30-35 rows + .build(); + + let builder = write_batches( + WriteBatchesShape { + num_batches: 10, + rows_per_batch: 10, + row_size: 100, + }, + props, + ); + + let sizes = row_group_sizes(builder.metadata()); + + assert!( + sizes.len() > 1, + "Byte limit should trigger before row limit, got {sizes:?}", + ); + + assert!( + sizes.iter().all(|&s| s < 1000), + "No row group should hit the row limit" + ); + + let total_rows: i64 = sizes.iter().sum(); + assert_eq!(total_rows, 100, "Total rows should be preserved"); + } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index fe846d15d9ad..9e45a0c3168c 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1233,7 +1233,7 @@ mod tests { let mut buf = Vec::with_capacity(1024); let props = WriterProperties::builder() - .set_max_row_group_size(3) + .set_max_row_group_row_count(Some(3)) .build(); let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap(); writer.write(&data).unwrap(); @@ -1466,7 +1466,7 @@ mod tests { let props = WriterProperties::builder() .set_data_page_row_count_limit(256) .set_write_batch_size(256) - .set_max_row_group_size(1024); + .set_max_row_group_row_count(Some(1024)); // Write data let mut file = tempfile().unwrap(); @@ -1704,7 +1704,7 @@ mod tests { let props = WriterProperties::builder() .set_data_page_row_count_limit(1) .set_write_batch_size(1) - .set_max_row_group_size(10) + .set_max_row_group_row_count(Some(10)) .set_write_page_header_statistics(true) .build(); let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap(); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 9018c09f2a89..a050ef77c46c 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -340,7 +340,7 @@ mod tests { let reader = get_test_reader(); let write_props = WriterProperties::builder() - .set_max_row_group_size(64) + .set_max_row_group_row_count(Some(64)) .build(); let mut async_buffer = Vec::new(); diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 50451aee120e..cdb0715edb55 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -1142,7 +1142,7 @@ mod test { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index bba07b9b056e..4a59385a4479 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -297,7 +297,8 @@ fn configure_writer_properties(args: &Args) -> WriterProperties { properties_builder = properties_builder.set_writer_version(writer_version); } if let Some(max_row_group_size) = args.max_row_group_size { - properties_builder = properties_builder.set_max_row_group_size(max_row_group_size); + properties_builder = + properties_builder.set_max_row_group_row_count(Some(max_row_group_size)); } if let Some(enable_bloom_filter) = args.enable_bloom_filter { properties_builder = properties_builder.set_bloom_filter_enabled(enable_bloom_filter); diff --git a/parquet/src/bin/parquet-rewrite.rs b/parquet/src/bin/parquet-rewrite.rs index 31058e552d15..5bdd432be530 100644 --- a/parquet/src/bin/parquet-rewrite.rs +++ b/parquet/src/bin/parquet-rewrite.rs @@ -352,7 +352,8 @@ fn main() { } if let Some(value) = args.max_row_group_size { - writer_properties_builder = writer_properties_builder.set_max_row_group_size(value); + writer_properties_builder = + writer_properties_builder.set_max_row_group_row_count(Some(value)); } if let Some(value) = args.data_page_row_count_limit { writer_properties_builder = writer_properties_builder.set_data_page_row_count_limit(value); diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 34b7fec2c0c5..abc788426260 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -653,7 +653,7 @@ mod tests { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 38a5a804c0b7..e165f0f72bd7 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -43,8 +43,8 @@ pub const DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT: usize = 20_000; pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; /// Default value for [`WriterProperties::write_page_header_statistics`] pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; -/// Default value for [`WriterProperties::max_row_group_size`] -pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; +/// Default value for [`WriterProperties::max_row_group_row_count`] +pub const DEFAULT_MAX_ROW_GROUP_ROW_COUNT: usize = 1024 * 1024; /// Default value for [`WriterProperties::bloom_filter_position`] pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] @@ -156,7 +156,8 @@ pub struct WriterProperties { data_page_size_limit: usize, data_page_row_count_limit: usize, write_batch_size: usize, - max_row_group_size: usize, + max_row_group_row_count: Option, + max_row_group_bytes: Option, bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, @@ -247,11 +248,26 @@ impl WriterProperties { self.write_batch_size } - /// Returns maximum number of rows in a row group. + /// Returns maximum number of rows in a row group, or `usize::MAX` if unlimited. /// /// For more details see [`WriterPropertiesBuilder::set_max_row_group_size`] + #[deprecated(since = "58.0.0", note = "Use `max_row_group_row_count` instead")] pub fn max_row_group_size(&self) -> usize { - self.max_row_group_size + self.max_row_group_row_count.unwrap_or(usize::MAX) + } + + /// Returns maximum number of rows in a row group, or `None` if unlimited. + /// + /// For more details see [`WriterPropertiesBuilder::set_max_row_group_row_count`] + pub fn max_row_group_row_count(&self) -> Option { + self.max_row_group_row_count + } + + /// Returns maximum size of a row group in bytes, or `None` if unlimited. + /// + /// For more details see [`WriterPropertiesBuilder::set_max_row_group_bytes`] + pub fn max_row_group_bytes(&self) -> Option { + self.max_row_group_bytes } /// Returns bloom filter position. @@ -445,7 +461,8 @@ pub struct WriterPropertiesBuilder { data_page_size_limit: usize, data_page_row_count_limit: usize, write_batch_size: usize, - max_row_group_size: usize, + max_row_group_row_count: Option, + max_row_group_bytes: Option, bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, @@ -468,7 +485,8 @@ impl Default for WriterPropertiesBuilder { data_page_size_limit: DEFAULT_PAGE_SIZE, data_page_row_count_limit: DEFAULT_DATA_PAGE_ROW_COUNT_LIMIT, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, - max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, + max_row_group_row_count: Some(DEFAULT_MAX_ROW_GROUP_ROW_COUNT), + max_row_group_bytes: None, bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), @@ -493,7 +511,8 @@ impl WriterPropertiesBuilder { data_page_size_limit: self.data_page_size_limit, data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, - max_row_group_size: self.max_row_group_size, + max_row_group_row_count: self.max_row_group_row_count, + max_row_group_bytes: self.max_row_group_bytes, bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, @@ -569,13 +588,43 @@ impl WriterPropertiesBuilder { } /// Sets maximum number of rows in a row group (defaults to `1024 * 1024` - /// via [`DEFAULT_MAX_ROW_GROUP_SIZE`]). + /// via [`DEFAULT_MAX_ROW_GROUP_ROW_COUNT`]). /// /// # Panics /// If the value is set to 0. + #[deprecated(since = "58.0.0", note = "Use `set_max_row_group_row_count` instead")] pub fn set_max_row_group_size(mut self, value: usize) -> Self { assert!(value > 0, "Cannot have a 0 max row group size"); - self.max_row_group_size = value; + self.max_row_group_row_count = Some(value); + self + } + + /// Sets maximum number of rows in a row group, or `None` for unlimited. + /// + /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, + /// the row group with the smaller limit will be produced. + /// + /// # Panics + /// If the value is `Some(0)`. + pub fn set_max_row_group_row_count(mut self, value: Option) -> Self { + assert_ne!(value, Some(0), "Cannot have a 0 max row group row count"); + self.max_row_group_row_count = value; + self + } + + /// Sets maximum size of a row group in bytes, or `None` for unlimited. + /// + /// Row groups are flushed when their estimated encoded size exceeds this threshold. + /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. + /// + /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, + /// the row group with the smaller limit will be produced. + /// + /// # Panics + /// If the value is `Some(0)`. + pub fn set_max_row_group_bytes(mut self, value: Option) -> Self { + assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); + self.max_row_group_bytes = value; self } @@ -952,7 +1001,8 @@ impl From for WriterPropertiesBuilder { data_page_size_limit: props.data_page_size_limit, data_page_row_count_limit: props.data_page_row_count_limit, write_batch_size: props.write_batch_size, - max_row_group_size: props.max_row_group_size, + max_row_group_row_count: props.max_row_group_row_count, + max_row_group_bytes: props.max_row_group_bytes, bloom_filter_position: props.bloom_filter_position, writer_version: props.writer_version, created_by: props.created_by, @@ -1333,7 +1383,11 @@ mod tests { DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); - assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); + assert_eq!( + props.max_row_group_row_count(), + Some(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) + ); + assert_eq!(props.max_row_group_bytes(), None); assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); @@ -1418,7 +1472,7 @@ mod tests { .set_data_page_size_limit(10) .set_dictionary_page_size_limit(20) .set_write_batch_size(30) - .set_max_row_group_size(40) + .set_max_row_group_row_count(Some(40)) .set_created_by("default".to_owned()) .set_key_value_metadata(Some(vec![KeyValue::new( "key".to_string(), @@ -1444,7 +1498,7 @@ mod tests { assert_eq!(props.data_page_size_limit(), 10); assert_eq!(props.dictionary_page_size_limit(), 20); assert_eq!(props.write_batch_size(), 30); - assert_eq!(props.max_row_group_size(), 40); + assert_eq!(props.max_row_group_row_count(), Some(40)); assert_eq!(props.created_by(), "default"); assert_eq!( props.key_value_metadata(), diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 35948af022f1..7d69904451d3 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -2198,7 +2198,7 @@ mod tests { let props = Arc::new( WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::None) - .set_max_row_group_size(1) + .set_max_row_group_row_count(Some(1)) .build(), ); let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); diff --git a/parquet/tests/arrow_reader/io/mod.rs b/parquet/tests/arrow_reader/io/mod.rs index 52cf61ddc4a3..1f5dd2da1750 100644 --- a/parquet/tests/arrow_reader/io/mod.rs +++ b/parquet/tests/arrow_reader/io/mod.rs @@ -219,7 +219,7 @@ static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index ffc36655b39a..39214e0488b6 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -1130,7 +1130,7 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem .expect("tempfile creation"); let mut builder = WriterProperties::builder() - .set_max_row_group_size(row_per_group) + .set_max_row_group_row_count(Some(row_per_group)) .set_bloom_filter_enabled(true) .set_statistics_enabled(EnabledStatistics::Page); if scenario.truncate_stats() { diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs index 73613c83d257..85dba68c9c69 100644 --- a/parquet/tests/arrow_reader/predicate_cache.rs +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -218,7 +218,7 @@ static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { let mut output = Vec::new(); let writer_options = WriterProperties::builder() - .set_max_row_group_size(200) + .set_max_row_group_row_count(Some(200)) .set_data_page_row_count_limit(100) .build(); let mut writer = diff --git a/parquet/tests/arrow_reader/row_filter/async.rs b/parquet/tests/arrow_reader/row_filter/async.rs index f756d6e89383..6fa616d714f1 100644 --- a/parquet/tests/arrow_reader/row_filter/async.rs +++ b/parquet/tests/arrow_reader/row_filter/async.rs @@ -417,7 +417,7 @@ async fn test_predicate_pushdown_with_skipped_pages() { ])); let props = WriterProperties::builder() - .set_max_row_group_size(300) + .set_max_row_group_row_count(Some(300)) .set_data_page_row_count_limit(33) .build(); diff --git a/parquet/tests/arrow_reader/statistics.rs b/parquet/tests/arrow_reader/statistics.rs index 2f7859d06355..4f7ddcff4ad1 100644 --- a/parquet/tests/arrow_reader/statistics.rs +++ b/parquet/tests/arrow_reader/statistics.rs @@ -116,7 +116,7 @@ fn build_parquet_file( .tempfile() .expect("tempfile creation"); - let mut builder = WriterProperties::builder().set_max_row_group_size(row_per_group); + let mut builder = WriterProperties::builder().set_max_row_group_row_count(Some(row_per_group)); if let Some(enable_stats) = enable_stats { builder = builder.set_statistics_enabled(enable_stats); } @@ -2903,7 +2903,7 @@ mod test { fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> Arc { let props = WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::Chunk) - .set_max_row_group_size(ROWS_PER_ROW_GROUP) + .set_max_row_group_row_count(Some(ROWS_PER_ROW_GROUP)) .build(); let mut buffer = Vec::new(); diff --git a/parquet/tests/encryption/encryption.rs b/parquet/tests/encryption/encryption.rs index b401742dcb30..c26a0edee682 100644 --- a/parquet/tests/encryption/encryption.rs +++ b/parquet/tests/encryption/encryption.rs @@ -432,7 +432,7 @@ fn uniform_encryption_roundtrip( let props = WriterProperties::builder() // Ensure multiple row groups - .set_max_row_group_size(50) + .set_max_row_group_row_count(Some(50)) // Ensure multiple pages per row group .set_write_batch_size(20) .set_data_page_row_count_limit(20) @@ -537,7 +537,7 @@ fn uniform_encryption_page_skipping(page_index: bool) -> parquet::errors::Result let props = WriterProperties::builder() // Ensure multiple row groups - .set_max_row_group_size(50) + .set_max_row_group_row_count(Some(50)) // Ensure multiple pages per row group .set_write_batch_size(20) .set_data_page_row_count_limit(20)