From 58c647d73b072794ac863d0a34f78462a3c32af1 Mon Sep 17 00:00:00 2001 From: voon Date: Thu, 14 May 2026 11:08:16 +0800 Subject: [PATCH 1/3] test(spark): Add merge-algorithm and I/O-count tests for read_blob batching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Existing tests for the BatchedBlobReader (PR #18098) only assert byte correctness of the data returned, so they cannot detect a regression that causes the batching optimization to stop reducing I/O. Add two test classes that close that gap: - TestBatchedBlobReaderMerge: direct unit tests on mergeRanges and identifyConsecutiveRanges (newly package-private), asserting merged range counts, gap-threshold boundaries, multi-file grouping, sort, index preservation, and overlap rejection. No Spark, no I/O. - TestBatchedBlobReaderIO: integration tests that drive processPartition with a CountingHoodieStorage wrapper around a real storage, asserting openSeekable/seek counts across four scenarios — many blobs in one file, contiguous zero-gap blobs, threshold-controlled small/large gaps including the inclusive boundary, and multi-file queries (per-file batching, mixed gap patterns, interleaved input order). --- .../sql/hudi/blob/BatchedBlobReader.scala | 4 +- .../sql/hudi/blob/CountingHoodieStorage.scala | 121 +++++++++ .../hudi/blob/TestBatchedBlobReaderIO.scala | 237 ++++++++++++++++++ .../blob/TestBatchedBlobReaderMerge.scala | 183 ++++++++++++++ 4 files changed, 543 insertions(+), 2 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala index 0328cdd0c5c22..a1c299cf2611d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala @@ -274,7 +274,7 @@ class BatchedBlobReader( * @param rows Sequence of row information * @return Sequence of merged ranges */ - private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + private[blob] def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { // Group by file path val byFile = rows.groupBy(_.filePath) @@ -299,7 +299,7 @@ class BatchedBlobReader( * @param maxGap Maximum gap to consider for merging * @return Sequence of merged ranges */ - private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + private[blob] def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { val result = ArrayBuffer[MergedRange[R]]() var currentFilePath: String = null diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala new file mode 100644 index 0000000000000..79399b52fa22e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath, StoragePathFilter, StoragePathInfo} + +import java.io.{InputStream, OutputStream} +import java.net.URI +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicInteger + +/** + * Test-only {@link HoodieStorage} decorator that counts I/O operations performed by + * batched blob reads. Delegates every method to an underlying storage, but tracks: + * + * - openSeekableCount — number of times openSeekable was called + * - seekCount — number of times seek was called on a returned SeekableDataInputStream + * + * Use this wrapper in tests to assert that batching reduces the number of physical + * I/O operations versus a non-batching configuration. Counters are reset via + * {@link #reset()}. + */ +class CountingHoodieStorage(delegate: HoodieStorage) extends HoodieStorage(delegate.getConf) { + + val openSeekableCount = new AtomicInteger(0) + val seekCount = new AtomicInteger(0) + + def reset(): Unit = { + openSeekableCount.set(0) + seekCount.set(0) + } + + override def openSeekable(path: StoragePath, bufferSize: Int, wrapStream: Boolean): SeekableDataInputStream = { + openSeekableCount.incrementAndGet() + val inner = delegate.openSeekable(path, bufferSize, wrapStream) + new CountingSeekableDataInputStream(inner, seekCount) + } + + // --- pass-through delegation ------------------------------------------------- + + override def newInstance(path: StoragePath, storageConf: StorageConfiguration[_]): HoodieStorage = + new CountingHoodieStorage(delegate.newInstance(path, storageConf)) + + override def getScheme: String = delegate.getScheme + override def getDefaultBlockSize(path: StoragePath): Int = delegate.getDefaultBlockSize(path) + override def getDefaultBufferSize: Int = delegate.getDefaultBufferSize + override def getDefaultReplication(path: StoragePath): Short = delegate.getDefaultReplication(path) + override def getUri: URI = delegate.getUri + + override def create(path: StoragePath, overwrite: Boolean): OutputStream = + delegate.create(path, overwrite) + override def create(path: StoragePath, overwrite: Boolean, bufferSize: Integer, replication: java.lang.Short, sizeThreshold: java.lang.Long): OutputStream = + delegate.create(path, overwrite, bufferSize, replication, sizeThreshold) + + override def open(path: StoragePath): InputStream = delegate.open(path) + override def append(path: StoragePath): OutputStream = delegate.append(path) + + override def exists(path: StoragePath): Boolean = delegate.exists(path) + override def getPathInfo(path: StoragePath): StoragePathInfo = delegate.getPathInfo(path) + override def createDirectory(path: StoragePath): Boolean = delegate.createDirectory(path) + + override def listDirectEntries(path: StoragePath): JList[StoragePathInfo] = delegate.listDirectEntries(path) + override def listFiles(path: StoragePath): JList[StoragePathInfo] = delegate.listFiles(path) + override def listDirectEntries(path: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = + delegate.listDirectEntries(path, filter) + + override def setModificationTime(path: StoragePath, modificationTimeInMillisEpoch: Long): Unit = + delegate.setModificationTime(path, modificationTimeInMillisEpoch) + + override def globEntries(pathPattern: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = + delegate.globEntries(pathPattern, filter) + + override def rename(oldPath: StoragePath, newPath: StoragePath): Boolean = + delegate.rename(oldPath, newPath) + + override def deleteDirectory(path: StoragePath): Boolean = delegate.deleteDirectory(path) + override def deleteFile(path: StoragePath): Boolean = delegate.deleteFile(path) + + override def getFileSystem: AnyRef = delegate.getFileSystem + override def getRawStorage: HoodieStorage = delegate.getRawStorage + + override def close(): Unit = delegate.close() +} + +/** + * Counting wrapper around a {@link SeekableDataInputStream}. Counts seeks; final + * readFully methods on DataInputStream delegate to the underlying stream's + * read(...) which the wrapped stream services directly. + */ +private class CountingSeekableDataInputStream( + delegate: SeekableDataInputStream, + seekCount: AtomicInteger) + extends SeekableDataInputStream(delegate) { + + override def seek(pos: Long): Unit = { + seekCount.incrementAndGet() + delegate.seek(pos) + } + + override def getPos: Long = delegate.getPos + + override def close(): Unit = delegate.close() +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala new file mode 100644 index 0000000000000..d9bd9ad953cfe --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils} +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{BinaryType, StructField, StructType} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * Integration tests asserting that batched blob reads actually reduce storage I/O. + * + * Each scenario drives BatchedBlobReader.processPartition directly with a + * CountingHoodieStorage wrapper around a real local-FS storage, then asserts + * the resulting openSeekable / seek call counts. Where possible, the same input + * is re-run with a non-batching configuration (maxGapBytes = 0) to establish + * a baseline and prove the reduction. + * + * Lives in the same package as BatchedBlobReader to access the package-private + * RowAccessor / RowBuilder implicits. + */ +class TestBatchedBlobReaderIO extends HoodieClientTestBase { + + private val DEFAULT_LOOKAHEAD = 50 + + /** Build rows by routing through Spark so the nested blob struct is encoded correctly. */ + private def buildRows(entries: Seq[(String, Long, Long)]): (Array[Row], StructType) = { + val df = sparkSession.createDataFrame(entries).toDF("external_path", "offset", "length") + .withColumn("data", blobStructCol("data", col("external_path"), col("offset"), col("length"))) + .select("data") + (df.collect(), df.schema) + } + + /** Run processPartition with the counting wrapper and drain the output iterator. */ + private def runWithCounting( + rows: Array[Row], + inputSchema: StructType, + maxGap: Int, + lookahead: Int = DEFAULT_LOOKAHEAD): CountingHoodieStorage = { + val raw: HoodieStorage = HoodieStorageUtils.getStorage(storageConf) + val counting = new CountingHoodieStorage(raw) + try { + val outputSchema = inputSchema.add(StructField(BatchedBlobReader.DATA_COL, BinaryType, nullable = true)) + val reader = new BatchedBlobReader(counting, maxGap, lookahead) + import RowAccessor.rowAccessor + import RowBuilder.rowBuilder + val out = reader.processPartition[Row](rows.iterator, structColIdx = 0, outputSchema) + // Force materialization (the underlying reads happen here). + while (out.hasNext) { + val r = out.next() + // Touch the data column so any lazy work completes. + r.getAs[Array[Byte]](BatchedBlobReader.DATA_COL) + } + counting + } finally { + counting.close() + } + } + + // --- Scenario (1): Multiple blobs in the same out-of-line file -------------- + + @Test + def testManyBlobsInSingleFileBatchable(): Unit = { + val filePath = createTestFile(tempDir, "many-batch.bin", 50000) + val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(1, counters.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") + assertEquals(1, counters.seekCount.get, "...and one seek") + } + + @Test + def testManyBlobsInSingleFileNoBatching(): Unit = { + val filePath = createTestFile(tempDir, "many-nobatch.bin", 50000) + val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 0) + assertEquals(20, counters.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") + assertEquals(20, counters.seekCount.get) + } + + // --- Scenario (2): Contiguous out-of-line blobs (zero gap) ------------------ + + @Test + def testContiguousBlobsMergeWithZeroGap(): Unit = { + val filePath = createTestFile(tempDir, "contig.bin", 1000) + val entries = (0 until 6).map(i => (filePath, (i * 100).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + // Zero gap, zero threshold -> gap (0) <= maxGap (0) -> all merge + val counters = runWithCounting(rows, schema, maxGap = 0) + assertEquals(1, counters.openSeekableCount.get, "Six contiguous blobs should merge into one read") + assertEquals(1, counters.seekCount.get) + } + + // --- Scenario (3): Out-of-line blobs with gaps (threshold-controlled) ------- + + @Test + def testSmallGapsWithinThresholdBatch(): Unit = { + val filePath = createTestFile(tempDir, "small-gaps.bin", 10000) + val entries = Seq( + (filePath, 0L, 100L), + (filePath, 120L, 100L), + (filePath, 240L, 100L), + (filePath, 360L, 100L), + (filePath, 480L, 100L), + (filePath, 600L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(1, counters.openSeekableCount.get) + assertEquals(1, counters.seekCount.get) + } + + @Test + def testLargeGapsBeyondThresholdDoNotBatch(): Unit = { + val filePath = createTestFile(tempDir, "large-gaps.bin", 50000) + val entries = Seq( + (filePath, 0L, 100L), + (filePath, 8192L, 100L), + (filePath, 16384L, 100L), + (filePath, 24576L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 1024) + assertEquals(4, counters.openSeekableCount.get, "Gaps above threshold must not batch") + assertEquals(4, counters.seekCount.get) + } + + @Test + def testThresholdBoundary(): Unit = { + val filePath = createTestFile(tempDir, "boundary.bin", 5000) + // Gap between consecutive blobs = exactly 1024 bytes: + // [0,100), [1124,100), [2248,100), [3372,100) + val entries = (0 until 4).map(i => (filePath, (i * 1124).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + val inclusive = runWithCounting(rows, schema, maxGap = 1024) + assertEquals(1, inclusive.openSeekableCount.get, "gap == maxGap is inclusive -> merge") + + val exclusive = runWithCounting(rows, schema, maxGap = 1023) + assertEquals(4, exclusive.openSeekableCount.get, "gap > maxGap -> no merge") + } + + @Test + def testMixedSmallAndLargeGapsInOneFile(): Unit = { + val filePath = createTestFile(tempDir, "mixed-gaps.bin", 20000) + // Two batchable groups separated by a big gap. + val entries = Seq( + (filePath, 0L, 100L), + (filePath, 200L, 100L), + (filePath, 400L, 100L), + // Large jump: gap = 10000 - 500 = 9500 > 4096 + (filePath, 10000L, 100L), + (filePath, 10200L, 100L), + (filePath, 10400L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(2, counters.openSeekableCount.get, "Two merged ranges -> two opens") + assertEquals(2, counters.seekCount.get) + } + + // --- Scenario (4): Blobs from multiple files in the same query -------------- + + @Test + def testBlobsFromMultipleFilesBatchedPerFile(): Unit = { + val files = (0 until 3).map(i => createTestFile(tempDir, s"multi-$i.bin", 1000)) + val entries = files.flatMap { f => + (0 until 4).map(i => (f, (i * 100).toLong, 100L)) + } + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(3, counters.openSeekableCount.get, "One batched open per file") + assertEquals(3, counters.seekCount.get) + } + + @Test + def testMultipleFilesWithMixedGapPatterns(): Unit = { + val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous + val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps + val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps + + val entries = + (0 until 4).map(i => (fileA, (i * 100).toLong, 100L)) ++ // contiguous -> 1 merged + (0 until 4).map(i => (fileB, (i * 150).toLong, 100L)) ++ // 50-byte gaps -> 1 merged + (0 until 4).map(i => (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 1024) + assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges") + assertEquals(6, counters.seekCount.get) + } + + @Test + def testInterleavedFileOrderStillBatchesPerFile(): Unit = { + val fileA = createTestFile(tempDir, "interA.bin", 1000) + val fileB = createTestFile(tempDir, "interB.bin", 1000) + val entries = Seq( + (fileA, 0L, 100L), + (fileB, 0L, 100L), + (fileA, 100L, 100L), + (fileB, 100L, 100L), + (fileA, 200L, 100L), + (fileB, 200L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(2, counters.openSeekableCount.get, "Interleaved input still groups per file") + assertEquals(2, counters.seekCount.get) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala new file mode 100644 index 0000000000000..73813e9de33e3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * Direct unit tests for the merge algorithm in {@link BatchedBlobReader}. + * + * These tests bypass Spark and storage entirely: they construct a reader with + * a null storage (the merge methods do not touch I/O) and call mergeRanges / + * identifyConsecutiveRanges with crafted RowInfo inputs to assert the + * structure of the merged output (counts, boundaries, ordering, grouping). + * + * This complements TestBatchedBlobReader (which verifies data correctness end-to-end) + * by proving the batching algorithm produces the expected merged ranges for + * each input pattern. + */ +class TestBatchedBlobReaderMerge { + + private def reader(maxGapBytes: Int = 4096) = + new BatchedBlobReader(storage = null, maxGapBytes = maxGapBytes, lookaheadRows = 50) + + private def row(filePath: String, offset: Long, length: Long, index: Long = 0L): RowInfo[Row] = + RowInfo[Row]( + originalRow = null.asInstanceOf[Row], + filePath = filePath, + offset = offset, + length = length, + index = index) + + @Test + def testEmptyInputProducesNoRanges(): Unit = { + val merged = reader().mergeRanges(Seq.empty[RowInfo[Row]], maxGap = 100) + assertTrue(merged.isEmpty) + } + + @Test + def testSingleRowProducesSingleRange(): Unit = { + val merged = reader().mergeRanges(Seq(row("/f", 1000, 200, index = 7)), maxGap = 100) + assertEquals(1, merged.size) + val r = merged.head + assertEquals("/f", r.filePath) + assertEquals(1000L, r.startOffset) + assertEquals(1200L, r.endOffset) + assertEquals(1, r.rows.size) + assertEquals(7L, r.rows.head.index) + } + + @Test + def testContiguousRangesMergeIntoOne(): Unit = { + // Zero-gap reads: [0,100), [100,100), [200,100) — gap == 0 <= maxGap == 0 → merge + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1), + row("/f", 200, 100, index = 2)) + val merged = reader().mergeRanges(rows, maxGap = 0) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + assertEquals(3, merged.head.rows.size) + } + + @Test + def testGapEqualToThresholdMerges(): Unit = { + // Gaps of exactly maxGap → inclusive boundary, merge into one + val rows = Seq( + row("/f", 0, 100), + row("/f", 1124, 100), // gap = 1024 == maxGap + row("/f", 2248, 100)) // gap = 1024 == maxGap + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(2348L, merged.head.endOffset) + } + + @Test + def testGapOneOverThresholdSplits(): Unit = { + // Gap of maxGap + 1 → split + val rows = Seq( + row("/f", 0, 100), + row("/f", 1125, 100)) // gap = 1025 > maxGap = 1024 + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(2, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(100L, merged.head.endOffset) + assertEquals(1125L, merged(1).startOffset) + assertEquals(1225L, merged(1).endOffset) + } + + @Test + def testLargeGapsProduceSeparateRanges(): Unit = { + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 10000, 100, index = 1), + row("/f", 20000, 100, index = 2), + row("/f", 30000, 100, index = 3)) + val merged = reader().mergeRanges(rows, maxGap = 1000) + assertEquals(4, merged.size) + merged.foreach(r => assertEquals(1, r.rows.size)) + } + + @Test + def testUnsortedInputIsSortedBeforeMerge(): Unit = { + // identifyConsecutiveRanges sorts by offset within each file + val rows = Seq( + row("/f", 200, 100, index = 2), + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1)) + val merged = reader(maxGapBytes = 0).identifyConsecutiveRanges(rows) + assertEquals(1, merged.size, "Single merged range expected after sort + merge") + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + // Internal rows in the merged range follow sorted order + val offsetsInMerged = merged.head.rows.map(_.offset) + assertEquals(Seq(0L, 100L, 200L), offsetsInMerged) + } + + @Test + def testMultipleFilesProduceRangePerFile(): Unit = { + val rows = Seq( + row("/a", 0, 100, index = 0), + row("/b", 0, 100, index = 1), + row("/a", 100, 100, index = 2), + row("/b", 100, 100, index = 3)) + val merged = reader(maxGapBytes = 4096).identifyConsecutiveRanges(rows) + assertEquals(2, merged.size, "One merged range per file") + val byFile = merged.groupBy(_.filePath) + assertEquals(Set("/a", "/b"), byFile.keySet) + assertEquals(1, byFile("/a").size) + assertEquals(1, byFile("/b").size) + // Each file's merged range covers offsets 0..200 + byFile.values.foreach { rs => + assertEquals(0L, rs.head.startOffset) + assertEquals(200L, rs.head.endOffset) + assertEquals(2, rs.head.rows.size) + } + } + + @Test + def testRowIndicesPreservedInMergedRange(): Unit = { + // Indices retained so the downstream sort-by-index can reconstruct input order + val rows = Seq( + row("/f", 0, 100, index = 42), + row("/f", 100, 100, index = 7), + row("/f", 200, 100, index = 19)) + val merged = reader().mergeRanges(rows, maxGap = 4096) + assertEquals(1, merged.size) + val indices = merged.head.rows.map(_.index) + assertEquals(Seq(42L, 7L, 19L), indices) + } + + @Test + def testOverlappingRangesThrow(): Unit = { + // [0,100) followed by [50,100) -> overlap + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 50, 100, index = 1)) + val ex = assertThrows( + classOf[IllegalArgumentException], + () => reader().mergeRanges(rows, maxGap = 4096)) + assertTrue(ex.getMessage.contains("Overlapping blob ranges detected")) + } +} From 895845e1b19173b98e3cef14c0ca0a2675a4c969 Mon Sep 17 00:00:00 2001 From: voon Date: Thu, 14 May 2026 18:47:37 +0800 Subject: [PATCH 2/3] test(spark): Allow org.apache.spark.sql.hudi.blob test package TestSparkSqlHudiPackageStructure rejects Scala test classes under org.apache.spark.sql.hudi.* that are not in the curated allow-list, which is mirrored in two CI configs that drive the wildcard suite filter. The new BatchedBlobReader merge/IO tests live in the .blob package so they can reach private[blob] helpers (mergeRanges, identifyConsecutiveRanges, RowAccessor) without widening internal visibility - matching the same-package pattern already used for the .analysis, .catalog, and .command production sub-packages. Add 'org.apache.spark.sql.hudi.blob' to all three places: - ALLOWED_PACKAGES in TestSparkSqlHudiPackageStructure - job6HudiSparkDdlOthersWildcardSuites in azure-pipelines-20230430.yml - SCALA_TEST_OTHERS_FILTER in .github/workflows/bot.yml --- .github/workflows/bot.yml | 2 +- azure-pipelines-20230430.yml | 1 + .../java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 2143edd92bcb2..06468d0c4fa23 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -33,7 +33,7 @@ env: JAVA_UT_FILTER1: -Dtest=!TestCOWDataSource,!TestMORDataSource,!TestHoodieFileSystemViews JAVA_UT_FILTER2: -Dtest=TestCOWDataSource,TestMORDataSource,TestHoodieFileSystemViews SCALA_TEST_DML_FILTER: -DwildcardSuites=org.apache.spark.sql.hudi.dml - SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature + SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.blob,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature FLINK_IT_FILTER1: -Dit.test=ITTestHoodieDataSource FLINK_IT_FILTER2: -Dit.test=!ITTestHoodieDataSource diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml index cccd89ec6cc73..3edac6046b1a3 100644 --- a/azure-pipelines-20230430.yml +++ b/azure-pipelines-20230430.yml @@ -75,6 +75,7 @@ parameters: - 'org.apache.spark.sql.avro' - 'org.apache.spark.sql.execution' - 'org.apache.spark.sql.hudi.analysis' + - 'org.apache.spark.sql.hudi.blob' - 'org.apache.spark.sql.hudi.catalog' - 'org.apache.spark.sql.hudi.command' - 'org.apache.spark.sql.hudi.common' diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java index 2937b59dc529b..ddce176eb7384 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java @@ -53,6 +53,7 @@ public class TestSparkSqlHudiPackageStructure { */ private static final Set ALLOWED_PACKAGES = new HashSet<>(Arrays.asList( "org.apache.spark.sql.hudi.analysis", + "org.apache.spark.sql.hudi.blob", "org.apache.spark.sql.hudi.catalog", "org.apache.spark.sql.hudi.command", "org.apache.spark.sql.hudi.common", From 2ff7167f18ce1531cb83c7b391e5e3712f0a3a89 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 14 May 2026 14:22:23 -0700 Subject: [PATCH 3/3] test(spark): Simplify I/O-count tests for read_blob batching Drop tests that re-verify the merge algorithm at the I/O layer when TestBatchedBlobReaderMerge already covers it. Keep only the cases that prove the merge result maps 1:1 to physical I/O ops: - batching reduces I/O end-to-end (single-file, batched vs baseline) - mixed gaps in one file produce one I/O per merged group - multi-file routing with interleaved input and mixed gap patterns Reduces TestBatchedBlobReaderIO from 9 to 3 tests (~67% fewer Spark DataFrame builds and real file reads, the dominant runtime cost). --- .../hudi/blob/TestBatchedBlobReaderIO.scala | 167 ++++-------------- 1 file changed, 38 insertions(+), 129 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala index d9bd9ad953cfe..54541ad1aaf76 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala @@ -34,9 +34,10 @@ import org.junit.jupiter.api.Test * * Each scenario drives BatchedBlobReader.processPartition directly with a * CountingHoodieStorage wrapper around a real local-FS storage, then asserts - * the resulting openSeekable / seek call counts. Where possible, the same input - * is re-run with a non-batching configuration (maxGapBytes = 0) to establish - * a baseline and prove the reduction. + * the resulting openSeekable / seek call counts. + * + * Scope: only the merge-result-to-physical-I/O mapping is exercised here. The + * gap/threshold merging algorithm itself is covered by TestBatchedBlobReaderMerge. * * Lives in the same package as BatchedBlobReader to access the package-private * RowAccessor / RowBuilder implicits. @@ -67,10 +68,8 @@ class TestBatchedBlobReaderIO extends HoodieClientTestBase { import RowAccessor.rowAccessor import RowBuilder.rowBuilder val out = reader.processPartition[Row](rows.iterator, structColIdx = 0, outputSchema) - // Force materialization (the underlying reads happen here). while (out.hasNext) { val r = out.next() - // Touch the data column so any lazy work completes. r.getAs[Array[Byte]](BatchedBlobReader.DATA_COL) } counting @@ -79,102 +78,37 @@ class TestBatchedBlobReaderIO extends HoodieClientTestBase { } } - // --- Scenario (1): Multiple blobs in the same out-of-line file -------------- - - @Test - def testManyBlobsInSingleFileBatchable(): Unit = { - val filePath = createTestFile(tempDir, "many-batch.bin", 50000) - val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(1, counters.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") - assertEquals(1, counters.seekCount.get, "...and one seek") - } - + /** + * End-to-end proof that batching reduces I/O: the same input collapses from + * N opens (maxGap=0, gaps exceed threshold) to a single open (maxGap=4096). + */ @Test - def testManyBlobsInSingleFileNoBatching(): Unit = { - val filePath = createTestFile(tempDir, "many-nobatch.bin", 50000) + def testBatchingReducesIOForSingleFile(): Unit = { + val filePath = createTestFile(tempDir, "single-file.bin", 50000) val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) val (rows, schema) = buildRows(entries) - val counters = runWithCounting(rows, schema, maxGap = 0) - assertEquals(20, counters.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") - assertEquals(20, counters.seekCount.get) - } - - // --- Scenario (2): Contiguous out-of-line blobs (zero gap) ------------------ - - @Test - def testContiguousBlobsMergeWithZeroGap(): Unit = { - val filePath = createTestFile(tempDir, "contig.bin", 1000) - val entries = (0 until 6).map(i => (filePath, (i * 100).toLong, 100L)) - val (rows, schema) = buildRows(entries) - - // Zero gap, zero threshold -> gap (0) <= maxGap (0) -> all merge - val counters = runWithCounting(rows, schema, maxGap = 0) - assertEquals(1, counters.openSeekableCount.get, "Six contiguous blobs should merge into one read") - assertEquals(1, counters.seekCount.get) - } - - // --- Scenario (3): Out-of-line blobs with gaps (threshold-controlled) ------- - - @Test - def testSmallGapsWithinThresholdBatch(): Unit = { - val filePath = createTestFile(tempDir, "small-gaps.bin", 10000) - val entries = Seq( - (filePath, 0L, 100L), - (filePath, 120L, 100L), - (filePath, 240L, 100L), - (filePath, 360L, 100L), - (filePath, 480L, 100L), - (filePath, 600L, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(1, counters.openSeekableCount.get) - assertEquals(1, counters.seekCount.get) - } - - @Test - def testLargeGapsBeyondThresholdDoNotBatch(): Unit = { - val filePath = createTestFile(tempDir, "large-gaps.bin", 50000) - val entries = Seq( - (filePath, 0L, 100L), - (filePath, 8192L, 100L), - (filePath, 16384L, 100L), - (filePath, 24576L, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 1024) - assertEquals(4, counters.openSeekableCount.get, "Gaps above threshold must not batch") - assertEquals(4, counters.seekCount.get) - } - - @Test - def testThresholdBoundary(): Unit = { - val filePath = createTestFile(tempDir, "boundary.bin", 5000) - // Gap between consecutive blobs = exactly 1024 bytes: - // [0,100), [1124,100), [2248,100), [3372,100) - val entries = (0 until 4).map(i => (filePath, (i * 1124).toLong, 100L)) - val (rows, schema) = buildRows(entries) - - val inclusive = runWithCounting(rows, schema, maxGap = 1024) - assertEquals(1, inclusive.openSeekableCount.get, "gap == maxGap is inclusive -> merge") + val batched = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(1, batched.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") + assertEquals(1, batched.seekCount.get) - val exclusive = runWithCounting(rows, schema, maxGap = 1023) - assertEquals(4, exclusive.openSeekableCount.get, "gap > maxGap -> no merge") + val nonBatched = runWithCounting(rows, schema, maxGap = 0) + assertEquals(20, nonBatched.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") + assertEquals(20, nonBatched.seekCount.get) } + /** + * Mixed small/large gaps in one file produce exactly one I/O per merged group. + * Threshold-boundary correctness is covered by TestBatchedBlobReaderMerge. + */ @Test - def testMixedSmallAndLargeGapsInOneFile(): Unit = { + def testMixedGapsProduceOneIOPerMergedGroup(): Unit = { val filePath = createTestFile(tempDir, "mixed-gaps.bin", 20000) - // Two batchable groups separated by a big gap. + // Two batchable groups separated by a 9500-byte gap (> maxGap=4096). val entries = Seq( (filePath, 0L, 100L), (filePath, 200L, 100L), (filePath, 400L, 100L), - // Large jump: gap = 10000 - 500 = 9500 > 4096 (filePath, 10000L, 100L), (filePath, 10200L, 100L), (filePath, 10400L, 100L)) @@ -185,53 +119,28 @@ class TestBatchedBlobReaderIO extends HoodieClientTestBase { assertEquals(2, counters.seekCount.get) } - // --- Scenario (4): Blobs from multiple files in the same query -------------- - + /** + * Multi-file routing at the I/O level: interleaved input + mixed gap patterns + * across three files should still produce per-file batched opens. + */ @Test - def testBlobsFromMultipleFilesBatchedPerFile(): Unit = { - val files = (0 until 3).map(i => createTestFile(tempDir, s"multi-$i.bin", 1000)) - val entries = files.flatMap { f => - (0 until 4).map(i => (f, (i * 100).toLong, 100L)) - } - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(3, counters.openSeekableCount.get, "One batched open per file") - assertEquals(3, counters.seekCount.get) - } - - @Test - def testMultipleFilesWithMixedGapPatterns(): Unit = { - val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous - val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps - val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps + def testMultiFileBatchingAtIOLevel(): Unit = { + val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous + val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps within threshold + val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps above threshold + // Interleaved order so we also confirm grouping isn't order-dependent at the I/O layer. val entries = - (0 until 4).map(i => (fileA, (i * 100).toLong, 100L)) ++ // contiguous -> 1 merged - (0 until 4).map(i => (fileB, (i * 150).toLong, 100L)) ++ // 50-byte gaps -> 1 merged - (0 until 4).map(i => (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges + (0 until 4).flatMap { i => + Seq( + (fileA, (i * 100).toLong, 100L), // contiguous -> 1 merged + (fileB, (i * 150).toLong, 100L), // 50-byte gaps -> 1 merged + (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges + } val (rows, schema) = buildRows(entries) val counters = runWithCounting(rows, schema, maxGap = 1024) - assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges") + assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges -> 6 opens") assertEquals(6, counters.seekCount.get) } - - @Test - def testInterleavedFileOrderStillBatchesPerFile(): Unit = { - val fileA = createTestFile(tempDir, "interA.bin", 1000) - val fileB = createTestFile(tempDir, "interB.bin", 1000) - val entries = Seq( - (fileA, 0L, 100L), - (fileB, 0L, 100L), - (fileA, 100L, 100L), - (fileB, 100L, 100L), - (fileA, 200L, 100L), - (fileB, 200L, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(2, counters.openSeekableCount.get, "Interleaved input still groups per file") - assertEquals(2, counters.seekCount.get) - } }