From 58c647d73b072794ac863d0a34f78462a3c32af1 Mon Sep 17 00:00:00 2001 From: voon Date: Thu, 14 May 2026 11:08:16 +0800 Subject: [PATCH 1/6] test(spark): Add merge-algorithm and I/O-count tests for read_blob batching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Existing tests for the BatchedBlobReader (PR #18098) only assert byte correctness of the data returned, so they cannot detect a regression that causes the batching optimization to stop reducing I/O. Add two test classes that close that gap: - TestBatchedBlobReaderMerge: direct unit tests on mergeRanges and identifyConsecutiveRanges (newly package-private), asserting merged range counts, gap-threshold boundaries, multi-file grouping, sort, index preservation, and overlap rejection. No Spark, no I/O. - TestBatchedBlobReaderIO: integration tests that drive processPartition with a CountingHoodieStorage wrapper around a real storage, asserting openSeekable/seek counts across four scenarios — many blobs in one file, contiguous zero-gap blobs, threshold-controlled small/large gaps including the inclusive boundary, and multi-file queries (per-file batching, mixed gap patterns, interleaved input order). --- .../sql/hudi/blob/BatchedBlobReader.scala | 4 +- .../sql/hudi/blob/CountingHoodieStorage.scala | 121 +++++++++ .../hudi/blob/TestBatchedBlobReaderIO.scala | 237 ++++++++++++++++++ .../blob/TestBatchedBlobReaderMerge.scala | 183 ++++++++++++++ 4 files changed, 543 insertions(+), 2 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala index 0328cdd0c5c22..a1c299cf2611d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala @@ -274,7 +274,7 @@ class BatchedBlobReader( * @param rows Sequence of row information * @return Sequence of merged ranges */ - private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + private[blob] def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { // Group by file path val byFile = rows.groupBy(_.filePath) @@ -299,7 +299,7 @@ class BatchedBlobReader( * @param maxGap Maximum gap to consider for merging * @return Sequence of merged ranges */ - private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + private[blob] def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { val result = ArrayBuffer[MergedRange[R]]() var currentFilePath: String = null diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala new file mode 100644 index 0000000000000..79399b52fa22e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath, StoragePathFilter, StoragePathInfo} + +import java.io.{InputStream, OutputStream} +import java.net.URI +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicInteger + +/** + * Test-only {@link HoodieStorage} decorator that counts I/O operations performed by + * batched blob reads. Delegates every method to an underlying storage, but tracks: + * + * - openSeekableCount — number of times openSeekable was called + * - seekCount — number of times seek was called on a returned SeekableDataInputStream + * + * Use this wrapper in tests to assert that batching reduces the number of physical + * I/O operations versus a non-batching configuration. Counters are reset via + * {@link #reset()}. + */ +class CountingHoodieStorage(delegate: HoodieStorage) extends HoodieStorage(delegate.getConf) { + + val openSeekableCount = new AtomicInteger(0) + val seekCount = new AtomicInteger(0) + + def reset(): Unit = { + openSeekableCount.set(0) + seekCount.set(0) + } + + override def openSeekable(path: StoragePath, bufferSize: Int, wrapStream: Boolean): SeekableDataInputStream = { + openSeekableCount.incrementAndGet() + val inner = delegate.openSeekable(path, bufferSize, wrapStream) + new CountingSeekableDataInputStream(inner, seekCount) + } + + // --- pass-through delegation ------------------------------------------------- + + override def newInstance(path: StoragePath, storageConf: StorageConfiguration[_]): HoodieStorage = + new CountingHoodieStorage(delegate.newInstance(path, storageConf)) + + override def getScheme: String = delegate.getScheme + override def getDefaultBlockSize(path: StoragePath): Int = delegate.getDefaultBlockSize(path) + override def getDefaultBufferSize: Int = delegate.getDefaultBufferSize + override def getDefaultReplication(path: StoragePath): Short = delegate.getDefaultReplication(path) + override def getUri: URI = delegate.getUri + + override def create(path: StoragePath, overwrite: Boolean): OutputStream = + delegate.create(path, overwrite) + override def create(path: StoragePath, overwrite: Boolean, bufferSize: Integer, replication: java.lang.Short, sizeThreshold: java.lang.Long): OutputStream = + delegate.create(path, overwrite, bufferSize, replication, sizeThreshold) + + override def open(path: StoragePath): InputStream = delegate.open(path) + override def append(path: StoragePath): OutputStream = delegate.append(path) + + override def exists(path: StoragePath): Boolean = delegate.exists(path) + override def getPathInfo(path: StoragePath): StoragePathInfo = delegate.getPathInfo(path) + override def createDirectory(path: StoragePath): Boolean = delegate.createDirectory(path) + + override def listDirectEntries(path: StoragePath): JList[StoragePathInfo] = delegate.listDirectEntries(path) + override def listFiles(path: StoragePath): JList[StoragePathInfo] = delegate.listFiles(path) + override def listDirectEntries(path: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = + delegate.listDirectEntries(path, filter) + + override def setModificationTime(path: StoragePath, modificationTimeInMillisEpoch: Long): Unit = + delegate.setModificationTime(path, modificationTimeInMillisEpoch) + + override def globEntries(pathPattern: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = + delegate.globEntries(pathPattern, filter) + + override def rename(oldPath: StoragePath, newPath: StoragePath): Boolean = + delegate.rename(oldPath, newPath) + + override def deleteDirectory(path: StoragePath): Boolean = delegate.deleteDirectory(path) + override def deleteFile(path: StoragePath): Boolean = delegate.deleteFile(path) + + override def getFileSystem: AnyRef = delegate.getFileSystem + override def getRawStorage: HoodieStorage = delegate.getRawStorage + + override def close(): Unit = delegate.close() +} + +/** + * Counting wrapper around a {@link SeekableDataInputStream}. Counts seeks; final + * readFully methods on DataInputStream delegate to the underlying stream's + * read(...) which the wrapped stream services directly. + */ +private class CountingSeekableDataInputStream( + delegate: SeekableDataInputStream, + seekCount: AtomicInteger) + extends SeekableDataInputStream(delegate) { + + override def seek(pos: Long): Unit = { + seekCount.incrementAndGet() + delegate.seek(pos) + } + + override def getPos: Long = delegate.getPos + + override def close(): Unit = delegate.close() +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala new file mode 100644 index 0000000000000..d9bd9ad953cfe --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils} +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{BinaryType, StructField, StructType} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * Integration tests asserting that batched blob reads actually reduce storage I/O. + * + * Each scenario drives BatchedBlobReader.processPartition directly with a + * CountingHoodieStorage wrapper around a real local-FS storage, then asserts + * the resulting openSeekable / seek call counts. Where possible, the same input + * is re-run with a non-batching configuration (maxGapBytes = 0) to establish + * a baseline and prove the reduction. + * + * Lives in the same package as BatchedBlobReader to access the package-private + * RowAccessor / RowBuilder implicits. + */ +class TestBatchedBlobReaderIO extends HoodieClientTestBase { + + private val DEFAULT_LOOKAHEAD = 50 + + /** Build rows by routing through Spark so the nested blob struct is encoded correctly. */ + private def buildRows(entries: Seq[(String, Long, Long)]): (Array[Row], StructType) = { + val df = sparkSession.createDataFrame(entries).toDF("external_path", "offset", "length") + .withColumn("data", blobStructCol("data", col("external_path"), col("offset"), col("length"))) + .select("data") + (df.collect(), df.schema) + } + + /** Run processPartition with the counting wrapper and drain the output iterator. */ + private def runWithCounting( + rows: Array[Row], + inputSchema: StructType, + maxGap: Int, + lookahead: Int = DEFAULT_LOOKAHEAD): CountingHoodieStorage = { + val raw: HoodieStorage = HoodieStorageUtils.getStorage(storageConf) + val counting = new CountingHoodieStorage(raw) + try { + val outputSchema = inputSchema.add(StructField(BatchedBlobReader.DATA_COL, BinaryType, nullable = true)) + val reader = new BatchedBlobReader(counting, maxGap, lookahead) + import RowAccessor.rowAccessor + import RowBuilder.rowBuilder + val out = reader.processPartition[Row](rows.iterator, structColIdx = 0, outputSchema) + // Force materialization (the underlying reads happen here). + while (out.hasNext) { + val r = out.next() + // Touch the data column so any lazy work completes. + r.getAs[Array[Byte]](BatchedBlobReader.DATA_COL) + } + counting + } finally { + counting.close() + } + } + + // --- Scenario (1): Multiple blobs in the same out-of-line file -------------- + + @Test + def testManyBlobsInSingleFileBatchable(): Unit = { + val filePath = createTestFile(tempDir, "many-batch.bin", 50000) + val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(1, counters.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") + assertEquals(1, counters.seekCount.get, "...and one seek") + } + + @Test + def testManyBlobsInSingleFileNoBatching(): Unit = { + val filePath = createTestFile(tempDir, "many-nobatch.bin", 50000) + val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 0) + assertEquals(20, counters.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") + assertEquals(20, counters.seekCount.get) + } + + // --- Scenario (2): Contiguous out-of-line blobs (zero gap) ------------------ + + @Test + def testContiguousBlobsMergeWithZeroGap(): Unit = { + val filePath = createTestFile(tempDir, "contig.bin", 1000) + val entries = (0 until 6).map(i => (filePath, (i * 100).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + // Zero gap, zero threshold -> gap (0) <= maxGap (0) -> all merge + val counters = runWithCounting(rows, schema, maxGap = 0) + assertEquals(1, counters.openSeekableCount.get, "Six contiguous blobs should merge into one read") + assertEquals(1, counters.seekCount.get) + } + + // --- Scenario (3): Out-of-line blobs with gaps (threshold-controlled) ------- + + @Test + def testSmallGapsWithinThresholdBatch(): Unit = { + val filePath = createTestFile(tempDir, "small-gaps.bin", 10000) + val entries = Seq( + (filePath, 0L, 100L), + (filePath, 120L, 100L), + (filePath, 240L, 100L), + (filePath, 360L, 100L), + (filePath, 480L, 100L), + (filePath, 600L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(1, counters.openSeekableCount.get) + assertEquals(1, counters.seekCount.get) + } + + @Test + def testLargeGapsBeyondThresholdDoNotBatch(): Unit = { + val filePath = createTestFile(tempDir, "large-gaps.bin", 50000) + val entries = Seq( + (filePath, 0L, 100L), + (filePath, 8192L, 100L), + (filePath, 16384L, 100L), + (filePath, 24576L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 1024) + assertEquals(4, counters.openSeekableCount.get, "Gaps above threshold must not batch") + assertEquals(4, counters.seekCount.get) + } + + @Test + def testThresholdBoundary(): Unit = { + val filePath = createTestFile(tempDir, "boundary.bin", 5000) + // Gap between consecutive blobs = exactly 1024 bytes: + // [0,100), [1124,100), [2248,100), [3372,100) + val entries = (0 until 4).map(i => (filePath, (i * 1124).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + val inclusive = runWithCounting(rows, schema, maxGap = 1024) + assertEquals(1, inclusive.openSeekableCount.get, "gap == maxGap is inclusive -> merge") + + val exclusive = runWithCounting(rows, schema, maxGap = 1023) + assertEquals(4, exclusive.openSeekableCount.get, "gap > maxGap -> no merge") + } + + @Test + def testMixedSmallAndLargeGapsInOneFile(): Unit = { + val filePath = createTestFile(tempDir, "mixed-gaps.bin", 20000) + // Two batchable groups separated by a big gap. + val entries = Seq( + (filePath, 0L, 100L), + (filePath, 200L, 100L), + (filePath, 400L, 100L), + // Large jump: gap = 10000 - 500 = 9500 > 4096 + (filePath, 10000L, 100L), + (filePath, 10200L, 100L), + (filePath, 10400L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(2, counters.openSeekableCount.get, "Two merged ranges -> two opens") + assertEquals(2, counters.seekCount.get) + } + + // --- Scenario (4): Blobs from multiple files in the same query -------------- + + @Test + def testBlobsFromMultipleFilesBatchedPerFile(): Unit = { + val files = (0 until 3).map(i => createTestFile(tempDir, s"multi-$i.bin", 1000)) + val entries = files.flatMap { f => + (0 until 4).map(i => (f, (i * 100).toLong, 100L)) + } + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(3, counters.openSeekableCount.get, "One batched open per file") + assertEquals(3, counters.seekCount.get) + } + + @Test + def testMultipleFilesWithMixedGapPatterns(): Unit = { + val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous + val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps + val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps + + val entries = + (0 until 4).map(i => (fileA, (i * 100).toLong, 100L)) ++ // contiguous -> 1 merged + (0 until 4).map(i => (fileB, (i * 150).toLong, 100L)) ++ // 50-byte gaps -> 1 merged + (0 until 4).map(i => (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 1024) + assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges") + assertEquals(6, counters.seekCount.get) + } + + @Test + def testInterleavedFileOrderStillBatchesPerFile(): Unit = { + val fileA = createTestFile(tempDir, "interA.bin", 1000) + val fileB = createTestFile(tempDir, "interB.bin", 1000) + val entries = Seq( + (fileA, 0L, 100L), + (fileB, 0L, 100L), + (fileA, 100L, 100L), + (fileB, 100L, 100L), + (fileA, 200L, 100L), + (fileB, 200L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(2, counters.openSeekableCount.get, "Interleaved input still groups per file") + assertEquals(2, counters.seekCount.get) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala new file mode 100644 index 0000000000000..73813e9de33e3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * Direct unit tests for the merge algorithm in {@link BatchedBlobReader}. + * + * These tests bypass Spark and storage entirely: they construct a reader with + * a null storage (the merge methods do not touch I/O) and call mergeRanges / + * identifyConsecutiveRanges with crafted RowInfo inputs to assert the + * structure of the merged output (counts, boundaries, ordering, grouping). + * + * This complements TestBatchedBlobReader (which verifies data correctness end-to-end) + * by proving the batching algorithm produces the expected merged ranges for + * each input pattern. + */ +class TestBatchedBlobReaderMerge { + + private def reader(maxGapBytes: Int = 4096) = + new BatchedBlobReader(storage = null, maxGapBytes = maxGapBytes, lookaheadRows = 50) + + private def row(filePath: String, offset: Long, length: Long, index: Long = 0L): RowInfo[Row] = + RowInfo[Row]( + originalRow = null.asInstanceOf[Row], + filePath = filePath, + offset = offset, + length = length, + index = index) + + @Test + def testEmptyInputProducesNoRanges(): Unit = { + val merged = reader().mergeRanges(Seq.empty[RowInfo[Row]], maxGap = 100) + assertTrue(merged.isEmpty) + } + + @Test + def testSingleRowProducesSingleRange(): Unit = { + val merged = reader().mergeRanges(Seq(row("/f", 1000, 200, index = 7)), maxGap = 100) + assertEquals(1, merged.size) + val r = merged.head + assertEquals("/f", r.filePath) + assertEquals(1000L, r.startOffset) + assertEquals(1200L, r.endOffset) + assertEquals(1, r.rows.size) + assertEquals(7L, r.rows.head.index) + } + + @Test + def testContiguousRangesMergeIntoOne(): Unit = { + // Zero-gap reads: [0,100), [100,100), [200,100) — gap == 0 <= maxGap == 0 → merge + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1), + row("/f", 200, 100, index = 2)) + val merged = reader().mergeRanges(rows, maxGap = 0) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + assertEquals(3, merged.head.rows.size) + } + + @Test + def testGapEqualToThresholdMerges(): Unit = { + // Gaps of exactly maxGap → inclusive boundary, merge into one + val rows = Seq( + row("/f", 0, 100), + row("/f", 1124, 100), // gap = 1024 == maxGap + row("/f", 2248, 100)) // gap = 1024 == maxGap + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(2348L, merged.head.endOffset) + } + + @Test + def testGapOneOverThresholdSplits(): Unit = { + // Gap of maxGap + 1 → split + val rows = Seq( + row("/f", 0, 100), + row("/f", 1125, 100)) // gap = 1025 > maxGap = 1024 + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(2, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(100L, merged.head.endOffset) + assertEquals(1125L, merged(1).startOffset) + assertEquals(1225L, merged(1).endOffset) + } + + @Test + def testLargeGapsProduceSeparateRanges(): Unit = { + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 10000, 100, index = 1), + row("/f", 20000, 100, index = 2), + row("/f", 30000, 100, index = 3)) + val merged = reader().mergeRanges(rows, maxGap = 1000) + assertEquals(4, merged.size) + merged.foreach(r => assertEquals(1, r.rows.size)) + } + + @Test + def testUnsortedInputIsSortedBeforeMerge(): Unit = { + // identifyConsecutiveRanges sorts by offset within each file + val rows = Seq( + row("/f", 200, 100, index = 2), + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1)) + val merged = reader(maxGapBytes = 0).identifyConsecutiveRanges(rows) + assertEquals(1, merged.size, "Single merged range expected after sort + merge") + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + // Internal rows in the merged range follow sorted order + val offsetsInMerged = merged.head.rows.map(_.offset) + assertEquals(Seq(0L, 100L, 200L), offsetsInMerged) + } + + @Test + def testMultipleFilesProduceRangePerFile(): Unit = { + val rows = Seq( + row("/a", 0, 100, index = 0), + row("/b", 0, 100, index = 1), + row("/a", 100, 100, index = 2), + row("/b", 100, 100, index = 3)) + val merged = reader(maxGapBytes = 4096).identifyConsecutiveRanges(rows) + assertEquals(2, merged.size, "One merged range per file") + val byFile = merged.groupBy(_.filePath) + assertEquals(Set("/a", "/b"), byFile.keySet) + assertEquals(1, byFile("/a").size) + assertEquals(1, byFile("/b").size) + // Each file's merged range covers offsets 0..200 + byFile.values.foreach { rs => + assertEquals(0L, rs.head.startOffset) + assertEquals(200L, rs.head.endOffset) + assertEquals(2, rs.head.rows.size) + } + } + + @Test + def testRowIndicesPreservedInMergedRange(): Unit = { + // Indices retained so the downstream sort-by-index can reconstruct input order + val rows = Seq( + row("/f", 0, 100, index = 42), + row("/f", 100, 100, index = 7), + row("/f", 200, 100, index = 19)) + val merged = reader().mergeRanges(rows, maxGap = 4096) + assertEquals(1, merged.size) + val indices = merged.head.rows.map(_.index) + assertEquals(Seq(42L, 7L, 19L), indices) + } + + @Test + def testOverlappingRangesThrow(): Unit = { + // [0,100) followed by [50,100) -> overlap + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 50, 100, index = 1)) + val ex = assertThrows( + classOf[IllegalArgumentException], + () => reader().mergeRanges(rows, maxGap = 4096)) + assertTrue(ex.getMessage.contains("Overlapping blob ranges detected")) + } +} From 895845e1b19173b98e3cef14c0ca0a2675a4c969 Mon Sep 17 00:00:00 2001 From: voon Date: Thu, 14 May 2026 18:47:37 +0800 Subject: [PATCH 2/6] test(spark): Allow org.apache.spark.sql.hudi.blob test package TestSparkSqlHudiPackageStructure rejects Scala test classes under org.apache.spark.sql.hudi.* that are not in the curated allow-list, which is mirrored in two CI configs that drive the wildcard suite filter. The new BatchedBlobReader merge/IO tests live in the .blob package so they can reach private[blob] helpers (mergeRanges, identifyConsecutiveRanges, RowAccessor) without widening internal visibility - matching the same-package pattern already used for the .analysis, .catalog, and .command production sub-packages. Add 'org.apache.spark.sql.hudi.blob' to all three places: - ALLOWED_PACKAGES in TestSparkSqlHudiPackageStructure - job6HudiSparkDdlOthersWildcardSuites in azure-pipelines-20230430.yml - SCALA_TEST_OTHERS_FILTER in .github/workflows/bot.yml --- .github/workflows/bot.yml | 2 +- azure-pipelines-20230430.yml | 1 + .../java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 2143edd92bcb2..06468d0c4fa23 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -33,7 +33,7 @@ env: JAVA_UT_FILTER1: -Dtest=!TestCOWDataSource,!TestMORDataSource,!TestHoodieFileSystemViews JAVA_UT_FILTER2: -Dtest=TestCOWDataSource,TestMORDataSource,TestHoodieFileSystemViews SCALA_TEST_DML_FILTER: -DwildcardSuites=org.apache.spark.sql.hudi.dml - SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature + SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.blob,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature FLINK_IT_FILTER1: -Dit.test=ITTestHoodieDataSource FLINK_IT_FILTER2: -Dit.test=!ITTestHoodieDataSource diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml index cccd89ec6cc73..3edac6046b1a3 100644 --- a/azure-pipelines-20230430.yml +++ b/azure-pipelines-20230430.yml @@ -75,6 +75,7 @@ parameters: - 'org.apache.spark.sql.avro' - 'org.apache.spark.sql.execution' - 'org.apache.spark.sql.hudi.analysis' + - 'org.apache.spark.sql.hudi.blob' - 'org.apache.spark.sql.hudi.catalog' - 'org.apache.spark.sql.hudi.command' - 'org.apache.spark.sql.hudi.common' diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java index 2937b59dc529b..ddce176eb7384 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java @@ -53,6 +53,7 @@ public class TestSparkSqlHudiPackageStructure { */ private static final Set ALLOWED_PACKAGES = new HashSet<>(Arrays.asList( "org.apache.spark.sql.hudi.analysis", + "org.apache.spark.sql.hudi.blob", "org.apache.spark.sql.hudi.catalog", "org.apache.spark.sql.hudi.command", "org.apache.spark.sql.hudi.common", From 2ff7167f18ce1531cb83c7b391e5e3712f0a3a89 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 14 May 2026 14:22:23 -0700 Subject: [PATCH 3/6] test(spark): Simplify I/O-count tests for read_blob batching Drop tests that re-verify the merge algorithm at the I/O layer when TestBatchedBlobReaderMerge already covers it. Keep only the cases that prove the merge result maps 1:1 to physical I/O ops: - batching reduces I/O end-to-end (single-file, batched vs baseline) - mixed gaps in one file produce one I/O per merged group - multi-file routing with interleaved input and mixed gap patterns Reduces TestBatchedBlobReaderIO from 9 to 3 tests (~67% fewer Spark DataFrame builds and real file reads, the dominant runtime cost). --- .../hudi/blob/TestBatchedBlobReaderIO.scala | 167 ++++-------------- 1 file changed, 38 insertions(+), 129 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala index d9bd9ad953cfe..54541ad1aaf76 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala @@ -34,9 +34,10 @@ import org.junit.jupiter.api.Test * * Each scenario drives BatchedBlobReader.processPartition directly with a * CountingHoodieStorage wrapper around a real local-FS storage, then asserts - * the resulting openSeekable / seek call counts. Where possible, the same input - * is re-run with a non-batching configuration (maxGapBytes = 0) to establish - * a baseline and prove the reduction. + * the resulting openSeekable / seek call counts. + * + * Scope: only the merge-result-to-physical-I/O mapping is exercised here. The + * gap/threshold merging algorithm itself is covered by TestBatchedBlobReaderMerge. * * Lives in the same package as BatchedBlobReader to access the package-private * RowAccessor / RowBuilder implicits. @@ -67,10 +68,8 @@ class TestBatchedBlobReaderIO extends HoodieClientTestBase { import RowAccessor.rowAccessor import RowBuilder.rowBuilder val out = reader.processPartition[Row](rows.iterator, structColIdx = 0, outputSchema) - // Force materialization (the underlying reads happen here). while (out.hasNext) { val r = out.next() - // Touch the data column so any lazy work completes. r.getAs[Array[Byte]](BatchedBlobReader.DATA_COL) } counting @@ -79,102 +78,37 @@ class TestBatchedBlobReaderIO extends HoodieClientTestBase { } } - // --- Scenario (1): Multiple blobs in the same out-of-line file -------------- - - @Test - def testManyBlobsInSingleFileBatchable(): Unit = { - val filePath = createTestFile(tempDir, "many-batch.bin", 50000) - val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(1, counters.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") - assertEquals(1, counters.seekCount.get, "...and one seek") - } - + /** + * End-to-end proof that batching reduces I/O: the same input collapses from + * N opens (maxGap=0, gaps exceed threshold) to a single open (maxGap=4096). + */ @Test - def testManyBlobsInSingleFileNoBatching(): Unit = { - val filePath = createTestFile(tempDir, "many-nobatch.bin", 50000) + def testBatchingReducesIOForSingleFile(): Unit = { + val filePath = createTestFile(tempDir, "single-file.bin", 50000) val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) val (rows, schema) = buildRows(entries) - val counters = runWithCounting(rows, schema, maxGap = 0) - assertEquals(20, counters.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") - assertEquals(20, counters.seekCount.get) - } - - // --- Scenario (2): Contiguous out-of-line blobs (zero gap) ------------------ - - @Test - def testContiguousBlobsMergeWithZeroGap(): Unit = { - val filePath = createTestFile(tempDir, "contig.bin", 1000) - val entries = (0 until 6).map(i => (filePath, (i * 100).toLong, 100L)) - val (rows, schema) = buildRows(entries) - - // Zero gap, zero threshold -> gap (0) <= maxGap (0) -> all merge - val counters = runWithCounting(rows, schema, maxGap = 0) - assertEquals(1, counters.openSeekableCount.get, "Six contiguous blobs should merge into one read") - assertEquals(1, counters.seekCount.get) - } - - // --- Scenario (3): Out-of-line blobs with gaps (threshold-controlled) ------- - - @Test - def testSmallGapsWithinThresholdBatch(): Unit = { - val filePath = createTestFile(tempDir, "small-gaps.bin", 10000) - val entries = Seq( - (filePath, 0L, 100L), - (filePath, 120L, 100L), - (filePath, 240L, 100L), - (filePath, 360L, 100L), - (filePath, 480L, 100L), - (filePath, 600L, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(1, counters.openSeekableCount.get) - assertEquals(1, counters.seekCount.get) - } - - @Test - def testLargeGapsBeyondThresholdDoNotBatch(): Unit = { - val filePath = createTestFile(tempDir, "large-gaps.bin", 50000) - val entries = Seq( - (filePath, 0L, 100L), - (filePath, 8192L, 100L), - (filePath, 16384L, 100L), - (filePath, 24576L, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 1024) - assertEquals(4, counters.openSeekableCount.get, "Gaps above threshold must not batch") - assertEquals(4, counters.seekCount.get) - } - - @Test - def testThresholdBoundary(): Unit = { - val filePath = createTestFile(tempDir, "boundary.bin", 5000) - // Gap between consecutive blobs = exactly 1024 bytes: - // [0,100), [1124,100), [2248,100), [3372,100) - val entries = (0 until 4).map(i => (filePath, (i * 1124).toLong, 100L)) - val (rows, schema) = buildRows(entries) - - val inclusive = runWithCounting(rows, schema, maxGap = 1024) - assertEquals(1, inclusive.openSeekableCount.get, "gap == maxGap is inclusive -> merge") + val batched = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(1, batched.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") + assertEquals(1, batched.seekCount.get) - val exclusive = runWithCounting(rows, schema, maxGap = 1023) - assertEquals(4, exclusive.openSeekableCount.get, "gap > maxGap -> no merge") + val nonBatched = runWithCounting(rows, schema, maxGap = 0) + assertEquals(20, nonBatched.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") + assertEquals(20, nonBatched.seekCount.get) } + /** + * Mixed small/large gaps in one file produce exactly one I/O per merged group. + * Threshold-boundary correctness is covered by TestBatchedBlobReaderMerge. + */ @Test - def testMixedSmallAndLargeGapsInOneFile(): Unit = { + def testMixedGapsProduceOneIOPerMergedGroup(): Unit = { val filePath = createTestFile(tempDir, "mixed-gaps.bin", 20000) - // Two batchable groups separated by a big gap. + // Two batchable groups separated by a 9500-byte gap (> maxGap=4096). val entries = Seq( (filePath, 0L, 100L), (filePath, 200L, 100L), (filePath, 400L, 100L), - // Large jump: gap = 10000 - 500 = 9500 > 4096 (filePath, 10000L, 100L), (filePath, 10200L, 100L), (filePath, 10400L, 100L)) @@ -185,53 +119,28 @@ class TestBatchedBlobReaderIO extends HoodieClientTestBase { assertEquals(2, counters.seekCount.get) } - // --- Scenario (4): Blobs from multiple files in the same query -------------- - + /** + * Multi-file routing at the I/O level: interleaved input + mixed gap patterns + * across three files should still produce per-file batched opens. + */ @Test - def testBlobsFromMultipleFilesBatchedPerFile(): Unit = { - val files = (0 until 3).map(i => createTestFile(tempDir, s"multi-$i.bin", 1000)) - val entries = files.flatMap { f => - (0 until 4).map(i => (f, (i * 100).toLong, 100L)) - } - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(3, counters.openSeekableCount.get, "One batched open per file") - assertEquals(3, counters.seekCount.get) - } - - @Test - def testMultipleFilesWithMixedGapPatterns(): Unit = { - val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous - val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps - val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps + def testMultiFileBatchingAtIOLevel(): Unit = { + val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous + val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps within threshold + val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps above threshold + // Interleaved order so we also confirm grouping isn't order-dependent at the I/O layer. val entries = - (0 until 4).map(i => (fileA, (i * 100).toLong, 100L)) ++ // contiguous -> 1 merged - (0 until 4).map(i => (fileB, (i * 150).toLong, 100L)) ++ // 50-byte gaps -> 1 merged - (0 until 4).map(i => (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges + (0 until 4).flatMap { i => + Seq( + (fileA, (i * 100).toLong, 100L), // contiguous -> 1 merged + (fileB, (i * 150).toLong, 100L), // 50-byte gaps -> 1 merged + (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges + } val (rows, schema) = buildRows(entries) val counters = runWithCounting(rows, schema, maxGap = 1024) - assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges") + assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges -> 6 opens") assertEquals(6, counters.seekCount.get) } - - @Test - def testInterleavedFileOrderStillBatchesPerFile(): Unit = { - val fileA = createTestFile(tempDir, "interA.bin", 1000) - val fileB = createTestFile(tempDir, "interB.bin", 1000) - val entries = Seq( - (fileA, 0L, 100L), - (fileB, 0L, 100L), - (fileA, 100L, 100L), - (fileB, 100L, 100L), - (fileA, 200L, 100L), - (fileB, 200L, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(2, counters.openSeekableCount.get, "Interleaved input still groups per file") - assertEquals(2, counters.seekCount.get) - } } From 168d5e3965d9ec041d32de9aeab73f6760fa47ca Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 14 May 2026 14:34:49 -0700 Subject: [PATCH 4/6] Replace I/O-count tests with SQL-driven correctness tests TestBatchedBlobReaderMerge already catches algorithmic regressions in the merge logic via direct calls to mergeRanges/identifyConsecutiveRanges, so the I/O-count layer was duplicative. Existing TestBatchedBlobReader and TestReadBlobSQL cover the readBatched API and the SQL surface respectively at byte-level granularity. Replace TestBatchedBlobReaderIO with TestReadBlobBatching: SQL-driven tests that exercise the read_blob() path with batching configurations chosen to drive specific merge behaviors: - 20 small reads in one file batched under maxGap=4096 - mixed small/large gaps in one file (above and below threshold) - threshold-boundary case (gap == maxGap) - multi-file interleaved input with mixed gap patterns Each test asserts the returned bytes match the deterministic pattern at each row's recorded offset, validating both query success and output correctness through the SQL planner / BatchedBlobReadExec path. Move to org.apache.hudi.blob (no longer needs private[blob] access). Drop CountingHoodieStorage. The package-allowlist entry for org.apache.spark.sql.hudi.blob is retained since TestBatchedBlobReaderMerge still needs same-package access to the merge helpers. --- .../hudi/blob/TestReadBlobBatching.scala | 188 ++++++++++++++++++ .../sql/hudi/blob/CountingHoodieStorage.scala | 121 ----------- .../hudi/blob/TestBatchedBlobReaderIO.scala | 146 -------------- 3 files changed, 188 insertions(+), 267 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala delete mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala delete mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala new file mode 100644 index 0000000000000..1ca5a83da62d3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * SQL-driven correctness tests for read_blob() batching scenarios. + * + * Complements: + * - TestBatchedBlobReaderMerge: unit-level coverage of the merge algorithm. + * - TestReadBlobSQL: general SQL surface (joins, subqueries, config plumbing, + * null handling, error paths). + * - TestBatchedBlobReader: byte-level correctness via the readBatched API. + * + * Scope: exercise the SQL path end-to-end with batching configurations chosen + * to drive specific merge behaviors (high-ratio merging, threshold boundary, + * per-file routing), and assert that the returned bytes match what the input + * specified — i.e. the query succeeds and batching does not corrupt output. + */ +class TestReadBlobBatching extends HoodieClientTestBase { + + /** + * Build a temp view named `viewName` from (id, file, offset, length) tuples + * with a blob struct column `file_info`. + */ + private def registerBlobView( + viewName: String, + entries: Seq[(Int, String, Long, Long)]): Unit = { + sparkSession.createDataFrame(entries) + .toDF("id", "external_path", "offset", "length") + .withColumn("file_info", blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "external_path", "offset", "file_info") + .createOrReplaceTempView(viewName) + } + + /** + * Run SELECT id, external_path, offset, read_blob(file_info) and assert each + * returned blob equals the deterministic pattern at its recorded offset. + */ + private def assertBlobBytesMatchExpected(rows: Array[Row], expectedLength: Int): Unit = { + rows.foreach { row => + val offset = row.getAs[Long]("offset") + val data = row.getAs[Array[Byte]]("data") + assertEquals(expectedLength, data.length, s"length mismatch at offset $offset") + assertBytesContent(data, expectedOffset = offset.toInt) + } + } + + /** + * Scenario (1) — many blobs in a single file batch into one read. + * + * 20 small reads with 50-byte gaps under maxGap=4096 must all merge into a + * single underlying range. Validates that high-ratio batching returns the + * correct bytes for every row. + */ + @Test + def testManyBlobsInSingleFileBatched(): Unit = { + val filePath = createTestFile(tempDir, "many.bin", 50000) + val entries = (0 until 20).map(i => (i, filePath, (i * 150).toLong, 100L)) + registerBlobView("many_blobs_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "4096" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM many_blobs_view + |ORDER BY id""".stripMargin).collect() + assertEquals(20, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } + + /** + * Scenario (2) — mixed small and large gaps in one file. + * + * Two batchable groups separated by a 9500-byte gap (above maxGap=4096) must + * produce correct bytes for all six rows regardless of the gap-driven split + * inside the reader. + */ + @Test + def testMixedGapsInSingleFile(): Unit = { + val filePath = createTestFile(tempDir, "mixed.bin", 20000) + val entries = Seq( + (1, filePath, 0L, 100L), + (2, filePath, 200L, 100L), + (3, filePath, 400L, 100L), + (4, filePath, 10000L, 100L), + (5, filePath, 10200L, 100L), + (6, filePath, 10400L, 100L)) + registerBlobView("mixed_gaps_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "4096" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM mixed_gaps_view + |ORDER BY id""".stripMargin).collect() + assertEquals(6, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } + + /** + * Scenario (3) — threshold boundary at SQL level. + * + * Four reads with gaps of exactly maxGap (1024). The merge algorithm treats + * gap == maxGap as inclusive (covered by TestBatchedBlobReaderMerge); this + * test confirms the SQL surface returns correct bytes at that boundary. + */ + @Test + def testThresholdBoundaryInclusiveMerge(): Unit = { + val filePath = createTestFile(tempDir, "boundary.bin", 5000) + // Gap between consecutive reads = 1024 bytes (1124 - 100, etc.). + val entries = (0 until 4).map(i => (i, filePath, (i * 1124).toLong, 100L)) + registerBlobView("boundary_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "1024" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM boundary_view + |ORDER BY id""".stripMargin).collect() + assertEquals(4, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } + + /** + * Scenario (4) — multi-file with interleaved input + mixed gap patterns. + * + * Three files with different access patterns (contiguous, small gaps within + * threshold, large gaps above threshold) queried in interleaved order. + * Confirms per-file routing through the SQL planner returns correct bytes + * regardless of input ordering. + */ + @Test + def testMultiFileInterleavedWithMixedGaps(): Unit = { + val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous + val fileB = createTestFile(tempDir, "fileB.bin", 2000) // 50-byte gaps + val fileC = createTestFile(tempDir, "fileC.bin", 50000) // 8KB gaps + + val entries = (0 until 4).flatMap { i => + Seq( + (i * 3, fileA, (i * 100).toLong, 100L), + (i * 3 + 1, fileB, (i * 150).toLong, 100L), + (i * 3 + 2, fileC, (i * 8192).toLong, 100L)) + } + registerBlobView("multi_file_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "1024" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM multi_file_view + |ORDER BY id""".stripMargin).collect() + assertEquals(12, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala deleted file mode 100644 index 79399b52fa22e..0000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.hudi.blob - -import org.apache.hudi.io.SeekableDataInputStream -import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath, StoragePathFilter, StoragePathInfo} - -import java.io.{InputStream, OutputStream} -import java.net.URI -import java.util.{List => JList} -import java.util.concurrent.atomic.AtomicInteger - -/** - * Test-only {@link HoodieStorage} decorator that counts I/O operations performed by - * batched blob reads. Delegates every method to an underlying storage, but tracks: - * - * - openSeekableCount — number of times openSeekable was called - * - seekCount — number of times seek was called on a returned SeekableDataInputStream - * - * Use this wrapper in tests to assert that batching reduces the number of physical - * I/O operations versus a non-batching configuration. Counters are reset via - * {@link #reset()}. - */ -class CountingHoodieStorage(delegate: HoodieStorage) extends HoodieStorage(delegate.getConf) { - - val openSeekableCount = new AtomicInteger(0) - val seekCount = new AtomicInteger(0) - - def reset(): Unit = { - openSeekableCount.set(0) - seekCount.set(0) - } - - override def openSeekable(path: StoragePath, bufferSize: Int, wrapStream: Boolean): SeekableDataInputStream = { - openSeekableCount.incrementAndGet() - val inner = delegate.openSeekable(path, bufferSize, wrapStream) - new CountingSeekableDataInputStream(inner, seekCount) - } - - // --- pass-through delegation ------------------------------------------------- - - override def newInstance(path: StoragePath, storageConf: StorageConfiguration[_]): HoodieStorage = - new CountingHoodieStorage(delegate.newInstance(path, storageConf)) - - override def getScheme: String = delegate.getScheme - override def getDefaultBlockSize(path: StoragePath): Int = delegate.getDefaultBlockSize(path) - override def getDefaultBufferSize: Int = delegate.getDefaultBufferSize - override def getDefaultReplication(path: StoragePath): Short = delegate.getDefaultReplication(path) - override def getUri: URI = delegate.getUri - - override def create(path: StoragePath, overwrite: Boolean): OutputStream = - delegate.create(path, overwrite) - override def create(path: StoragePath, overwrite: Boolean, bufferSize: Integer, replication: java.lang.Short, sizeThreshold: java.lang.Long): OutputStream = - delegate.create(path, overwrite, bufferSize, replication, sizeThreshold) - - override def open(path: StoragePath): InputStream = delegate.open(path) - override def append(path: StoragePath): OutputStream = delegate.append(path) - - override def exists(path: StoragePath): Boolean = delegate.exists(path) - override def getPathInfo(path: StoragePath): StoragePathInfo = delegate.getPathInfo(path) - override def createDirectory(path: StoragePath): Boolean = delegate.createDirectory(path) - - override def listDirectEntries(path: StoragePath): JList[StoragePathInfo] = delegate.listDirectEntries(path) - override def listFiles(path: StoragePath): JList[StoragePathInfo] = delegate.listFiles(path) - override def listDirectEntries(path: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = - delegate.listDirectEntries(path, filter) - - override def setModificationTime(path: StoragePath, modificationTimeInMillisEpoch: Long): Unit = - delegate.setModificationTime(path, modificationTimeInMillisEpoch) - - override def globEntries(pathPattern: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = - delegate.globEntries(pathPattern, filter) - - override def rename(oldPath: StoragePath, newPath: StoragePath): Boolean = - delegate.rename(oldPath, newPath) - - override def deleteDirectory(path: StoragePath): Boolean = delegate.deleteDirectory(path) - override def deleteFile(path: StoragePath): Boolean = delegate.deleteFile(path) - - override def getFileSystem: AnyRef = delegate.getFileSystem - override def getRawStorage: HoodieStorage = delegate.getRawStorage - - override def close(): Unit = delegate.close() -} - -/** - * Counting wrapper around a {@link SeekableDataInputStream}. Counts seeks; final - * readFully methods on DataInputStream delegate to the underlying stream's - * read(...) which the wrapped stream services directly. - */ -private class CountingSeekableDataInputStream( - delegate: SeekableDataInputStream, - seekCount: AtomicInteger) - extends SeekableDataInputStream(delegate) { - - override def seek(pos: Long): Unit = { - seekCount.incrementAndGet() - delegate.seek(pos) - } - - override def getPos: Long = delegate.getPos - - override def close(): Unit = delegate.close() -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala deleted file mode 100644 index 54541ad1aaf76..0000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.hudi.blob - -import org.apache.hudi.blob.BlobTestHelpers._ -import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils} -import org.apache.hudi.testutils.HoodieClientTestBase - -import org.apache.spark.sql.Row -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{BinaryType, StructField, StructType} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test - -/** - * Integration tests asserting that batched blob reads actually reduce storage I/O. - * - * Each scenario drives BatchedBlobReader.processPartition directly with a - * CountingHoodieStorage wrapper around a real local-FS storage, then asserts - * the resulting openSeekable / seek call counts. - * - * Scope: only the merge-result-to-physical-I/O mapping is exercised here. The - * gap/threshold merging algorithm itself is covered by TestBatchedBlobReaderMerge. - * - * Lives in the same package as BatchedBlobReader to access the package-private - * RowAccessor / RowBuilder implicits. - */ -class TestBatchedBlobReaderIO extends HoodieClientTestBase { - - private val DEFAULT_LOOKAHEAD = 50 - - /** Build rows by routing through Spark so the nested blob struct is encoded correctly. */ - private def buildRows(entries: Seq[(String, Long, Long)]): (Array[Row], StructType) = { - val df = sparkSession.createDataFrame(entries).toDF("external_path", "offset", "length") - .withColumn("data", blobStructCol("data", col("external_path"), col("offset"), col("length"))) - .select("data") - (df.collect(), df.schema) - } - - /** Run processPartition with the counting wrapper and drain the output iterator. */ - private def runWithCounting( - rows: Array[Row], - inputSchema: StructType, - maxGap: Int, - lookahead: Int = DEFAULT_LOOKAHEAD): CountingHoodieStorage = { - val raw: HoodieStorage = HoodieStorageUtils.getStorage(storageConf) - val counting = new CountingHoodieStorage(raw) - try { - val outputSchema = inputSchema.add(StructField(BatchedBlobReader.DATA_COL, BinaryType, nullable = true)) - val reader = new BatchedBlobReader(counting, maxGap, lookahead) - import RowAccessor.rowAccessor - import RowBuilder.rowBuilder - val out = reader.processPartition[Row](rows.iterator, structColIdx = 0, outputSchema) - while (out.hasNext) { - val r = out.next() - r.getAs[Array[Byte]](BatchedBlobReader.DATA_COL) - } - counting - } finally { - counting.close() - } - } - - /** - * End-to-end proof that batching reduces I/O: the same input collapses from - * N opens (maxGap=0, gaps exceed threshold) to a single open (maxGap=4096). - */ - @Test - def testBatchingReducesIOForSingleFile(): Unit = { - val filePath = createTestFile(tempDir, "single-file.bin", 50000) - val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) - val (rows, schema) = buildRows(entries) - - val batched = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(1, batched.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") - assertEquals(1, batched.seekCount.get) - - val nonBatched = runWithCounting(rows, schema, maxGap = 0) - assertEquals(20, nonBatched.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") - assertEquals(20, nonBatched.seekCount.get) - } - - /** - * Mixed small/large gaps in one file produce exactly one I/O per merged group. - * Threshold-boundary correctness is covered by TestBatchedBlobReaderMerge. - */ - @Test - def testMixedGapsProduceOneIOPerMergedGroup(): Unit = { - val filePath = createTestFile(tempDir, "mixed-gaps.bin", 20000) - // Two batchable groups separated by a 9500-byte gap (> maxGap=4096). - val entries = Seq( - (filePath, 0L, 100L), - (filePath, 200L, 100L), - (filePath, 400L, 100L), - (filePath, 10000L, 100L), - (filePath, 10200L, 100L), - (filePath, 10400L, 100L)) - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 4096) - assertEquals(2, counters.openSeekableCount.get, "Two merged ranges -> two opens") - assertEquals(2, counters.seekCount.get) - } - - /** - * Multi-file routing at the I/O level: interleaved input + mixed gap patterns - * across three files should still produce per-file batched opens. - */ - @Test - def testMultiFileBatchingAtIOLevel(): Unit = { - val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous - val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps within threshold - val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps above threshold - - // Interleaved order so we also confirm grouping isn't order-dependent at the I/O layer. - val entries = - (0 until 4).flatMap { i => - Seq( - (fileA, (i * 100).toLong, 100L), // contiguous -> 1 merged - (fileB, (i * 150).toLong, 100L), // 50-byte gaps -> 1 merged - (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges - } - val (rows, schema) = buildRows(entries) - - val counters = runWithCounting(rows, schema, maxGap = 1024) - assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges -> 6 opens") - assertEquals(6, counters.seekCount.get) - } -} From af94452b247e5a3a9eed233663848ed7ee247628 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 14 May 2026 14:45:42 -0700 Subject: [PATCH 5/6] Write Hudi table instead of temp view in batching SQL tests Each scenario now bulk_inserts a Hudi table with the blob column and reads it back via sparkSession.read.format("hudi"), then runs SELECT read_blob(...) against the loaded view. This exercises HoodieFileIndex and BatchedBlobReadExec serialization in addition to the merge/batching path, matching the production read plan rather than a Spark-only temp view. Adds writeHudiBlobTable helper that coerces the input DataFrame to the canonical BlobType schema (nullable reference struct, as required by HoodieSparkSchemaConverters.validateBlobStructure) before save. --- .../hudi/blob/TestReadBlobBatching.scala | 75 ++++++++++++++----- 1 file changed, 55 insertions(+), 20 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala index 1ca5a83da62d3..ae9a06d8fd687 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala @@ -24,11 +24,13 @@ import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test /** - * SQL-driven correctness tests for read_blob() batching scenarios. + * SQL-driven correctness tests for read_blob() batching scenarios on + * Hudi-backed tables. * * Complements: * - TestBatchedBlobReaderMerge: unit-level coverage of the merge algorithm. @@ -36,30 +38,59 @@ import org.junit.jupiter.api.Test * null handling, error paths). * - TestBatchedBlobReader: byte-level correctness via the readBatched API. * - * Scope: exercise the SQL path end-to-end with batching configurations chosen - * to drive specific merge behaviors (high-ratio merging, threshold boundary, - * per-file routing), and assert that the returned bytes match what the input - * specified — i.e. the query succeeds and batching does not corrupt output. + * Each test writes a Hudi table containing a blob column referencing external + * data files, reads it back via the Hudi reader, and runs SELECT read_blob(...) + * with a batching configuration chosen to drive specific merge behaviors. + * Writing through Hudi (rather than a temp view) exercises HoodieFileIndex and + * BatchedBlobReadExec serialization — the production read path. Each test + * asserts the returned bytes match the deterministic pattern at each row's + * recorded offset — i.e. the query succeeds and batching does not corrupt + * output. */ class TestReadBlobBatching extends HoodieClientTestBase { /** - * Build a temp view named `viewName` from (id, file, offset, length) tuples - * with a blob struct column `file_info`. + * Write a Hudi table at `tablePath` with rows (id, external_path, offset, + * file_info) and register the loaded table as the given temp view. + * + * Coerces the input to the canonical BlobType schema with a nullable + * reference struct, which HoodieSparkSchemaConverters.validateBlobStructure + * requires on write. */ - private def registerBlobView( + private def writeHudiBlobTable( + tablePath: String, + tableName: String, viewName: String, entries: Seq[(Int, String, Long, Long)]): Unit = { - sparkSession.createDataFrame(entries) + val rawDf = sparkSession.createDataFrame(entries) .toDF("id", "external_path", "offset", "length") - .withColumn("file_info", blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) .select("id", "external_path", "offset", "file_info") + + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("external_path", StringType, nullable = true), + StructField("offset", LongType, nullable = true), + StructField("file_info", BlobType().asInstanceOf[StructType], + nullable = true, blobMetadata) + )) + val df = sparkSession.createDataFrame(rawDf.rdd, canonicalSchema) + + df.write.format("hudi") + .option("hoodie.table.name", tableName) + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + sparkSession.read.format("hudi").load(tablePath) .createOrReplaceTempView(viewName) } /** - * Run SELECT id, external_path, offset, read_blob(file_info) and assert each - * returned blob equals the deterministic pattern at its recorded offset. + * Validate that each returned blob equals the deterministic pattern at its + * recorded offset. */ private def assertBlobBytesMatchExpected(rows: Array[Row], expectedLength: Int): Unit = { rows.foreach { row => @@ -75,13 +106,14 @@ class TestReadBlobBatching extends HoodieClientTestBase { * * 20 small reads with 50-byte gaps under maxGap=4096 must all merge into a * single underlying range. Validates that high-ratio batching returns the - * correct bytes for every row. + * correct bytes for every row when driven by the Hudi read plan. */ @Test def testManyBlobsInSingleFileBatched(): Unit = { val filePath = createTestFile(tempDir, "many.bin", 50000) + val tablePath = s"$tempDir/many_blobs_table" val entries = (0 until 20).map(i => (i, filePath, (i * 150).toLong, 100L)) - registerBlobView("many_blobs_view", entries) + writeHudiBlobTable(tablePath, "many_blobs", "many_blobs_view", entries) withSparkConfig(sparkSession, Map( "hoodie.blob.batching.max.gap.bytes" -> "4096" @@ -105,6 +137,7 @@ class TestReadBlobBatching extends HoodieClientTestBase { @Test def testMixedGapsInSingleFile(): Unit = { val filePath = createTestFile(tempDir, "mixed.bin", 20000) + val tablePath = s"$tempDir/mixed_gaps_table" val entries = Seq( (1, filePath, 0L, 100L), (2, filePath, 200L, 100L), @@ -112,7 +145,7 @@ class TestReadBlobBatching extends HoodieClientTestBase { (4, filePath, 10000L, 100L), (5, filePath, 10200L, 100L), (6, filePath, 10400L, 100L)) - registerBlobView("mixed_gaps_view", entries) + writeHudiBlobTable(tablePath, "mixed_gaps", "mixed_gaps_view", entries) withSparkConfig(sparkSession, Map( "hoodie.blob.batching.max.gap.bytes" -> "4096" @@ -136,9 +169,10 @@ class TestReadBlobBatching extends HoodieClientTestBase { @Test def testThresholdBoundaryInclusiveMerge(): Unit = { val filePath = createTestFile(tempDir, "boundary.bin", 5000) + val tablePath = s"$tempDir/boundary_table" // Gap between consecutive reads = 1024 bytes (1124 - 100, etc.). val entries = (0 until 4).map(i => (i, filePath, (i * 1124).toLong, 100L)) - registerBlobView("boundary_view", entries) + writeHudiBlobTable(tablePath, "boundary", "boundary_view", entries) withSparkConfig(sparkSession, Map( "hoodie.blob.batching.max.gap.bytes" -> "1024" @@ -156,15 +190,16 @@ class TestReadBlobBatching extends HoodieClientTestBase { * Scenario (4) — multi-file with interleaved input + mixed gap patterns. * * Three files with different access patterns (contiguous, small gaps within - * threshold, large gaps above threshold) queried in interleaved order. - * Confirms per-file routing through the SQL planner returns correct bytes - * regardless of input ordering. + * threshold, large gaps above threshold) written into one Hudi table in + * interleaved order. Confirms per-file routing through the Hudi read plan + * returns correct bytes regardless of input ordering. */ @Test def testMultiFileInterleavedWithMixedGaps(): Unit = { val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous val fileB = createTestFile(tempDir, "fileB.bin", 2000) // 50-byte gaps val fileC = createTestFile(tempDir, "fileC.bin", 50000) // 8KB gaps + val tablePath = s"$tempDir/multi_file_table" val entries = (0 until 4).flatMap { i => Seq( @@ -172,7 +207,7 @@ class TestReadBlobBatching extends HoodieClientTestBase { (i * 3 + 1, fileB, (i * 150).toLong, 100L), (i * 3 + 2, fileC, (i * 8192).toLong, 100L)) } - registerBlobView("multi_file_view", entries) + writeHudiBlobTable(tablePath, "multi_file", "multi_file_view", entries) withSparkConfig(sparkSession, Map( "hoodie.blob.batching.max.gap.bytes" -> "1024" From 5f4faad4bdcf57d57a773589cbfbf88f03183774 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 15 May 2026 11:31:10 -0700 Subject: [PATCH 6/6] Use individual imports instead of wildcard imports --- .../scala/org/apache/hudi/blob/TestReadBlobBatching.scala | 8 ++++---- .../spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala index ae9a06d8fd687..9cbd8e908220a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala @@ -19,13 +19,13 @@ package org.apache.hudi.blob -import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.blob.BlobTestHelpers.{assertBytesContent, blobMetadata, blobStructCol, createTestFile, withSparkConfig} import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.Row -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions._ +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{BlobType, IntegerType, LongType, StringType, StructField, StructType} +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala index 73813e9de33e3..219f487a3c662 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi.blob import org.apache.spark.sql.Row -import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test /**