diff --git a/arrow-array/src/builder/primitive_builder.rs b/arrow-array/src/builder/primitive_builder.rs index 049cef241c83..dc96486bca6a 100644 --- a/arrow-array/src/builder/primitive_builder.rs +++ b/arrow-array/src/builder/primitive_builder.rs @@ -260,6 +260,26 @@ impl PrimitiveBuilder { self.values_builder.extend_from_slice(values); } + /// Appends values from a iter of type `Option` + /// + /// # Panics + /// + /// Panics if `values` and `is_valid` have different lengths + #[inline] + pub fn extend_from_iter_option>>(&mut self, iter: I) { + let iter = iter.into_iter(); + self.values_builder.extend(iter.map(|v| match v { + Some(v) => { + self.null_buffer_builder.append_non_null(); + v + } + None => { + self.null_buffer_builder.append_null(); + T::Native::default() + } + })); + } + /// Appends array values and null to this builder as is /// (this means that underlying null values are copied as is). /// diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index 2f46c96be6b7..19d3e34f5243 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -22,15 +22,19 @@ use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::parquet_column; use crate::basic::Type as PhysicalType; -use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData}; -use crate::file::page_index::column_index::{ColumnIndexIterators, ColumnIndexMetaData}; +use crate::file::page_index::column_index::ColumnIndexMetaData; use crate::file::statistics::Statistics as ParquetStatistics; use crate::schema::types::SchemaDescriptor; use arrow_array::builder::{ - BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder, - StringViewBuilder, + BinaryBuilder, BinaryViewBuilder, BooleanBuilder, Date32Builder, Date64Builder, + Decimal32Builder, Decimal64Builder, FixedSizeBinaryBuilder, Float16Builder, Float32Builder, + Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, + LargeStringBuilder, StringBuilder, StringViewBuilder, Time32MillisecondBuilder, + Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder, + TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder, + TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder, }; use arrow_array::{ ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, Decimal64Array, @@ -38,9 +42,9 @@ use arrow_array::{ Int16Array, Int32Array, Int64Array, LargeBinaryArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt8Array, - UInt16Array, UInt32Array, UInt64Array, new_empty_array, new_null_array, + UInt16Array, UInt32Array, UInt64Array, new_null_array, }; -use arrow_buffer::i256; +use arrow_buffer::{NullBufferBuilder, i256}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use half::f16; use paste::paste; @@ -596,473 +600,641 @@ macro_rules! get_statistics { }}} } -macro_rules! make_data_page_stats_iterator { - ($iterator_type: ident, $func: ident, $stat_value_type: ty) => { - struct $iterator_type<'a, I> - where - I: Iterator, - { - iter: I, - } - - impl<'a, I> $iterator_type<'a, I> - where - I: Iterator, - { - fn new(iter: I) -> Self { - Self { iter } - } - } - - impl<'a, I> Iterator for $iterator_type<'a, I> - where - I: Iterator, - { - type Item = Vec>; - - fn next(&mut self) -> Option { - let next = self.iter.next(); - match next { - Some((len, index)) => match index { - // No matching `Index` found; - // thus no statistics that can be extracted. - // We return vec![None; len] to effectively - // create an arrow null-array with the length - // corresponding to the number of entries in - // `ParquetOffsetIndex` per row group per column. - ColumnIndexMetaData::NONE => Some(vec![None; len]), - _ => Some(<$stat_value_type>::$func(&index).collect::>()), - }, - _ => None, - } - } - - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } - } - }; -} - -make_data_page_stats_iterator!(MinBooleanDataPageStatsIterator, min_values_iter, bool); -make_data_page_stats_iterator!(MaxBooleanDataPageStatsIterator, max_values_iter, bool); -make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min_values_iter, i32); -make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max_values_iter, i32); -make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min_values_iter, i64); -make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max_values_iter, i64); -make_data_page_stats_iterator!( - MinFloat16DataPageStatsIterator, - min_values_iter, - FixedLenByteArray -); -make_data_page_stats_iterator!( - MaxFloat16DataPageStatsIterator, - max_values_iter, - FixedLenByteArray -); -make_data_page_stats_iterator!(MinFloat32DataPageStatsIterator, min_values_iter, f32); -make_data_page_stats_iterator!(MaxFloat32DataPageStatsIterator, max_values_iter, f32); -make_data_page_stats_iterator!(MinFloat64DataPageStatsIterator, min_values_iter, f64); -make_data_page_stats_iterator!(MaxFloat64DataPageStatsIterator, max_values_iter, f64); -make_data_page_stats_iterator!( - MinByteArrayDataPageStatsIterator, - min_values_iter, - ByteArray -); -make_data_page_stats_iterator!( - MaxByteArrayDataPageStatsIterator, - max_values_iter, - ByteArray -); -make_data_page_stats_iterator!( - MaxFixedLenByteArrayDataPageStatsIterator, - max_values_iter, - FixedLenByteArray -); - -make_data_page_stats_iterator!( - MinFixedLenByteArrayDataPageStatsIterator, - min_values_iter, - FixedLenByteArray -); - -macro_rules! get_decimal_page_stats_iterator { - ($iterator_type: ident, $func: ident, $stat_value_type: ident, $convert_func: ident) => { - struct $iterator_type<'a, I> - where - I: Iterator, - { - iter: I, - } - - impl<'a, I> $iterator_type<'a, I> - where - I: Iterator, - { - fn new(iter: I) -> Self { - Self { iter } - } - } - - impl<'a, I> Iterator for $iterator_type<'a, I> - where - I: Iterator, - { - type Item = Vec>; - - // Some(native_index.$func().map(|v| v.map($conv)).collect::>()) - fn next(&mut self) -> Option { - let next = self.iter.next(); - match next { - Some((len, index)) => match index { - ColumnIndexMetaData::INT32(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $stat_value_type::from(*x))) - .collect::>(), - ), - ColumnIndexMetaData::INT64(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $stat_value_type::try_from(*x).unwrap())) - .collect::>(), - ), - ColumnIndexMetaData::BYTE_ARRAY(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $convert_func(x))) - .collect::>(), - ), - ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(native_index) => Some( - native_index - .$func() - .map(|x| x.map(|x| $convert_func(x))) - .collect::>(), - ), - _ => Some(vec![None; len]), - }, - _ => None, - } - } - - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } - } - }; -} - -get_decimal_page_stats_iterator!( - MinDecimal32DataPageStatsIterator, - min_values_iter, - i32, - from_bytes_to_i32 -); - -get_decimal_page_stats_iterator!( - MaxDecimal32DataPageStatsIterator, - max_values_iter, - i32, - from_bytes_to_i32 -); - -get_decimal_page_stats_iterator!( - MinDecimal64DataPageStatsIterator, - min_values_iter, - i64, - from_bytes_to_i64 -); - -get_decimal_page_stats_iterator!( - MaxDecimal64DataPageStatsIterator, - max_values_iter, - i64, - from_bytes_to_i64 -); - -get_decimal_page_stats_iterator!( - MinDecimal128DataPageStatsIterator, - min_values_iter, - i128, - from_bytes_to_i128 -); - -get_decimal_page_stats_iterator!( - MaxDecimal128DataPageStatsIterator, - max_values_iter, - i128, - from_bytes_to_i128 -); - -get_decimal_page_stats_iterator!( - MinDecimal256DataPageStatsIterator, - min_values_iter, - i256, - from_bytes_to_i256 -); - -get_decimal_page_stats_iterator!( - MaxDecimal256DataPageStatsIterator, - max_values_iter, - i256, - from_bytes_to_i256 -); - macro_rules! get_data_page_statistics { ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => { - paste! { - match $data_type { + { + let chunks: Vec<(usize, &ColumnIndexMetaData)> = $iterator.collect(); + let capacity: usize = chunks.iter().map(|c| c.0).sum(); + paste! { + match $data_type { DataType::Boolean => { - let iterator = [<$stat_type_prefix BooleanDataPageStatsIterator>]::new($iterator); - let mut builder = BooleanBuilder::new(); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - builder.append_value(x); + let mut b = BooleanBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BOOLEAN(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.copied()); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::UInt8 => { + let mut b = UInt8Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| u8::try_from(x).ok())), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::UInt16 => { + let mut b = UInt16Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| u16::try_from(x).ok())), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::UInt32 => { + let mut b = UInt32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|&x| x as u32)), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::UInt64 => { + let mut b = UInt64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|&x| x as u64)), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Int8 => { + let mut b = Int8Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| i8::try_from(x).ok())), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Int16 => { + let mut b = Int16Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| i16::try_from(x).ok())), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Int32 => { + let mut b = Int32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Int64 => { + let mut b = Int64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Float16 => { + let mut b = Float16Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|x| from_bytes_to_f16(x))), + ); + } + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) + }, + DataType::Float32 => { + let mut b = Float32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::FLOAT(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Float64 => { + let mut b = Float64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::DOUBLE(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Binary => { + let mut b = BinaryBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| x.as_ref())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::LargeBinary => { + let mut b = LargeBinaryBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + b.append_option(val.map(|x| x.as_ref())); + } + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) }, - DataType::UInt8 => Ok(Arc::new( - UInt8Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| u8::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::UInt16 => Ok(Arc::new( - UInt16Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| u16::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::UInt32 => Ok(Arc::new( - UInt32Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| Some(x as u32)) - }) - }) - .flatten() - ))), - DataType::UInt64 => Ok(Arc::new( - UInt64Array::from_iter( - [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| Some(x as u64)) - }) - }) - .flatten() - ))), - DataType::Int8 => Ok(Arc::new( - Int8Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| i8::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::Int16 => Ok(Arc::new( - Int16Array::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| i16::try_from(x).ok()) - }) - }) - .flatten() - ) - )), - DataType::Int32 => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Int64 => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Float16 => Ok(Arc::new( - Float16Array::from_iter( - [<$stat_type_prefix Float16DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter().map(|x| { - x.and_then(|x| from_bytes_to_f16(x.data())) - }) - }) - .flatten() - ) - )), - DataType::Float32 => Ok(Arc::new(Float32Array::from_iter([<$stat_type_prefix Float32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Float64 => Ok(Arc::new(Float64Array::from_iter([<$stat_type_prefix Float64DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Binary => Ok(Arc::new(BinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), - DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter([<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator).flatten()))), DataType::Utf8 => { - let mut builder = StringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - builder.append_null(); - continue; - }; - - builder.append_value(x); + let mut b = StringBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(x) => match std::str::from_utf8(x.as_ref()) { + Ok(s) => b.append_value(s), + _ => b.append_null(), + } + None => b.append_null(), + } + } + } + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::LargeUtf8 => { - let mut builder = LargeStringBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - builder.append_null(); - continue; - }; - - builder.append_value(x); + let mut b = LargeStringBuilder::with_capacity(capacity, capacity * 10); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(x) => match std::str::from_utf8(x.as_ref()) { + Ok(s) => b.append_value(s), + _ => b.append_null(), + } + None => b.append_null(), + } + } + } + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Dictionary(_, value_type) => { - [<$stat_type_prefix:lower _ page_statistics>](value_type, $iterator, $physical_type) + [<$stat_type_prefix:lower _ page_statistics>](value_type, chunks.into_iter(), $physical_type) }, DataType::Timestamp(unit, timezone) => { - let iter = [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(); - Ok(match unit { - TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter(iter).with_timezone_opt(timezone.clone())), - }) + match unit { + TimeUnit::Second => { + let mut b = TimestampSecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + TimeUnit::Millisecond => { + let mut b = TimestampMillisecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + TimeUnit::Microsecond => { + let mut b = TimestampMicrosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + TimeUnit::Nanosecond => { + let mut b = TimestampNanosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish().with_timezone_opt(timezone.clone()))) + } + } + }, + DataType::Date32 => { + let mut b = Date32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> { + let mut b = Date64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|&x| (x as i64) * 24 * 60 * 60 * 1000)), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => { + let mut b = Date64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + }, + DataType::Decimal32(precision, scale) => { + let mut b = Decimal32Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.and_then(|&x| i32::try_from(x).ok())), + ); + } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))), + ); + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i32(x.as_ref()))), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) + }, + DataType::Decimal64(precision, scale) => { + let mut b = Decimal64Builder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| *x as i64)), + ); + } + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))), + ); + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i64(x.as_ref()))), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) + }, + DataType::Decimal128(precision, scale) => { + let mut b = Decimal128Array::builder(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| *x as i128)), + ); + } + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| *x as i128)), + ); + } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))), + ); + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i128(x.as_ref()))), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) + }, + DataType::Decimal256(precision, scale) => { + let mut b = Decimal256Array::builder(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| i256::from_i128(*x as i128))), + ); + } + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| i256::from_i128(*x as i128))), + ); + } + ColumnIndexMetaData::BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))), + ); + } + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.map(|x| from_bytes_to_i256(x.as_ref()))), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.with_precision_and_scale(*precision, *scale)?.finish())) }, - DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Date64 if $physical_type == Some(PhysicalType::INT32)=> Ok( - Arc::new( - Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator) - .map(|x| { - x.into_iter() - .map(|x| { - x.and_then(|x| i64::try_from(x).ok()) - }) - .map(|x| x.map(|x| x * 24 * 60 * 60 * 1000)) - }).flatten() - ) - ) - ), - DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))), - DataType::Decimal32(precision, scale) => Ok(Arc::new( - Decimal32Array::from_iter([<$stat_type_prefix Decimal32DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), - DataType::Decimal64(precision, scale) => Ok(Arc::new( - Decimal64Array::from_iter([<$stat_type_prefix Decimal64DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), - DataType::Decimal128(precision, scale) => Ok(Arc::new( - Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), - DataType::Decimal256(precision, scale) => Ok(Arc::new( - Decimal256Array::from_iter([<$stat_type_prefix Decimal256DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)), DataType::Time32(unit) => { - Ok(match unit { - TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(), - )), - TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( - [<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten(), - )), + match unit { + TimeUnit::Second => { + let mut b = Time32SecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } + TimeUnit::Millisecond => { + let mut b = Time32MillisecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT32(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } _ => { - // don't know how to extract statistics, so return an empty array - new_empty_array(&DataType::Time32(unit.clone())) + Ok(new_null_array($data_type, capacity)) } - }) + } } DataType::Time64(unit) => { - Ok(match unit { - TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( - [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(), - )), - TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( - [<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten(), - )), + match unit { + TimeUnit::Microsecond => { + let mut b = Time64MicrosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } + TimeUnit::Nanosecond => { + let mut b = Time64NanosecondBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::INT64(index) => { + b.extend_from_iter_option( + index.[<$stat_type_prefix:lower _values_iter>]() + .map(|val| val.copied()), + ); + } + _ => b.append_nulls(len), + } + } + Ok(Arc::new(b.finish())) + } _ => { - // don't know how to extract statistics, so return an empty array - new_empty_array(&DataType::Time64(unit.clone())) + Ok(new_null_array($data_type, capacity)) } - }) + } }, DataType::FixedSizeBinary(size) => { - let mut builder = FixedSizeBinaryBuilder::new(*size); - let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - if x.len() == *size as usize { - let _ = builder.append_value(x.data()); - } else { - builder.append_null(); + let mut b = FixedSizeBinaryBuilder::with_capacity(capacity, *size); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(v) => { + if v.len() == *size as usize { + let _ = b.append_value(v.as_ref())?; + } else { + b.append_null(); + } + } + None => b.append_null(), + } + } } + _ => b.append_nulls(len), } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Utf8View => { - let mut builder = StringViewBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - let Ok(x) = std::str::from_utf8(x.data()) else { - builder.append_null(); - continue; - }; - - builder.append_value(x); + let mut b = StringViewBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(x) => match std::str::from_utf8(x.as_ref()) { + Ok(s) => b.append_value(s), + _ => b.append_null(), + } + None => b.append_null(), + } + } + } + _ => { + for _ in 0..len { b.append_null(); } + } } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::BinaryView => { - let mut builder = BinaryViewBuilder::new(); - let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator); - for x in iterator { - for x in x.into_iter() { - let Some(x) = x else { - builder.append_null(); // no statistics value - continue; - }; - - builder.append_value(x); + let mut b = BinaryViewBuilder::with_capacity(capacity); + for (len, index) in chunks { + match index { + ColumnIndexMetaData::BYTE_ARRAY(index) => { + for val in index.[<$stat_type_prefix:lower _values_iter>]() { + match val { + Some(v) => b.append_value(v.as_ref()), + None => b.append_null(), + } + } + } + _ => { + for _ in 0..len { b.append_null(); } + } } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(b.finish())) }, DataType::Date64 | // required to cover $physical_type match guard DataType::Null | @@ -1077,12 +1249,12 @@ macro_rules! get_data_page_statistics { DataType::Union(_, _) | DataType::Map(_, _) | DataType::RunEndEncoded(_, _) => { - let len = $iterator.count(); // don't know how to extract statistics, so return a null array - Ok(new_null_array($data_type, len)) + Ok(new_null_array($data_type, capacity)) }, } } + } } } /// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an @@ -1142,14 +1314,25 @@ pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result, { - let iter = iterator.flat_map(|(len, index)| match index { - ColumnIndexMetaData::NONE => vec![None; len], - column_index => column_index.null_counts().map_or(vec![None; len], |v| { - v.iter().map(|i| Some(*i as u64)).collect::>() - }), - }); - - Ok(UInt64Array::from_iter(iter)) + let chunks: Vec<_> = iterator.collect(); + let total_capacity: usize = chunks.iter().map(|(len, _)| *len).sum(); + let mut values = Vec::with_capacity(total_capacity); + let mut nulls = NullBufferBuilder::new(total_capacity); + for (len, index) in chunks { + match index.null_counts() { + Some(counts) => { + values.extend(counts.iter().map(|&x| x as u64)); + nulls.append_n_non_nulls(len); + } + None => { + values.resize(values.len() + len, 0); + nulls.append_n_nulls(len); + } + } + } + let null_buffer = nulls.build(); + let array = UInt64Array::new(values.into(), null_buffer); + Ok(array) } /// Extracts Parquet statistics as Arrow arrays @@ -1588,10 +1771,7 @@ impl<'a> StatisticsConverter<'a> { { let Some(parquet_index) = self.parquet_column_index else { let num_row_groups = row_group_indices.into_iter().count(); - return Ok(UInt64Array::from_iter(std::iter::repeat_n( - None, - num_row_groups, - ))); + return Ok(UInt64Array::new_null(num_row_groups)); }; let iter = row_group_indices.into_iter().map(|rg_index| { @@ -1639,7 +1819,8 @@ impl<'a> StatisticsConverter<'a> { return Ok(None); }; - let mut row_count_total = Vec::new(); + let mut row_counts = Vec::new(); + let mut nulls = NullBufferBuilder::new(0); for rg_idx in row_group_indices { let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations(); @@ -1649,17 +1830,22 @@ impl<'a> StatisticsConverter<'a> { // append the last page row count let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows(); - let row_count_per_page = row_count_per_page - .chain(std::iter::once(Some( - *num_rows_in_row_group as u64 - - page_locations.last().unwrap().first_row_index as u64, - ))) - .collect::>(); - - row_count_total.extend(row_count_per_page); + let row_count_per_page = row_count_per_page.chain(std::iter::once(Some( + *num_rows_in_row_group as u64 + - page_locations.last().unwrap().first_row_index as u64, + ))); + + row_counts.extend(row_count_per_page.clone().map(|x| x.unwrap_or(0))); + for val in row_count_per_page { + if val.is_some() { + nulls.append_non_null(); + } else { + nulls.append_null(); + } + } } - Ok(Some(UInt64Array::from_iter(row_count_total))) + Ok(Some(UInt64Array::new(row_counts.into(), nulls.build()))) } /// Returns a null array of data_type with one element per row group diff --git a/parquet/src/file/page_index/column_index.rs b/parquet/src/file/page_index/column_index.rs index a41fefef2600..d0050443323a 100644 --- a/parquet/src/file/page_index/column_index.rs +++ b/parquet/src/file/page_index/column_index.rs @@ -195,6 +195,7 @@ impl PrimitiveColumnIndex { /// Returns the min value for the page indexed by `idx` /// /// It is `None` when all values are null + #[inline] pub fn min_value(&self, idx: usize) -> Option<&T> { if self.null_pages[idx] { None @@ -206,6 +207,7 @@ impl PrimitiveColumnIndex { /// Returns the max value for the page indexed by `idx` /// /// It is `None` when all values are null + #[inline] pub fn max_value(&self, idx: usize) -> Option<&T> { if self.null_pages[idx] { None @@ -383,26 +385,14 @@ impl ByteArrayColumnIndex { /// /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. pub fn min_values_iter(&self) -> impl Iterator> { - (0..self.num_pages() as usize).map(|i| { - if self.is_null_page(i) { - None - } else { - self.min_value(i) - } - }) + (0..self.num_pages() as usize).map(|i| self.min_value(i)) } /// Returns an iterator over the max values. /// /// Values may be `None` when [`ColumnIndex::is_null_page()`] is `true`. pub fn max_values_iter(&self) -> impl Iterator> { - (0..self.num_pages() as usize).map(|i| { - if self.is_null_page(i) { - None - } else { - self.max_value(i) - } - }) + (0..self.num_pages() as usize).map(|i| self.max_value(i)) } } @@ -596,6 +586,7 @@ impl ColumnIndexMetaData { } /// Returns whether the page indexed by `idx` consists of all null values + #[inline] pub fn is_null_page(&self, idx: usize) -> bool { colidx_enum_func!(self, is_null_page, idx) }