diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 2143edd92bcb2..06468d0c4fa23 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -33,7 +33,7 @@ env: JAVA_UT_FILTER1: -Dtest=!TestCOWDataSource,!TestMORDataSource,!TestHoodieFileSystemViews JAVA_UT_FILTER2: -Dtest=TestCOWDataSource,TestMORDataSource,TestHoodieFileSystemViews SCALA_TEST_DML_FILTER: -DwildcardSuites=org.apache.spark.sql.hudi.dml - SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature + SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.blob,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature FLINK_IT_FILTER1: -Dit.test=ITTestHoodieDataSource FLINK_IT_FILTER2: -Dit.test=!ITTestHoodieDataSource diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml index cccd89ec6cc73..3edac6046b1a3 100644 --- a/azure-pipelines-20230430.yml +++ b/azure-pipelines-20230430.yml @@ -75,6 +75,7 @@ parameters: - 'org.apache.spark.sql.avro' - 'org.apache.spark.sql.execution' - 'org.apache.spark.sql.hudi.analysis' + - 'org.apache.spark.sql.hudi.blob' - 'org.apache.spark.sql.hudi.catalog' - 'org.apache.spark.sql.hudi.command' - 'org.apache.spark.sql.hudi.common' diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala index 0328cdd0c5c22..a1c299cf2611d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala @@ -274,7 +274,7 @@ class BatchedBlobReader( * @param rows Sequence of row information * @return Sequence of merged ranges */ - private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + private[blob] def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { // Group by file path val byFile = rows.groupBy(_.filePath) @@ -299,7 +299,7 @@ class BatchedBlobReader( * @param maxGap Maximum gap to consider for merging * @return Sequence of merged ranges */ - private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + private[blob] def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { val result = ArrayBuffer[MergedRange[R]]() var currentFilePath: String = null diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java index 2937b59dc529b..ddce176eb7384 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java @@ -53,6 +53,7 @@ public class TestSparkSqlHudiPackageStructure { */ private static final Set ALLOWED_PACKAGES = new HashSet<>(Arrays.asList( "org.apache.spark.sql.hudi.analysis", + "org.apache.spark.sql.hudi.blob", "org.apache.spark.sql.hudi.catalog", "org.apache.spark.sql.hudi.command", "org.apache.spark.sql.hudi.common", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala new file mode 100644 index 0000000000000..79399b52fa22e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/CountingHoodieStorage.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath, StoragePathFilter, StoragePathInfo} + +import java.io.{InputStream, OutputStream} +import java.net.URI +import java.util.{List => JList} +import java.util.concurrent.atomic.AtomicInteger + +/** + * Test-only {@link HoodieStorage} decorator that counts I/O operations performed by + * batched blob reads. Delegates every method to an underlying storage, but tracks: + * + * - openSeekableCount — number of times openSeekable was called + * - seekCount — number of times seek was called on a returned SeekableDataInputStream + * + * Use this wrapper in tests to assert that batching reduces the number of physical + * I/O operations versus a non-batching configuration. Counters are reset via + * {@link #reset()}. + */ +class CountingHoodieStorage(delegate: HoodieStorage) extends HoodieStorage(delegate.getConf) { + + val openSeekableCount = new AtomicInteger(0) + val seekCount = new AtomicInteger(0) + + def reset(): Unit = { + openSeekableCount.set(0) + seekCount.set(0) + } + + override def openSeekable(path: StoragePath, bufferSize: Int, wrapStream: Boolean): SeekableDataInputStream = { + openSeekableCount.incrementAndGet() + val inner = delegate.openSeekable(path, bufferSize, wrapStream) + new CountingSeekableDataInputStream(inner, seekCount) + } + + // --- pass-through delegation ------------------------------------------------- + + override def newInstance(path: StoragePath, storageConf: StorageConfiguration[_]): HoodieStorage = + new CountingHoodieStorage(delegate.newInstance(path, storageConf)) + + override def getScheme: String = delegate.getScheme + override def getDefaultBlockSize(path: StoragePath): Int = delegate.getDefaultBlockSize(path) + override def getDefaultBufferSize: Int = delegate.getDefaultBufferSize + override def getDefaultReplication(path: StoragePath): Short = delegate.getDefaultReplication(path) + override def getUri: URI = delegate.getUri + + override def create(path: StoragePath, overwrite: Boolean): OutputStream = + delegate.create(path, overwrite) + override def create(path: StoragePath, overwrite: Boolean, bufferSize: Integer, replication: java.lang.Short, sizeThreshold: java.lang.Long): OutputStream = + delegate.create(path, overwrite, bufferSize, replication, sizeThreshold) + + override def open(path: StoragePath): InputStream = delegate.open(path) + override def append(path: StoragePath): OutputStream = delegate.append(path) + + override def exists(path: StoragePath): Boolean = delegate.exists(path) + override def getPathInfo(path: StoragePath): StoragePathInfo = delegate.getPathInfo(path) + override def createDirectory(path: StoragePath): Boolean = delegate.createDirectory(path) + + override def listDirectEntries(path: StoragePath): JList[StoragePathInfo] = delegate.listDirectEntries(path) + override def listFiles(path: StoragePath): JList[StoragePathInfo] = delegate.listFiles(path) + override def listDirectEntries(path: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = + delegate.listDirectEntries(path, filter) + + override def setModificationTime(path: StoragePath, modificationTimeInMillisEpoch: Long): Unit = + delegate.setModificationTime(path, modificationTimeInMillisEpoch) + + override def globEntries(pathPattern: StoragePath, filter: StoragePathFilter): JList[StoragePathInfo] = + delegate.globEntries(pathPattern, filter) + + override def rename(oldPath: StoragePath, newPath: StoragePath): Boolean = + delegate.rename(oldPath, newPath) + + override def deleteDirectory(path: StoragePath): Boolean = delegate.deleteDirectory(path) + override def deleteFile(path: StoragePath): Boolean = delegate.deleteFile(path) + + override def getFileSystem: AnyRef = delegate.getFileSystem + override def getRawStorage: HoodieStorage = delegate.getRawStorage + + override def close(): Unit = delegate.close() +} + +/** + * Counting wrapper around a {@link SeekableDataInputStream}. Counts seeks; final + * readFully methods on DataInputStream delegate to the underlying stream's + * read(...) which the wrapped stream services directly. + */ +private class CountingSeekableDataInputStream( + delegate: SeekableDataInputStream, + seekCount: AtomicInteger) + extends SeekableDataInputStream(delegate) { + + override def seek(pos: Long): Unit = { + seekCount.incrementAndGet() + delegate.seek(pos) + } + + override def getPos: Long = delegate.getPos + + override def close(): Unit = delegate.close() +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala new file mode 100644 index 0000000000000..54541ad1aaf76 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderIO.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers._ +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils} +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{BinaryType, StructField, StructType} +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * Integration tests asserting that batched blob reads actually reduce storage I/O. + * + * Each scenario drives BatchedBlobReader.processPartition directly with a + * CountingHoodieStorage wrapper around a real local-FS storage, then asserts + * the resulting openSeekable / seek call counts. + * + * Scope: only the merge-result-to-physical-I/O mapping is exercised here. The + * gap/threshold merging algorithm itself is covered by TestBatchedBlobReaderMerge. + * + * Lives in the same package as BatchedBlobReader to access the package-private + * RowAccessor / RowBuilder implicits. + */ +class TestBatchedBlobReaderIO extends HoodieClientTestBase { + + private val DEFAULT_LOOKAHEAD = 50 + + /** Build rows by routing through Spark so the nested blob struct is encoded correctly. */ + private def buildRows(entries: Seq[(String, Long, Long)]): (Array[Row], StructType) = { + val df = sparkSession.createDataFrame(entries).toDF("external_path", "offset", "length") + .withColumn("data", blobStructCol("data", col("external_path"), col("offset"), col("length"))) + .select("data") + (df.collect(), df.schema) + } + + /** Run processPartition with the counting wrapper and drain the output iterator. */ + private def runWithCounting( + rows: Array[Row], + inputSchema: StructType, + maxGap: Int, + lookahead: Int = DEFAULT_LOOKAHEAD): CountingHoodieStorage = { + val raw: HoodieStorage = HoodieStorageUtils.getStorage(storageConf) + val counting = new CountingHoodieStorage(raw) + try { + val outputSchema = inputSchema.add(StructField(BatchedBlobReader.DATA_COL, BinaryType, nullable = true)) + val reader = new BatchedBlobReader(counting, maxGap, lookahead) + import RowAccessor.rowAccessor + import RowBuilder.rowBuilder + val out = reader.processPartition[Row](rows.iterator, structColIdx = 0, outputSchema) + while (out.hasNext) { + val r = out.next() + r.getAs[Array[Byte]](BatchedBlobReader.DATA_COL) + } + counting + } finally { + counting.close() + } + } + + /** + * End-to-end proof that batching reduces I/O: the same input collapses from + * N opens (maxGap=0, gaps exceed threshold) to a single open (maxGap=4096). + */ + @Test + def testBatchingReducesIOForSingleFile(): Unit = { + val filePath = createTestFile(tempDir, "single-file.bin", 50000) + val entries = (0 until 20).map(i => (filePath, (i * 150).toLong, 100L)) + val (rows, schema) = buildRows(entries) + + val batched = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(1, batched.openSeekableCount.get, "20 blobs with 50-byte gaps should batch to one open") + assertEquals(1, batched.seekCount.get) + + val nonBatched = runWithCounting(rows, schema, maxGap = 0) + assertEquals(20, nonBatched.openSeekableCount.get, "maxGap=0 with 50-byte gaps should produce one open per blob") + assertEquals(20, nonBatched.seekCount.get) + } + + /** + * Mixed small/large gaps in one file produce exactly one I/O per merged group. + * Threshold-boundary correctness is covered by TestBatchedBlobReaderMerge. + */ + @Test + def testMixedGapsProduceOneIOPerMergedGroup(): Unit = { + val filePath = createTestFile(tempDir, "mixed-gaps.bin", 20000) + // Two batchable groups separated by a 9500-byte gap (> maxGap=4096). + val entries = Seq( + (filePath, 0L, 100L), + (filePath, 200L, 100L), + (filePath, 400L, 100L), + (filePath, 10000L, 100L), + (filePath, 10200L, 100L), + (filePath, 10400L, 100L)) + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 4096) + assertEquals(2, counters.openSeekableCount.get, "Two merged ranges -> two opens") + assertEquals(2, counters.seekCount.get) + } + + /** + * Multi-file routing at the I/O level: interleaved input + mixed gap patterns + * across three files should still produce per-file batched opens. + */ + @Test + def testMultiFileBatchingAtIOLevel(): Unit = { + val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous + val fileB = createTestFile(tempDir, "fileB.bin", 2000) // small gaps within threshold + val fileC = createTestFile(tempDir, "fileC.bin", 50000) // large gaps above threshold + + // Interleaved order so we also confirm grouping isn't order-dependent at the I/O layer. + val entries = + (0 until 4).flatMap { i => + Seq( + (fileA, (i * 100).toLong, 100L), // contiguous -> 1 merged + (fileB, (i * 150).toLong, 100L), // 50-byte gaps -> 1 merged + (fileC, (i * 8192).toLong, 100L)) // 8KB gaps -> 4 ranges + } + val (rows, schema) = buildRows(entries) + + val counters = runWithCounting(rows, schema, maxGap = 1024) + assertEquals(6, counters.openSeekableCount.get, "1 (A) + 1 (B) + 4 (C) merged ranges -> 6 opens") + assertEquals(6, counters.seekCount.get) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala new file mode 100644 index 0000000000000..73813e9de33e3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test + +/** + * Direct unit tests for the merge algorithm in {@link BatchedBlobReader}. + * + * These tests bypass Spark and storage entirely: they construct a reader with + * a null storage (the merge methods do not touch I/O) and call mergeRanges / + * identifyConsecutiveRanges with crafted RowInfo inputs to assert the + * structure of the merged output (counts, boundaries, ordering, grouping). + * + * This complements TestBatchedBlobReader (which verifies data correctness end-to-end) + * by proving the batching algorithm produces the expected merged ranges for + * each input pattern. + */ +class TestBatchedBlobReaderMerge { + + private def reader(maxGapBytes: Int = 4096) = + new BatchedBlobReader(storage = null, maxGapBytes = maxGapBytes, lookaheadRows = 50) + + private def row(filePath: String, offset: Long, length: Long, index: Long = 0L): RowInfo[Row] = + RowInfo[Row]( + originalRow = null.asInstanceOf[Row], + filePath = filePath, + offset = offset, + length = length, + index = index) + + @Test + def testEmptyInputProducesNoRanges(): Unit = { + val merged = reader().mergeRanges(Seq.empty[RowInfo[Row]], maxGap = 100) + assertTrue(merged.isEmpty) + } + + @Test + def testSingleRowProducesSingleRange(): Unit = { + val merged = reader().mergeRanges(Seq(row("/f", 1000, 200, index = 7)), maxGap = 100) + assertEquals(1, merged.size) + val r = merged.head + assertEquals("/f", r.filePath) + assertEquals(1000L, r.startOffset) + assertEquals(1200L, r.endOffset) + assertEquals(1, r.rows.size) + assertEquals(7L, r.rows.head.index) + } + + @Test + def testContiguousRangesMergeIntoOne(): Unit = { + // Zero-gap reads: [0,100), [100,100), [200,100) — gap == 0 <= maxGap == 0 → merge + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1), + row("/f", 200, 100, index = 2)) + val merged = reader().mergeRanges(rows, maxGap = 0) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + assertEquals(3, merged.head.rows.size) + } + + @Test + def testGapEqualToThresholdMerges(): Unit = { + // Gaps of exactly maxGap → inclusive boundary, merge into one + val rows = Seq( + row("/f", 0, 100), + row("/f", 1124, 100), // gap = 1024 == maxGap + row("/f", 2248, 100)) // gap = 1024 == maxGap + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(2348L, merged.head.endOffset) + } + + @Test + def testGapOneOverThresholdSplits(): Unit = { + // Gap of maxGap + 1 → split + val rows = Seq( + row("/f", 0, 100), + row("/f", 1125, 100)) // gap = 1025 > maxGap = 1024 + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(2, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(100L, merged.head.endOffset) + assertEquals(1125L, merged(1).startOffset) + assertEquals(1225L, merged(1).endOffset) + } + + @Test + def testLargeGapsProduceSeparateRanges(): Unit = { + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 10000, 100, index = 1), + row("/f", 20000, 100, index = 2), + row("/f", 30000, 100, index = 3)) + val merged = reader().mergeRanges(rows, maxGap = 1000) + assertEquals(4, merged.size) + merged.foreach(r => assertEquals(1, r.rows.size)) + } + + @Test + def testUnsortedInputIsSortedBeforeMerge(): Unit = { + // identifyConsecutiveRanges sorts by offset within each file + val rows = Seq( + row("/f", 200, 100, index = 2), + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1)) + val merged = reader(maxGapBytes = 0).identifyConsecutiveRanges(rows) + assertEquals(1, merged.size, "Single merged range expected after sort + merge") + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + // Internal rows in the merged range follow sorted order + val offsetsInMerged = merged.head.rows.map(_.offset) + assertEquals(Seq(0L, 100L, 200L), offsetsInMerged) + } + + @Test + def testMultipleFilesProduceRangePerFile(): Unit = { + val rows = Seq( + row("/a", 0, 100, index = 0), + row("/b", 0, 100, index = 1), + row("/a", 100, 100, index = 2), + row("/b", 100, 100, index = 3)) + val merged = reader(maxGapBytes = 4096).identifyConsecutiveRanges(rows) + assertEquals(2, merged.size, "One merged range per file") + val byFile = merged.groupBy(_.filePath) + assertEquals(Set("/a", "/b"), byFile.keySet) + assertEquals(1, byFile("/a").size) + assertEquals(1, byFile("/b").size) + // Each file's merged range covers offsets 0..200 + byFile.values.foreach { rs => + assertEquals(0L, rs.head.startOffset) + assertEquals(200L, rs.head.endOffset) + assertEquals(2, rs.head.rows.size) + } + } + + @Test + def testRowIndicesPreservedInMergedRange(): Unit = { + // Indices retained so the downstream sort-by-index can reconstruct input order + val rows = Seq( + row("/f", 0, 100, index = 42), + row("/f", 100, 100, index = 7), + row("/f", 200, 100, index = 19)) + val merged = reader().mergeRanges(rows, maxGap = 4096) + assertEquals(1, merged.size) + val indices = merged.head.rows.map(_.index) + assertEquals(Seq(42L, 7L, 19L), indices) + } + + @Test + def testOverlappingRangesThrow(): Unit = { + // [0,100) followed by [50,100) -> overlap + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 50, 100, index = 1)) + val ex = assertThrows( + classOf[IllegalArgumentException], + () => reader().mergeRanges(rows, maxGap = 4096)) + assertTrue(ex.getMessage.contains("Overlapping blob ranges detected")) + } +}