Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
Expand Down Expand Up @@ -237,6 +238,12 @@ protected void initQueryIndexConf() {
* Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
*/
protected void cleanupSparkContexts() {
// HoodieInMemoryHashIndex holds a JVM-static record-location map that survives
// sparkSession.stop(), leaking record keys and locations across sequential tests
// in the same JVM. A stale entry causes tagLocation to demote a not-matched
// INSERT into a no-op UPDATE on a non-existent file group.
HoodieInMemoryHashIndex.clear();

if (sparkSession != null) {
sparkSession.stop();
sparkSession = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieSparkClientTestBase}

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
Expand All @@ -28,6 +30,7 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.condition.DisabledIfSystemProperty

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -674,6 +677,263 @@ class TestVectorDataSource extends HoodieSparkClientTestBase {
assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have original value 1.0")
}

@Test
def testMorLogOnlyCompactionPreservesVectorMetadata(): Unit = {
val path = basePath + "/mor_log_only_vec"
val tableName = "mor_log_only_vec_test"
try {
spark.sql(
s"""
|create table $tableName (
| id int,
| embedding VECTOR(3),
| ts long
|) using hudi
| location '$path'
| tblproperties (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'ts',
| hoodie.index.type = 'INMEMORY',
| hoodie.compact.inline = 'true',
| hoodie.clean.commits.retained = '1'
| )
""".stripMargin)

def readOrdered(): Seq[Row] =
spark.sql(s"select id, embedding, ts from $tableName order by id").collect().toSeq

def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] =
rows.find(_.getInt(0) == id).get.getSeq[Float](1)

spark.sql(
s"insert into $tableName values " +
"(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float)), 1000)")
spark.sql(
s"insert into $tableName values " +
"(2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as float)), 1000)")
spark.sql(
s"insert into $tableName values " +
"(3, array(cast(0.7 as float), cast(0.8 as float), cast(0.9 as float)), 1000)")
// 3 commits will not trigger compaction, so it should be log only.
assertTrue(DataSourceTestUtils.isLogFileOnly(path))
val afterInserts = readOrdered()
assertEquals(3, afterInserts.size)
assertEquals(Seq(0.1f, 0.2f, 0.3f), embeddingOf(1, afterInserts))
assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterInserts))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterInserts))

spark.sql(
s"""
|merge into $tableName h0
|using (
| select 1 as id,
| array(cast(0.11 as float), cast(0.22 as float), cast(0.33 as float)) as embedding,
| 1001L as ts
|) s0
| on h0.id = s0.id
| when matched then update set *
|""".stripMargin)
// 4 commits will not trigger compaction, so it should be log only.
assertTrue(DataSourceTestUtils.isLogFileOnly(path))
val afterUpdate = readOrdered()
assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterUpdate))
assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterUpdate))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterUpdate))

spark.sql(
s"""
|merge into $tableName h0
|using (
| select 4 as id,
| array(cast(0.44 as float), cast(0.55 as float), cast(0.66 as float)) as embedding,
| 1000L as ts
|) s0
| on h0.id = s0.id
| when not matched then insert *
|""".stripMargin)

// 5 commits will trigger compaction.
assertFalse(DataSourceTestUtils.isLogFileOnly(path))
val afterCompaction = readOrdered()
assertEquals(4, afterCompaction.size)
assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCompaction))
assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterCompaction))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCompaction))
assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCompaction))

// VECTOR custom-type descriptor must survive the compacted base-file read path.
val embeddingField = spark.table(tableName).schema.find(_.name == "embedding").get
assertTrue(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
s"Expected VECTOR type metadata on embedding field after compaction, " +
s"got: ${embeddingField.metadata}")

// 6th commit drives an auto-clean that retires the now-superseded log-only slice.
// Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior
// slice was not yet superseded when that clean fired and no .clean instant was
// written. This deltacommit's postCommit clean sees the post-compaction base
// file and writes the .clean instant.
spark.sql(
s"""
|merge into $tableName h0
|using (
| select 2 as id,
| array(cast(0.222 as float), cast(0.555 as float), cast(0.888 as float)) as embedding,
| 1002L as ts
|) s0
| on h0.id = s0.id
| when matched then update set *
|""".stripMargin)
val afterCleanup = readOrdered()
assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCleanup))
assertEquals(Seq(0.222f, 0.555f, 0.888f), embeddingOf(2, afterCleanup))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCleanup))
assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCleanup))

val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path).setConf(storageConf).build()
metaClient.reloadActiveTimeline()
assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0,
"Expected at least one .clean instant on the timeline after compaction")
} finally {
spark.sql(s"drop table if exists $tableName")
}
}

@Test
@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true")
def testMorLogOnlyCompactionPreservesVectorMetadataLance(): Unit = {
val path = basePath + "/mor_log_only_vec_lance"
val tableName = "mor_log_only_vec_lance_test"
try {
spark.sql(
s"""
|create table $tableName (
| id int,
| embedding VECTOR(3),
| ts long
|) using hudi
| location '$path'
| tblproperties (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'ts',
| hoodie.index.type = 'INMEMORY',
| hoodie.compact.inline = 'true',
| hoodie.clean.commits.retained = '1',
| 'hoodie.table.base.file.format' = 'LANCE'
| )
""".stripMargin)

def readOrdered(): Seq[Row] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: readOrdered and embeddingOf are defined identically inside both testMorLogOnlyCompactionPreservesVectorMetadata and this Lance variant. Could you hoist them to private class-level helpers so changes only need to be made in one place?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

spark.sql(s"select id, embedding, ts from $tableName order by id").collect().toSeq

def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] =
rows.find(_.getInt(0) == id).get.getSeq[Float](1)

spark.sql(
s"insert into $tableName values " +
"(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as float)), 1000)")
spark.sql(
s"insert into $tableName values " +
"(2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as float)), 1000)")
spark.sql(
s"insert into $tableName values " +
"(3, array(cast(0.7 as float), cast(0.8 as float), cast(0.9 as float)), 1000)")
// 3 commits will not trigger compaction, so it should be log only.
assertTrue(DataSourceTestUtils.isLogFileOnly(path))
val afterInserts = readOrdered()
assertEquals(3, afterInserts.size)
assertEquals(Seq(0.1f, 0.2f, 0.3f), embeddingOf(1, afterInserts))
assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterInserts))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterInserts))

spark.sql(
s"""
|merge into $tableName h0
|using (
| select 1 as id,
| array(cast(0.11 as float), cast(0.22 as float), cast(0.33 as float)) as embedding,
| 1001L as ts
|) s0
| on h0.id = s0.id
| when matched then update set *
|""".stripMargin)
// 4 commits will not trigger compaction, so it should be log only.
assertTrue(DataSourceTestUtils.isLogFileOnly(path))
val afterUpdate = readOrdered()
assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterUpdate))
assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterUpdate))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterUpdate))

spark.sql(
s"""
|merge into $tableName h0
|using (
| select 4 as id,
| array(cast(0.44 as float), cast(0.55 as float), cast(0.66 as float)) as embedding,
| 1000L as ts
|) s0
| on h0.id = s0.id
| when not matched then insert *
|""".stripMargin)

// 5 commits will trigger compaction.
assertFalse(DataSourceTestUtils.isLogFileOnly(path))
val afterCompaction = readOrdered()
assertEquals(4, afterCompaction.size)
assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCompaction))
assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterCompaction))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCompaction))
assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCompaction))

val embeddingField = spark.table(tableName).schema.find(_.name == "embedding").get
assertTrue(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
s"Expected VECTOR type metadata on embedding field after compaction, " +
s"got: ${embeddingField.metadata}")

// 6th commit drives an auto-clean that retires the now-superseded log-only slice.
// Inline compaction on commit 5 ran AFTER its own postCommit clean, so the prior
// slice was not yet superseded when that clean fired and no .clean instant was
// written. This deltacommit's postCommit clean writes the .clean instant.
spark.sql(
s"""
|merge into $tableName h0
|using (
| select 2 as id,
| array(cast(0.222 as float), cast(0.555 as float), cast(0.888 as float)) as embedding,
| 1002L as ts
|) s0
| on h0.id = s0.id
| when matched then update set *
|""".stripMargin)
val afterCleanup = readOrdered()
assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCleanup))
assertEquals(Seq(0.222f, 0.555f, 0.888f), embeddingOf(2, afterCleanup))
assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCleanup))
assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCleanup))

val metaClient = HoodieTableMetaClient.builder()
.setBasePath(path).setConf(storageConf).build()
metaClient.reloadActiveTimeline()
assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0,
"Expected at least one .clean instant on the timeline after compaction")

// The table is configured for Lance, and compaction must have produced a
// .lance base file (not parquet) — otherwise the Lance variant degenerates
// into a parquet test.
assertEquals(HoodieFileFormat.LANCE, metaClient.getTableConfig.getBaseFileFormat,
"Expected Lance base file format")
val lanceBaseFiles = new java.io.File(path).listFiles()
.filter(f => f.isFile && f.getName.endsWith(".lance"))
assertTrue(lanceBaseFiles.nonEmpty,
s"Expected at least one .lance base file under $path after compaction, " +
s"found: ${new java.io.File(path).listFiles().map(_.getName).mkString(", ")}")
} finally {
spark.sql(s"drop table if exists $tableName")
}
}

@Test
def testDimensionMismatchOnWrite(): Unit = {
// Schema declares VECTOR(8) but data has arrays of length 4
Expand Down
Loading
Loading