diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java index 942d1aeabb503..05eb1641c14f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java @@ -107,13 +107,13 @@ public class HoodieReaderConfig extends HoodieConfig { public static final String BLOB_INLINE_READ_MODE_DESCRIPTOR = "DESCRIPTOR"; public static final ConfigProperty BLOB_INLINE_READ_MODE = ConfigProperty .key("hoodie.read.blob.inline.mode") - .defaultValue(BLOB_INLINE_READ_MODE_CONTENT) + .defaultValue(BLOB_INLINE_READ_MODE_DESCRIPTOR) .markAdvanced() .sinceVersion("1.2.0") .withValidValues(BLOB_INLINE_READ_MODE_CONTENT, BLOB_INLINE_READ_MODE_DESCRIPTOR) .withDocumentation("How Hudi interprets INLINE BLOB values on read. " - + "CONTENT (default) returns the raw inline bytes. " - + "DESCRIPTOR returns an OUT_OF_LINE-shaped reference pointing at the backing " - + "Lance file with the INLINE payload's position and size, so callers can defer " - + "the byte read via read_blob()."); + + "DESCRIPTOR (default) returns an OUT_OF_LINE-shaped reference pointing at the " + + "backing Lance file with the INLINE payload's position and size, so callers can " + + "defer the byte read via read_blob(). " + + "CONTENT returns the raw inline bytes directly in the data field on every read."); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala index 0328cdd0c5c22..dc77ce03b619f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala @@ -208,16 +208,44 @@ class BatchedBlobReader( // Dispatch based on storage_type (field 0) val storageType = accessor.getString(blobStruct, 0) if (storageType == HoodieSchema.Blob.INLINE) { - // Case 1: Inline — bytes are in field 1 - val bytes = accessor.getBytes(blobStruct, 1) - batch += RowInfo[R]( - originalRow = row, - filePath = "", - offset = -1, - length = -1, - index = rowIndex, - inlineBytes = Some(bytes) - ) + // INLINE rows can arrive in two shapes: + // - CONTENT-mode read: inline_data populated with raw bytes (field 1). + // - DESCRIPTOR-mode read: inline_data is null and the reference struct + // (field 2) carries a synthetic positional pointer into the base file. + // read_blob() must return bytes in both cases, so we fall back to the + // descriptor's range read when inline_data is absent. + val inlineIsNull = accessor.isNullAt(blobStruct, 1) + if (!inlineIsNull) { + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + require(!accessor.isNullAt(blobStruct, 2), + s"INLINE blob at row $rowIndex has null inline_data and null reference; cannot resolve bytes") + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + require(filePath != null && filePath.nonEmpty, + s"INLINE blob descriptor at row $rowIndex must have non-empty external_path") + require(!accessor.isNullAt(referenceStruct, 1) && !accessor.isNullAt(referenceStruct, 2), + s"INLINE blob descriptor at row $rowIndex must set both offset and length") + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + require(offset >= 0, s"INLINE blob descriptor offset must be non-negative for '$filePath': $offset") + require(length >= 0, s"INLINE blob descriptor length must be non-negative for '$filePath': $length") + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } } else if (storageType == HoodieSchema.Blob.OUT_OF_LINE) { // Case 2 or 3: Out-of-line — get reference struct (field 2) require(!accessor.isNullAt(blobStruct, 2), s"Out-of-line blob at row $rowIndex must set reference") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 1cd5647d99495..ed9258035c3b8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -36,7 +36,7 @@ import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.types._ -import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertNotNull, assertTrue} import org.junit.jupiter.api.condition.DisabledIfSystemProperty import org.junit.jupiter.params.ParameterizedTest @@ -851,8 +851,11 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { // Writer-side: prove the bytes actually routed through Lance's dedicated blob writer. assertLanceBlobEncoding(tablePath) - // Reader-side: in CONTENT mode the INLINE bytes come back directly in `data`. - val readRows = spark.read.format("hudi").load(tablePath) + // Reader-side: in CONTENT mode the INLINE bytes come back directly in `data`. Set the mode + // explicitly — the default is DESCRIPTOR, which would surface a reference instead. + val readRows = spark.read.format("hudi") + .option("hoodie.read.blob.inline.mode", "CONTENT") + .load(tablePath) .select($"id", $"payload") .orderBy($"id") .collect() @@ -972,6 +975,139 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { assertArrayEquals(expectedPayloads(i), bytes, s"read_blob() bytes mismatch for id=$i") } + + // Mixed-usage regression: read_blob(payload) co-projected with payload.reference.* in a + // single SELECT. The INLINE-descriptor fallback in BatchedBlobReader is load-bearing, if + // read_blob() forced the scan to CONTENT shape, reference.* would all be null here. + val mixed = spark.sql( + s"""SELECT id, + | read_blob(payload) AS bytes, + | payload.reference.external_path AS ext_path, + | payload.reference.offset AS off, + | payload.reference.length AS len + | FROM $viewName ORDER BY id""".stripMargin).collect() + assertEquals(numRows, mixed.length) + mixed.zipWithIndex.foreach { case (row, i) => + assertArrayEquals(expectedPayloads(i), row.getAs[Array[Byte]]("bytes"), + s"read_blob() bytes must survive co-projection with reference.* (id=$i)") + val ext = row.getAs[String]("ext_path") + assertNotNull(ext, s"reference.external_path must survive read_blob co-usage (id=$i)") + assertTrue(ext.endsWith(".lance"), s"external_path should point at .lance, got: $ext (id=$i)") + assertFalse(row.isNullAt(row.fieldIndex("off")), + s"reference.offset must not be null when co-selected with read_blob (id=$i)") + assertTrue(row.getLong(row.fieldIndex("off")) >= 0L) + assertEquals(payloadLen.toLong, row.getLong(row.fieldIndex("len")), + s"reference.length must equal payload length (id=$i)") + } + } + + /** + * Compaction must preserve INLINE blob bytes under the DESCRIPTOR default. MOR compaction reads + * the base file via {@link HoodieSparkLanceReader}, which hard-pins CONTENT regardless of the + * user-facing {@code hoodie.read.blob.inline.mode}. If that pin were to honor the default + * (DESCRIPTOR), compaction would read null {@code data} and rewrite a base file without bytes, + * silently corrupting untouched rows. This test inserts INLINE blobs, upserts a subset to force + * compaction, and asserts that touched rows carry the new bytes while untouched rows retain the + * originals. + */ + @Test + def testBlobInlineCompactionRoundTrip(): Unit = { + val tableType = HoodieTableType.MERGE_ON_READ + val tableName = "test_lance_blob_inline_compact_mor" + val tablePath = s"$basePath/$tableName" + + val payloadLen = 1024 + val numRows = 6 + val initialPayloads: Seq[Array[Byte]] = (0 until numRows).map { i => + (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray + } + val sparkSess = spark + import sparkSess.implicits._ + + val canonicalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload", BlobType().asInstanceOf[StructType], nullable = true, + BlobTestHelpers.blobMetadata) + )) + def asInlineDf(idToBytes: Seq[(Int, Array[Byte])]): DataFrame = { + val rawDf = idToBytes.toDF("id", "bytes") + .select($"id", BlobTestHelpers.inlineBlobStructCol("payload", $"bytes")) + spark.createDataFrame(rawDf.rdd, canonicalSchema) + } + + // First commit: bulk_insert ids 0..5 with the initial pattern. Lands in a base file. + writeDataframe(tableType, tableName, tablePath, + asInlineDf(initialPayloads.zipWithIndex.map { case (b, i) => (i, b) }), + saveMode = SaveMode.Overwrite, + operation = Some("bulk_insert"), + extraOptions = Map(PRECOMBINE_FIELD.key() -> "id")) + + assertLanceBlobEncoding(tablePath) + + // Second commit: upsert ids 0..2 with all-0xEE payloads, triggering inline compaction. The + // compactor reads the base file + log via the CONTENT-pinned reader and rewrites a new base + // file. Ids 3..5 are untouched: their bytes must survive the compaction read/rewrite even + // though the user-facing default is now DESCRIPTOR. + val updatedPayloadByte: Byte = 0xEE.toByte + val updatedIds = 0 until 3 + val updatedPayloads = updatedIds.map(i => (i, Array.fill[Byte](payloadLen)(updatedPayloadByte))) + writeDataframe(tableType, tableName, tablePath, + asInlineDf(updatedPayloads), + operation = Some("upsert"), + extraOptions = Map(PRECOMBINE_FIELD.key() -> "id", + "hoodie.compact.inline" -> "true", + "hoodie.compact.inline.max.delta.commits" -> "1")) + + val metaClient = HoodieTableMetaClient.builder() + .setConf(HoodieTestUtils.getDefaultStorageConf) + .setBasePath(tablePath) + .build() + val compactionCommits = metaClient.reloadActiveTimeline().filterCompletedInstants() + .getInstants.asScala.filter(_.getAction == "commit") + assertTrue(compactionCommits.nonEmpty, "Compaction commit should be present after upsert") + + val expected: Map[Int, Array[Byte]] = ( + updatedIds.map(i => i -> Array.fill[Byte](payloadLen)(updatedPayloadByte)) ++ + (updatedIds.length until numRows).map(i => i -> initialPayloads(i)) + ).toMap + + // Verify via the realistic user-facing path. After the flip, a plain read yields the + // DESCRIPTOR shape: INLINE type, null `data`, populated reference. This confirms the new + // default is in effect end-to-end. + val readRows = spark.read.format("hudi") + .load(tablePath) + .select($"id", $"payload") + .orderBy($"id") + .collect() + assertEquals(numRows, readRows.length) + readRows.foreach { row => + val id = row.getInt(row.fieldIndex("id")) + val payload = row.getStruct(row.fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.INLINE, + payload.getString(payload.fieldIndex(HoodieSchema.Blob.TYPE)), + s"Type must remain INLINE post-compaction (id=$id)") + assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)), + s"DESCRIPTOR default should null `data` on plain read (id=$id)") + assertNotNull(payload.getStruct(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)), + s"DESCRIPTOR default should populate reference on plain read (id=$id)") + } + + // read_blob() is the canonical bytes-materializing path under the DESCRIPTOR default. The + // bytes can only come back if (a) HoodieSparkLanceReader's CONTENT pin held during the + // compactor's base-file read — otherwise untouched ids 3..5 would have been rewritten with + // null `data` — and (b) the compacted base file retained Lance blob encoding so the + // synthesized descriptor's positional read resolves. Failure on either side surfaces here. + val viewName = s"${tableName}_view" + spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName) + val materialized = spark.sql( + s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY id").collect() + assertEquals(numRows, materialized.length) + materialized.foreach { row => + val id = row.getInt(row.fieldIndex("id")) + val bytes = row.getAs[Array[Byte]]("bytes") + assertArrayEquals(expected(id), bytes, + s"read_blob() must return correct bytes post-compaction (id=$id)") + } } /** diff --git a/rfc/rfc-100/rfc-100.md b/rfc/rfc-100/rfc-100.md index 2c637ccb2db2d..0ae5b3412531d 100644 --- a/rfc/rfc-100/rfc-100.md +++ b/rfc/rfc-100/rfc-100.md @@ -139,9 +139,9 @@ Selecting the blob column directly (e.g. `SELECT image_blob FROM t` or `SELECT * **Reader Configuration** -- `hoodie.read.blob.inline.mode` — values `CONTENT` (default) | `DESCRIPTOR`. +- `hoodie.read.blob.inline.mode` — values `DESCRIPTOR` (default) | `CONTENT`. + - `DESCRIPTOR` (default): the engine returns an `OUT_OF_LINE`-shaped descriptor in the `reference` field where the underlying file format supports it (Lance today), enabling lazy byte materialization via `read_blob`. For file formats without a native descriptor for inline payloads (Parquet), both `data` and `reference` are returned `NULL`, and the caller must use `read_blob` to retrieve bytes. - `CONTENT`: the engine eagerly materializes inline bytes into the struct's `data` field. - - `DESCRIPTOR`: the engine returns an `OUT_OF_LINE`-shaped descriptor in the `reference` field where the underlying file format supports it (Lance today), enabling lazy byte materialization via `read_blob`. For file formats without a native descriptor for inline payloads (Parquet), both `data` and `reference` are returned `NULL`, and the caller must use `read_blob` to retrieve bytes. - This config governs `INLINE` reads only. For `OUT_OF_LINE` storage, the engine always returns a populated `reference` regardless of this setting. **Behavior matrix** @@ -151,44 +151,64 @@ Selecting the blob column directly (e.g. `SELECT image_blob FROM t` or `SELECT * | `SELECT read_blob(col) FROM table` | INLINE | Parquet | (any) | n/a | n/a | Yes — returns bytes | | `SELECT read_blob(col) FROM table` | INLINE | Lance | (any) | n/a | n/a | Yes — returns bytes | | `SELECT read_blob(col) FROM table` | OUT_OF_LINE | (any) | (any) | n/a | n/a | Yes — returns bytes | -| `SELECT col FROM table` | INLINE | Parquet | `CONTENT` (default) | bytes | NULL | Yes — via `data` | -| `SELECT col FROM table` | INLINE | Parquet | `DESCRIPTOR` | **NULL** | **NULL** | No — must call `read_blob` | -| `SELECT col FROM table` | INLINE | Lance | `CONTENT` (default) | bytes | NULL | Yes — via `data` | -| `SELECT col FROM table` | INLINE | Lance | `DESCRIPTOR` | NULL | populated (Lance blob enc.) | No — descriptor visible; use `read_blob` for bytes| +| `SELECT col FROM table` | INLINE | Parquet | `CONTENT` | bytes | NULL | Yes — via `data` | +| `SELECT col FROM table` | INLINE | Parquet | `DESCRIPTOR` (default) | bytes¹ | NULL¹ | Yes — via `data`¹ | +| `SELECT col FROM table` | INLINE | Lance | `CONTENT` | bytes | NULL | Yes — via `data` | +| `SELECT col FROM table` | INLINE | Lance | `DESCRIPTOR` (default) | NULL | populated (Lance blob enc.) | No — descriptor visible; use `read_blob` for bytes| | `SELECT col FROM table` | OUT_OF_LINE | (any) | (irrelevant) | NULL | populated | No — must call `read_blob` | +¹ **Parquet + DESCRIPTOR is a no-op today.** The Parquet reader path does not currently honor `hoodie.read.blob.inline.mode`, so on Parquet tables the config is ignored and INLINE rows always come back in `CONTENT` shape regardless of the setting. The default flip to `DESCRIPTOR` therefore has no observable effect on Parquet tables. The matrix row above shows what users will actually see today, not the eventual spec. + +**Eventual spec for Parquet + DESCRIPTOR (not yet implemented)** + +When Parquet gains DESCRIPTOR support, INLINE rows in `DESCRIPTOR` mode are intended to return `data = NULL` and `reference = NULL` (no native positional handle exists in Parquet for an inline byte array), with `read_blob` materializing bytes through a format-specific path. Until then, treat `DESCRIPTOR` as effectively `CONTENT` on Parquet. + **Why Parquet and Lance differ in `DESCRIPTOR` mode** -Lance's native blob encoding stores blobs in a way that already exposes a `(file, offset, length)` descriptor cheaply, so `DESCRIPTOR` mode surfaces it directly in the `reference` field — effectively letting INLINE blobs be read with the same deferred-materialization path used for OUT_OF_LINE references. Parquet has no equivalent native descriptor for an inline byte array, so both fields are `NULL` in `DESCRIPTOR` mode and the caller must use `read_blob` to materialize bytes. +Lance's native blob encoding stores blobs in a way that already exposes a `(file, offset, length)` descriptor cheaply, so `DESCRIPTOR` mode surfaces it directly in the `reference` field — effectively letting INLINE blobs be read with the same deferred-materialization path used for OUT_OF_LINE references. Parquet has no equivalent native descriptor for an inline byte array, which is why the DESCRIPTOR path is currently a no-op there. **Visual** +What the user gets back, grouped by storage type (set at write time) and then by query shape: + +```mermaid +flowchart TD + ST{storage_type} + + ST -->|OUT_OF_LINE| QO{Query} + QO -->|"SELECT col"| OOL["type = OUT_OF_LINE
inline_data = NULL
reference = user-supplied"] + QO -->|"SELECT read_blob(col)"| RBO(["bytes — materialized
via the external reference"]) + + ST -->|INLINE| QI{Query} + QI -->|"SELECT col"| M{hoodie.read.blob.inline.mode} + M -->|CONTENT| CONT["type = INLINE
inline_data = bytes
reference = NULL"] + M -->|DESCRIPTOR default| F{file format} + F -->|Lance| LD["type = INLINE
inline_data = NULL
reference = synthetic managed
path, offset, length, is_managed=true"] + F -->|"Parquet (today: mode no-op)"| PD["Parquet reader does not
implement DESCRIPTOR yet —
returns CONTENT shape:
inline_data = bytes, reference = NULL"] + + QI -->|"SELECT read_blob(col)"| RM{hoodie.read.blob.inline.mode} + RM -->|CONTENT| RBC(["bytes from inline_data on the row
1 hop"]) + RM -->|DESCRIPTOR default| RF{file format} + RF -->|Lance| RBL(["bytes via raw pread at
synth reference offset/length
2 hops"]) + RF -->|"Parquet (today: mode no-op)"| RBP(["bytes from inline_data on the row
1 hop — same as CONTENT"]) ``` - ┌──────────────────────────────────────────────────────────────────┐ - │ read_blob(col) ── universal, always materializes bytes ──│ - │ │ │ - │ ▼ │ - │ ┌─────────────┐ INLINE ───► read inline payload │ - │ │ Hudi reader │ ──┤ │ - │ └─────────────┘ OUT_OF_LINE ► follow reference → read bytes │ - └──────────────────────────────────────────────────────────────────┘ - - ┌──────────────────────────────────────────────────────────────────┐ - │ SELECT col (returns Blob struct as-is) │ - │ │ │ - │ ▼ │ - │ storage = OUT_OF_LINE ─────────────► data=NULL, reference=set │ - │ │ - │ storage = INLINE, │ - │ inline.mode = CONTENT (default) ───► data=, ref=NULL │ - │ │ - │ storage = INLINE, │ - │ inline.mode = DESCRIPTOR │ - │ ├─ Parquet ─────────────────────► data=NULL, ref=NULL │ - │ └─ Lance ─────────────────────► data=NULL, ref=set │ - └──────────────────────────────────────────────────────────────────┘ + +`read_blob(col)` byte resolution — works regardless of storage type or mode, with hop count depending on the row shape that arrives: + +```mermaid +flowchart LR + RB[/"SELECT read_blob(col)"/] --> Scan["Scan emits Blob struct"] + Scan --> Shape{"row shape"} + Shape -->|"inline_data populated
(CONTENT mode)"| Direct["BatchedBlobReader:
read inline_data off the row
1 hop total"] + Shape -->|"reference populated
(OUT_OF_LINE, or DESCRIPTOR + Lance)"| Indirect["BatchedBlobReader:
openSeekable(external_path)
seek(offset), readFully(length)
2 hops total"] + Direct --> Bytes(["bytes"]) + Indirect --> Bytes ``` +Notes: +- For DESCRIPTOR + Lance, hop 2 is a raw filesystem `pread` against the `.lance` file at the descriptor's `(offset, length)`. It bypasses Lance's decoder entirely — the blob encoding (`lance-encoding:blob=true` on a `LargeBinary` column) stores blob bytes contiguously at the position Lance reports, so direct byte access is safe. +- Plain `SELECT col` (no `read_blob`) is always 1 hop. DESCRIPTOR's win is that hop 1 skips blob decoding when bytes aren't needed. + ### 3. Writer #### Phase 1: External Blob Support The writer will be updated to support writing blob data as out-of-line references.