diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 2143edd92bcb2..06468d0c4fa23 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -33,7 +33,7 @@ env: JAVA_UT_FILTER1: -Dtest=!TestCOWDataSource,!TestMORDataSource,!TestHoodieFileSystemViews JAVA_UT_FILTER2: -Dtest=TestCOWDataSource,TestMORDataSource,TestHoodieFileSystemViews SCALA_TEST_DML_FILTER: -DwildcardSuites=org.apache.spark.sql.hudi.dml - SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature + SCALA_TEST_OTHERS_FILTER: -DwildcardSuites=org.apache.hudi,org.apache.spark.hudi,org.apache.spark.sql.avro,org.apache.spark.sql.execution,org.apache.spark.sql.hudi.analysis,org.apache.spark.sql.hudi.blob,org.apache.spark.sql.hudi.catalog,org.apache.spark.sql.hudi.command,org.apache.spark.sql.hudi.common,org.apache.spark.sql.hudi.ddl,org.apache.spark.sql.hudi.procedure,org.apache.spark.sql.hudi.feature FLINK_IT_FILTER1: -Dit.test=ITTestHoodieDataSource FLINK_IT_FILTER2: -Dit.test=!ITTestHoodieDataSource diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml index cccd89ec6cc73..3edac6046b1a3 100644 --- a/azure-pipelines-20230430.yml +++ b/azure-pipelines-20230430.yml @@ -75,6 +75,7 @@ parameters: - 'org.apache.spark.sql.avro' - 'org.apache.spark.sql.execution' - 'org.apache.spark.sql.hudi.analysis' + - 'org.apache.spark.sql.hudi.blob' - 'org.apache.spark.sql.hudi.catalog' - 'org.apache.spark.sql.hudi.command' - 'org.apache.spark.sql.hudi.common' diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala index 0328cdd0c5c22..a1c299cf2611d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala @@ -274,7 +274,7 @@ class BatchedBlobReader( * @param rows Sequence of row information * @return Sequence of merged ranges */ - private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + private[blob] def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { // Group by file path val byFile = rows.groupBy(_.filePath) @@ -299,7 +299,7 @@ class BatchedBlobReader( * @param maxGap Maximum gap to consider for merging * @return Sequence of merged ranges */ - private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + private[blob] def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { val result = ArrayBuffer[MergedRange[R]]() var currentFilePath: String = null diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java index 2937b59dc529b..ddce176eb7384 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestSparkSqlHudiPackageStructure.java @@ -53,6 +53,7 @@ public class TestSparkSqlHudiPackageStructure { */ private static final Set ALLOWED_PACKAGES = new HashSet<>(Arrays.asList( "org.apache.spark.sql.hudi.analysis", + "org.apache.spark.sql.hudi.blob", "org.apache.spark.sql.hudi.catalog", "org.apache.spark.sql.hudi.command", "org.apache.spark.sql.hudi.common", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala new file mode 100644 index 0000000000000..9cbd8e908220a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobBatching.scala @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.blob + +import org.apache.hudi.blob.BlobTestHelpers.{assertBytesContent, blobMetadata, blobStructCol, createTestFile, withSparkConfig} +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{BlobType, IntegerType, LongType, StringType, StructField, StructType} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +/** + * SQL-driven correctness tests for read_blob() batching scenarios on + * Hudi-backed tables. + * + * Complements: + * - TestBatchedBlobReaderMerge: unit-level coverage of the merge algorithm. + * - TestReadBlobSQL: general SQL surface (joins, subqueries, config plumbing, + * null handling, error paths). + * - TestBatchedBlobReader: byte-level correctness via the readBatched API. + * + * Each test writes a Hudi table containing a blob column referencing external + * data files, reads it back via the Hudi reader, and runs SELECT read_blob(...) + * with a batching configuration chosen to drive specific merge behaviors. + * Writing through Hudi (rather than a temp view) exercises HoodieFileIndex and + * BatchedBlobReadExec serialization — the production read path. Each test + * asserts the returned bytes match the deterministic pattern at each row's + * recorded offset — i.e. the query succeeds and batching does not corrupt + * output. + */ +class TestReadBlobBatching extends HoodieClientTestBase { + + /** + * Write a Hudi table at `tablePath` with rows (id, external_path, offset, + * file_info) and register the loaded table as the given temp view. + * + * Coerces the input to the canonical BlobType schema with a nullable + * reference struct, which HoodieSparkSchemaConverters.validateBlobStructure + * requires on write. + */ + private def writeHudiBlobTable( + tablePath: String, + tableName: String, + viewName: String, + entries: Seq[(Int, String, Long, Long)]): Unit = { + val rawDf = sparkSession.createDataFrame(entries) + .toDF("id", "external_path", "offset", "length") + .withColumn("file_info", + blobStructCol("file_info", col("external_path"), col("offset"), col("length"))) + .select("id", "external_path", "offset", "file_info") + + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("external_path", StringType, nullable = true), + StructField("offset", LongType, nullable = true), + StructField("file_info", BlobType().asInstanceOf[StructType], + nullable = true, blobMetadata) + )) + val df = sparkSession.createDataFrame(rawDf.rdd, canonicalSchema) + + df.write.format("hudi") + .option("hoodie.table.name", tableName) + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + sparkSession.read.format("hudi").load(tablePath) + .createOrReplaceTempView(viewName) + } + + /** + * Validate that each returned blob equals the deterministic pattern at its + * recorded offset. + */ + private def assertBlobBytesMatchExpected(rows: Array[Row], expectedLength: Int): Unit = { + rows.foreach { row => + val offset = row.getAs[Long]("offset") + val data = row.getAs[Array[Byte]]("data") + assertEquals(expectedLength, data.length, s"length mismatch at offset $offset") + assertBytesContent(data, expectedOffset = offset.toInt) + } + } + + /** + * Scenario (1) — many blobs in a single file batch into one read. + * + * 20 small reads with 50-byte gaps under maxGap=4096 must all merge into a + * single underlying range. Validates that high-ratio batching returns the + * correct bytes for every row when driven by the Hudi read plan. + */ + @Test + def testManyBlobsInSingleFileBatched(): Unit = { + val filePath = createTestFile(tempDir, "many.bin", 50000) + val tablePath = s"$tempDir/many_blobs_table" + val entries = (0 until 20).map(i => (i, filePath, (i * 150).toLong, 100L)) + writeHudiBlobTable(tablePath, "many_blobs", "many_blobs_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "4096" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM many_blobs_view + |ORDER BY id""".stripMargin).collect() + assertEquals(20, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } + + /** + * Scenario (2) — mixed small and large gaps in one file. + * + * Two batchable groups separated by a 9500-byte gap (above maxGap=4096) must + * produce correct bytes for all six rows regardless of the gap-driven split + * inside the reader. + */ + @Test + def testMixedGapsInSingleFile(): Unit = { + val filePath = createTestFile(tempDir, "mixed.bin", 20000) + val tablePath = s"$tempDir/mixed_gaps_table" + val entries = Seq( + (1, filePath, 0L, 100L), + (2, filePath, 200L, 100L), + (3, filePath, 400L, 100L), + (4, filePath, 10000L, 100L), + (5, filePath, 10200L, 100L), + (6, filePath, 10400L, 100L)) + writeHudiBlobTable(tablePath, "mixed_gaps", "mixed_gaps_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "4096" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM mixed_gaps_view + |ORDER BY id""".stripMargin).collect() + assertEquals(6, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } + + /** + * Scenario (3) — threshold boundary at SQL level. + * + * Four reads with gaps of exactly maxGap (1024). The merge algorithm treats + * gap == maxGap as inclusive (covered by TestBatchedBlobReaderMerge); this + * test confirms the SQL surface returns correct bytes at that boundary. + */ + @Test + def testThresholdBoundaryInclusiveMerge(): Unit = { + val filePath = createTestFile(tempDir, "boundary.bin", 5000) + val tablePath = s"$tempDir/boundary_table" + // Gap between consecutive reads = 1024 bytes (1124 - 100, etc.). + val entries = (0 until 4).map(i => (i, filePath, (i * 1124).toLong, 100L)) + writeHudiBlobTable(tablePath, "boundary", "boundary_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "1024" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM boundary_view + |ORDER BY id""".stripMargin).collect() + assertEquals(4, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } + + /** + * Scenario (4) — multi-file with interleaved input + mixed gap patterns. + * + * Three files with different access patterns (contiguous, small gaps within + * threshold, large gaps above threshold) written into one Hudi table in + * interleaved order. Confirms per-file routing through the Hudi read plan + * returns correct bytes regardless of input ordering. + */ + @Test + def testMultiFileInterleavedWithMixedGaps(): Unit = { + val fileA = createTestFile(tempDir, "fileA.bin", 1000) // contiguous + val fileB = createTestFile(tempDir, "fileB.bin", 2000) // 50-byte gaps + val fileC = createTestFile(tempDir, "fileC.bin", 50000) // 8KB gaps + val tablePath = s"$tempDir/multi_file_table" + + val entries = (0 until 4).flatMap { i => + Seq( + (i * 3, fileA, (i * 100).toLong, 100L), + (i * 3 + 1, fileB, (i * 150).toLong, 100L), + (i * 3 + 2, fileC, (i * 8192).toLong, 100L)) + } + writeHudiBlobTable(tablePath, "multi_file", "multi_file_view", entries) + + withSparkConfig(sparkSession, Map( + "hoodie.blob.batching.max.gap.bytes" -> "1024" + )) { + val rows = sparkSession.sql( + """SELECT id, external_path, offset, read_blob(file_info) AS data + |FROM multi_file_view + |ORDER BY id""".stripMargin).collect() + assertEquals(12, rows.length) + assertBlobBytesMatchExpected(rows, expectedLength = 100) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala new file mode 100644 index 0000000000000..219f487a3c662 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/blob/TestBatchedBlobReaderMerge.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.api.Test + +/** + * Direct unit tests for the merge algorithm in {@link BatchedBlobReader}. + * + * These tests bypass Spark and storage entirely: they construct a reader with + * a null storage (the merge methods do not touch I/O) and call mergeRanges / + * identifyConsecutiveRanges with crafted RowInfo inputs to assert the + * structure of the merged output (counts, boundaries, ordering, grouping). + * + * This complements TestBatchedBlobReader (which verifies data correctness end-to-end) + * by proving the batching algorithm produces the expected merged ranges for + * each input pattern. + */ +class TestBatchedBlobReaderMerge { + + private def reader(maxGapBytes: Int = 4096) = + new BatchedBlobReader(storage = null, maxGapBytes = maxGapBytes, lookaheadRows = 50) + + private def row(filePath: String, offset: Long, length: Long, index: Long = 0L): RowInfo[Row] = + RowInfo[Row]( + originalRow = null.asInstanceOf[Row], + filePath = filePath, + offset = offset, + length = length, + index = index) + + @Test + def testEmptyInputProducesNoRanges(): Unit = { + val merged = reader().mergeRanges(Seq.empty[RowInfo[Row]], maxGap = 100) + assertTrue(merged.isEmpty) + } + + @Test + def testSingleRowProducesSingleRange(): Unit = { + val merged = reader().mergeRanges(Seq(row("/f", 1000, 200, index = 7)), maxGap = 100) + assertEquals(1, merged.size) + val r = merged.head + assertEquals("/f", r.filePath) + assertEquals(1000L, r.startOffset) + assertEquals(1200L, r.endOffset) + assertEquals(1, r.rows.size) + assertEquals(7L, r.rows.head.index) + } + + @Test + def testContiguousRangesMergeIntoOne(): Unit = { + // Zero-gap reads: [0,100), [100,100), [200,100) — gap == 0 <= maxGap == 0 → merge + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1), + row("/f", 200, 100, index = 2)) + val merged = reader().mergeRanges(rows, maxGap = 0) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + assertEquals(3, merged.head.rows.size) + } + + @Test + def testGapEqualToThresholdMerges(): Unit = { + // Gaps of exactly maxGap → inclusive boundary, merge into one + val rows = Seq( + row("/f", 0, 100), + row("/f", 1124, 100), // gap = 1024 == maxGap + row("/f", 2248, 100)) // gap = 1024 == maxGap + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(1, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(2348L, merged.head.endOffset) + } + + @Test + def testGapOneOverThresholdSplits(): Unit = { + // Gap of maxGap + 1 → split + val rows = Seq( + row("/f", 0, 100), + row("/f", 1125, 100)) // gap = 1025 > maxGap = 1024 + val merged = reader().mergeRanges(rows, maxGap = 1024) + assertEquals(2, merged.size) + assertEquals(0L, merged.head.startOffset) + assertEquals(100L, merged.head.endOffset) + assertEquals(1125L, merged(1).startOffset) + assertEquals(1225L, merged(1).endOffset) + } + + @Test + def testLargeGapsProduceSeparateRanges(): Unit = { + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 10000, 100, index = 1), + row("/f", 20000, 100, index = 2), + row("/f", 30000, 100, index = 3)) + val merged = reader().mergeRanges(rows, maxGap = 1000) + assertEquals(4, merged.size) + merged.foreach(r => assertEquals(1, r.rows.size)) + } + + @Test + def testUnsortedInputIsSortedBeforeMerge(): Unit = { + // identifyConsecutiveRanges sorts by offset within each file + val rows = Seq( + row("/f", 200, 100, index = 2), + row("/f", 0, 100, index = 0), + row("/f", 100, 100, index = 1)) + val merged = reader(maxGapBytes = 0).identifyConsecutiveRanges(rows) + assertEquals(1, merged.size, "Single merged range expected after sort + merge") + assertEquals(0L, merged.head.startOffset) + assertEquals(300L, merged.head.endOffset) + // Internal rows in the merged range follow sorted order + val offsetsInMerged = merged.head.rows.map(_.offset) + assertEquals(Seq(0L, 100L, 200L), offsetsInMerged) + } + + @Test + def testMultipleFilesProduceRangePerFile(): Unit = { + val rows = Seq( + row("/a", 0, 100, index = 0), + row("/b", 0, 100, index = 1), + row("/a", 100, 100, index = 2), + row("/b", 100, 100, index = 3)) + val merged = reader(maxGapBytes = 4096).identifyConsecutiveRanges(rows) + assertEquals(2, merged.size, "One merged range per file") + val byFile = merged.groupBy(_.filePath) + assertEquals(Set("/a", "/b"), byFile.keySet) + assertEquals(1, byFile("/a").size) + assertEquals(1, byFile("/b").size) + // Each file's merged range covers offsets 0..200 + byFile.values.foreach { rs => + assertEquals(0L, rs.head.startOffset) + assertEquals(200L, rs.head.endOffset) + assertEquals(2, rs.head.rows.size) + } + } + + @Test + def testRowIndicesPreservedInMergedRange(): Unit = { + // Indices retained so the downstream sort-by-index can reconstruct input order + val rows = Seq( + row("/f", 0, 100, index = 42), + row("/f", 100, 100, index = 7), + row("/f", 200, 100, index = 19)) + val merged = reader().mergeRanges(rows, maxGap = 4096) + assertEquals(1, merged.size) + val indices = merged.head.rows.map(_.index) + assertEquals(Seq(42L, 7L, 19L), indices) + } + + @Test + def testOverlappingRangesThrow(): Unit = { + // [0,100) followed by [50,100) -> overlap + val rows = Seq( + row("/f", 0, 100, index = 0), + row("/f", 50, 100, index = 1)) + val ex = assertThrows( + classOf[IllegalArgumentException], + () => reader().mergeRanges(rows, maxGap = 4096)) + assertTrue(ex.getMessage.contains("Overlapping blob ranges detected")) + } +}