diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java index 97bd0e1175d76..6f7530f27b39b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java @@ -55,6 +55,7 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; @@ -237,6 +238,12 @@ protected void initQueryIndexConf() { * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}). */ protected void cleanupSparkContexts() { + // HoodieInMemoryHashIndex holds a JVM-static record-location map that survives + // sparkSession.stop(), leaking record keys and locations across sequential tests + // in the same JVM. A stale entry causes tagLocation to demote a not-matched + // INSERT into a no-op UPDATE on a non-existent file group. + HoodieInMemoryHashIndex.clear(); + if (sparkSession != null) { sparkSession.stop(); sparkSession = null; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala index 81eb776b54823..3758a22442d29 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala @@ -18,8 +18,10 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} -import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieSparkClientTestBase} import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetFileReader @@ -28,6 +30,7 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.types._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.condition.DisabledIfSystemProperty import scala.collection.JavaConverters._ @@ -674,6 +677,263 @@ class TestVectorDataSource extends HoodieSparkClientTestBase { assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have original value 1.0") } + @Test + def testMorLogOnlyCompactionPreservesVectorMetadata(): Unit = { + val path = basePath + "/mor_log_only_vec" + val tableName = "mor_log_only_vec_test" + try { + spark.sql( + s""" + |create table $tableName ( + | id int, + | embedding VECTOR(3), + | ts long + |) using hudi + | location '$path' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) + """.stripMargin) + + def readOrdered(): Seq[Row] = + spark.sql(s"select id, embedding, ts from $tableName order by id").collect().toSeq + + def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] = + rows.find(_.getInt(0) == id).get.getSeq[Float](1) + + spark.sql( + s"insert into $tableName values " + + "(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float)), 1000)") + spark.sql( + s"insert into $tableName values " + + "(2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as float)), 1000)") + spark.sql( + s"insert into $tableName values " + + "(3, array(cast(0.7 as float), cast(0.8 as float), cast(0.9 as float)), 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertTrue(DataSourceTestUtils.isLogFileOnly(path)) + val afterInserts = readOrdered() + assertEquals(3, afterInserts.size) + assertEquals(Seq(0.1f, 0.2f, 0.3f), embeddingOf(1, afterInserts)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterInserts)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterInserts)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, + | array(cast(0.11 as float), cast(0.22 as float), cast(0.33 as float)) as embedding, + | 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertTrue(DataSourceTestUtils.isLogFileOnly(path)) + val afterUpdate = readOrdered() + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterUpdate)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterUpdate)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterUpdate)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, + | array(cast(0.44 as float), cast(0.55 as float), cast(0.66 as float)) as embedding, + | 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertFalse(DataSourceTestUtils.isLogFileOnly(path)) + val afterCompaction = readOrdered() + assertEquals(4, afterCompaction.size) + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCompaction)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterCompaction)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCompaction)) + assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCompaction)) + + // VECTOR custom-type descriptor must survive the compacted base-file read path. + val embeddingField = spark.table(tableName).schema.find(_.name == "embedding").get + assertTrue(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected VECTOR type metadata on embedding field after compaction, " + + s"got: ${embeddingField.metadata}") + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + // Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior + // slice was not yet superseded when that clean fired and no .clean instant was + // written. This deltacommit's postCommit clean sees the post-compaction base + // file and writes the .clean instant. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, + | array(cast(0.222 as float), cast(0.555 as float), cast(0.888 as float)) as embedding, + | 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val afterCleanup = readOrdered() + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCleanup)) + assertEquals(Seq(0.222f, 0.555f, 0.888f), embeddingOf(2, afterCleanup)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCleanup)) + assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCleanup)) + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path).setConf(storageConf).build() + metaClient.reloadActiveTimeline() + assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + } finally { + spark.sql(s"drop table if exists $tableName") + } + } + + @Test + @DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true") + def testMorLogOnlyCompactionPreservesVectorMetadataLance(): Unit = { + val path = basePath + "/mor_log_only_vec_lance" + val tableName = "mor_log_only_vec_lance_test" + try { + spark.sql( + s""" + |create table $tableName ( + | id int, + | embedding VECTOR(3), + | ts long + |) using hudi + | location '$path' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + def readOrdered(): Seq[Row] = + spark.sql(s"select id, embedding, ts from $tableName order by id").collect().toSeq + + def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] = + rows.find(_.getInt(0) == id).get.getSeq[Float](1) + + spark.sql( + s"insert into $tableName values " + + "(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float)), 1000)") + spark.sql( + s"insert into $tableName values " + + "(2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as float)), 1000)") + spark.sql( + s"insert into $tableName values " + + "(3, array(cast(0.7 as float), cast(0.8 as float), cast(0.9 as float)), 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertTrue(DataSourceTestUtils.isLogFileOnly(path)) + val afterInserts = readOrdered() + assertEquals(3, afterInserts.size) + assertEquals(Seq(0.1f, 0.2f, 0.3f), embeddingOf(1, afterInserts)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterInserts)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterInserts)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, + | array(cast(0.11 as float), cast(0.22 as float), cast(0.33 as float)) as embedding, + | 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertTrue(DataSourceTestUtils.isLogFileOnly(path)) + val afterUpdate = readOrdered() + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterUpdate)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterUpdate)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterUpdate)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, + | array(cast(0.44 as float), cast(0.55 as float), cast(0.66 as float)) as embedding, + | 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertFalse(DataSourceTestUtils.isLogFileOnly(path)) + val afterCompaction = readOrdered() + assertEquals(4, afterCompaction.size) + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCompaction)) + assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterCompaction)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCompaction)) + assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCompaction)) + + val embeddingField = spark.table(tableName).schema.find(_.name == "embedding").get + assertTrue(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected VECTOR type metadata on embedding field after compaction, " + + s"got: ${embeddingField.metadata}") + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + // Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior + // slice was not yet superseded when that clean fired and no .clean instant was + // written. This deltacommit's postCommit clean writes the .clean instant. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, + | array(cast(0.222 as float), cast(0.555 as float), cast(0.888 as float)) as embedding, + | 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val afterCleanup = readOrdered() + assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCleanup)) + assertEquals(Seq(0.222f, 0.555f, 0.888f), embeddingOf(2, afterCleanup)) + assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCleanup)) + assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCleanup)) + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path).setConf(storageConf).build() + metaClient.reloadActiveTimeline() + assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + + // The table is configured for Lance, and compaction must have produced a + // .lance base file (not parquet) — otherwise the Lance variant degenerates + // into a parquet test. + assertEquals(HoodieFileFormat.LANCE, metaClient.getTableConfig.getBaseFileFormat, + "Expected Lance base file format") + val lanceBaseFiles = new java.io.File(path).listFiles() + .filter(f => f.isFile && f.getName.endsWith(".lance")) + assertTrue(lanceBaseFiles.nonEmpty, + s"Expected at least one .lance base file under $path after compaction, " + + s"found: ${new java.io.File(path).listFiles().map(_.getName).mkString(", ")}") + } finally { + spark.sql(s"drop table if exists $tableName") + } + } + @Test def testDimensionMismatchOnWrite(): Unit = { // Schema declares VECTOR(8) but data has arrays of length 4 diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala new file mode 100644 index 0000000000000..f9dd9262c4b04 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.dml.schema + +import org.apache.hudi.blob.BlobTestHelpers +import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.testutils.DataSourceTestUtils +import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient + +import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase + +import java.io.File + +class TestBlobDataType extends HoodieSparkSqlTestBase { + + private val referenceStructType = + "struct" + + private def inlineBlobLiteral(hex: String): String = + s"""named_struct( + | 'type', 'INLINE', + | 'data', cast(X'$hex' as binary), + | 'reference', cast(null as $referenceStructType) + |)""".stripMargin + + private def outOfLineBlobLiteral(path: String, offset: Long, length: Long): String = + s"""named_struct( + | 'type', 'OUT_OF_LINE', + | 'data', cast(null as binary), + | 'reference', named_struct( + | 'external_path', '$path', + | 'offset', cast($offset as bigint), + | 'length', cast($length as bigint), + | 'managed', false + | ) + |)""".stripMargin + + test("Test Query Log Only MOR Table With BLOB INLINE column triggers compaction") { + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) + """.stripMargin) + + spark.sql(s"insert into $tableName values (1, ${inlineBlobLiteral("01")}, 1000)") + spark.sql(s"insert into $tableName values (2, ${inlineBlobLiteral("02")}, 1000)") + spark.sql(s"insert into $tableName values (3, ${inlineBlobLiteral("03")}, 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, ${inlineBlobLiteral("11")} as data, 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, ${inlineBlobLiteral("04")} as data, 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + // read_blob() on an INLINE column returns the inline bytes directly, verify the + // post-compaction bytes match what was written. + val bytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(4)(bytesById.size) + assert(bytesById(1).sameElements(Array(0x11.toByte))) + assert(bytesById(2).sameElements(Array(0x02.toByte))) + assert(bytesById(3).sameElements(Array(0x03.toByte))) + assert(bytesById(4).sameElements(Array(0x04.toByte))) + + // Verify inline shape: type='INLINE', data non-null, reference null. + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + } + + // BLOB custom-type descriptor must survive the compacted base-file read path. + val blobField = spark.table(tableName).schema.find(_.name == "data").get + assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected BLOB type metadata on data field after compaction, " + + s"got: ${blobField.metadata}") + assertResult(HoodieSchemaType.BLOB.name())( + blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + // Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior + // slice was not yet superseded when that clean fired and no .clean instant was + // written. This deltacommit's postCommit clean writes the .clean instant. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, ${inlineBlobLiteral("22")} as data, 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val updatedBytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assert(updatedBytesById(2).sameElements(Array(0x22.toByte))) + + val metaClient = createMetaClient(spark, tablePath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + + test("Test Query Log Only MOR Table With BLOB OUT_OF_LINE column triggers compaction") { + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val blobDir = new File(tmp, "blobs") + blobDir.mkdirs() + // createTestFile writes bytes where byte[i] = i % 256, assertBytesContent + // checks round-trip against that pattern. + val file1 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1.bin", 100) + val file2 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2.bin", 100) + val file3 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob3.bin", 100) + val file4 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob4.bin", 100) + val file1Updated = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1_updated.bin", 80) + val file2Updated = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2_updated.bin", 60) + + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) + """.stripMargin) + + spark.sql( + s"insert into $tableName values (1, ${outOfLineBlobLiteral(file1, 0L, 100L)}, 1000)") + spark.sql( + s"insert into $tableName values (2, ${outOfLineBlobLiteral(file2, 0L, 100L)}, 1000)") + spark.sql( + s"insert into $tableName values (3, ${outOfLineBlobLiteral(file3, 0L, 100L)}, 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, ${outOfLineBlobLiteral(file1Updated, 0L, 80L)} as data, 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, ${outOfLineBlobLiteral(file4, 0L, 100L)} as data, 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + // read_blob() on an OUT_OF_LINE column must dereference external_path and read + // the referenced byte range, verify bytes from the compacted base-file plan. + val bytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(4)(bytesById.size) + assertResult(80)(bytesById(1).length) + BlobTestHelpers.assertBytesContent(bytesById(1)) + assertResult(100)(bytesById(2).length) + BlobTestHelpers.assertBytesContent(bytesById(2)) + assertResult(100)(bytesById(3).length) + BlobTestHelpers.assertBytesContent(bytesById(3)) + assertResult(100)(bytesById(4).length) + BlobTestHelpers.assertBytesContent(bytesById(4)) + + // Verify out-of-line shape: type='OUT_OF_LINE', data null, reference non-null. + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("OUT_OF_LINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + } + + // BLOB custom-type descriptor must survive the compacted base-file read path. + val blobField = spark.table(tableName).schema.find(_.name == "data").get + assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected BLOB type metadata on data field after compaction, " + + s"got: ${blobField.metadata}") + assertResult(HoodieSchemaType.BLOB.name())( + blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + // Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior + // slice was not yet superseded when that clean fired and no .clean instant was + // written. This deltacommit's postCommit clean writes the .clean instant. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, ${outOfLineBlobLiteral(file2Updated, 0L, 60L)} as data, 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val updatedBytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(60)(updatedBytesById(2).length) + BlobTestHelpers.assertBytesContent(updatedBytesById(2)) + + val metaClient = createMetaClient(spark, tablePath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + + test("Test Query Log Only MOR Table With BLOB INLINE column triggers compaction (Lance)") { + assume(System.getProperty("lance.skip.tests") != "true", + "Lance tests disabled via -Dlance.skip.tests=true") + // Covers log-only MOR ingest of INLINE blobs on a Lance base format, the inline + // compaction trigger, and the post-compaction inline read shape. + + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + // Verify the LANCE config was actually persisted to hoodie.properties. + assertResult(HoodieFileFormat.LANCE)( + createMetaClient(spark, tablePath).getTableConfig.getBaseFileFormat) + + spark.sql(s"insert into $tableName values (1, ${inlineBlobLiteral("01")}, 1000)") + spark.sql(s"insert into $tableName values (2, ${inlineBlobLiteral("02")}, 1000)") + spark.sql(s"insert into $tableName values (3, ${inlineBlobLiteral("03")}, 1000)") + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, ${inlineBlobLiteral("11")} as data, 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, ${inlineBlobLiteral("04")} as data, 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + val bytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(4)(bytesById.size) + assert(bytesById(1).sameElements(Array(0x11.toByte))) + assert(bytesById(2).sameElements(Array(0x02.toByte))) + assert(bytesById(3).sameElements(Array(0x03.toByte))) + assert(bytesById(4).sameElements(Array(0x04.toByte))) + + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + // Lance materializes the `reference` struct as non-null with all-null leaves for + // INLINE rows (vs. a null struct on Parquet). `type` is the canonical INLINE + // discriminator per RFC-100; tolerate either shape and just check the leaves. + val refIdx = blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE) + if (!blob.isNullAt(refIdx)) { + val ref = blob.getStruct(refIdx) + assert(ref.isNullAt(ref.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH))) + } + } + + val blobField = spark.table(tableName).schema.find(_.name == "data").get + assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected BLOB type metadata on data field after compaction, " + + s"got: ${blobField.metadata}") + assertResult(HoodieSchemaType.BLOB.name())( + blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, ${inlineBlobLiteral("22")} as data, 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val updatedBytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assert(updatedBytesById(2).sameElements(Array(0x22.toByte))) + + val metaClient = createMetaClient(spark, tablePath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + + test("Test Query Log Only MOR Table With BLOB OUT_OF_LINE column triggers compaction (Lance)") { + assume(System.getProperty("lance.skip.tests") != "true", + "Lance tests disabled via -Dlance.skip.tests=true") + // Lance writer has no BLOB handling today (RFC-100 Phase 2). Expected to fail + // until support lands in HoodieSparkLanceWriter; this test pins the gap. + + withRecordType()(withTempDir { tmp => + val tablePath = new File(tmp, "hudi").getCanonicalPath + val blobDir = new File(tmp, "blobs") + blobDir.mkdirs() + val file1 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1.bin", 100) + val file2 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2.bin", 100) + val file3 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob3.bin", 100) + val file4 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob4.bin", 100) + val file1Updated = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1_updated.bin", 80) + val file2Updated = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2_updated.bin", 60) + + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | data blob, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + // Verify the LANCE config was actually persisted to hoodie.properties. + assertResult(HoodieFileFormat.LANCE)( + createMetaClient(spark, tablePath).getTableConfig.getBaseFileFormat) + + spark.sql( + s"insert into $tableName values (1, ${outOfLineBlobLiteral(file1, 0L, 100L)}, 1000)") + spark.sql( + s"insert into $tableName values (2, ${outOfLineBlobLiteral(file2, 0L, 100L)}, 1000)") + spark.sql( + s"insert into $tableName values (3, ${outOfLineBlobLiteral(file3, 0L, 100L)}, 1000)") + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, ${outOfLineBlobLiteral(file1Updated, 0L, 80L)} as data, 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, ${outOfLineBlobLiteral(file4, 0L, 100L)} as data, 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath)) + + val bytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(4)(bytesById.size) + assertResult(80)(bytesById(1).length) + BlobTestHelpers.assertBytesContent(bytesById(1)) + assertResult(100)(bytesById(2).length) + BlobTestHelpers.assertBytesContent(bytesById(2)) + assertResult(100)(bytesById(3).length) + BlobTestHelpers.assertBytesContent(bytesById(3)) + assertResult(100)(bytesById(4).length) + BlobTestHelpers.assertBytesContent(bytesById(4)) + + spark.sql(s"select id, data from $tableName order by id").collect().foreach { row => + val blob = row.getStruct(1) + assertResult("OUT_OF_LINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE))) + assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD))) + assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE))) + } + + val blobField = spark.table(tableName).schema.find(_.name == "data").get + assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD), + s"Expected BLOB type metadata on data field after compaction, " + + s"got: ${blobField.metadata}") + assertResult(HoodieSchemaType.BLOB.name())( + blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD)) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, ${outOfLineBlobLiteral(file2Updated, 0L, 60L)} as data, 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + val updatedBytesById = spark.sql( + s"select id, read_blob(data) as bytes from $tableName order by id" + ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap + assertResult(60)(updatedBytesById(2).length) + BlobTestHelpers.assertBytesContent(updatedBytesById(2)) + + val metaClient = createMetaClient(spark, tablePath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala index f6372d4f797e2..f9c2dbc32c6c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala @@ -20,10 +20,13 @@ package org.apache.spark.sql.hudi.dml.schema import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.common.util.StringUtils import org.apache.hudi.internal.schema.HoodieSchemaException +import org.apache.hudi.testutils.DataSourceTestUtils +import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier @@ -118,6 +121,234 @@ class TestVariantDataType extends HoodieSparkSqlTestBase { } } + test("Test Query Log Only MOR Table With VARIANT column triggers compaction") { + assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") + + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | v variant, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1' + | ) + """.stripMargin) + + spark.sql( + s"insert into $tableName values " + + "(1, parse_json('{\"key\":\"value1\"}'), 1000)") + spark.sql( + s"insert into $tableName values " + + "(2, parse_json('{\"key\":\"value2\"}'), 1000)") + spark.sql( + s"insert into $tableName values " + + "(3, parse_json('{\"key\":\"value3\"}'), 1000)") + // 3 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"value1\"}", 1000), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000) + ) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, + | parse_json('{"key":"v1-merged"}') as v, + | 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + // 4 commits will not trigger compaction, so it should be log only. + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000) + ) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, + | parse_json('{"key":"value4"}') as v, + | 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + // 5 commits will trigger compaction. + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000), + Seq(4, "{\"key\":\"value4\"}", 1000) + ) + + // VARIANT must round-trip as native VariantType through the compacted base-file read path. + val variantField = spark.table(tableName).schema.find(_.name == "v").get + assertResult("variant")(variantField.dataType.typeName) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + // Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior + // slice was not yet superseded when that clean fired and no .clean instant was + // written. This deltacommit's postCommit clean writes the .clean instant. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, + | parse_json('{"key":"v2-merged"}') as v, + | 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"v2-merged\"}", 1002), + Seq(3, "{\"key\":\"value3\"}", 1000), + Seq(4, "{\"key\":\"value4\"}", 1000) + ) + + val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + + test("Test Query Log Only MOR Table With VARIANT column triggers compaction (Lance)") { + assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") + assume(System.getProperty("lance.skip.tests") != "true", + "Lance tests disabled via -Dlance.skip.tests=true") + // Lance writer has no VARIANT handling today (RFC-100 Phase 2). Expected to fail + // until support lands in HoodieSparkLanceWriter; this test pins the gap. + + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | v variant, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.index.type = 'INMEMORY', + | hoodie.compact.inline = 'true', + | hoodie.clean.commits.retained = '1', + | 'hoodie.table.base.file.format' = 'LANCE' + | ) + """.stripMargin) + + // Verify the LANCE config was actually persisted to hoodie.properties. + assertResult(HoodieFileFormat.LANCE)( + createMetaClient(spark, tmp.getCanonicalPath).getTableConfig.getBaseFileFormat) + + spark.sql( + s"insert into $tableName values " + + "(1, parse_json('{\"key\":\"value1\"}'), 1000)") + spark.sql( + s"insert into $tableName values " + + "(2, parse_json('{\"key\":\"value2\"}'), 1000)") + spark.sql( + s"insert into $tableName values " + + "(3, parse_json('{\"key\":\"value3\"}'), 1000)") + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"value1\"}", 1000), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000) + ) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 1 as id, + | parse_json('{"key":"v1-merged"}') as v, + | 1001L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + assertResult(true)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000) + ) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, + | parse_json('{"key":"value4"}') as v, + | 1000L as ts + |) s0 + | on h0.id = s0.id + | when not matched then insert * + |""".stripMargin) + + assertResult(false)(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath)) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"value2\"}", 1000), + Seq(3, "{\"key\":\"value3\"}", 1000), + Seq(4, "{\"key\":\"value4\"}", 1000) + ) + + val variantField = spark.table(tableName).schema.find(_.name == "v").get + assertResult("variant")(variantField.dataType.typeName) + + // 6th commit drives an auto-clean that retires the now-superseded log-only slice. + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 2 as id, + | parse_json('{"key":"v2-merged"}') as v, + | 1002L as ts + |) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + checkAnswer(s"select id, cast(v as string), ts from $tableName order by id")( + Seq(1, "{\"key\":\"v1-merged\"}", 1001), + Seq(2, "{\"key\":\"v2-merged\"}", 1002), + Seq(3, "{\"key\":\"value3\"}", 1000), + Seq(4, "{\"key\":\"value4\"}", 1000) + ) + + val metaClient = createMetaClient(spark, tmp.getCanonicalPath) + metaClient.reloadActiveTimeline() + assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0, + "Expected at least one .clean instant on the timeline after compaction") + }) + } + test("Test toHiveCompatibleSchema converts VariantType to physical struct") { assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher")