From f96e14cb481517f0a4171d2574d6d13b4f0284b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 29 Jan 2026 19:04:33 +0100 Subject: [PATCH 01/17] Optimize data page statistics --- parquet/src/arrow/arrow_reader/statistics.rs | 1031 ++++++++++-------- 1 file changed, 578 insertions(+), 453 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 2f46c96be6b7..528871a251f8 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -22,15 +22,19 @@ use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::parquet_column; use crate::basic::Type as PhysicalType; -use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; -use crate::file::page_index::column_index::{ColumnIndexIterators, ColumnIndexMetaData}; +use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::statistics::Statistics as ParquetStatistics; use crate::schema::types::SchemaDescriptor; use arrow_array::builder::{ - BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder, - StringViewBuilder, + BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date32Builder, Date64Builder, + Decimal32Builder, Decimal64Builder, FixedSizeBinaryBuilder, Float16Builder, Float32Builder, + Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, + LargeStringBuilder, StringBuilder, StringViewBuilder, Time32MillisecondBuilder, + Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, + TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder, + TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder, }; use arrow_array::{ ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array, @@ -38,7 +42,7 @@ use arrow_array::{ Int16Array, Int32Array, Int64Array, LargeBinaryArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt8Array, - UInt16Array, UInt32Array, UInt64Array, new_empty_array, new_null_array, + UInt16Array, UInt32Array, UInt64Array, new_null_array, }; use arrow_buffer::i256; use arrow_schema::{DataType, Field, Schema, TimeUnit}; @@ -596,473 +600,572 @@ macro_rules! get_statistics { }}} } -macro_rules! make_data_page_stats_iterator { - ($iterator_type: ident, $func: ident, $stat_value_type: ty) => { - struct $iterator_type<'a, I> - where - I: Iterator, - { - iter: I, - } - - impl<'a, I> $iterator_type<'a, I> - where - I: Iterator, - { - fn new(iter: I) -> Self { - Self { iter } - } - } - - impl<'a, I> Iterator for $iterator_type<'a, I> - where - I: Iterator, - { - type Item = Vec>; - - fn next(&mut self) -> Option { - let next = self.iter.next(); - match next { - Some((len, index)) => match index { - // No matching `Index` found; - // thus no statistics that can be extracted. - // We return vec![None; len] to effectively - // create an arrow null-array with the length - // corresponding to the number of entries in - // `ParquetOffsetIndex` per row group per column. - ColumnIndexMetaData::NONE => Some(vec![None; len]), - _ => Some(<$stat_value_type>::$func(&index).collect::>()), - }, - _ => None, - } - } - - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } - } - }; -} - -make_data_page_stats_iterator!(MinBooleanDataPageStatsIterator, min_values_iter, bool); -make_data_page_stats_iterator!(MaxBooleanDataPageStatsIterator, max_values_iter, bool); -make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min_values_iter, i32); -make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max_values_iter, i32); -make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min_values_iter, i64); -make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max_values_iter, i64); -make_data_page_stats_iterator!( - MinFloat16DataPageStatsIterator, - min_values_iter, - FixedLenByteArray -); -make_data_page_stats_iterator!( - MaxFloat16DataPageStatsIterator, - max_values_iter, - FixedLenByteArray -); -make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator, min_values_iter, f32); -make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator, max_values_iter, f32); -make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator, min_values_iter, f64); -make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator, max_values_iter, f64); -make_data_page_stats_iterator!( - MinByteArrayDataPageStatsIterator, - min_values_iter, - ByteArray -); -make_data_page_stats_iterator!( - MaxByteArrayDataPageStatsIterator, - max_values_iter, - ByteArray -); -make_data_page_stats_iterator!( - MaxFixedLenByteArrayDataPageStatsIterator, - max_values_iter, - FixedLenByteArray -); - -make_data_page_stats_iterator!( - MinFixedLenByteArrayDataPageStatsIterator, - min_values_iter, - FixedLenByteArray -); - -macro_rules! get_decimal_page_stats_iterator { - ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => { - struct $iterator_type<'a, I> - where - I: Iterator, - { - iter: I, - } - - impl<'a, I> $iterator_type<'a, I> - where - I: Iterator, - { - fn new(iter: I) -> Self { - Self { iter } - } - } - - impl<'a, I> Iterator for $iterator_type<'a, I> - where - I: Iterator, - { - type Item = Vec>; - - // Some(native_index.$func().map(|v| v.map($conv)).collect::>()) - fn next(&mut self) -> Option { - let next = self.iter.next(); - match next { - Some((len, index)) => match index { - ColumnIndexMetaData::INT32(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $stat_value_type::from(*x))) - .collect::>(), - ), - ColumnIndexMetaData::INT64(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $stat_value_type::try_from(*x).unwrap())) - .collect::>(), - ), - ColumnIndexMetaData::BYTE_ARRAY(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $convert_func(x))) - .collect::>(), - ), - ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $convert_func(x))) - .collect::>(), - ), - _ => Some(vec![None; len]), - }, - _ => None, - } - } - - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } - } - }; -} - -get_decimal_page_stats_iterator!( - MinDecimal32DataPageStatsIterator, - min_values_iter, - i32, - from_bytes_to_i32 -); - -get_decimal_page_stats_iterator!( - MaxDecimal32DataPageStatsIterator, - max_values_iter, - i32, - from_bytes_to_i32 -); - -get_decimal_page_stats_iterator!( - MinDecimal64DataPageStatsIterator, - min_values_iter, - i64, - from_bytes_to_i64 -); - -get_decimal_page_stats_iterator!( - MaxDecimal64DataPageStatsIterator, - max_values_iter, - i64, - from_bytes_to_i64 -); - -get_decimal_page_stats_iterator!( - MinDecimal128DataPageStatsIterator, - min_values_iter, - i128, - from_bytes_to_i128 -); - -get_decimal_page_stats_iterator!( - MaxDecimal128DataPageStatsIterator, - max_values_iter, - i128, - from_bytes_to_i128 -); - -get_decimal_page_stats_iterator!( - MinDecimal256DataPageStatsIterator, - min_values_iter, - i256, - from_bytes_to_i256 -); - -get_decimal_page_stats_iterator!( - MaxDecimal256DataPageStatsIterator, - max_values_iter, - i256, - from_bytes_to_i256 -); - macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => { - paste! { - match $data_type { + { + let chunks: Vec<(usize, &ColumnIndexMetaData)> = $iterator.collect(); + let capacity: usize = chunks.iter().map(|c| c.0).sum(); + paste! { + match $data_type { DataType::Boolean => { - let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator); - let mut builder = BooleanBuilder::new(); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - builder.append_value(x); + let mut b = BooleanBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BOOLEAN(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::UInt8 => { + let mut b = UInt8Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.and_then(|&x| u8::try_from(x).ok())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::UInt16 => { + let mut b = UInt16Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.and_then(|&x| u16::try_from(x).ok())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::UInt32 => { + let mut builder = UInt32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.map(|&x| x as u32)); + } + } + _ => builder.append_nulls(len), } } Ok(Arc::new(builder.finish())) }, - DataType::UInt8 => Ok(Arc::new( - UInt8Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| u8::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::UInt16 => Ok(Arc::new( - UInt16Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| u16::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::UInt32 => Ok(Arc::new( - UInt32Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| Some(x as u32)) - }) - }) - .flatten() - ))), - DataType::UInt64 => Ok(Arc::new( - UInt64Array::from_iter( - [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| Some(x as u64)) - }) - }) - .flatten() - ))), - DataType::Int8 => Ok(Arc::new( - Int8Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| i8::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::Int16 => Ok(Arc::new( - Int16Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| i16::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Float16 => Ok(Arc::new( - Float16Array::from_iter( - [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| from_bytes_to_f16(x.data())) - }) - }) - .flatten() - ) - )), - DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), - DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Utf8 => { - let mut builder = StringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - builder.append_null(); - continue; - }; - - builder.append_value(x); + DataType::UInt64 => { + let mut builder = UInt64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.map(|&x| x as u64)); + } + } + _ => builder.append_nulls(len), } } Ok(Arc::new(builder.finish())) }, - DataType::LargeUtf8 => { - let mut builder = LargeStringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - builder.append_null(); - continue; - }; - - builder.append_value(x); + DataType::Int8 => { + let mut b = Int8Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.and_then(|&x| i8::try_from(x).ok())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Int16 => { + let mut b = Int16Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.and_then(|&x| i16::try_from(x).ok())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Int32 => { + let mut builder = Int32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.copied()); + } + } + _ => builder.append_nulls(len), + } + } + Ok(Arc::new(builder.finish())) + }, + DataType::Int64 => { + let mut builder = Int64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.copied()); + } + } + _ => builder.append_nulls(len), + } + } + Ok(Arc::new(builder.finish())) + }, + DataType::Float16 => { + let mut b = Float16Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.and_then(|x| from_bytes_to_f16(x.as_ref()))); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Float32 => { + let mut builder = Float32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::FLOAT(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.copied()); + } + } + _ => builder.append_nulls(len), } } Ok(Arc::new(builder.finish())) }, + DataType::Float64 => { + let mut builder = Float64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::DOUBLE(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.copied()); + } + } + _ => builder.append_nulls(len), + } + } + Ok(Arc::new(builder.finish())) + }, + DataType::Binary => { + let mut b = BinaryBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| x.as_ref())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::LargeBinary => { + let mut b = LargeBinaryBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| x.as_ref())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Utf8 => { + let mut b = StringBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(x) => match std::str::from_utf8(x.as_ref()) { + Ok(s) => b.append_value(s), + _ => b.append_null(), + } + None => b.append_null(), + } + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::LargeUtf8 => { + let mut b = LargeStringBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(x) => match std::str::from_utf8(x.as_ref()) { + Ok(s) => b.append_value(s), + _ => b.append_null(), + } + None => b.append_null(), + } + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, DataType::Dictionary(_, value_type) => { - [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type) + [<$stat_type_prefix:lower _ page_statistics>](value_type, chunks.into_iter(), $physical_type) }, DataType::Timestamp(unit, timezone) => { - let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); - Ok(match unit { - TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - }) + match unit { + TimeUnit::Second => { + let mut b = TimestampSecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + TimeUnit::Millisecond => { + let mut b = TimestampMillisecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + TimeUnit::Microsecond => { + let mut b = TimestampMicrosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + TimeUnit::Nanosecond => { + let mut b = TimestampNanosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + } + }, + DataType::Date32 => { + let mut builder = Date32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.copied()); + } + } + _ => builder.append_nulls(len), + } + } + Ok(Arc::new(builder.finish())) + }, + DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> { + let mut b = Date64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => { + let mut builder = Date64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + builder.append_option(val.copied()); + } + } + _ => builder.append_nulls(len), + } + } + Ok(Arc::new(builder.finish())) + }, + DataType::Decimal32(precision, scale) => { + let mut b = Decimal32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) + }, + DataType::Decimal64(precision, scale) => { + let mut b = Decimal64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| *x as i64)); + } + } + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) + }, + DataType::Decimal128(precision, scale) => { + let mut b = Decimal128Array::builder(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| *x as i128)); + } + } + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| *x as i128)); + } + } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i128(x.as_ref()))); + } + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i128(x.as_ref()))); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) + }, + DataType::Decimal256(precision, scale) => { + let mut b = Decimal256Array::builder(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| i256::from_i128(*x as i128))); + } + } + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| i256::from_i128(*x as i128))); + } + } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i256(x.as_ref()))); + } + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i256(x.as_ref()))); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) }, - DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok( - Arc::new( - Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter() - .map(|x| { - x.and_then(|x| i64::try_from(x).ok()) - }) - .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000)) - }).flatten() - ) - ) - ), - DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Decimal32(precision, scale) => Ok(Arc::new( - Decimal32Array::from_iter([<$stat_type_prefix Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), - DataType::Decimal64(precision, scale) => Ok(Arc::new( - Decimal64Array::from_iter([<$stat_type_prefix Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), - DataType::Decimal128(precision, scale) => Ok(Arc::new( - Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), - DataType::Decimal256(precision, scale) => Ok(Arc::new( - Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), DataType::Time32(unit) => { - Ok(match unit { - TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(), - )), - TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(), - )), + match unit { + TimeUnit::Second => { + let mut b = Time32SecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } + TimeUnit::Millisecond => { + let mut b = Time32MillisecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } _ => { - // don't know how to extract statistics, so return an empty array - new_empty_array(&DataType::Time32(unit.clone())) + Ok(new_null_array($data_type, capacity)) } - }) + } } DataType::Time64(unit) => { - Ok(match unit { - TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( - [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(), - )), - TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( - [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(), - )), + match unit { + TimeUnit::Microsecond => { + let mut b = Time64MicrosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } + TimeUnit::Nanosecond => { + let mut b = Time64NanosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } _ => { - // don't know how to extract statistics, so return an empty array - new_empty_array(&DataType::Time64(unit.clone())) + Ok(new_null_array($data_type, capacity)) } - }) + } }, DataType::FixedSizeBinary(size) => { - let mut builder = FixedSizeBinaryBuilder::new(*size); - let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - if x.len() == *size as usize { - let _ = builder.append_value(x.data()); - } else { - builder.append_null(); + let mut b = FixedSizeBinaryBuilder::with_capacity(capacity, *size); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(v) => b.append_value(v.as_ref())?, + None => b.append_null(), + } + } } + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Utf8View => { - let mut builder = StringViewBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - builder.append_null(); - continue; - }; - - builder.append_value(x); + let mut b = StringViewBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(x) => match std::str::from_utf8(x.as_ref()) { + Ok(s) => b.append_value(s), + _ => b.append_null(), + } + None => b.append_null(), + } + } + } + _ => { + for _ in 0..len { b.append_null(); } + } } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::BinaryView => { - let mut builder = BinaryViewBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - builder.append_value(x); + let mut b = BinaryViewBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(v) => b.append_value(v.as_ref()), + None => b.append_null(), + } + } + } + _ => { + for _ in 0..len { b.append_null(); } + } } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Date64 | // required to cover $physical_type match guard DataType::Null | @@ -1077,12 +1180,12 @@ macro_rules! get_data_page_statistics { DataType::Union(_, _) | DataType::Map(_, _) | DataType::RunEndEncoded(_, _) => { - let len = $iterator.count(); // don't know how to extract statistics, so return a null array - Ok(new_null_array($data_type, len)) + Ok(new_null_array($data_type, capacity)) }, } } + } } } /// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an @@ -1142,14 +1245,22 @@ pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result, { - let iter = iterator.flat_map(|(len, index)| match index { - ColumnIndexMetaData::NONE => vec![None; len], - column_index => column_index.null_counts().map_or(vec![None; len], |v| { - v.iter().map(|i| Some(*i as u64)).collect::>() - }), - }); - - Ok(UInt64Array::from_iter(iter)) + let chunks: Vec<_> = iterator.collect(); + let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum(); + let mut builder = UInt64Builder::with_capacity(total_capacity); + for (len, index) in chunks { + match index.null_counts() { + Some(counts) => { + for &count in counts { + builder.append_value(count as u64); + } + } + None => { + builder.append_nulls(len); + } + } + } + Ok(builder.finish()) } /// Extracts Parquet statistics as Arrow arrays @@ -1252,17 +1363,31 @@ impl<'a> StatisticsConverter<'a> { return Ok(None); }; - let mut builder = UInt64Array::builder(10); - for metadata in metadatas.into_iter() { - let row_count = metadata.num_rows(); - let row_count: u64 = row_count.try_into().map_err(|e| { - arrow_err!(format!( - "Parquet row count {row_count} too large to convert to u64: {e}" - )) - })?; - builder.append_value(row_count); + let mut e = None; + let row_counts: Vec = metadatas + .into_iter() + .map(|metadata| { + let row_count: i64 = metadata.num_rows(); + let row_count: std::result::Result = row_count.try_into(); + + match row_count { + Ok(v) => v, + Err(_) => { + e = Some(ParquetError::General(format!( + "Row count {} too large to convert to u64", + metadata.num_rows() + ))); + 0u64 // placeholder, error will be returned later + } + } + }) + .collect(); + if let Some(e) = e { + return Err(arrow_err!(format!( + "Parquet row count too large to convert to u64: {e}" + ))); } - Ok(Some(builder.finish())) + Ok(Some(UInt64Array::from(row_counts))) } /// Create a new `StatisticsConverter` to extract statistics for a column From 679e6f7dfea0c645fb66b35ea37ef7b141830c4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 29 Jan 2026 21:11:49 +0100 Subject: [PATCH 02/17] Revert row group change --- parquet/src/arrow/arrow_reader/statistics.rs | 38 ++++++-------------- 1 file changed, 10 insertions(+), 28 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 528871a251f8..f11340c97fab 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1359,35 +1359,17 @@ impl<'a> StatisticsConverter<'a> { where I: IntoIterator, { - let Some(_) = self.parquet_column_index else { - return Ok(None); - }; - - let mut e = None; - let row_counts: Vec = metadatas - .into_iter() - .map(|metadata| { - let row_count: i64 = metadata.num_rows(); - let row_count: std::result::Result = row_count.try_into(); - - match row_count { - Ok(v) => v, - Err(_) => { - e = Some(ParquetError::General(format!( - "Row count {} too large to convert to u64", - metadata.num_rows() - ))); - 0u64 // placeholder, error will be returned later - } - } - }) - .collect(); - if let Some(e) = e { - return Err(arrow_err!(format!( - "Parquet row count too large to convert to u64: {e}" - ))); + let mut builder = UInt64Array::builder(10); + for metadata in metadatas.into_iter() { + let row_count = metadata.num_rows(); + let row_count: u64 = row_count.try_into().map_err(|e| { + arrow_err!(format!( + "Parquet row count {row_count} too large to convert to u64: {e}" + )) + })?; + builder.append_value(row_count); } - Ok(Some(UInt64Array::from(row_counts))) + Ok(Some(builder.finish())) } /// Create a new `StatisticsConverter` to extract statistics for a column From 8b7f23268ccabd2424b42d749563f60f7cd18720 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 29 Jan 2026 21:21:16 +0100 Subject: [PATCH 03/17] Fix --- parquet/src/arrow/arrow_reader/statistics.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index f11340c97fab..7fec11d4ebc8 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1359,6 +1359,9 @@ impl<'a> StatisticsConverter<'a> { where I: IntoIterator, { + if self.parquet_column_index.is_none() { + return Ok(None); + } let mut builder = UInt64Array::builder(10); for metadata in metadatas.into_iter() { let row_count = metadata.num_rows(); From f900620aa35a067cdff81d0970e8913e2266a76e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 29 Jan 2026 21:25:46 +0100 Subject: [PATCH 04/17] Fix --- parquet/src/arrow/arrow_reader/statistics.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 7fec11d4ebc8..29ba6c7cf688 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1359,9 +1359,10 @@ impl<'a> StatisticsConverter<'a> { where I: IntoIterator, { - if self.parquet_column_index.is_none() { + let Some(_) = self.parquet_column_index else { return Ok(None); - } + }; + let mut builder = UInt64Array::builder(10); for metadata in metadatas.into_iter() { let row_count = metadata.num_rows(); From 37b1f306c64d98f693f8865516ad83dcf39b42ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 08:29:23 +0100 Subject: [PATCH 05/17] Fix decimals --- parquet/src/arrow/arrow_reader/statistics.rs | 25 ++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 29ba6c7cf688..4ad50af73779 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -957,6 +957,21 @@ macro_rules! get_data_page_statistics { b.append_option(val.copied()); } } + ColumnIndexMetaData::INT64(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.and_then(|&x| i32::try_from(x).ok())); + } + } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i32(x.as_ref()))); + } + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i32(x.as_ref()))); + } + } _ => b.append_nulls(len), } } @@ -976,6 +991,16 @@ macro_rules! get_data_page_statistics { b.append_option(val.copied()); } } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i64(x.as_ref()))); + } + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| from_bytes_to_i64(x.as_ref()))); + } + } _ => b.append_nulls(len), } } From bc7a7a6f2346860135350dbdf251a872cf8b50be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 09:11:14 +0100 Subject: [PATCH 06/17] Opts --- parquet/src/arrow/arrow_reader/statistics.rs | 42 ++++++++++++-------- parquet/src/file/page_index/column_index.rs | 2 + 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 4ad50af73779..f24bc550ca3f 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -44,7 +44,7 @@ use arrow_array::{ TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array, new_null_array, }; -use arrow_buffer::i256; +use arrow_buffer::{NullBufferBuilder, i256}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use half::f16; use paste::paste; @@ -1272,20 +1272,22 @@ where { let chunks: Vec<_> = iterator.collect(); let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum(); - let mut builder = UInt64Builder::with_capacity(total_capacity); + let mut values = Vec::with_capacity(total_capacity); + let mut nulls = NullBufferBuilder::new_with_len(total_capacity); for (len, index) in chunks { match index.null_counts() { Some(counts) => { - for &count in counts { - builder.append_value(count as u64); - } + values.extend(counts.iter().map(|&x| x as u64)); + nulls.append_n_non_nulls(len); } None => { - builder.append_nulls(len); + nulls.append_n_nulls(len); } } } - Ok(builder.finish()) + let null_buffer = nulls.build(); + let array = UInt64Array::new(values.into(), null_buffer); + Ok(array) } /// Extracts Parquet statistics as Arrow arrays @@ -1775,7 +1777,8 @@ impl<'a> StatisticsConverter<'a> { return Ok(None); }; - let mut row_count_total = Vec::new(); + let mut row_counts = Vec::new(); + let mut nulls = NullBufferBuilder::new_with_len(0); for rg_idx in row_group_indices { let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations(); @@ -1785,17 +1788,22 @@ impl<'a> StatisticsConverter<'a> { // append the last page row count let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows(); - let row_count_per_page = row_count_per_page - .chain(std::iter::once(Some( - *num_rows_in_row_group as u64 - - page_locations.last().unwrap().first_row_index as u64, - ))) - .collect::>(); - - row_count_total.extend(row_count_per_page); + let row_count_per_page = row_count_per_page.chain(std::iter::once(Some( + *num_rows_in_row_group as u64 + - page_locations.last().unwrap().first_row_index as u64, + ))); + + row_counts.extend(row_count_per_page.clone().map(|x| x.unwrap_or(0))); + for val in row_count_per_page { + if val.is_some() { + nulls.append_non_null(); + } else { + nulls.append_null(); + } + } } - Ok(Some(UInt64Array::from_iter(row_count_total))) + Ok(Some(UInt64Array::new(row_counts.into(), nulls.build()))) } /// Returns a null array of data_type with one element per row group diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index a41fefef2600..128b7df04b4f 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -195,6 +195,7 @@ impl PrimitiveColumnIndex { /// Returns the min value for the page indexed by `idx` /// /// It is `None` when all values are null + #[inline] pub fn min_value(&self, idx: usize) -> Option<&T> { if self.null_pages[idx] { None @@ -206,6 +207,7 @@ impl PrimitiveColumnIndex { /// Returns the max value for the page indexed by `idx` /// /// It is `None` when all values are null + #[inline] pub fn max_value(&self, idx: usize) -> Option<&T> { if self.null_pages[idx] { None From f9626514f7f85c7276c0440e9c9d0cfcc72b188f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 09:23:10 +0100 Subject: [PATCH 07/17] Fix --- parquet/src/arrow/arrow_reader/statistics.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index f24bc550ca3f..8c83aaca3d91 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1273,7 +1273,7 @@ where let chunks: Vec<_> = iterator.collect(); let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum(); let mut values = Vec::with_capacity(total_capacity); - let mut nulls = NullBufferBuilder::new_with_len(total_capacity); + let mut nulls = NullBufferBuilder::new(total_capacity); for (len, index) in chunks { match index.null_counts() { Some(counts) => { @@ -1726,10 +1726,7 @@ impl<'a> StatisticsConverter<'a> { { let Some(parquet_index) = self.parquet_column_index else { let num_row_groups = row_group_indices.into_iter().count(); - return Ok(UInt64Array::from_iter(std::iter::repeat_n( - None, - num_row_groups, - ))); + return Ok(UInt64Array::new_null(num_row_groups)); }; let iter = row_group_indices.into_iter().map(|rg_index| { @@ -1778,7 +1775,7 @@ impl<'a> StatisticsConverter<'a> { }; let mut row_counts = Vec::new(); - let mut nulls = NullBufferBuilder::new_with_len(0); + let mut nulls = NullBufferBuilder::new(0); for rg_idx in row_group_indices { let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations(); From 715668668fb2b2b5777036a45dcf7165d4afca25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 09:26:25 +0100 Subject: [PATCH 08/17] Fix --- parquet/src/arrow/arrow_reader/statistics.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 8c83aaca3d91..6717c6915b5b 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1281,6 +1281,7 @@ where nulls.append_n_non_nulls(len); } None => { + values.resize(values.len() + len, 0); nulls.append_n_nulls(len); } } From a95253646f901c4a53eb579b93df3f0f73099fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 10:21:12 +0100 Subject: [PATCH 09/17] Optimize --- arrow-array/src/builder/primitive_builder.rs | 19 ++ parquet/src/arrow/arrow_reader/statistics.rs | 253 ++++++++++--------- parquet/src/file/page_index/column_index.rs | 7 + 3 files changed, 165 insertions(+), 114 deletions(-) diff --git a/arrow-array/src/builder/primitive_builder.rs b/arrow-array/src/builder/primitive_builder.rs index 049cef241c83..b2e5ee3efd65 100644 --- a/arrow-array/src/builder/primitive_builder.rs +++ b/arrow-array/src/builder/primitive_builder.rs @@ -260,6 +260,25 @@ impl PrimitiveBuilder { self.values_builder.extend_from_slice(values); } + /// Appends values from a iter of type `T` and a validity boolean iter + /// + /// # Panics + /// + /// Panics if `values` and `is_valid` have different lengths + #[inline] + pub fn extend_from_iter_option>>(&mut self, iter: I) { + let iter = iter.into_iter(); + self.values_builder.extend(iter.map(|v| + match v { + Some(v) => {self.null_buffer_builder.append_non_null(); v}, + None => { + self.null_buffer_builder.append_null(); + T::Native::default() + } + } + )); + } + /// Appends array values and null to this builder as is /// (this means that underlying null values are copied as is). /// diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 6717c6915b5b..e7e7c7e44624 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -626,9 +626,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.and_then(|&x| u8::try_from(x).ok())); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| u8::try_from(x).ok())), + ); } _ => b.append_nulls(len), } @@ -640,9 +641,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.and_then(|&x| u16::try_from(x).ok())); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| u16::try_from(x).ok())), + ); } _ => b.append_nulls(len), } @@ -654,9 +656,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.map(|&x| x as u32)); - } + builder.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|&x| x as u32)), + ); } _ => builder.append_nulls(len), } @@ -668,9 +671,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.map(|&x| x as u64)); - } + builder.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|&x| x as u64)), + ); } _ => builder.append_nulls(len), } @@ -682,9 +686,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.and_then(|&x| i8::try_from(x).ok())); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| i8::try_from(x).ok())), + ); } _ => b.append_nulls(len), } @@ -696,9 +701,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.and_then(|&x| i16::try_from(x).ok())); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| i16::try_from(x).ok())), + ); } _ => b.append_nulls(len), } @@ -710,9 +716,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + builder.append_values(values, nulls); } _ => builder.append_nulls(len), } @@ -724,9 +730,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + builder.append_values(values, nulls); } _ => builder.append_nulls(len), } @@ -738,9 +744,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.and_then(|x| from_bytes_to_f16(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|x| from_bytes_to_f16(x))), + ); } _ => b.append_nulls(len), } @@ -752,9 +759,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::FLOAT(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.copied()); - } + builder.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => builder.append_nulls(len), } @@ -766,9 +774,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::DOUBLE(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + builder.append_values(values, nulls); } _ => builder.append_nulls(len), } @@ -853,9 +861,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } _ => b.append_nulls(len), } @@ -867,9 +875,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } _ => b.append_nulls(len), } @@ -881,9 +889,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } _ => b.append_nulls(len), } @@ -895,9 +903,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } _ => b.append_nulls(len), } @@ -911,9 +919,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + builder.append_values(values, nulls); } _ => builder.append_nulls(len), } @@ -925,9 +933,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)), + ); } _ => b.append_nulls(len), } @@ -939,9 +948,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - builder.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + builder.append_values(values, nulls); } _ => builder.append_nulls(len), } @@ -953,24 +962,27 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.and_then(|&x| i32::try_from(x).ok())); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| i32::try_from(x).ok())), + ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i32(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))), + ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i32(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))), + ); } _ => b.append_nulls(len), } @@ -982,24 +994,28 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| *x as i64)); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| *x as i64)), + ); } ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i64(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))), + ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i64(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))), + ); } _ => b.append_nulls(len), } @@ -1011,24 +1027,28 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| *x as i128)); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| *x as i128)), + ); } ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| *x as i128)); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| *x as i128)), + ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i128(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))), + ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i128(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))), + ); } _ => b.append_nulls(len), } @@ -1040,24 +1060,28 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| i256::from_i128(*x as i128))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| i256::from_i128(*x as i128))), + ); } ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| i256::from_i128(*x as i128))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| i256::from_i128(*x as i128))), + ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i256(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))), + ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.map(|x| from_bytes_to_i256(x.as_ref()))); - } + b.extend_from_iter_opt( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))), + ); } _ => b.append_nulls(len), } @@ -1071,9 +1095,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); + } _ => b.append_nulls(len), } @@ -1085,9 +1110,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } _ => b.append_nulls(len), } @@ -1106,9 +1131,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } _ => b.append_nulls(len), } @@ -1120,9 +1145,9 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - for val in index.[<$stat_type_prefix:lower _values_iter>]() { - b.append_option(val.copied()); - } + let values = index.[<$stat_type_prefix:lower _values>](); + let nulls = index.null_pages(); + b.append_values(values, nulls); } _ => b.append_nulls(len), } diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index 128b7df04b4f..8b8825c0857e 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -191,6 +191,12 @@ impl PrimitiveColumnIndex { } }) } + /// Returns the null pages. + /// + /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. + pub fn null_pages(&self) -> &[bool] { + &self.column_index.null_pages + } /// Returns the min value for the page indexed by `idx` /// @@ -598,6 +604,7 @@ impl ColumnIndexMetaData { } /// Returns whether the page indexed by `idx` consists of all null values + #[inline] pub fn is_null_page(&self, idx: usize) -> bool { colidx_enum_func!(self, is_null_page, idx) } From f57c82ee47122ac658c6ba38fb65edcb4512d142 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 10:21:43 +0100 Subject: [PATCH 10/17] Optimize --- arrow-array/src/builder/primitive_builder.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/arrow-array/src/builder/primitive_builder.rs b/arrow-array/src/builder/primitive_builder.rs index b2e5ee3efd65..fa8a8e4919d5 100644 --- a/arrow-array/src/builder/primitive_builder.rs +++ b/arrow-array/src/builder/primitive_builder.rs @@ -268,15 +268,16 @@ impl PrimitiveBuilder { #[inline] pub fn extend_from_iter_option>>(&mut self, iter: I) { let iter = iter.into_iter(); - self.values_builder.extend(iter.map(|v| - match v { - Some(v) => {self.null_buffer_builder.append_non_null(); v}, - None => { - self.null_buffer_builder.append_null(); - T::Native::default() - } + self.values_builder.extend(iter.map(|v| match v { + Some(v) => { + self.null_buffer_builder.append_non_null(); + v } - )); + None => { + self.null_buffer_builder.append_null(); + T::Native::default() + } + })); } /// Appends array values and null to this builder as is From c0c46ca6e803f641e76175f96422a7c0b4c2808f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 10:24:48 +0100 Subject: [PATCH 11/17] Optimize --- parquet/src/arrow/arrow_reader/statistics.rs | 48 ++++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index e7e7c7e44624..51137f7e7a83 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -626,7 +626,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.and_then(|&x| u8::try_from(x).ok())), ); @@ -641,7 +641,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.and_then(|&x| u16::try_from(x).ok())), ); @@ -656,7 +656,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - builder.extend_from_iter_opt( + builder.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|&x| x as u32)), ); @@ -671,7 +671,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - builder.extend_from_iter_opt( + builder.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|&x| x as u64)), ); @@ -686,7 +686,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.and_then(|&x| i8::try_from(x).ok())), ); @@ -701,7 +701,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.and_then(|&x| i16::try_from(x).ok())), ); @@ -744,7 +744,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.and_then(|x| from_bytes_to_f16(x))), ); @@ -759,7 +759,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::FLOAT(index) => { - builder.extend_from_iter_opt( + builder.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.copied()), ); @@ -933,7 +933,7 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)), ); @@ -967,19 +967,19 @@ macro_rules! get_data_page_statistics { b.append_values(values, nulls); } ColumnIndexMetaData::INT64(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.and_then(|&x| i32::try_from(x).ok())), ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))), ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))), ); @@ -994,25 +994,25 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| *x as i64)), ); } ColumnIndexMetaData::INT64(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.copied()), ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))), ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))), ); @@ -1027,25 +1027,25 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| *x as i128)), ); } ColumnIndexMetaData::INT64(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| *x as i128)), ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))), ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))), ); @@ -1060,25 +1060,25 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| i256::from_i128(*x as i128))), ); } ColumnIndexMetaData::INT64(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| i256::from_i128(*x as i128))), ); } ColumnIndexMetaData::BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))), ); } ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { - b.extend_from_iter_opt( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))), ); From 571b169bf2136b5c86acd807bff0e744cd5aae31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 10:28:49 +0100 Subject: [PATCH 12/17] Optimize --- parquet/src/file/page_index/column_index.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index 8b8825c0857e..a921029d4e27 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -392,11 +392,7 @@ impl ByteArrayColumnIndex { /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. pub fn min_values_iter(&self) -> impl Iterator> { (0..self.num_pages() as usize).map(|i| { - if self.is_null_page(i) { - None - } else { - self.min_value(i) - } + self.min_value(i) }) } @@ -405,11 +401,7 @@ impl ByteArrayColumnIndex { /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. pub fn max_values_iter(&self) -> impl Iterator> { (0..self.num_pages() as usize).map(|i| { - if self.is_null_page(i) { - None - } else { - self.max_value(i) - } + self.max_value(i) }) } } From 1f49cc748f12bf9d75131aa6cf4af359284d2fe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 11:29:19 +0100 Subject: [PATCH 13/17] Optimize --- parquet/src/arrow/arrow_reader/statistics.rs | 153 ++++++++++--------- parquet/src/file/page_index/column_index.rs | 10 +- 2 files changed, 86 insertions(+), 77 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 51137f7e7a83..a560e46aee94 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -652,34 +652,34 @@ macro_rules! get_data_page_statistics { Ok(Arc::new(b.finish())) }, DataType::UInt32 => { - let mut builder = UInt32Builder::with_capacity(capacity); + let mut b = UInt32Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - builder.extend_from_iter_option( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|&x| x as u32)), ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::UInt64 => { - let mut builder = UInt64Builder::with_capacity(capacity); + let mut b = UInt64Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - builder.extend_from_iter_option( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.map(|&x| x as u64)), ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Int8 => { let mut b = Int8Builder::with_capacity(capacity); @@ -712,32 +712,34 @@ macro_rules! get_data_page_statistics { Ok(Arc::new(b.finish())) }, DataType::Int32 => { - let mut builder = Int32Builder::with_capacity(capacity); + let mut b = Int32Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - builder.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Int64 => { - let mut builder = Int64Builder::with_capacity(capacity); + let mut b = Int64Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - builder.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Float16 => { let mut b = Float16Builder::with_capacity(capacity); @@ -755,33 +757,34 @@ macro_rules! get_data_page_statistics { Ok(Arc::new(b.finish())) }, DataType::Float32 => { - let mut builder = Float32Builder::with_capacity(capacity); + let mut b = Float32Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::FLOAT(index) => { - builder.extend_from_iter_option( + b.extend_from_iter_option( index.[<$stat_type_prefix:lower _values_iter>]() .map(|val| val.copied()), ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Float64 => { - let mut builder = Float64Builder::with_capacity(capacity); + let mut b = Float64Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::DOUBLE(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - builder.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Binary => { let mut b = BinaryBuilder::with_capacity(capacity, capacity * 10); @@ -861,9 +864,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } @@ -875,9 +879,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } @@ -889,9 +894,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } @@ -903,9 +909,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } @@ -915,18 +922,19 @@ macro_rules! get_data_page_statistics { } }, DataType::Date32 => { - let mut builder = Date32Builder::with_capacity(capacity); + let mut b = Date32Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - builder.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> { let mut b = Date64Builder::with_capacity(capacity); @@ -944,27 +952,29 @@ macro_rules! get_data_page_statistics { Ok(Arc::new(b.finish())) }, DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => { - let mut builder = Date64Builder::with_capacity(capacity); + let mut b = Date64Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - builder.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } - _ => builder.append_nulls(len), + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Decimal32(precision, scale) => { let mut b = Decimal32Builder::with_capacity(capacity); for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } ColumnIndexMetaData::INT64(index) => { b.extend_from_iter_option( @@ -1095,10 +1105,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); - + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } @@ -1110,9 +1120,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT32(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } @@ -1131,9 +1142,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } @@ -1145,9 +1157,10 @@ macro_rules! get_data_page_statistics { for (len, index) in chunks { match index { ColumnIndexMetaData::INT64(index) => { - let values = index.[<$stat_type_prefix:lower _values>](); - let nulls = index.null_pages(); - b.append_values(values, nulls); + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); } _ => b.append_nulls(len), } diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index a921029d4e27..ff30fe7cb09e 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -82,7 +82,7 @@ impl ColumnIndex { /// Returns whether the page indexed by `idx` consists of all null values pub fn is_null_page(&self, idx: usize) -> bool { - self.null_pages[idx] + !self.null_pages[idx] } } @@ -391,18 +391,14 @@ impl ByteArrayColumnIndex { /// /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. pub fn min_values_iter(&self) -> impl Iterator> { - (0..self.num_pages() as usize).map(|i| { - self.min_value(i) - }) + (0..self.num_pages() as usize).map(|i| self.min_value(i)) } /// Returns an iterator over the max values. /// /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. pub fn max_values_iter(&self) -> impl Iterator> { - (0..self.num_pages() as usize).map(|i| { - self.max_value(i) - }) + (0..self.num_pages() as usize).map(|i| self.max_value(i)) } } From 42b04ba87dfa318bba05cb1de2fdfadf1c9b057b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 11:36:56 +0100 Subject: [PATCH 14/17] Fix --- parquet/src/file/page_index/column_index.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index ff30fe7cb09e..0e84743927ee 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -82,7 +82,7 @@ impl ColumnIndex { /// Returns whether the page indexed by `idx` consists of all null values pub fn is_null_page(&self, idx: usize) -> bool { - !self.null_pages[idx] + self.null_pages[idx] } } From 4655af25f5f69c6b55482f92db56609851c6485f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 30 Jan 2026 16:21:37 +0100 Subject: [PATCH 15/17] Fix --- parquet/src/file/page_index/column_index.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index 0e84743927ee..d0050443323a 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -191,12 +191,6 @@ impl PrimitiveColumnIndex { } }) } - /// Returns the null pages. - /// - /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. - pub fn null_pages(&self) -> &[bool] { - &self.column_index.null_pages - } /// Returns the min value for the page indexed by `idx` /// From e833993517386885437a01b78c8a8d42e1b9c74b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 31 Jan 2026 13:05:39 +0100 Subject: [PATCH 16/17] Update parquet/src/arrow/arrow_reader/statistics.rs Co-authored-by: Andrew Lamb --- parquet/src/arrow/arrow_reader/statistics.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index a560e46aee94..19d3e34f5243 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1179,7 +1179,13 @@ macro_rules! get_data_page_statistics { ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { for val in index.[<$stat_type_prefix:lower _values_iter>]() { match val { - Some(v) => b.append_value(v.as_ref())?, + Some(v) => { + if v.len() == *size as usize { + let _ = b.append_value(v.as_ref())?; + } else { + b.append_null(); + } + } None => b.append_null(), } } From 36c391534ac84617741e0bf808c26a3a5c12b041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 31 Jan 2026 13:12:48 +0100 Subject: [PATCH 17/17] Fix comment --- arrow-array/src/builder/primitive_builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-array/src/builder/primitive_builder.rs b/arrow-array/src/builder/primitive_builder.rs index fa8a8e4919d5..dc96486bca6a 100644 --- a/arrow-array/src/builder/primitive_builder.rs +++ b/arrow-array/src/builder/primitive_builder.rs @@ -260,7 +260,7 @@ impl PrimitiveBuilder { self.values_builder.extend_from_slice(values); } - /// Appends values from a iter of type `T` and a validity boolean iter + /// Appends values from a iter of type `Option` /// /// # Panics ///