test(spark): Add tests for batch-mode blob reads#18736
Conversation
…tching Existing tests for the BatchedBlobReader (PR apache#18098) only assert byte correctness of the data returned, so they cannot detect a regression that causes the batching optimization to stop reducing I/O. Add two test classes that close that gap: - TestBatchedBlobReaderMerge: direct unit tests on mergeRanges and identifyConsecutiveRanges (newly package-private), asserting merged range counts, gap-threshold boundaries, multi-file grouping, sort, index preservation, and overlap rejection. No Spark, no I/O. - TestBatchedBlobReaderIO: integration tests that drive processPartition with a CountingHoodieStorage wrapper around a real storage, asserting openSeekable/seek counts across four scenarios — many blobs in one file, contiguous zero-gap blobs, threshold-controlled small/large gaps including the inclusive boundary, and multi-file queries (per-file batching, mixed gap patterns, interleaved input order).
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds unit tests for BatchedBlobReader's merge algorithm and integration tests that count I/O operations via a CountingHoodieStorage decorator, plus a minimal private → private[blob] visibility relaxation on two helper methods. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One naming nit in the merge-test helper — everything else reads cleanly.
cc @yihua
| private def reader(maxGapBytes: Int = 4096) = | ||
| new BatchedBlobReader(storage = null, maxGapBytes = maxGapBytes, lookaheadRows = 50) | ||
|
|
||
| private def row(filePath: String, offset: Long, length: Long, index: Long = 0L): RowInfo[Row] = |
There was a problem hiding this comment.
🤖 nit: the helper is named row but returns a RowInfo[Row], and org.apache.spark.sql.Row is imported at the top — could you rename it to rowInfo to avoid the double-take when reading call sites like Seq(row("/f", 1000, 200))?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
TestSparkSqlHudiPackageStructure rejects Scala test classes under org.apache.spark.sql.hudi.* that are not in the curated allow-list, which is mirrored in two CI configs that drive the wildcard suite filter. The new BatchedBlobReader merge/IO tests live in the .blob package so they can reach private[blob] helpers (mergeRanges, identifyConsecutiveRanges, RowAccessor) without widening internal visibility - matching the same-package pattern already used for the .analysis, .catalog, and .command production sub-packages. Add 'org.apache.spark.sql.hudi.blob' to all three places: - ALLOWED_PACKAGES in TestSparkSqlHudiPackageStructure - job6HudiSparkDdlOthersWildcardSuites in azure-pipelines-20230430.yml - SCALA_TEST_OTHERS_FILTER in .github/workflows/bot.yml
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds a focused suite of merge-algorithm unit tests and I/O-count integration tests for BatchedBlobReader, plus a minimal private → private[blob] visibility relaxation needed to call the merge helpers directly. The test math (gap boundaries, file-grouping, interleaved input) checks out, and the CountingHoodieStorage decorator delegates correctly — close() propagation is safe because the underlying HoodieHadoopStorage.close() is intentionally a no-op. No issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
cc @yihua
Drop tests that re-verify the merge algorithm at the I/O layer when TestBatchedBlobReaderMerge already covers it. Keep only the cases that prove the merge result maps 1:1 to physical I/O ops: - batching reduces I/O end-to-end (single-file, batched vs baseline) - mixed gaps in one file produce one I/O per merged group - multi-file routing with interleaved input and mixed gap patterns Reduces TestBatchedBlobReaderIO from 9 to 3 tests (~67% fewer Spark DataFrame builds and real file reads, the dominant runtime cost).
| def testSingleRowProducesSingleRange(): Unit = { | ||
| val merged = reader().mergeRanges(Seq(row("/f", 1000, 200, index = 7)), maxGap = 100) | ||
| assertEquals(1, merged.size) | ||
| val r = merged.head | ||
| assertEquals("/f", r.filePath) | ||
| assertEquals(1000L, r.startOffset) | ||
| assertEquals(1200L, r.endOffset) | ||
| assertEquals(1, r.rows.size) | ||
| assertEquals(7L, r.rows.head.index) |
There was a problem hiding this comment.
It would be better to test this with SQL so that it works end-to-end.
TestBatchedBlobReaderMerge already catches algorithmic regressions in the merge logic via direct calls to mergeRanges/identifyConsecutiveRanges, so the I/O-count layer was duplicative. Existing TestBatchedBlobReader and TestReadBlobSQL cover the readBatched API and the SQL surface respectively at byte-level granularity. Replace TestBatchedBlobReaderIO with TestReadBlobBatching: SQL-driven tests that exercise the read_blob() path with batching configurations chosen to drive specific merge behaviors: - 20 small reads in one file batched under maxGap=4096 - mixed small/large gaps in one file (above and below threshold) - threshold-boundary case (gap == maxGap) - multi-file interleaved input with mixed gap patterns Each test asserts the returned bytes match the deterministic pattern at each row's recorded offset, validating both query success and output correctness through the SQL planner / BatchedBlobReadExec path. Move to org.apache.hudi.blob (no longer needs private[blob] access). Drop CountingHoodieStorage. The package-allowlist entry for org.apache.spark.sql.hudi.blob is retained since TestBatchedBlobReaderMerge still needs same-package access to the merge helpers.
yihua
left a comment
There was a problem hiding this comment.
Existing coverage before this PR
TestBatchedBlobReader (calls readBatched API, validates bytes):
testBasicBatchedRead— contiguous reads in one filetestGapThresholdSmallGaps— 4 reads, 20-byte gaps,maxGapBytes=4096→ directly covers new IO scenario 3testGapThresholdLargeGaps— 4 reads, 9.9KB gaps,maxGapBytes=1000→ directly covers new IO scenario 4testNoBatchingDifferentFiles— multi-file, one read eachtestMixedScenario— batchable + non-batchable groups across 2 files → covers new IO scenarios 6, 7, 8testPreserveInputOrder,testEmptyDataset,testOverlappingRangesThrowsException, etc.
TestReadBlobSQL (uses read_blob() SQL function):
testConfigurationParameters— setshoodie.blob.batching.max.gap.bytes=10000+lookahead.size=100, 3 reads with 4.9KB gap (only existing SQL test that exercises the batching config knob)testReadBlobMultipleFiles— multi-file reads via SQLtestReadOutOfLineBlobOnHudiBackedTable,testBasicReadBlobSQL, joins, subqueries, etc.
So, it would be good to add SQL-level tests for batch reading of blobs. We should rewrite TestBatchedBlobReaderIO as a SQL-driven test focused on batching scenarios that the existing TestReadBlobSQL doesn't already exercise. Validate bytes, not I/O counts.
TestBatchedBlobReaderMerge is the only place that catches a merge-algorithm regression, and we can keep it.
Each scenario now bulk_inserts a Hudi table with the blob column and
reads it back via sparkSession.read.format("hudi"), then runs
SELECT read_blob(...) against the loaded view. This exercises
HoodieFileIndex and BatchedBlobReadExec serialization in addition to
the merge/batching path, matching the production read plan rather than
a Spark-only temp view.
Adds writeHudiBlobTable helper that coerces the input DataFrame to the
canonical BlobType schema (nullable reference struct, as required by
HoodieSparkSchemaConverters.validateBlobStructure) before save.
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## master #18736 +/- ##
=============================================
- Coverage 68.14% 54.01% -14.14%
+ Complexity 29051 12460 -16591
=============================================
Files 2516 1434 -1082
Lines 140935 72161 -68774
Branches 17472 8245 -9227
=============================================
- Hits 96047 38978 -57069
+ Misses 36993 29688 -7305
+ Partials 7895 3495 -4400
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
The existing tests for
BatchedBlobReader(introduced in #18098) cover byte-level correctness via thereadBatchedAPI (TestBatchedBlobReader) and theread_blob()SQL surface (TestReadBlobSQL), but they do not exercise the merge/batching algorithm itself. A regression that silently disables the I/O reduction — a bad gap-threshold comparison, a broken range merge, sort/group breakage — would still pass byte-level assertions. This PR adds focused coverage for the batch-mode path.Summary and Changelog
TestBatchedBlobReaderMerge— unit tests againstmergeRangesandidentifyConsecutiveRanges. Asserts merged-range counts, gap-threshold inclusive/exclusive boundaries, multi-file grouping, sort, index preservation, and rejection of overlapping ranges. No Spark, no I/O.TestReadBlobBatching— SQL-driven correctness tests on Hudi-backed tables. Each scenariobulk_inserts a Hudi table containing the blob column, reads it back viasparkSession.read.format("hudi"), and runsSELECT read_blob(...)with a batching configuration chosen to exercise specific merge behaviors:maxGap=4096(high-ratio merge)gap == maxGap)Driving the query through a real Hudi table exercises
HoodieFileIndexandBatchedBlobReadExecserialization (the production read plan) on top of the batching path. Each test asserts the returned bytes match the deterministic pattern at each row's recorded offset — validates query success and output correctness.Relaxes the visibility of two
BatchedBlobReaderhelpers fromprivateto package-private so the merge tests can call them directly.Adds
org.apache.spark.sql.hudi.blobto the test-package allowlist (TestSparkSqlHudiPackageStructure, the Azure pipeline filter, and the GitHub Actions filter) so the merge tests can useprivate[blob]access without widening internal visibility further.Impact
Tests only. No production code logic changes. The visibility relaxation on the two helpers is scoped to the same package as the reader.
Risk Level
none
Documentation Update
none
Contributor's checklist