From b465f155b49de37c123f9f699bde26ff85beeb9f Mon Sep 17 00:00:00 2001 From: voon Date: Sun, 26 Apr 2026 16:52:51 +0800 Subject: [PATCH 1/2] test(schema): Add Lance MOR log-only compaction tests for custom types Mirror the parquet MOR log-only compaction tests for VECTOR, VARIANT, and BLOB onto the Lance base file format, and extend all variants with a 6th deltacommit so the cleaner has a chance to retire the post-compaction log-only slice and write a .clean instant. - VECTOR Lance: passes; verifies HoodieFileFormat.LANCE on the table config and that a .lance base file exists under the table path after compaction. - VARIANT Lance / BLOB INLINE Lance / BLOB OUT_OF_LINE Lance: gated by -Dlance.skip.tests; expected to fail at HoodieSparkLanceWriter -> LanceArrowUtils.toArrowType (RFC-100 Phase 2 gap). Each asserts the LANCE format config sticks to hoodie.properties immediately after CREATE TABLE so the table-level invariant is checked even when the writer fails downstream. - All 8 tests (4 parquet + 4 Lance) now drive a 6th merge-update after the compaction-triggering 5th commit. The 5th commit's auto-clean runs before inline compaction, so the prior log slice is not yet superseded; the 6th commit's postCommit clean retires it and writes the .clean instant. The cleaner-timeline assertion uses reloadActiveTimeline() to avoid a stale cached view. --- .../HoodieSparkClientTestHarness.java | 7 + .../functional/TestVectorDataSource.scala | 136 ++++++++++ .../hudi/dml/schema/TestBlobDataType.scala | 235 ++++++++++++++++++ .../hudi/dml/schema/TestVariantDataType.scala | 116 +++++++++ 4 files changed, 494 insertions(+) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java index 97bd0e1175d76..6f7530f27b39b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java @@ -55,6 +55,7 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; @@ -237,6 +238,12 @@ protected void initQueryIndexConf() { * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}). */ protected void cleanupSparkContexts() { + // HoodieInMemoryHashIndex holds a JVM-static record-location map that survives + // sparkSession.stop(), leaking record keys and locations across sequential tests + // in the same JVM. A stale entry causes tagLocation to demote a not-matched + // INSERT into a no-op UPDATE on a non-existent file group. + HoodieInMemoryHashIndex.clear(); + if (sparkSession != null) { sparkSession.stop(); sparkSession = null; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala index daf0c8a6c19b2..9bcd75dbc28aa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala @@ -18,6 +18,7 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieSparkClientTestBase} @@ -29,6 +30,7 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.types._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.condition.DisabledIfSystemProperty import scala.collection.JavaConverters._ @@ -801,6 +803,140 @@ class TestVectorDataSource extends HoodieSparkClientTestBase { } } + @Test + @DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true") + def testMorLogOnlyCompactionPreservesVectorMetadataLance(): Unit = { + val path = basePath + "/mor_log_only_vec_lance" + val tableName = "mor_log_only_vec_lance_test" + try { + spark.sql( + s""" + |create table $tableName ( + | id int, + | embedding VECTOR(3), + | ts long + |) using hudi + | location '$path' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + def readOrdered(): Seq[Row] = + spark.sql(s"select id, embedding, ts from $tableName order by id").collect().toSeq + + def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] = + rows.find(_.getInt(0) == id).get.getSeq[Float](1) + + spark.sql( + s"insert into $tableName values " + + "(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float)), 1000)") + spark.sql( + s"insert into $tableName values " + + "(2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as float)), 1000)") + spark.sql( + s"insert into $tableName values " + + "(3, array(cast(0.7 as float), cast(0.8 as float), cast(0.9 as float)), 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertTrue(DataSourceTestUtils.isLogFileOnly(path)) + val afterInserts = readOrdered() + assertEquals(3, afterInserts.size) + assertEquals(Seq(0.1f, 0.2f, 0.3f), embeddingOf(1, afterInserts)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterInserts)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterInserts)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, + | array(cast(0.11 as float), cast(0.22 as float), cast(0.33 as float)) as embedding, + | 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertTrue(DataSourceTestUtils.isLogFileOnly(path)) + val afterUpdate = readOrdered() + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterUpdate)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterUpdate)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterUpdate)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, + | array(cast(0.44 as float), cast(0.55 as float), cast(0.66 as float)) as embedding, + | 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertFalse(DataSourceTestUtils.isLogFileOnly(path)) + val afterCompaction = readOrdered() + assertEquals(4, afterCompaction.size) + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCompaction)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterCompaction)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCompaction)) + assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCompaction)) + + val embeddingField = spark.table(tableName).schema.find(_.name == "embedding").get + assertTrue(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected VECTOR type metadata on embedding field after compaction, " + + s"got: ${embeddingField.metadata}") + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + // Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior + // slice was not yet superseded when that clean fired and no .clean instant was + // written. This deltacommit's postCommit clean writes the .clean instant. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, + | array(cast(0.222 as float), cast(0.555 as float), cast(0.888 as float)) as embedding, + | 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val afterCleanup = readOrdered() + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCleanup)) + assertEquals(Seq(0.222f, 0.555f, 0.888f), embeddingOf(2, afterCleanup)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCleanup)) + assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCleanup)) + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path).setConf(storageConf).build() + metaClient.reloadActiveTimeline() + assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + + // The table is configured for Lance, and compaction must have produced a + // .lance base file (not parquet) — otherwise the Lance variant degenerates + // into a parquet test. + assertEquals(HoodieFileFormat.LANCE, metaClient.getTableConfig.getBaseFileFormat, + "Expected Lance base file format") + val lanceBaseFiles = new java.io.File(path).listFiles() + .filter(f => f.isFile && f.getName.endsWith(".lance")) + assertTrue(lanceBaseFiles.nonEmpty, + s"Expected at least one .lance base file under $path after compaction, " + + s"found: ${new java.io.File(path).listFiles().map(_.getName).mkString(", ")}") + } finally { + spark.sql(s"drop table if exists $tableName") + } + } + @Test def testDimensionMismatchOnWrite(): Unit = { // Schema declares VECTOR(8) but data has arrays of length 4 diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala index a1197fd7a89b6..29bb03cc4791d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.dml.schema import org.apache.hudi.blob.BlobTestHelpers +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient @@ -282,4 +283,238 @@ class TestBlobDataType extends HoodieSparkSqlTestBase { "Expected at least one .clean instant on the timeline after compaction") }) } + + test("Test Query Log Only MOR Table With BLOB INLINE column triggers compaction (Lance)") { + assume(System.getProperty("lance.skip.tests") != "true", + "Lance tests disabled via -Dlance.skip.tests=true") + // Covers log-only MOR ingest of INLINE blobs on a Lance base format, the inline + // compaction trigger, and the post-compaction inline read shape. + + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + // Verify the LANCE config was actually persisted to hoodie.properties. + assertResult(HoodieFileFormat.LANCE)( + createMetaClient(spark, tablePath).getTableConfig.getBaseFileFormat) + + spark.sql(s"insert into $tableName values (1, ${inlineBlobLiteral("01")}, 1000)") + spark.sql(s"insert into $tableName values (2, ${inlineBlobLiteral("02")}, 1000)") + spark.sql(s"insert into $tableName values (3, ${inlineBlobLiteral("03")}, 1000)") + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, ${inlineBlobLiteral("11")} as data, 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, ${inlineBlobLiteral("04")} as data, 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + val bytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(4)(bytesById.size) + assert(bytesById(1).sameElements(Array(0x11.toByte))) + assert(bytesById(2).sameElements(Array(0x02.toByte))) + assert(bytesById(3).sameElements(Array(0x03.toByte))) + assert(bytesById(4).sameElements(Array(0x04.toByte))) + + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + // Lance materializes the `reference` struct as non-null with all-null leaves for + // INLINE rows (vs. a null struct on Parquet). `type` is the canonical INLINE + // discriminator per RFC-100; tolerate either shape and just check the leaves. + val refIdx = blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE) + if (!blob.isNullAt(refIdx)) { + val ref = blob.getStruct(refIdx) + assert(ref.isNullAt(ref.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH))) + } + } + + val blobField = spark.table(tableName).schema.find(_.name == "data").get + assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected BLOB type metadata on data field after compaction, " + + s"got: ${blobField.metadata}") + assertResult(HoodieSchemaType.BLOB.name())( + blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, ${inlineBlobLiteral("22")} as data, 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val updatedBytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assert(updatedBytesById(2).sameElements(Array(0x22.toByte))) + + val metaClient = createMetaClient(spark, tablePath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + + test("Test Query Log Only MOR Table With BLOB OUT_OF_LINE column triggers compaction (Lance)") { + assume(System.getProperty("lance.skip.tests") != "true", + "Lance tests disabled via -Dlance.skip.tests=true") + // Lance writer has no BLOB handling today (RFC-100 Phase 2). Expected to fail + // until support lands in HoodieSparkLanceWriter; this test pins the gap. + + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val blobDir = new File(tmp, "blobs") + blobDir.mkdirs() + val file1 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1.bin", 100) + val file2 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2.bin", 100) + val file3 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob3.bin", 100) + val file4 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob4.bin", 100) + val file1Updated = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1_updated.bin", 80) + val file2Updated = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2_updated.bin", 60) + + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + // Verify the LANCE config was actually persisted to hoodie.properties. + assertResult(HoodieFileFormat.LANCE)( + createMetaClient(spark, tablePath).getTableConfig.getBaseFileFormat) + + spark.sql( + s"insert into $tableName values (1, ${outOfLineBlobLiteral(file1, 0L, 100L)}, 1000)") + spark.sql( + s"insert into $tableName values (2, ${outOfLineBlobLiteral(file2, 0L, 100L)}, 1000)") + spark.sql( + s"insert into $tableName values (3, ${outOfLineBlobLiteral(file3, 0L, 100L)}, 1000)") + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, ${outOfLineBlobLiteral(file1Updated, 0L, 80L)} as data, 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, ${outOfLineBlobLiteral(file4, 0L, 100L)} as data, 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + val bytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(4)(bytesById.size) + assertResult(80)(bytesById(1).length) + BlobTestHelpers.assertBytesContent(bytesById(1)) + assertResult(100)(bytesById(2).length) + BlobTestHelpers.assertBytesContent(bytesById(2)) + assertResult(100)(bytesById(3).length) + BlobTestHelpers.assertBytesContent(bytesById(3)) + assertResult(100)(bytesById(4).length) + BlobTestHelpers.assertBytesContent(bytesById(4)) + + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("OUT_OF_LINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + } + + val blobField = spark.table(tableName).schema.find(_.name == "data").get + assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected BLOB type metadata on data field after compaction, " + + s"got: ${blobField.metadata}") + assertResult(HoodieSchemaType.BLOB.name())( + blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, ${outOfLineBlobLiteral(file2Updated, 0L, 60L)} as data, 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val updatedBytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(60)(updatedBytesById(2).length) + BlobTestHelpers.assertBytesContent(updatedBytesById(2)) + + val metaClient = createMetaClient(spark, tablePath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala index e364772e0596e..0f5f0928f8128 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.dml.schema import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.util.StringUtils @@ -235,6 +236,121 @@ class TestVariantDataType extends HoodieSparkSqlTestBase { }) } + test("Test Query Log Only MOR Table With VARIANT column triggers compaction (Lance)") { + assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") + assume(System.getProperty("lance.skip.tests") != "true", + "Lance tests disabled via -Dlance.skip.tests=true") + // Lance writer has no VARIANT handling today (RFC-100 Phase 2). Expected to fail + // until support lands in HoodieSparkLanceWriter; this test pins the gap. + + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | v variant, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + // Verify the LANCE config was actually persisted to hoodie.properties. + assertResult(HoodieFileFormat.LANCE)( + createMetaClient(spark, tmp.getCanonicalPath).getTableConfig.getBaseFileFormat) + + spark.sql( + s"insert into $tableName values " + + "(1, parse_json('{\"key\":\"value1\"}'), 1000)") + spark.sql( + s"insert into $tableName values " + + "(2, parse_json('{\"key\":\"value2\"}'), 1000)") + spark.sql( + s"insert into $tableName values " + + "(3, parse_json('{\"key\":\"value3\"}'), 1000)") + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"value1\"}", 1000), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000) + ) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, + | parse_json('{"key":"v1-merged"}') as v, + | 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000) + ) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, + | parse_json('{"key":"value4"}') as v, + | 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000), + Seq(4, "{\"key\":\"value4\"}", 1000) + ) + + val variantField = spark.table(tableName).schema.find(_.name == "v").get + assertResult("variant")(variantField.dataType.typeName) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, + | parse_json('{"key":"v2-merged"}') as v, + | 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"v2-merged\"}", 1002), + Seq(3, "{\"key\":\"value3\"}", 1000), + Seq(4, "{\"key\":\"value4\"}", 1000) + ) + + val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + test("Test toHiveCompatibleSchema converts VariantType to physical struct") { assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") From b4f6f5e7d20ccebd46ecf6fb7e1dbdfc9cfdb035 Mon Sep 17 00:00:00 2001 From: voon Date: Sun, 26 Apr 2026 17:30:39 +0800 Subject: [PATCH 2/2] feat(blob): Accept partial {type,data} or {type,reference} structs on write INLINE writes now accept the natural `{type, data}` shape and OUT_OF_LINE writes accept `{type, reference}`; the missing sibling field is auto-padded with null at the writer ingest boundary so the canonical 3-field BLOB layout is preserved on disk. Padding recurses through StructType, ArrayType, and MapType (via Spark's transform / transform_values) so nested partial blobs are handled too. Already-canonical inputs are a no-op. --- .../avro/HoodieSparkSchemaConverters.scala | 181 +++++++++++++++ .../apache/hudi/HoodieSparkSqlWriter.scala | 7 +- .../spark/sql/hudi/blob/ReadBlobRule.scala | 179 ++++++++++----- .../command/InsertBlobCanonicalizer.scala | 146 ++++++++++++ .../apache/hudi/blob/BlobTestHelpers.scala | 28 +++ .../apache/hudi/blob/TestReadBlobSQL.scala | 216 ++++++++++++++++++ .../hudi/dml/schema/TestBlobDataType.scala | 108 +++++++++ .../InsertIntoHoodieTableCommand.scala | 8 +- .../InsertIntoHoodieTableCommand.scala | 8 +- 9 files changed, 817 insertions(+), 64 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertBlobCanonicalizer.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala index c1c3fe1f68378..f8807789559bc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala @@ -24,6 +24,8 @@ import org.apache.hudi.common.schema.HoodieSchema.TimePrecision import org.apache.hudi.internal.schema.HoodieSchemaException import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.functions.{col, lit, struct, transform, transform_values, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.types.Decimal.minBytesForPrecision @@ -82,6 +84,178 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { def validateCustomTypeStructures(structType: StructType): Unit = validateCustomTypeStructuresRecursive(structType) + /** + * Pads partial BLOB columns - anywhere they appear in the schema tree - to the canonical + * 3-field layout `{type, data, reference}` so the writer's row encoder always sees the + * full shape. Recurses through nested `StructType`, `ArrayType`, and `MapType` to mirror + * the validator's coverage. + * + * RFC-100 BLOB columns are physically a 3-field struct, but for INLINE writes only + * `{type, data}` is meaningful and for OUT_OF_LINE writes only `{type, reference}` is + * meaningful. This helper accepts either partial form on input and rewrites each row to + * the canonical 3-field shape with `lit(null)` filling in the missing field. Null blob + * structs (and null array elements / map values containing blobs) round-trip as null. + * Already-canonical blob columns pass through unchanged (idempotent). + * + * @param df the DataFrame whose BLOB columns may be partial at any nesting depth + * @return the input DataFrame if no partial blob columns were found, or a projected + * DataFrame with each partial blob column rewritten to canonical shape + */ + def padPartialBlobColumns(df: DataFrame): DataFrame = { + val caseSensitive = SQLConf.get.caseSensitiveAnalysis + if (!df.schema.fields.exists(f => fieldNeedsPad(f, caseSensitive))) { + df + } else { + val projected: Seq[Column] = df.schema.fields.map { f => + if (fieldNeedsPad(f, caseSensitive)) { + padField(f, col(s"`${f.name}`"), caseSensitive).as(f.name, f.metadata) + } else { + col(s"`${f.name}`") + } + } + df.select(projected: _*) + } + } + + /** + * Returns true if the field itself is a partial blob field that needs padding, + * or if any partial blob field exists somewhere inside its data type. + */ + private def fieldNeedsPad(field: StructField, caseSensitive: Boolean): Boolean = + isPartialBlobField(field, caseSensitive) || typeNeedsPad(field.dataType, caseSensitive) + + /** + * Returns true if the data type contains, anywhere within it, a BLOB-tagged StructField + * whose struct shape is a partial 2-field accepted layout. + */ + private def typeNeedsPad(dataType: DataType, caseSensitive: Boolean): Boolean = dataType match { + case s: StructType => s.fields.exists(f => fieldNeedsPad(f, caseSensitive)) + case ArrayType(elementType, _) => typeNeedsPad(elementType, caseSensitive) + case MapType(_, valueType, _) => typeNeedsPad(valueType, caseSensitive) + case _ => false + } + + /** + * Returns true if `field` is tagged `hudi_type=BLOB` and its struct shape is one of the + * accepted partial layouts: `{type, data}` or `{type, reference}`. Canonical 3-field + * structs return false (no padding needed). Anything else also returns false - the strict + * validator will reject those downstream. + */ + private def isPartialBlobField(field: StructField, caseSensitive: Boolean): Boolean = { + if (!field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) return false + val descriptorType = HoodieSchema + .parseTypeDescriptor(field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + .getType + if (descriptorType != HoodieSchemaType.BLOB) return false + field.dataType match { + case st: StructType if !isCanonicalBlobStruct(st) => isAcceptedPartialBlobStruct(st, caseSensitive) + case _ => false + } + } + + private def isAcceptedPartialBlobStruct(st: StructType, caseSensitive: Boolean): Boolean = { + if (st.length != 2) return false + val key: String => String = + if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT) + val names = st.fields.map(f => key(f.name)).toSet + val typeKey = key(HoodieSchema.Blob.TYPE) + val dataKey = key(HoodieSchema.Blob.INLINE_DATA_FIELD) + val refKey = key(HoodieSchema.Blob.EXTERNAL_REFERENCE) + names == Set(typeKey, dataKey) || names == Set(typeKey, refKey) + } + + /** + * Builds a Column expression that rewrites the value at `sourceCol` (which has the same + * shape as `field.dataType`) to its post-padding canonical shape. Used by + * [[padPartialBlobColumns]] and recursively by itself for nested struct fields. + * + * The caller is responsible for `.as(field.name, field.metadata)` on the returned column; + * this method produces an unaliased value expression so it can also be used inside + * `transform`/`transform_values` lambdas. + */ + private def padField(field: StructField, sourceCol: Column, caseSensitive: Boolean): Column = { + if (isPartialBlobField(field, caseSensitive)) { + padBlobStructValue(sourceCol, field.dataType.asInstanceOf[StructType], caseSensitive) + } else { + padDataType(field.dataType, sourceCol, caseSensitive) + } + } + + /** + * Builds a Column expression that rewrites a value at `sourceCol` (whose shape is + * `dataType`) so any partial blob structs nested anywhere inside are padded to canonical. + * If no padding is needed inside `dataType`, returns `sourceCol` directly. + */ + private def padDataType(dataType: DataType, sourceCol: Column, caseSensitive: Boolean): Column = { + if (!typeNeedsPad(dataType, caseSensitive)) return sourceCol + dataType match { + case s: StructType => + val rebuiltFields: Seq[Column] = s.fields.map { f => + val childExpr = sourceCol.getField(f.name) + padField(f, childExpr, caseSensitive).as(f.name) + } + // Preserve null-struct semantics: a null source struct must round-trip as null, + // not as a non-null struct with all-null fields produced by `struct(...)`. + when(sourceCol.isNull, lit(null).cast(rebuiltType(s, caseSensitive))) + .otherwise(struct(rebuiltFields: _*)) + + case ArrayType(elementType, _) => + // transform() preserves the array's null-ness; the lambda handles null elements. + transform(sourceCol, (x: Column) => padDataType(elementType, x, caseSensitive)) + + case MapType(_, valueType, _) => + // transform_values() preserves the map's null-ness; lambda handles null values. + transform_values(sourceCol, (_: Column, v: Column) => padDataType(valueType, v, caseSensitive)) + + case _ => sourceCol + } + } + + /** + * Rewrites a (possibly null) blob-struct value at `blobCol` to the canonical 3-field + * shape, padding the missing sibling field with `lit(null)`. Preserves null-struct + * semantics: a null source struct round-trips as null. + */ + private def padBlobStructValue(blobCol: Column, st: StructType, caseSensitive: Boolean): Column = { + val key: String => String = + if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT) + val present = st.fields.map(f => key(f.name)).toSet + val typeCol = blobCol.getField(HoodieSchema.Blob.TYPE).as(HoodieSchema.Blob.TYPE) + val dataCol = if (present.contains(key(HoodieSchema.Blob.INLINE_DATA_FIELD))) { + blobCol.getField(HoodieSchema.Blob.INLINE_DATA_FIELD).as(HoodieSchema.Blob.INLINE_DATA_FIELD) + } else { + lit(null).cast(BinaryType).as(HoodieSchema.Blob.INLINE_DATA_FIELD) + } + val refCol = if (present.contains(key(HoodieSchema.Blob.EXTERNAL_REFERENCE))) { + blobCol.getField(HoodieSchema.Blob.EXTERNAL_REFERENCE).as(HoodieSchema.Blob.EXTERNAL_REFERENCE) + } else { + lit(null).cast(expectedBlobReferenceStructType).as(HoodieSchema.Blob.EXTERNAL_REFERENCE) + } + when(blobCol.isNull, lit(null).cast(expectedBlobStructType)) + .otherwise(struct(typeCol, dataCol, refCol)) + } + + /** + * Returns the post-padding DataType corresponding to `dataType`: every accepted partial + * blob struct is replaced by `expectedBlobStructType`; nested struct/array/map containers + * are rebuilt with their inner types similarly transformed. Used to provide the + * `lit(null).cast(...)` target type when guarding null-struct semantics. + */ + private def rebuiltType(dataType: DataType, caseSensitive: Boolean): DataType = dataType match { + case s: StructType => + StructType(s.fields.map { f => + val newType = + if (isPartialBlobField(f, caseSensitive)) expectedBlobStructType + else rebuiltType(f.dataType, caseSensitive) + f.copy(dataType = newType) + }) + case ArrayType(elementType, containsNull) => + ArrayType(rebuiltType(elementType, caseSensitive), containsNull) + case MapType(keyType, valueType, valueContainsNull) => + MapType(keyType, rebuiltType(valueType, caseSensitive), valueContainsNull) + case other => other + } + private def validateCustomTypeStructuresRecursive(dataType: DataType): Unit = dataType match { case s: StructType => s.fields.foreach { f => @@ -409,6 +583,13 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { private lazy val expectedBlobStructType: StructType = toSqlType(HoodieSchema.createBlob())._1.asInstanceOf[StructType] + // Spark type of the canonical reference sub-struct ({external_path, offset, length, managed}). + // Used by padPartialBlobColumns to construct lit(null).cast(...) for the missing reference + // field when a user supplies an INLINE-only `{type, data}` blob struct. + private lazy val expectedBlobReferenceStructType: DataType = + expectedBlobStructType.fields + .find(_.name == HoodieSchema.Blob.EXTERNAL_REFERENCE).get.dataType + /** * Validates that a StructType matches the expected blob schema structure defined in {@link HoodieSchema.Blob}. * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index aaa79929256f6..2c4b36c2d2b27 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -70,6 +70,7 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql._ +import org.apache.spark.sql.avro.HoodieSparkSchemaConverters import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.StructType @@ -356,11 +357,15 @@ class HoodieSparkSqlWriterInternal { val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val latestTableSchemaOpt = getLatestTableSchema(tableMetaClient, schemaFromCatalog) - val df = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) { + val dfPreBlobPad = if (preppedWriteOperation || preppedSparkSqlWrites || preppedSparkSqlMergeInto || sourceDf.isStreaming) { sourceDf } else { sourceDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala.toSeq: _*) } + // RFC-100 BLOB QoL: accept partial INLINE-only `{type,data}` or OUT_OF_LINE-only + // `{type,reference}` user inputs by padding the missing sibling field with null at the + // ingest boundary. No-op for already-canonical 3-field structs. + val df = HoodieSparkSchemaConverters.padPartialBlobColumns(dfPreBlobPad) // NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and // Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that // play crucial role in establishing compatibility b/w schemas diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala index b91d08674cb95..6723b37a89683 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ReadBlobRule.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.hudi.blob -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, ExprId, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.{DataType, StructType} @@ -35,7 +34,14 @@ import scala.collection.mutable.ArrayBuffer * Replaces [[ReadBlobExpression]] markers with [[BatchedBlobRead]] nodes * that read blob data during physical execution. * + * Supports both top-level column references and nested expressions that + * resolve to a BlobType struct (e.g. `read_blob(parent.payload)` or + * `read_blob(arr[0])`). Non-attribute sources are lifted to a synthetic + * top-level alias via an injected `Project` so the downstream + * [[BatchedBlobRead]] always sees a plain top-level attribute. + * * Example: `SELECT id, read_blob(image_data) FROM table` + * Example: `SELECT id, read_blob(doc.payload) FROM table` * * @param spark SparkSession for accessing configuration */ @@ -46,34 +52,29 @@ case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] { if containsReadBlobExpression(projectList) && containsReadBlobInExpression(condition) && !child.isInstanceOf[BatchedBlobRead] => - val projectBlobCols = extractAllBlobColumns(projectList) - val filterBlobCols = extractBlobColumnsFromExpression(condition) - val blobColumns = (projectBlobCols ++ filterBlobCols) - .foldLeft((mutable.LinkedHashSet.empty[ExprId], ArrayBuffer.empty[AttributeReference])) { - case ((seen, acc), a) if seen.add(a.exprId) => (seen, acc += a) - case ((seen, acc), _) => (seen, acc) - }._2.toSeq - val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) - val newCondition = replaceReadBlobExpression(condition, blobToDataAttr) - val newProjectList = transformNamedExpressions(projectList, blobToDataAttr) + val sources = dedupBlobSources( + extractAllBlobSourceExprs(projectList) ++ extractBlobSourceExprsFromExpression(condition)) + val (wrappedPlan, sourceToDataAttr) = wrapWithBlobReads(sources, child) + val newCondition = replaceReadBlobExpression(condition, sourceToDataAttr) + val newProjectList = transformNamedExpressions(projectList, sourceToDataAttr) Project(newProjectList, Filter(newCondition, wrappedPlan)) case Filter(condition, child) if containsReadBlobInExpression(condition) && !child.isInstanceOf[BatchedBlobRead] => - val blobColumns = extractBlobColumnsFromExpression(condition) - val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) - val newCondition = replaceReadBlobExpression(condition, blobToDataAttr) + val sources = extractBlobSourceExprsFromExpression(condition) + val (wrappedPlan, sourceToDataAttr) = wrapWithBlobReads(sources, child) + val newCondition = replaceReadBlobExpression(condition, sourceToDataAttr) Project(child.output, Filter(newCondition, wrappedPlan)) case Project(projectList, child) if containsReadBlobExpression(projectList) && !child.isInstanceOf[BatchedBlobRead] => - val blobColumns = extractAllBlobColumns(projectList) - val (wrappedPlan, blobToDataAttr) = wrapWithBlobReads(blobColumns, child) - val newProjectList = transformNamedExpressions(projectList, blobToDataAttr) + val sources = extractAllBlobSourceExprs(projectList) + val (wrappedPlan, sourceToDataAttr) = wrapWithBlobReads(sources, child) + val newProjectList = transformNamedExpressions(projectList, sourceToDataAttr) Project(newProjectList, wrappedPlan) case node if containsReadBlobInAnyExpression(node) => @@ -86,39 +87,72 @@ case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] { plan.expressions.exists(containsReadBlobInExpression) } + /** + * Wraps `child` with one [[BatchedBlobRead]] per distinct blob source expression. + * Non-attribute sources are first projected to a synthetic top-level alias so the + * downstream nodes can address them as plain top-level attributes. + * + * @return the augmented plan plus a mapping from each source expression's + * canonicalized form to the [[BatchedBlobRead.dataAttr]] that should + * replace the corresponding `read_blob()` call. + */ private def wrapWithBlobReads( - blobColumns: Seq[AttributeReference], - child: LogicalPlan): (LogicalPlan, Map[ExprId, Attribute]) = { - if (blobColumns.isEmpty) { + sources: Seq[Expression], + child: LogicalPlan): (LogicalPlan, Map[Expression, Attribute]) = { + if (sources.isEmpty) { throw new IllegalStateException("read_blob() found but no valid blob column reference extracted.") } - blobColumns.foldLeft((child: LogicalPlan, Map.empty[ExprId, Attribute])) { - case ((currentPlan, mapping), blobAttr) => - // Type compatibility check (early fail for non-struct columns) - blobAttr.dataType match { - case struct: StructType if DataType.equalsIgnoreCaseAndNullability(struct, org.apache.spark.sql.types.BlobType.dataType) => - // Valid blob column - case _ => - throw new IllegalArgumentException( - s"Blob column '${blobAttr.name}' must be compatible with BlobType (type, data, reference struct), found: ${blobAttr.dataType}") - } - val blobRead = BatchedBlobRead(currentPlan, blobAttr) - (blobRead, mapping + (blobAttr.exprId -> blobRead.dataAttr)) + + // Per source: either reuse an existing top-level AttributeReference, or synthesize a + // fresh Alias that we will project onto the plan to lift the nested expression into a + // top-level attribute. We keep the original source expression so we can key the final + // mapping by its canonicalized form. + case class Binding(source: Expression, blobAttr: AttributeReference, lift: Option[Alias]) + val bindings: Seq[Binding] = sources.map { + case attr: AttributeReference => + Binding(attr, attr, None) + case other => + // Deterministic alias name from the canonicalized form so identical nested + // sub-trees (e.g. across SELECT and WHERE) get the same alias name. + val aliasName = s"_blob_src_${Integer.toHexString(other.canonicalized.hashCode())}" + val alias = Alias(other, aliasName)() + Binding(other, alias.toAttribute.asInstanceOf[AttributeReference], Some(alias)) } - } - private def extractBlobColumnsFromExpression(expr: Expression): Seq[AttributeReference] = { - val seen = mutable.LinkedHashSet.empty[ExprId] - val result = ArrayBuffer.empty[AttributeReference] - collectBlobColumns(expr, seen, result) - result.toSeq + // Type-compatibility check: every blob source must resolve to a BlobType struct. + bindings.foreach { b => + b.blobAttr.dataType match { + case struct: StructType + if DataType.equalsIgnoreCaseAndNullability(struct, org.apache.spark.sql.types.BlobType.dataType) => + // Valid blob source. + case _ => + throw new IllegalArgumentException( + s"Blob source '${b.source}' must be compatible with BlobType (type, data, reference struct), " + + s"found: ${b.blobAttr.dataType}") + } + } + + // Inject the synthetic Project once if any source needed lifting. + val lifts = bindings.flatMap(_.lift) + val planWithLifts: LogicalPlan = if (lifts.isEmpty) child + else Project(child.output ++ lifts, child) + + // Stack one BatchedBlobRead per distinct blob source. + val (finalPlan, mapping) = bindings.foldLeft( + (planWithLifts, Map.empty[Expression, Attribute])) { + case ((currentPlan, acc), b) => + val blobRead = BatchedBlobRead(currentPlan, b.blobAttr) + (blobRead, acc + (b.source.canonicalized -> blobRead.dataAttr)) + } + + (finalPlan, mapping) } /** * Check if any expression in the project list contains a ReadBlobExpression. */ private def containsReadBlobExpression(projectList: Seq[Expression]): Boolean = { - projectList.exists(expr => containsReadBlobInExpression(expr)) + projectList.exists(containsReadBlobInExpression) } private def containsReadBlobInExpression(expr: Expression): Boolean = { @@ -128,46 +162,69 @@ case class ReadBlobRule(spark: SparkSession) extends Rule[LogicalPlan] { } } - private def extractAllBlobColumns(expressions: Seq[Expression]): Seq[AttributeReference] = { - val seen = mutable.LinkedHashSet.empty[ExprId] - val result = ArrayBuffer.empty[AttributeReference] - expressions.foreach(collectBlobColumns(_, seen, result)) + private def extractAllBlobSourceExprs(expressions: Seq[Expression]): Seq[Expression] = { + val seen = mutable.LinkedHashSet.empty[Expression] + val result = ArrayBuffer.empty[Expression] + expressions.foreach(collectBlobSources(_, seen, result)) result.toSeq } - private def collectBlobColumns( + private def extractBlobSourceExprsFromExpression(expr: Expression): Seq[Expression] = { + val seen = mutable.LinkedHashSet.empty[Expression] + val result = ArrayBuffer.empty[Expression] + collectBlobSources(expr, seen, result) + result.toSeq + } + + /** + * Walks `expr` collecting the inner expression of every `ReadBlobExpression`. + * Dedup is by canonicalized form so identical sub-trees only produce one + * [[BatchedBlobRead]] each. + */ + private def collectBlobSources( expr: Expression, - seen: mutable.Set[ExprId], - result: ArrayBuffer[AttributeReference]): Unit = expr match { - case ReadBlobExpression(attr: AttributeReference) => - if (seen.add(attr.exprId)) result += attr + seen: mutable.Set[Expression], + result: ArrayBuffer[Expression]): Unit = expr match { + case ReadBlobExpression(inner) => + if (seen.add(inner.canonicalized)) result += inner case other => - other.children.foreach(collectBlobColumns(_, seen, result)) + other.children.foreach(collectBlobSources(_, seen, result)) + } + + /** + * Dedup two source-expression lists (e.g. from a Project + Filter) by canonicalized + * form, preserving first-seen order. + */ + private def dedupBlobSources(sources: Seq[Expression]): Seq[Expression] = { + val seen = mutable.LinkedHashSet.empty[Expression] + val result = ArrayBuffer.empty[Expression] + sources.foreach { s => + if (seen.add(s.canonicalized)) result += s + } + result.toSeq } private def transformNamedExpressions( expressions: Seq[NamedExpression], - blobToDataAttr: Map[ExprId, Attribute]): Seq[NamedExpression] = { + sourceToDataAttr: Map[Expression, Attribute]): Seq[NamedExpression] = { expressions.map { case alias @ Alias(childExpr, name) => - val rewritten = replaceReadBlobExpression(childExpr, blobToDataAttr) + val rewritten = replaceReadBlobExpression(childExpr, sourceToDataAttr) Alias(rewritten, name)(alias.exprId, alias.qualifier, alias.explicitMetadata) case attr: AttributeReference => attr case other => - replaceReadBlobExpression(other, blobToDataAttr).asInstanceOf[NamedExpression] + replaceReadBlobExpression(other, sourceToDataAttr).asInstanceOf[NamedExpression] } } private def replaceReadBlobExpression( expr: Expression, - blobToDataAttr: Map[ExprId, Attribute]): Expression = expr match { - case ReadBlobExpression(attr: AttributeReference) => - blobToDataAttr.getOrElse(attr.exprId, throw new IllegalArgumentException( - s"read_blob() called on column '${attr.name}' (exprId=${attr.exprId}) which was not registered for blob reading. " + - s"Available blob columns: ${blobToDataAttr.keys.mkString(", ")}")) - case ReadBlobExpression(_) => - throw new IllegalStateException("read_blob() must be called on a direct column reference") + sourceToDataAttr: Map[Expression, Attribute]): Expression = expr match { + case ReadBlobExpression(inner) => + sourceToDataAttr.getOrElse(inner.canonicalized, throw new IllegalArgumentException( + s"read_blob() called on expression '$inner' which was not registered for blob reading. " + + s"Available blob sources: ${sourceToDataAttr.keys.mkString(", ")}")) case other => - other.mapChildren(replaceReadBlobExpression(_, blobToDataAttr)) + other.mapChildren(replaceReadBlobExpression(_, sourceToDataAttr)) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertBlobCanonicalizer.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertBlobCanonicalizer.scala new file mode 100644 index 0000000000000..1385cf5904009 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertBlobCanonicalizer.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.types._ + +import java.util.Locale + +/** + * Rewrites partial 2-field BLOB struct expressions to the canonical 3-field shape + * before Spark's [[org.apache.spark.sql.catalyst.analysis.TableOutputResolver]] runs, + * so SQL inserts of `{type, reference}` (OUT_OF_LINE) are not rejected by its positional + * struct check. Top-level BLOB columns only; nested partial BLOBs in SQL still need the + * canonical `named_struct` literal. + */ +object InsertBlobCanonicalizer { + + /** + * Idempotent. For each top-level column whose target is BLOB-tagged and whose source + * is a partial 2-field accepted layout, projects a canonical 3-field + * [[CreateNamedStruct]] onto the query. + */ + def padPartialBlobsAgainstTarget( + query: LogicalPlan, + expectedSchema: StructType): LogicalPlan = { + val sourceAttrs = query.output + if (sourceAttrs.length != expectedSchema.length) { + query + } else { + val newProjectList: Seq[NamedExpression] = sourceAttrs.zip(expectedSchema.fields).map { + case (sourceAttr, targetField) => + canonicalizeIfPartialBlob(sourceAttr, targetField) match { + case Some(canonical) => + // Use a fresh ExprId: reshaping changes the column's dataType, and reusing + // sourceAttr.exprId would put two attributes with the same id but different + // types in the plan tree (the upstream child still emits the partial struct), + // which Spark's plan validator rejects (PLAN_VALIDATION_FAILED_RULE_IN_BATCH). + Alias(canonical, sourceAttr.name)(qualifier = sourceAttr.qualifier) + case None => + sourceAttr + } + } + + if (newProjectList.zip(sourceAttrs).forall { case (a, b) => a eq b }) query + else Project(newProjectList, query) + } + } + + private def canonicalizeIfPartialBlob( + sourceAttr: Attribute, + targetField: StructField): Option[Expression] = { + if (!isBlobMetadata(targetField.metadata)) { + None + } else { + (sourceAttr.dataType, targetField.dataType) match { + case (sourceStruct: StructType, targetStruct: StructType) + if isPartialBlobLayout(sourceStruct) && isCanonicalBlobLayout(targetStruct) => + Some(canonicalizeBlobExpr(sourceAttr, sourceStruct, targetStruct)) + case _ => None + } + } + } + + private def isBlobMetadata(metadata: Metadata): Boolean = { + metadata.contains(HoodieSchema.TYPE_METADATA_FIELD) && + HoodieSchema.parseTypeDescriptor(metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + .getType == HoodieSchemaType.BLOB + } + + private def isCanonicalBlobLayout(st: StructType): Boolean = { + st.length == 3 && + st.fields.exists(_.name.equalsIgnoreCase(HoodieSchema.Blob.TYPE)) && + st.fields.exists(_.name.equalsIgnoreCase(HoodieSchema.Blob.INLINE_DATA_FIELD)) && + st.fields.exists(_.name.equalsIgnoreCase(HoodieSchema.Blob.EXTERNAL_REFERENCE)) + } + + private def isPartialBlobLayout(st: StructType): Boolean = { + if (st.length != 2) { + false + } else { + val names = st.fields.map(_.name.toLowerCase(Locale.ROOT)).toSet + val typeKey = HoodieSchema.Blob.TYPE.toLowerCase(Locale.ROOT) + val dataKey = HoodieSchema.Blob.INLINE_DATA_FIELD.toLowerCase(Locale.ROOT) + val refKey = HoodieSchema.Blob.EXTERNAL_REFERENCE.toLowerCase(Locale.ROOT) + names == Set(typeKey, dataKey) || names == Set(typeKey, refKey) + } + } + + private def canonicalizeBlobExpr( + sourceExpr: Expression, + sourceStruct: StructType, + targetStruct: StructType): Expression = { + val nameToOrdinal = sourceStruct.fields.zipWithIndex.map { + case (f, i) => f.name.toLowerCase(Locale.ROOT) -> i + }.toMap + + def lookupOrdinal(name: String): Option[Int] = + nameToOrdinal.get(name.toLowerCase(Locale.ROOT)) + + val typeOrd = lookupOrdinal(HoodieSchema.Blob.TYPE).get + val typeFieldExpr = GetStructField(sourceExpr, typeOrd, Some(HoodieSchema.Blob.TYPE)) + + val dataExpr: Expression = lookupOrdinal(HoodieSchema.Blob.INLINE_DATA_FIELD) match { + case Some(ord) => GetStructField(sourceExpr, ord, Some(HoodieSchema.Blob.INLINE_DATA_FIELD)) + case None => Literal(null, BinaryType) + } + + val refTargetType = targetStruct.fields + .find(_.name.equalsIgnoreCase(HoodieSchema.Blob.EXTERNAL_REFERENCE)).get.dataType + val refExpr: Expression = lookupOrdinal(HoodieSchema.Blob.EXTERNAL_REFERENCE) match { + case Some(ord) => GetStructField(sourceExpr, ord, Some(HoodieSchema.Blob.EXTERNAL_REFERENCE)) + case None => Literal(null, refTargetType) + } + + // Preserve null-struct semantics: a null source struct round-trips as null, + // not as a non-null struct with all-null fields produced by CreateNamedStruct. + If(IsNull(sourceExpr), + Literal(null, targetStruct), + CreateNamedStruct(Seq( + Literal(HoodieSchema.Blob.TYPE), typeFieldExpr, + Literal(HoodieSchema.Blob.INLINE_DATA_FIELD), dataExpr, + Literal(HoodieSchema.Blob.EXTERNAL_REFERENCE), refExpr + ))) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala index b94229aa478e6..dd9f1b498b570 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/BlobTestHelpers.scala @@ -45,6 +45,34 @@ object BlobTestHelpers { ).as(name, blobMetadata) } + /** + * Minimal `{type, data}` shape - exercises the writer's auto-padding path that + * fills in `reference: null` for INLINE blobs. + */ + def inlineBlobStructColMinimal(name: String, bytesCol: Column): Column = { + struct( + lit(HoodieSchema.Blob.INLINE).as(HoodieSchema.Blob.TYPE), + bytesCol.cast("binary").as(HoodieSchema.Blob.INLINE_DATA_FIELD) + ).as(name, blobMetadata) + } + + /** + * Minimal `{type, reference}` shape - exercises the writer's auto-padding path + * that fills in `data: null` for OUT_OF_LINE blobs. + */ + def outOfLineBlobStructColMinimal( + name: String, filePathCol: Column, offsetCol: Column, lengthCol: Column): Column = { + struct( + lit(HoodieSchema.Blob.OUT_OF_LINE).as(HoodieSchema.Blob.TYPE), + struct( + filePathCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH), + offsetCol.as(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET), + lengthCol.cast("bigint").as(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH), + lit(false).as(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED) + ).as(HoodieSchema.Blob.EXTERNAL_REFERENCE) + ).as(name, blobMetadata) + } + def wholeFileBlobStructCol(name: String, filePathCol: Column): Column = { struct( lit(HoodieSchema.Blob.OUT_OF_LINE).as(HoodieSchema.Blob.TYPE), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala index 533c9589e338c..2da88988deb6f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala @@ -452,6 +452,222 @@ class TestReadBlobSQL extends HoodieClientTestBase { assertTrue(thrown.getCause.isInstanceOf[HoodieIOException]) } + /** + * INLINE BLOB writes accept the minimal `{type, data}` struct (no `reference` field) at + * the DataFrame entry. The writer pads each row to the canonical 3-field shape with + * `reference = null`. Round-trips the bytes through `read_blob()` to confirm the padded + * row materializes correctly. + */ + @Test + def testInlineBlobWriteAcceptsMinimalStruct(): Unit = { + val tablePath = s"$tempDir/hudi_inline_min_struct_table" + val payload = Array[Byte](0xA, 0xB, 0xC, 0xD) + + val minimalInlineSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload", StructType(Seq( + StructField(HoodieSchema.Blob.TYPE, StringType, nullable = true), + StructField(HoodieSchema.Blob.INLINE_DATA_FIELD, BinaryType, nullable = true) + )), nullable = true, blobMetadata) + )) + val minimalRow = Row(1, Row(HoodieSchema.Blob.INLINE, payload)) + val minimalDf = sparkSession.createDataFrame( + Collections.singletonList(minimalRow), minimalInlineSchema) + + minimalDf.write.format("hudi") + .option("hoodie.table.name", "blob_inline_min_struct") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + val readBack = sparkSession.read.format("hudi").load(tablePath) + .select("id", "payload") + .collect() + assertEquals(1, readBack.length) + val payloadStruct = readBack(0).getStruct(readBack(0).fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.INLINE, + payloadStruct.getString(payloadStruct.fieldIndex(HoodieSchema.Blob.TYPE))) + assertArrayEquals(payload, + payloadStruct.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD)) + // The padder filled `reference` with null on the way in. + assertTrue(payloadStruct.isNullAt( + payloadStruct.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + + // read_blob() resolves INLINE bytes directly. + sparkSession.read.format("hudi").load(tablePath) + .createOrReplaceTempView("blob_min_inline_view") + val readBlobBytes = sparkSession.sql( + "SELECT id, read_blob(payload) AS bytes FROM blob_min_inline_view") + .collect().head.getAs[Array[Byte]]("bytes") + assertArrayEquals(payload, readBlobBytes) + } + + /** + * OUT_OF_LINE BLOB writes accept the minimal `{type, reference}` struct (no `data` + * field) at the DataFrame entry. The writer pads each row to the canonical 3-field + * shape with `data = null`. + */ + @Test + def testOutOfLineBlobWriteAcceptsMinimalStruct(): Unit = { + val extFile = createTestFile(tempDir, "minstruct_ool.bin", 1000) + val tablePath = s"$tempDir/hudi_oolline_min_struct_table" + + // Minimal {type, reference} schema — no `data` field. + val refStructType = StructType(Seq( + StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH, StringType, nullable = true), + StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET, LongType, nullable = true), + StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH, LongType, nullable = true), + StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED, BooleanType, nullable = true) + )) + val minimalSchema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("payload", StructType(Seq( + StructField(HoodieSchema.Blob.TYPE, StringType, nullable = true), + StructField(HoodieSchema.Blob.EXTERNAL_REFERENCE, refStructType, nullable = true) + )), nullable = true, blobMetadata) + )) + val refRow = Row(extFile, 0L, 100L, false) + val minimalRow = Row(1, Row(HoodieSchema.Blob.OUT_OF_LINE, refRow)) + val minimalDf = sparkSession.createDataFrame( + Collections.singletonList(minimalRow), minimalSchema) + + minimalDf.write.format("hudi") + .option("hoodie.table.name", "blob_oolline_min_struct") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + val readBack = sparkSession.read.format("hudi").load(tablePath) + .select("id", "payload") + .collect() + assertEquals(1, readBack.length) + val payloadStruct = readBack(0).getStruct(readBack(0).fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.OUT_OF_LINE, + payloadStruct.getString(payloadStruct.fieldIndex(HoodieSchema.Blob.TYPE))) + // The padder filled `data` with null on the way in. + assertTrue(payloadStruct.isNullAt( + payloadStruct.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + val ref = payloadStruct.getStruct( + payloadStruct.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)) + assertTrue(ref.getString(ref.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH)) + .endsWith("minstruct_ool.bin")) + + // read_blob() dereferences the external file and reads the byte range. + sparkSession.read.format("hudi").load(tablePath) + .createOrReplaceTempView("blob_min_oolline_view") + val readBlobBytes = sparkSession.sql( + "SELECT id, read_blob(payload) AS bytes FROM blob_min_oolline_view") + .collect().head.getAs[Array[Byte]]("bytes") + assertEquals(100, readBlobBytes.length) + assertBytesContent(readBlobBytes, expectedOffset = 0) + } + + /** + * Partial INLINE blob structs nested inside a non-blob struct, an array of structs, and + * a map of string -> struct are all padded by the writer. Exercises the recursive pad + * paths in `padPartialBlobColumns` (struct rebuild, `transform`, `transform_values`). + */ + @Test + def testNestedPartialBlobWritesAcceptedThroughStructArrayAndMap(): Unit = { + val tablePath = s"$tempDir/hudi_nested_partial_blob_table" + + val payloadStruct = StructType(Seq( + StructField(HoodieSchema.Blob.TYPE, StringType, nullable = true), + StructField(HoodieSchema.Blob.INLINE_DATA_FIELD, BinaryType, nullable = true) + )) + val nestedStructField = StructField( + "doc", StructType(Seq( + StructField("doc_id", IntegerType, nullable = false), + StructField("payload", payloadStruct, nullable = true, blobMetadata) + )), nullable = true) + val nestedArrayField = StructField( + "items", ArrayType(StructType(Seq( + StructField("idx", IntegerType, nullable = false), + StructField("payload", payloadStruct, nullable = true, blobMetadata) + )), containsNull = true), nullable = true) + val nestedMapField = StructField( + "by_label", MapType(StringType, StructType(Seq( + StructField("payload", payloadStruct, nullable = true, blobMetadata) + )), valueContainsNull = true), nullable = true) + + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + nestedStructField, + nestedArrayField, + nestedMapField + )) + + val structPayload = Array[Byte](0x1, 0x2) + val arrPayload0 = Array[Byte](0x3, 0x4) + val arrPayload1 = Array[Byte](0x5, 0x6) + val mapPayload = Array[Byte](0x7, 0x8) + + val docRow = Row(101, Row(HoodieSchema.Blob.INLINE, structPayload)) + val itemsRow = Seq( + Row(0, Row(HoodieSchema.Blob.INLINE, arrPayload0)), + Row(1, Row(HoodieSchema.Blob.INLINE, arrPayload1)) + ) + val byLabelRow = Map("a" -> Row(Row(HoodieSchema.Blob.INLINE, mapPayload))) + val row = Row(1, docRow, itemsRow, byLabelRow) + + val df = sparkSession.createDataFrame(Collections.singletonList(row), schema) + + df.write.format("hudi") + .option("hoodie.table.name", "blob_nested_partial") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.operation", "bulk_insert") + .mode("overwrite") + .save(tablePath) + + val rows = sparkSession.read.format("hudi").load(tablePath) + .select("id", "doc", "items", "by_label").collect() + assertEquals(1, rows.length) + val readBack = rows(0) + + // Struct-nested blob: padded `reference` is null, bytes round-trip. + val docReadBack = readBack.getStruct(readBack.fieldIndex("doc")) + val docPayload = docReadBack.getStruct(docReadBack.fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.INLINE, + docPayload.getString(docPayload.fieldIndex(HoodieSchema.Blob.TYPE))) + assertArrayEquals(structPayload, + docPayload.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD)) + assertTrue(docPayload.isNullAt(docPayload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + + // Array-nested blob: each element's blob is independently padded. + val items = readBack.getList[Row](readBack.fieldIndex("items")) + assertEquals(2, items.size()) + val itemPayloads = (0 until 2).map { i => + val itemBlob = items.get(i).getStruct(items.get(i).fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.INLINE, + itemBlob.getString(itemBlob.fieldIndex(HoodieSchema.Blob.TYPE))) + assertTrue(itemBlob.isNullAt(itemBlob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + itemBlob.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD) + } + assertArrayEquals(arrPayload0, itemPayloads(0)) + assertArrayEquals(arrPayload1, itemPayloads(1)) + + // Map-nested blob: padded value's blob is canonical. + val byLabel = readBack.getJavaMap[String, Row](readBack.fieldIndex("by_label")) + val mapVal = byLabel.get("a") + val mapBlob = mapVal.getStruct(mapVal.fieldIndex("payload")) + assertEquals(HoodieSchema.Blob.INLINE, + mapBlob.getString(mapBlob.fieldIndex(HoodieSchema.Blob.TYPE))) + assertTrue(mapBlob.isNullAt(mapBlob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + assertArrayEquals(mapPayload, + mapBlob.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD)) + + // read_blob() resolves the struct-nested blob directly: the rule lifts the + // nested expression to a synthetic top-level alias before BatchedBlobRead. + sparkSession.read.format("hudi").load(tablePath) + .createOrReplaceTempView("blob_nested_view") + val sqlBytes = sparkSession.sql( + "SELECT read_blob(doc.payload) AS bytes FROM blob_nested_view") + .collect().head.getAs[Array[Byte]]("bytes") + assertArrayEquals(structPayload, sqlBytes) + } + @Test def testReadBlobOnNonBlobColumn(): Unit = { val df = sparkSession.createDataFrame(Seq( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala index 29bb03cc4791d..e6b284442b7d7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala @@ -517,4 +517,112 @@ class TestBlobDataType extends HoodieSparkSqlTestBase { "Expected at least one .clean instant on the timeline after compaction") }) } + + /** + * INLINE BLOB column accepts the minimal `named_struct('type','INLINE','data', X'..')` + * literal at INSERT time, no `reference` field required. The writer pads the missing + * `reference` with null. Both the canonical 3-field literal and the minimal 2-field + * literal coexist in the same table. + */ + test("Test Inline BLOB SQL insert accepts minimal named_struct") { + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'cow', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY' + | ) + """.stripMargin) + + val minimalLiteral = + """named_struct( + | 'type', 'INLINE', + | 'data', cast(X'AB' as binary) + |)""".stripMargin + + // Minimal 2-field named_struct now writes successfully. + spark.sql(s"insert into $tableName values (1, $minimalLiteral, 1000)") + // Canonical 3-field literal still writes successfully. + spark.sql(s"insert into $tableName values (2, ${inlineBlobLiteral("CD")}, 1000)") + + val bytesById = spark.sql(s"select id, read_blob(data) as bytes from $tableName order by id") + .collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(2)(bytesById.size) + assert(bytesById(1).sameElements(Array(0xAB.toByte))) + assert(bytesById(2).sameElements(Array(0xCD.toByte))) + + // Confirm structural shape: type=INLINE, data populated, reference null on both rows. + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + } + }) + } + + /** + * OUT_OF_LINE BLOB column accepts the minimal `named_struct('type','OUT_OF_LINE', + * 'reference', named_struct(...))` literal — no `data` field required. The writer + * pads the missing `data` with null. + */ + test("Test OUT_OF_LINE BLOB SQL insert accepts minimal named_struct") { + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val blobDir = new File(tmp, "blobs") + blobDir.mkdirs() + val file1 = BlobTestHelpers.createTestFile(blobDir.toPath, "min_ool.bin", 100) + + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'cow', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY' + | ) + """.stripMargin) + + val minimalLiteral = + s"""named_struct( + | 'type', 'OUT_OF_LINE', + | 'reference', named_struct( + | 'external_path', '$file1', + | 'offset', cast(0 as bigint), + | 'length', cast(100 as bigint), + | 'managed', false + | ) + |)""".stripMargin + + spark.sql(s"insert into $tableName values (1, $minimalLiteral, 1000)") + + val bytes = spark.sql(s"select id, read_blob(data) as bytes from $tableName") + .collect().head.getAs[Array[Byte]]("bytes") + assertResult(100)(bytes.length) + BlobTestHelpers.assertBytesContent(bytes) + + val blob = spark.sql(s"select data from $tableName").collect().head.getStruct(0) + assertResult("OUT_OF_LINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + // Padder filled `data` with null. + assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + }) + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 3fc336167e893..28d0b22e72288 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -168,7 +168,13 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi // since such columns wouldn't be otherwise specified w/in the query itself and therefore couldn't be matched // positionally for example val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) - val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf) + // Canonicalize partial BLOB struct expressions before TableOutputResolver runs, + // so OUT_OF_LINE-style `{type, reference}` literals (which Spark's positional + // struct check would otherwise reject for not aligning with the canonical + // `{type, data, reference}` layout) are accepted on the SQL ingest path. + val canonicalizedQuery = InsertBlobCanonicalizer.padPartialBlobsAgainstTarget( + cleanedQuery, StructType(expectedQueryColumns)) + val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), canonicalizedQuery, catalogTable, conf) // After potential reshaping validate that the output of the query conforms to the table's schema validate(removeMetaFields(coercedQueryOutput.schema), partitionsSpec, catalogTable) diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index b1a2a3a2c6e7d..746c1aec91b86 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -168,7 +168,13 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi // since such columns wouldn't be otherwise specified w/in the query itself and therefore couldn't be matched // positionally for example val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) - val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf) + // Canonicalize partial BLOB struct expressions before TableOutputResolver runs, + // so OUT_OF_LINE-style `{type, reference}` literals (which Spark's positional + // struct check would otherwise reject for not aligning with the canonical + // `{type, data, reference}` layout) are accepted on the SQL ingest path. + val canonicalizedQuery = InsertBlobCanonicalizer.padPartialBlobsAgainstTarget( + cleanedQuery, StructType(expectedQueryColumns)) + val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), canonicalizedQuery, catalogTable, conf) // After potential reshaping validate that the output of the query conforms to the table's schema validate(removeMetaFields(coercedQueryOutput.schema), partitionsSpec, catalogTable)