Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ public class HoodieReaderConfig extends HoodieConfig {
public static final String BLOB_INLINE_READ_MODE_DESCRIPTOR = "DESCRIPTOR";
public static final ConfigProperty<String> BLOB_INLINE_READ_MODE = ConfigProperty
.key("hoodie.read.blob.inline.mode")
.defaultValue(BLOB_INLINE_READ_MODE_CONTENT)
.defaultValue(BLOB_INLINE_READ_MODE_DESCRIPTOR)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This is a user-visible behavior change for SELECT * on INLINE blob columns. The config carries sinceVersion("1.2.0") — has 1.2.0 already shipped CONTENT as the default? If yes, the flip should bump sinceVersion/document the breaking change explicitly; if no, it would help to call that out so reviewers can confirm no released version locks in the old default. Either way, a short "Migration / Upgrade" subsection in the RFC pointing users to set hoodie.read.blob.inline.mode=CONTENT to preserve prior behavior would make this easier to roll out.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

.markAdvanced()
.sinceVersion("1.2.0")
.withValidValues(BLOB_INLINE_READ_MODE_CONTENT, BLOB_INLINE_READ_MODE_DESCRIPTOR)
.withDocumentation("How Hudi interprets INLINE BLOB values on read. "
+ "CONTENT (default) returns the raw inline bytes. "
+ "DESCRIPTOR returns an OUT_OF_LINE-shaped reference pointing at the backing "
+ "Lance file with the INLINE payload's position and size, so callers can defer "
+ "the byte read via read_blob().");
+ "DESCRIPTOR (default) returns an OUT_OF_LINE-shaped reference pointing at the "
+ "backing Lance file with the INLINE payload's position and size, so callers can "
+ "defer the byte read via read_blob(). "
+ "CONTENT returns the raw inline bytes directly in the data field on every read.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,44 @@ class BatchedBlobReader(
// Dispatch based on storage_type (field 0)
val storageType = accessor.getString(blobStruct, 0)
if (storageType == HoodieSchema.Blob.INLINE) {
// Case 1: Inline — bytes are in field 1
val bytes = accessor.getBytes(blobStruct, 1)
batch += RowInfo[R](
originalRow = row,
filePath = "",
offset = -1,
length = -1,
index = rowIndex,
inlineBytes = Some(bytes)
)
// INLINE rows can arrive in two shapes:
// - CONTENT-mode read: inline_data populated with raw bytes (field 1).
// - DESCRIPTOR-mode read: inline_data is null and the reference struct
// (field 2) carries a synthetic positional pointer into the base file.
// read_blob() must return bytes in both cases, so we fall back to the
// descriptor's range read when inline_data is absent.
val inlineIsNull = accessor.isNullAt(blobStruct, 1)
if (!inlineIsNull) {
val bytes = accessor.getBytes(blobStruct, 1)
batch += RowInfo[R](
originalRow = row,
filePath = "",
offset = -1,
length = -1,
index = rowIndex,
inlineBytes = Some(bytes)
)
} else {
require(!accessor.isNullAt(blobStruct, 2),
s"INLINE blob at row $rowIndex has null inline_data and null reference; cannot resolve bytes")
val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount)
val filePath = accessor.getString(referenceStruct, 0)
require(filePath != null && filePath.nonEmpty,
s"INLINE blob descriptor at row $rowIndex must have non-empty external_path")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The INLINE-with-null-inline_data branch silently performs a remote pread against the synthetic descriptor's external_path. Could the RFC discuss the failure mode and security implications of trusting that path? If a malformed/forged base file produces a descriptor pointing at an arbitrary file or with an is_managed=true claim that isn't validated against the table's known file set, read_blob() could end up reading bytes outside the table. Some validation (e.g. is_managed must be true and the path must lie under the table base) seems worth calling out as a design invariant.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

require(!accessor.isNullAt(referenceStruct, 1) && !accessor.isNullAt(referenceStruct, 2),
s"INLINE blob descriptor at row $rowIndex must set both offset and length")
val offset = accessor.getLong(referenceStruct, 1)
val length = accessor.getLong(referenceStruct, 2)
require(offset >= 0, s"INLINE blob descriptor offset must be non-negative for '$filePath': $offset")
require(length >= 0, s"INLINE blob descriptor length must be non-negative for '$filePath': $length")
batch += RowInfo[R](
originalRow = row,
filePath = filePath,
offset = offset,
length = length,
index = rowIndex
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The require(...) calls here will throw IllegalArgumentException mid-batch on any malformed INLINE+DESCRIPTOR row (null inline_data and null/incomplete reference). On a corrupted file this kills the whole task and surfaces as a confusing user error far from the cause. Would it be worth using a Hudi-typed exception with the file path + row index, or at least a top-level wrapper that names the column and source file? Same question for the offset/length sign checks — those would only fire on writer bugs, but the diagnostic is what determines whether anyone can debug it.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

)
}
} else if (storageType == HoodieSchema.Blob.OUT_OF_LINE) {
// Case 2 or 3: Out-of-line — get reference struct (field 2)
require(!accessor.isNullAt(blobStruct, 2), s"Out-of-line blob at row $rowIndex must set reference")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.jupiter.api.condition.DisabledIfSystemProperty
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -851,8 +851,11 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
// Writer-side: prove the bytes actually routed through Lance's dedicated blob writer.
assertLanceBlobEncoding(tablePath)

// Reader-side: in CONTENT mode the INLINE bytes come back directly in `data`.
val readRows = spark.read.format("hudi").load(tablePath)
// Reader-side: in CONTENT mode the INLINE bytes come back directly in `data`. Set the mode
// explicitly — the default is DESCRIPTOR, which would surface a reference instead.
val readRows = spark.read.format("hudi")
.option("hoodie.read.blob.inline.mode", "CONTENT")
.load(tablePath)
.select($"id", $"payload")
.orderBy($"id")
.collect()
Expand Down Expand Up @@ -972,6 +975,139 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
assertArrayEquals(expectedPayloads(i), bytes,
s"read_blob() bytes mismatch for id=$i")
}

// Mixed-usage regression: read_blob(payload) co-projected with payload.reference.* in a
// single SELECT. The INLINE-descriptor fallback in BatchedBlobReader is load-bearing, if
// read_blob() forced the scan to CONTENT shape, reference.* would all be null here.
val mixed = spark.sql(
s"""SELECT id,
| read_blob(payload) AS bytes,
| payload.reference.external_path AS ext_path,
| payload.reference.offset AS off,
| payload.reference.length AS len
| FROM $viewName ORDER BY id""".stripMargin).collect()
assertEquals(numRows, mixed.length)
mixed.zipWithIndex.foreach { case (row, i) =>
assertArrayEquals(expectedPayloads(i), row.getAs[Array[Byte]]("bytes"),
s"read_blob() bytes must survive co-projection with reference.* (id=$i)")
val ext = row.getAs[String]("ext_path")
assertNotNull(ext, s"reference.external_path must survive read_blob co-usage (id=$i)")
assertTrue(ext.endsWith(".lance"), s"external_path should point at .lance, got: $ext (id=$i)")
assertFalse(row.isNullAt(row.fieldIndex("off")),
s"reference.offset must not be null when co-selected with read_blob (id=$i)")
assertTrue(row.getLong(row.fieldIndex("off")) >= 0L)
assertEquals(payloadLen.toLong, row.getLong(row.fieldIndex("len")),
s"reference.length must equal payload length (id=$i)")
}
}

/**
* Compaction must preserve INLINE blob bytes under the DESCRIPTOR default. MOR compaction reads
* the base file via {@link HoodieSparkLanceReader}, which hard-pins CONTENT regardless of the
* user-facing {@code hoodie.read.blob.inline.mode}. If that pin were to honor the default
* (DESCRIPTOR), compaction would read null {@code data} and rewrite a base file without bytes,
* silently corrupting untouched rows. This test inserts INLINE blobs, upserts a subset to force
* compaction, and asserts that touched rows carry the new bytes while untouched rows retain the
* originals.
*/
@Test
def testBlobInlineCompactionRoundTrip(): Unit = {
val tableType = HoodieTableType.MERGE_ON_READ
val tableName = "test_lance_blob_inline_compact_mor"
val tablePath = s"$basePath/$tableName"

val payloadLen = 1024
val numRows = 6
val initialPayloads: Seq[Array[Byte]] = (0 until numRows).map { i =>
(0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray
}
val sparkSess = spark
import sparkSess.implicits._

val canonicalSchema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("payload", BlobType().asInstanceOf[StructType], nullable = true,
BlobTestHelpers.blobMetadata)
))
def asInlineDf(idToBytes: Seq[(Int, Array[Byte])]): DataFrame = {
val rawDf = idToBytes.toDF("id", "bytes")
.select($"id", BlobTestHelpers.inlineBlobStructCol("payload", $"bytes"))
spark.createDataFrame(rawDf.rdd, canonicalSchema)
}

// First commit: bulk_insert ids 0..5 with the initial pattern. Lands in a base file.
writeDataframe(tableType, tableName, tablePath,
asInlineDf(initialPayloads.zipWithIndex.map { case (b, i) => (i, b) }),
saveMode = SaveMode.Overwrite,
operation = Some("bulk_insert"),
extraOptions = Map(PRECOMBINE_FIELD.key() -> "id"))

assertLanceBlobEncoding(tablePath)

// Second commit: upsert ids 0..2 with all-0xEE payloads, triggering inline compaction. The
// compactor reads the base file + log via the CONTENT-pinned reader and rewrites a new base
// file. Ids 3..5 are untouched: their bytes must survive the compaction read/rewrite even
// though the user-facing default is now DESCRIPTOR.
val updatedPayloadByte: Byte = 0xEE.toByte
val updatedIds = 0 until 3
val updatedPayloads = updatedIds.map(i => (i, Array.fill[Byte](payloadLen)(updatedPayloadByte)))
writeDataframe(tableType, tableName, tablePath,
asInlineDf(updatedPayloads),
operation = Some("upsert"),
extraOptions = Map(PRECOMBINE_FIELD.key() -> "id",
"hoodie.compact.inline" -> "true",
"hoodie.compact.inline.max.delta.commits" -> "1"))

val metaClient = HoodieTableMetaClient.builder()
.setConf(HoodieTestUtils.getDefaultStorageConf)
.setBasePath(tablePath)
.build()
val compactionCommits = metaClient.reloadActiveTimeline().filterCompletedInstants()
.getInstants.asScala.filter(_.getAction == "commit")
assertTrue(compactionCommits.nonEmpty, "Compaction commit should be present after upsert")

val expected: Map[Int, Array[Byte]] = (
updatedIds.map(i => i -> Array.fill[Byte](payloadLen)(updatedPayloadByte)) ++
(updatedIds.length until numRows).map(i => i -> initialPayloads(i))
).toMap

// Verify via the realistic user-facing path. After the flip, a plain read yields the
// DESCRIPTOR shape: INLINE type, null `data`, populated reference. This confirms the new
// default is in effect end-to-end.
val readRows = spark.read.format("hudi")
.load(tablePath)
.select($"id", $"payload")
.orderBy($"id")
.collect()
assertEquals(numRows, readRows.length)
readRows.foreach { row =>
val id = row.getInt(row.fieldIndex("id"))
val payload = row.getStruct(row.fieldIndex("payload"))
assertEquals(HoodieSchema.Blob.INLINE,
payload.getString(payload.fieldIndex(HoodieSchema.Blob.TYPE)),
s"Type must remain INLINE post-compaction (id=$id)")
assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)),
s"DESCRIPTOR default should null `data` on plain read (id=$id)")
assertNotNull(payload.getStruct(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)),
s"DESCRIPTOR default should populate reference on plain read (id=$id)")
}

// read_blob() is the canonical bytes-materializing path under the DESCRIPTOR default. The
// bytes can only come back if (a) HoodieSparkLanceReader's CONTENT pin held during the
// compactor's base-file read — otherwise untouched ids 3..5 would have been rewritten with
// null `data` — and (b) the compacted base file retained Lance blob encoding so the
// synthesized descriptor's positional read resolves. Failure on either side surfaces here.
val viewName = s"${tableName}_view"
spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName)
val materialized = spark.sql(
s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY id").collect()
assertEquals(numRows, materialized.length)
materialized.foreach { row =>
val id = row.getInt(row.fieldIndex("id"))
val bytes = row.getAs[Array[Byte]]("bytes")
assertArrayEquals(expected(id), bytes,
s"read_blob() must return correct bytes post-compaction (id=$id)")
}
}

/**
Expand Down
Loading
Loading