diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 387a0602a60..eb2d53fb7ad 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -309,6 +309,20 @@ where }); if let Some(rows) = rows { + // If there is a pending partial record from a previous page, + // count it before considering the whole-page skip. When the + // next page provides num_rows (e.g. a V2 data page or via + // offset index), its records are self-contained, so the + // partial from the previous page is complete at this boundary. + if let Some(decoder) = self.rep_level_decoder.as_mut() { + if decoder.flush_partial() { + remaining_records -= 1; + if remaining_records == 0 { + return Ok(num_records); + } + } + } + if rows <= remaining_records { self.page_reader.skip_next_page()?; remaining_records -= rows; @@ -1361,4 +1375,135 @@ mod tests { ); } } + + /// Regression test for + /// + /// Reproduces the production scenario: all DataPage v2 pages for a + /// list column (rep_level=1) read without an offset index (i.e. + /// `at_record_boundary` returns false for non-last pages). + /// + /// When a prior operation (here `skip_records(1)`) loads a v2 page, + /// and a subsequent `skip_records` exhausts the remaining levels on + /// that page, the rep level decoder is left with `has_partial=true`. + /// Because `has_record_delimiter` is false, the partial is not + /// flushed during level-based processing. When the next v2 page is + /// then peeked with `num_rows` available, the whole-page-skip + /// shortcut must flush the pending partial first. Otherwise: + /// + /// 1. The skip over-counts (skips N+1 records instead of N), and + /// 2. The stale `has_partial` causes a subsequent `read_records` to + /// produce a "phantom" record with 0 values. + #[test] + fn test_skip_records_v2_page_skip_accounts_for_partial() { + use crate::encodings::levels::LevelEncoder; + + let max_rep_level: i16 = 1; + let max_def_level: i16 = 1; + + // Column descriptor for a list element column (rep=1, def=1) + let primitive_type = SchemaType::primitive_type_builder("element", PhysicalType::INT32) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(); + let desc = Arc::new(ColumnDescriptor::new( + Arc::new(primitive_type), + max_def_level, + max_rep_level, + ColumnPath::new(vec!["list".to_string(), "element".to_string()]), + )); + + // Helper: build a DataPage v2 for this list column. + let make_v2_page = + |rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: u32| -> Page { + let mut rep_enc = LevelEncoder::v2(max_rep_level, rep_levels.len()); + rep_enc.put(rep_levels); + let rep_bytes = rep_enc.consume(); + + let mut def_enc = LevelEncoder::v2(max_def_level, def_levels.len()); + def_enc.put(def_levels); + let def_bytes = def_enc.consume(); + + let val_bytes: Vec = values.iter().flat_map(|v| v.to_le_bytes()).collect(); + + let mut buf = Vec::new(); + buf.extend_from_slice(&rep_bytes); + buf.extend_from_slice(&def_bytes); + buf.extend_from_slice(&val_bytes); + + Page::DataPageV2 { + buf: Bytes::from(buf), + num_values: rep_levels.len() as u32, + encoding: Encoding::PLAIN, + num_nulls: 0, + num_rows, + def_levels_byte_len: def_bytes.len() as u32, + rep_levels_byte_len: rep_bytes.len() as u32, + is_compressed: false, + statistics: None, + } + }; + + // All pages are DataPage v2 (matching the production scenario where + // parquet-rs writes only v2 data pages and no offset index is loaded, + // so at_record_boundary() returns false for non-last pages). + + // Page 1 (v2): 2 records × 2 elements = [10,20], [30,40] + let page1 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[10, 20, 30, 40], 2); + + // Page 2 (v2): 2 records × 2 elements = [50,60], [70,80] + let page2 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[50, 60, 70, 80], 2); + + // Page 3 (v2): 1 record × 2 elements = [90,100] + let page3 = make_v2_page(&[0, 1], &[1, 1], &[90, 100], 1); + + // 5 records total: [10,20], [30,40], [50,60], [70,80], [90,100] + let pages = VecDeque::from(vec![page1, page2, page3]); + let page_reader = InMemoryPageReader::new(pages); + let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader)); + let mut typed_reader = get_typed_column_reader::(column_reader); + + // Step 1 — skip 1 record: + // Peek page 1: num_rows=2, remaining=1 → rows(2) > remaining(1), + // so the page is LOADED (not whole-page-skipped). + // Level-based skip consumes rep levels [0,1] for record [10,20], + // stopping at the 0 that starts record [30,40]. + let skipped = typed_reader.skip_records(1).unwrap(); + assert_eq!(skipped, 1); + + // Step 2 — skip 2 more records ([30,40] and [50,60]): + // Mid-page in page 1 with 2 remaining levels [0,1] for [30,40]. + // skip_rep_levels(2, 2): the leading 0 does NOT act as a record + // delimiter (has_partial=false, idx==0), so count_records returns + // (true, 0, 2) — all levels consumed, has_partial=true, 0 records. + // + // has_record_delimiter is false → no flush at page boundary. + // Page 1 exhausted → peek page 2 (v2, num_rows=2). + // + // With fix: flush_partial → remaining 2→1, page 2 NOT skipped + // (rows=2 > remaining=1). Load page 2, skip 1 record [50,60]. + // + // Without fix: rows(2) <= remaining(2) → page 2 whole-page-skipped, + // over-counting by 1. has_partial stays true (stale from page 1). + let skipped = typed_reader.skip_records(2).unwrap(); + assert_eq!(skipped, 2); + + // Step 3 — read 1 record: + let mut values = Vec::new(); + let mut def_levels = Vec::new(); + let mut rep_levels = Vec::new(); + + let (records, values_read, levels_read) = typed_reader + .read_records(1, Some(&mut def_levels), Some(&mut rep_levels), &mut values) + .unwrap(); + + // Without the fix: (1, 0, 0) — phantom record from stale has_partial; + // the rep=0 on page 3 "completes" the phantom, yielding 0 values. + // With the fix: (1, 2, 2) — correctly reads record [70, 80]. + assert_eq!(records, 1, "should read exactly 1 record"); + assert_eq!(levels_read, 2, "should read 2 levels for the record"); + assert_eq!(values_read, 2, "should read 2 non-null values"); + assert_eq!(values, vec![70, 80], "should contain 4th record's values"); + assert_eq!(rep_levels, vec![0, 1], "rep levels for a 2-element list"); + assert_eq!(def_levels, vec![1, 1], "def levels (all non-null)"); + } } diff --git a/parquet/tests/arrow_reader/row_filter/sync.rs b/parquet/tests/arrow_reader/row_filter/sync.rs index e59fa392cfd..77a75220dc2 100644 --- a/parquet/tests/arrow_reader/row_filter/sync.rs +++ b/parquet/tests/arrow_reader/row_filter/sync.rs @@ -206,7 +206,6 @@ fn test_row_filter_full_page_skip_is_handled() { /// Without the fix, the list column over-skips by one record, causing /// struct children to disagree on record counts. #[test] -#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")] fn test_row_selection_list_column_v2_page_boundary_skip() { use arrow_array::builder::{Int32Builder, ListBuilder}; @@ -327,7 +326,6 @@ fn test_row_selection_list_column_v2_page_boundary_skip() { /// bug causes one leaf to over-skip by one record while the other stays /// correct. #[test] -#[should_panic(expected = "Not all children array length are the same!")] fn test_list_struct_page_boundary_desync_produces_length_mismatch() { use arrow_array::Array; use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};