Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,20 @@ where
});

if let Some(rows) = rows {
// If there is a pending partial record from a previous page,
// count it before considering the whole-page skip. When the
// next page provides num_rows (e.g. a V2 data page or via
// offset index), its records are self-contained, so the
// partial from the previous page is complete at this boundary.
if let Some(decoder) = self.rep_level_decoder.as_mut() {
if decoder.flush_partial() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is correct. If all pages are V2, then has_partial will never be true, because V2 pages must start on a new record (R=0). If all pages are V1 this will never trigger because num_rows will be None. This case only applies when switching from V1 to V2, in which case it's appropriate to call flush_partial because, as said above, V2 pages must start at a new record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that means that this bug requires a parquet file with a column chunk that has both V1 and V2 pages?

Copy link
Contributor

@alamb alamb Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made such a file (that triggers the error with a RowSelection): 🤯 #9395

remaining_records -= 1;
if remaining_records == 0 {
return Ok(num_records);
}
}
}

if rows <= remaining_records {
self.page_reader.skip_next_page()?;
remaining_records -= rows;
Expand Down Expand Up @@ -1361,4 +1375,135 @@ mod tests {
);
}
}

/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
///
/// Reproduces the production scenario: all DataPage v2 pages for a
/// list column (rep_level=1) read without an offset index (i.e.
/// `at_record_boundary` returns false for non-last pages).
///
/// When a prior operation (here `skip_records(1)`) loads a v2 page,
/// and a subsequent `skip_records` exhausts the remaining levels on
/// that page, the rep level decoder is left with `has_partial=true`.
/// Because `has_record_delimiter` is false, the partial is not
/// flushed during level-based processing. When the next v2 page is
/// then peeked with `num_rows` available, the whole-page-skip
/// shortcut must flush the pending partial first. Otherwise:
///
/// 1. The skip over-counts (skips N+1 records instead of N), and
/// 2. The stale `has_partial` causes a subsequent `read_records` to
/// produce a "phantom" record with 0 values.
#[test]
fn test_skip_records_v2_page_skip_accounts_for_partial() {
use crate::encodings::levels::LevelEncoder;

let max_rep_level: i16 = 1;
let max_def_level: i16 = 1;

// Column descriptor for a list element column (rep=1, def=1)
let primitive_type = SchemaType::primitive_type_builder("element", PhysicalType::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap();
let desc = Arc::new(ColumnDescriptor::new(
Arc::new(primitive_type),
max_def_level,
max_rep_level,
ColumnPath::new(vec!["list".to_string(), "element".to_string()]),
));

// Helper: build a DataPage v2 for this list column.
let make_v2_page =
|rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: u32| -> Page {
let mut rep_enc = LevelEncoder::v2(max_rep_level, rep_levels.len());
rep_enc.put(rep_levels);
let rep_bytes = rep_enc.consume();

let mut def_enc = LevelEncoder::v2(max_def_level, def_levels.len());
def_enc.put(def_levels);
let def_bytes = def_enc.consume();

let val_bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();

let mut buf = Vec::new();
buf.extend_from_slice(&rep_bytes);
buf.extend_from_slice(&def_bytes);
buf.extend_from_slice(&val_bytes);

Page::DataPageV2 {
buf: Bytes::from(buf),
num_values: rep_levels.len() as u32,
encoding: Encoding::PLAIN,
num_nulls: 0,
num_rows,
def_levels_byte_len: def_bytes.len() as u32,
rep_levels_byte_len: rep_bytes.len() as u32,
is_compressed: false,
statistics: None,
}
};

// All pages are DataPage v2 (matching the production scenario where
// parquet-rs writes only v2 data pages and no offset index is loaded,
// so at_record_boundary() returns false for non-last pages).

// Page 1 (v2): 2 records × 2 elements = [10,20], [30,40]
let page1 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[10, 20, 30, 40], 2);

// Page 2 (v2): 2 records × 2 elements = [50,60], [70,80]
let page2 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[50, 60, 70, 80], 2);

// Page 3 (v2): 1 record × 2 elements = [90,100]
let page3 = make_v2_page(&[0, 1], &[1, 1], &[90, 100], 1);

// 5 records total: [10,20], [30,40], [50,60], [70,80], [90,100]
let pages = VecDeque::from(vec![page1, page2, page3]);
let page_reader = InMemoryPageReader::new(pages);
let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
let mut typed_reader = get_typed_column_reader::<Int32Type>(column_reader);

// Step 1 — skip 1 record:
// Peek page 1: num_rows=2, remaining=1 → rows(2) > remaining(1),
// so the page is LOADED (not whole-page-skipped).
// Level-based skip consumes rep levels [0,1] for record [10,20],
// stopping at the 0 that starts record [30,40].
let skipped = typed_reader.skip_records(1).unwrap();
assert_eq!(skipped, 1);

// Step 2 — skip 2 more records ([30,40] and [50,60]):
// Mid-page in page 1 with 2 remaining levels [0,1] for [30,40].
// skip_rep_levels(2, 2): the leading 0 does NOT act as a record
// delimiter (has_partial=false, idx==0), so count_records returns
// (true, 0, 2) — all levels consumed, has_partial=true, 0 records.
//
// has_record_delimiter is false → no flush at page boundary.
// Page 1 exhausted → peek page 2 (v2, num_rows=2).
//
// With fix: flush_partial → remaining 2→1, page 2 NOT skipped
// (rows=2 > remaining=1). Load page 2, skip 1 record [50,60].
//
// Without fix: rows(2) <= remaining(2) → page 2 whole-page-skipped,
// over-counting by 1. has_partial stays true (stale from page 1).
let skipped = typed_reader.skip_records(2).unwrap();
assert_eq!(skipped, 2);

// Step 3 — read 1 record:
let mut values = Vec::new();
let mut def_levels = Vec::new();
let mut rep_levels = Vec::new();

let (records, values_read, levels_read) = typed_reader
.read_records(1, Some(&mut def_levels), Some(&mut rep_levels), &mut values)
.unwrap();

// Without the fix: (1, 0, 0) — phantom record from stale has_partial;
// the rep=0 on page 3 "completes" the phantom, yielding 0 values.
// With the fix: (1, 2, 2) — correctly reads record [70, 80].
assert_eq!(records, 1, "should read exactly 1 record");
assert_eq!(levels_read, 2, "should read 2 levels for the record");
assert_eq!(values_read, 2, "should read 2 non-null values");
assert_eq!(values, vec![70, 80], "should contain 4th record's values");
assert_eq!(rep_levels, vec![0, 1], "rep levels for a 2-element list");
assert_eq!(def_levels, vec![1, 1], "def levels (all non-null)");
}
}
2 changes: 0 additions & 2 deletions parquet/tests/arrow_reader/row_filter/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ fn test_row_filter_full_page_skip_is_handled() {
/// Without the fix, the list column over-skips by one record, causing
/// struct children to disagree on record counts.
#[test]
#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")]
fn test_row_selection_list_column_v2_page_boundary_skip() {
use arrow_array::builder::{Int32Builder, ListBuilder};

Expand Down Expand Up @@ -327,7 +326,6 @@ fn test_row_selection_list_column_v2_page_boundary_skip() {
/// bug causes one leaf to over-skip by one record while the other stays
/// correct.
#[test]
#[should_panic(expected = "Not all children array length are the same!")]
fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
use arrow_array::Array;
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};
Expand Down
Loading