diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 08e240af98f54..232068b3addf4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -20,12 +20,19 @@ import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy; import org.apache.hudi.client.transaction.ConflictResolutionStrategy; import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; +import org.apache.hudi.client.transaction.lock.StorageBasedLockProvider; +import org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider; +import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; import java.io.File; @@ -241,6 +248,17 @@ public class HoodieLockConfig extends HoodieConfig { @Deprecated public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PROVIDER_CLASS_NAME.key(); + // Lock provider class names from modules not directly accessible in hudi-client-common. + public static final String HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS = + "org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider"; + public static final String DYNAMODB_BASED_LOCK_PROVIDER_CLASS = + "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider"; + public static final String DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS = + "org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider"; + private static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb."; + private static final String DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY = DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key"; + private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "storage."; + private HoodieLockConfig() { super(); } @@ -249,6 +267,109 @@ public static HoodieLockConfig.Builder newBuilder() { return new HoodieLockConfig.Builder(); } + /** + * Derive a {@link HoodieLockConfig} for a different table by copying lock-related configs from the + * given write config. Only built-in lock providers with explicitly set lock keys/paths are supported; + * providers that implicitly derive their lock identity from the table's base path are rejected. + * + * @param lockProviderClass the lock provider class name + * @param writeConfig the source write config to copy lock properties from + * @return a new HoodieLockConfig with the relevant lock properties + * @throws IllegalArgumentException if required lock keys are missing or the provider derives its lock identity implicitly + * @throws HoodieException if the lock provider is not a built-in provider + */ + public static HoodieLockConfig deriveLockConfigForDifferentTable(String lockProviderClass, HoodieWriteConfig writeConfig) { + TypedProperties dataProps = writeConfig.getProps(); + Properties lockProps = new Properties(); + lockProps.put(LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass); + + // Common lock configs used by all providers + lockProps.put(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(), + writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS)); + lockProps.put(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(), + writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS)); + lockProps.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(), + writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS)); + lockProps.put(LOCK_ACQUIRE_NUM_RETRIES.key(), + writeConfig.getStringOrDefault(LOCK_ACQUIRE_NUM_RETRIES)); + lockProps.put(LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(), + writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_NUM_RETRIES)); + lockProps.put(LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(), + String.valueOf(writeConfig.getIntOrDefault(LOCK_ACQUIRE_WAIT_TIMEOUT_MS))); + lockProps.put(LOCK_HEARTBEAT_INTERVAL_MS.key(), + String.valueOf(writeConfig.getIntOrDefault(LOCK_HEARTBEAT_INTERVAL_MS))); + + // Provider-specific configs with lock key validation. + // Providers whose lock identity (key/path) is inferred from table name or base path at build time + // won't carry those inferred values into a rebuilt config for a different table, so we either + // require them to be set explicitly or reject providers that offer no explicit override. + if (FileSystemBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)) { + copyPropsWithPrefix(dataProps, lockProps, LockConfiguration.FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX); + if (!lockProps.containsKey(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY)) { + throw new IllegalArgumentException(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY + + " must be explicitly set when using " + lockProviderClass + + ". Without it, the lock path is derived from the table's base path," + + " which would resolve to a different location when building a lock config for a different table."); + } + } else if (ZookeeperBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)) { + copyPropsWithPrefix(dataProps, lockProps, LockConfiguration.ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX); + if (!lockProps.containsKey(LockConfiguration.ZK_LOCK_KEY_PROP_KEY)) { + throw new IllegalArgumentException(LockConfiguration.ZK_LOCK_KEY_PROP_KEY + + " must be explicitly set when using " + lockProviderClass + + ". The inferred default from table name is not propagated" + + " when building a lock config for a different table."); + } + } else if (ZookeeperBasedImplicitBasePathLockProvider.class.getCanonicalName().equals(lockProviderClass)) { + throw new IllegalArgumentException(lockProviderClass + + " is not supported because it derives its lock identity from the table's base path," + + " which would resolve to a different lock when building a lock config for a different table." + + " Use " + ZookeeperBasedLockProvider.class.getCanonicalName() + " with an explicit " + + LockConfiguration.ZK_LOCK_KEY_PROP_KEY + " instead."); + } else if (HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) { + copyPropsWithPrefix(dataProps, lockProps, LockConfiguration.HIVE_METASTORE_LOCK_PROPERTY_PREFIX); + if (!lockProps.containsKey(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY)) { + throw new IllegalArgumentException(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY + + " must be explicitly set when using " + lockProviderClass); + } + if (!lockProps.containsKey(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY)) { + throw new IllegalArgumentException(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY + + " must be explicitly set when using " + lockProviderClass); + } + } else if (DYNAMODB_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) { + copyPropsWithPrefix(dataProps, lockProps, DYNAMODB_BASED_LOCK_PROPERTY_PREFIX); + if (!lockProps.containsKey(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY)) { + throw new IllegalArgumentException(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY + + " must be explicitly set when using " + lockProviderClass + + ". The inferred default from table name is not propagated" + + " when building a lock config for a different table."); + } + } else if (DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) { + throw new IllegalArgumentException(lockProviderClass + + " is not supported because it derives its lock identity from the table's base path," + + " which would resolve to a different lock when building a lock config for a different table." + + " Use " + DYNAMODB_BASED_LOCK_PROVIDER_CLASS + " with an explicit " + + DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY + " instead."); + } else if (StorageBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)) { + throw new IllegalArgumentException(lockProviderClass + + " is not supported because it derives its lock identity from the table's base path," + + " which would resolve to a different lock when building a lock config for a different table," + + " and does not provide an explicit lock path override."); + } else { + throw new HoodieException(writeConfig.getWriteConcurrencyMode() + + " is only supported for built-in lock providers. Unsupported lock provider: " + lockProviderClass); + } + + return newBuilder().fromProperties(lockProps).build(); + } + + private static void copyPropsWithPrefix(TypedProperties source, Properties target, String prefix) { + for (String key : source.stringPropertyNames()) { + if (key.startsWith(prefix)) { + target.setProperty(key, source.getProperty(key)); + } + } + } + public static class Builder { private final HoodieLockConfig lockConfig = new HoodieLockConfig(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 8da89553fcf9d..4c1c90fe27911 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -25,9 +25,11 @@ import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.BaseHoodieWriteClient; +import org.apache.hudi.client.RunsTableService; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -156,7 +158,7 @@ * * @param Type of input for the write client */ -public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWriter { +public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMetadataWriter, RunsTableService { static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class); @@ -2153,12 +2155,20 @@ static HoodieActiveTimeline runPendingTableServicesOperationsAndRefreshTimeline( // finish off any pending log compaction or compactions operations if any from previous attempt. boolean ranServices = false; if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) { - writeClient.runAnyPendingCompactions(); - ranServices = true; + if (writeClient.shouldDelegateToTableServiceManager(writeClient.getConfig(), ActionType.compaction)) { + LOG.info("Skipping pending compactions on MDT as they are delegated to table service manager."); + } else { + writeClient.runAnyPendingCompactions(); + ranServices = true; + } } if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() > 0) { - writeClient.runAnyPendingLogCompactions(); - ranServices = true; + if (writeClient.shouldDelegateToTableServiceManager(writeClient.getConfig(), ActionType.logcompaction)) { + LOG.info("Skipping pending log compactions on MDT as they are delegated to table service manager."); + } else { + writeClient.runAnyPendingLogCompactions(); + ranServices = true; + } } return ranServices ? metadataMetaClient.reloadActiveTimeline() : activeTimeline; } catch (Exception e) { @@ -2201,7 +2211,11 @@ void compactIfNecessary(BaseHoodieWriteClient writeClient, Option m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1)); @@ -2217,7 +2231,11 @@ void compactIfNecessary(BaseHoodieWriteClient writeClient, Option scheduledLogCompaction = writeClient.scheduleLogCompaction(Option.empty()); if (scheduledLogCompaction.isPresent()) { LOG.info("Log compaction is scheduled for timestamp {}", scheduledLogCompaction.get()); - writeClient.logCompact(scheduledLogCompaction.get(), true); + if (shouldDelegateToTableServiceManager(metadataWriteConfig, ActionType.logcompaction)) { + LOG.info("Skipping execution of log compaction on MDT as it is delegated to table service manager."); + } else { + writeClient.logCompact(scheduledLogCompaction.get(), true); + } } } } catch (Exception e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java index a7f51400300ec..2d4717a80054b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriterTableVersionSix.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -289,7 +290,11 @@ void compactIfNecessary(BaseHoodieWriteClient writeClient, Option writeClient, Option + HoodieLockConfig.deriveLockConfigForDifferentTable( + ZookeeperBasedImplicitBasePathLockProvider.class.getCanonicalName(), writeConfig)); + assertTrue(ex.getMessage().contains("derives its lock identity from the table's base path")); + assertTrue(ex.getMessage().contains(ZookeeperBasedLockProvider.class.getCanonicalName())); + } + + @Test + public void testGetLockConfigForHiveMetastoreLockProvider() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + HoodieLockConfig.HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS); + lockProps.put(HoodieLockConfig.HIVE_DATABASE_NAME.key(), "my_database"); + lockProps.put(HoodieLockConfig.HIVE_TABLE_NAME.key(), "my_table"); + lockProps.put(HoodieLockConfig.HIVE_METASTORE_URI.key(), "thrift://hms-host:9083"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + HoodieLockConfig lockConfig = HoodieLockConfig.deriveLockConfigForDifferentTable( + HoodieLockConfig.HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS, writeConfig); + assertEquals(HoodieLockConfig.HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS, + lockConfig.getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)); + assertEquals("my_database", lockConfig.getString(HoodieLockConfig.HIVE_DATABASE_NAME)); + assertEquals("my_table", lockConfig.getString(HoodieLockConfig.HIVE_TABLE_NAME)); + assertEquals("thrift://hms-host:9083", lockConfig.getString(HoodieLockConfig.HIVE_METASTORE_URI)); + } + + @Test + public void testGetLockConfigForDynamoDBLockProvider() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS); + lockProps.put("hoodie.write.lock.dynamodb.table", "my_lock_table"); + lockProps.put("hoodie.write.lock.dynamodb.region", "us-west-2"); + lockProps.put("hoodie.write.lock.dynamodb.partition_key", "test_table"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + HoodieLockConfig lockConfig = HoodieLockConfig.deriveLockConfigForDifferentTable( + HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS, writeConfig); + assertEquals(HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS, + lockConfig.getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME)); + assertEquals("my_lock_table", lockConfig.getProps().getProperty("hoodie.write.lock.dynamodb.table")); + assertEquals("us-west-2", lockConfig.getProps().getProperty("hoodie.write.lock.dynamodb.region")); + assertEquals("test_table", lockConfig.getProps().getProperty("hoodie.write.lock.dynamodb.partition_key")); + } + + @Test + public void testGetLockConfigRejectsStorageBasedLockProvider() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + StorageBasedLockProvider.class.getCanonicalName()); + lockProps.put("hoodie.write.lock.storage.validity.timeout.secs", "600"); + lockProps.put("hoodie.write.lock.storage.renew.interval.secs", "60"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable( + StorageBasedLockProvider.class.getCanonicalName(), writeConfig)); + assertTrue(ex.getMessage().contains("derives its lock identity from the table's base path")); + assertTrue(ex.getMessage().contains("does not provide an explicit lock path override")); + } + + @Test + public void testGetLockConfigCopiesCommonRetryConfigs() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withFileSystemLockPath("/tmp/lock_dir") + .withNumRetries(5) + .withRetryWaitTimeInMillis(2000L) + .withRetryMaxWaitTimeInMillis(32000L) + .withClientNumRetries(10) + .withClientRetryWaitTimeInMillis(3000L) + .withLockWaitTimeInMillis(90000L) + .withHeartbeatIntervalInMillis(45000L) + .build()) + .build(); + + HoodieLockConfig lockConfig = HoodieLockConfig.deriveLockConfigForDifferentTable( + FileSystemBasedLockProvider.class.getCanonicalName(), writeConfig); + assertEquals("5", lockConfig.getString(HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES)); + assertEquals("2000", lockConfig.getString(HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS)); + assertEquals("32000", lockConfig.getString(HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS)); + assertEquals("10", lockConfig.getString(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES)); + assertEquals("3000", lockConfig.getString(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS)); + assertEquals("90000", lockConfig.getString(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS)); + assertEquals("45000", lockConfig.getString(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS)); + } + + @Test + public void testGetLockConfigRejectsCustomLockProvider() { + String customLockProviderClass = "com.example.custom.MyCustomLockProvider"; + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), customLockProviderClass); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + HoodieException ex = assertThrows(HoodieException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable(customLockProviderClass, writeConfig)); + assertTrue(ex.getMessage().contains("only supported for built-in lock providers")); + assertTrue(ex.getMessage().contains(customLockProviderClass)); + } + + @Test + public void testGetLockConfigRejectsZookeeperProviderWithoutLockKey() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(ZookeeperBasedLockProvider.class) + .withZkQuorum("zk-host:2181") + .withZkBasePath("/hudi/locks") + .withZkPort("2181") + .build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable( + ZookeeperBasedLockProvider.class.getCanonicalName(), writeConfig)); + assertTrue(ex.getMessage().contains(LockConfiguration.ZK_LOCK_KEY_PROP_KEY)); + } + + @Test + public void testGetLockConfigRejectsFileSystemProviderWithoutLockPath() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withFileSystemLockExpire(10) + .build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable( + FileSystemBasedLockProvider.class.getCanonicalName(), writeConfig)); + assertTrue(ex.getMessage().contains(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY)); + } + + @Test + public void testGetLockConfigRejectsDynamoDBProviderWithoutPartitionKey() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS); + lockProps.put("hoodie.write.lock.dynamodb.table", "my_lock_table"); + lockProps.put("hoodie.write.lock.dynamodb.region", "us-west-2"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable( + HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS, writeConfig)); + assertTrue(ex.getMessage().contains("hoodie.write.lock.dynamodb.partition_key")); + } + + @Test + public void testGetLockConfigRejectsDynamoDBImplicitPartitionKeyLockProvider() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + HoodieLockConfig.DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS); + lockProps.put("hoodie.write.lock.dynamodb.table", "my_lock_table"); + lockProps.put("hoodie.write.lock.dynamodb.region", "us-west-2"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable( + HoodieLockConfig.DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS, writeConfig)); + assertTrue(ex.getMessage().contains("derives its lock identity from the table's base path")); + assertTrue(ex.getMessage().contains(HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS)); + } + + @Test + public void testGetLockConfigRejectsHiveMetastoreProviderWithoutDatabase() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + HoodieLockConfig.HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS); + lockProps.put(HoodieLockConfig.HIVE_TABLE_NAME.key(), "my_table"); + lockProps.put(HoodieLockConfig.HIVE_METASTORE_URI.key(), "thrift://hms-host:9083"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable( + HoodieLockConfig.HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS, writeConfig)); + assertTrue(ex.getMessage().contains(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY)); + } + + @Test + public void testGetLockConfigRejectsHiveMetastoreProviderWithoutTable() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + HoodieLockConfig.HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS); + lockProps.put(HoodieLockConfig.HIVE_DATABASE_NAME.key(), "my_database"); + lockProps.put(HoodieLockConfig.HIVE_METASTORE_URI.key(), "thrift://hms-host:9083"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder().fromProperties(lockProps).build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieLockConfig.deriveLockConfigForDifferentTable( + HoodieLockConfig.HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS, writeConfig)); + assertTrue(ex.getMessage().contains(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY)); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java index 5b69c1af65285..4065f113d518f 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieTableServiceManagerConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -47,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR; @@ -95,6 +97,8 @@ void runPendingTableServicesOperations(boolean hasPendingCompaction, boolean has HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); HoodieActiveTimeline initialTimeline = mock(HoodieActiveTimeline.class, RETURNS_DEEP_STUBS); BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp/").build(); + when(writeClient.getConfig()).thenReturn(writeConfig); if (requiresRefresh) { when(metaClient.reloadActiveTimeline()).thenReturn(initialTimeline); } else { @@ -123,6 +127,63 @@ void runPendingTableServicesOperations(boolean hasPendingCompaction, boolean has verify(metaClient, times(expectedTimelineReloads)).reloadActiveTimeline(); } + @Test + void runPendingTableServicesSkipsExecutionWhenTSMEnabled() { + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieActiveTimeline initialTimeline = mock(HoodieActiveTimeline.class, RETURNS_DEEP_STUBS); + BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class); + + Properties tsmProps = new Properties(); + tsmProps.put(HoodieTableServiceManagerConfig.TABLE_SERVICE_MANAGER_ENABLED.key(), "true"); + tsmProps.put(HoodieTableServiceManagerConfig.TABLE_SERVICE_MANAGER_ACTIONS.key(), "compaction,logcompaction"); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp/").withProperties(tsmProps).build(); + when(writeClient.getConfig()).thenReturn(writeConfig); + when(writeClient.shouldDelegateToTableServiceManager(any(), any())).thenCallRealMethod(); + + when(metaClient.getActiveTimeline()).thenReturn(initialTimeline); + when(initialTimeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1); + when(initialTimeline.filterPendingLogCompactionTimeline().countInstants()).thenReturn(1); + + HoodieActiveTimeline result = HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline( + metaClient, writeClient, false, Option.empty()); + + // TSM-delegated actions should not be executed + verify(writeClient, times(0)).runAnyPendingCompactions(); + verify(writeClient, times(0)).runAnyPendingLogCompactions(); + // No services ran, so no timeline reload needed + assertSame(initialTimeline, result); + } + + @Test + void runPendingTableServicesPartialTSMDelegation() { + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieActiveTimeline initialTimeline = mock(HoodieActiveTimeline.class, RETURNS_DEEP_STUBS); + BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class); + + // Only compaction is delegated to TSM, logcompaction is not + Properties tsmProps = new Properties(); + tsmProps.put(HoodieTableServiceManagerConfig.TABLE_SERVICE_MANAGER_ENABLED.key(), "true"); + tsmProps.put(HoodieTableServiceManagerConfig.TABLE_SERVICE_MANAGER_ACTIONS.key(), "compaction"); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp/").withProperties(tsmProps).build(); + when(writeClient.getConfig()).thenReturn(writeConfig); + when(writeClient.shouldDelegateToTableServiceManager(any(), any())).thenCallRealMethod(); + + when(metaClient.getActiveTimeline()).thenReturn(initialTimeline); + when(initialTimeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1); + when(initialTimeline.filterPendingLogCompactionTimeline().countInstants()).thenReturn(1); + + HoodieActiveTimeline reloadedTimeline = mock(HoodieActiveTimeline.class); + when(metaClient.reloadActiveTimeline()).thenReturn(reloadedTimeline); + + HoodieActiveTimeline result = HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline( + metaClient, writeClient, false, Option.empty()); + + // Compaction delegated to TSM → skipped; logcompaction not delegated → executed + verify(writeClient, times(0)).runAnyPendingCompactions(); + verify(writeClient, times(1)).runAnyPendingLogCompactions(); + assertSame(reloadedTimeline, result); + } + @Test void rollbackFailedWrites_reloadsTimelineOnWritesRolledBack() { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("file://tmp/") @@ -376,6 +437,7 @@ void testPerformTableServicesWithFailureHandling( when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig); when(metadataConfig.shouldFailOnTableServiceFailures()).thenReturn(shouldFailOnTableServiceFailures); when(writeConfig.getTableName()).thenReturn("test_table"); + when(writeConfig.getTableServiceManagerConfig()).thenReturn(HoodieTableServiceManagerConfig.newBuilder().build()); // Set up timeline mocks when(metaClient.reloadActiveTimeline()).thenReturn(timeline); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java index 0175cb60d663c..06c116967e175 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java @@ -18,22 +18,32 @@ package org.apache.hudi.metadata; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.client.transaction.lock.StorageBasedLockProvider; +import org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider; +import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.junit.jupiter.api.Test; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieMetadataWriteUtils { @@ -106,6 +116,390 @@ public void testCreateMetadataWriteConfigForNBCC() { WriteConcurrencyMode.SINGLE_WRITER, null); } + @Test + public void testCreateMetadataWriteConfigForOCC() { + String dataTableBasePath = "/tmp/base_path/"; + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(dataTableBasePath) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withFileSystemLockPath("/tmp/lock_dir") + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + // HoodieWriteConfig builder auto-adjusts failed writes policy to LAZY for multi-writer modes + validateMetadataWriteConfig(metadataWriteConfig, HoodieFailedWritesCleaningPolicy.LAZY, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, FileSystemBasedLockProvider.class.getCanonicalName()); + // MDT base path should NOT be overwritten to data table's base path + String expectedMdtBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataTableBasePath); + assertEquals(expectedMdtBasePath, metadataWriteConfig.getBasePath()); + } + + @Test + public void testCreateMetadataWriteConfigRejectsInProcessLockProvider() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(InProcessLockProvider.class).build()) + .build(); + + IllegalStateException ex = assertThrows(IllegalStateException.class, () -> + HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT)); + assertTrue(ex.getMessage().contains("InProcessLockProvider cannot be used")); + } + + @Test + public void testCreateMetadataWriteConfigForcesStreamingWritesOffWithMultiWriter() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(true) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withFileSystemLockPath("/tmp/lock_dir") + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + // Multi-writer takes precedence over streaming writes; streaming writes are forced off + validateMetadataWriteConfig(metadataWriteConfig, HoodieFailedWritesCleaningPolicy.LAZY, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, FileSystemBasedLockProvider.class.getCanonicalName()); + } + + @Test + public void testCreateMetadataWriteConfigWithTableServiceManager() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withTableServiceManagerEnabled(true) + .withTableServiceManagerActions("compaction,logcompaction") + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + assertTrue(metadataWriteConfig.getTableServiceManagerConfig().isTableServiceManagerEnabled()); + assertTrue(metadataWriteConfig.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)); + assertTrue(metadataWriteConfig.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.logcompaction)); + assertFalse(metadataWriteConfig.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.clean)); + } + + @Test + public void testCreateMetadataWriteConfigWithTableServiceManagerLogCompactionOnly() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withTableServiceManagerEnabled(true) + .withTableServiceManagerActions("logcompaction") + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + assertTrue(metadataWriteConfig.getTableServiceManagerConfig().isTableServiceManagerEnabled()); + assertFalse(metadataWriteConfig.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction), + "compaction should not match when only logcompaction is configured"); + assertTrue(metadataWriteConfig.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.logcompaction)); + } + + @Test + public void testCreateMetadataWriteConfigWithTableServiceManagerDisabled() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + assertFalse(metadataWriteConfig.getTableServiceManagerConfig().isTableServiceManagerEnabled()); + assertFalse(metadataWriteConfig.getTableServiceManagerConfig().isEnabledAndActionSupported(ActionType.compaction)); + } + + @Test + public void testCreateMetadataWriteConfigForOCCWithZookeeperLockProvider() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(ZookeeperBasedLockProvider.class) + .withZkQuorum("zk-host:2181") + .withZkBasePath("/hudi/locks") + .withZkLockKey("test_table") + .withZkPort("2181") + .withZkSessionTimeoutInMs(30000L) + .withZkConnectionTimeoutInMs(15000L) + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + validateMetadataWriteConfig(metadataWriteConfig, HoodieFailedWritesCleaningPolicy.LAZY, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, ZookeeperBasedLockProvider.class.getCanonicalName()); + assertEquals("zk-host:2181", metadataWriteConfig.getProps().getString(HoodieLockConfig.ZK_CONNECT_URL.key())); + assertEquals("/hudi/locks", metadataWriteConfig.getProps().getString(HoodieLockConfig.ZK_BASE_PATH.key())); + assertEquals("test_table", metadataWriteConfig.getProps().getString(HoodieLockConfig.ZK_LOCK_KEY.key())); + assertEquals("2181", metadataWriteConfig.getProps().getString(HoodieLockConfig.ZK_PORT.key())); + assertEquals(30000, metadataWriteConfig.getProps().getInteger(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())); + assertEquals(15000, metadataWriteConfig.getProps().getInteger(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())); + } + + @Test + public void testCreateMetadataWriteConfigForOCCWithHiveMetastoreLockProvider() { + String hmsLockProviderClass = "org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider"; + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), hmsLockProviderClass); + lockProps.put(HoodieLockConfig.HIVE_DATABASE_NAME.key(), "my_database"); + lockProps.put(HoodieLockConfig.HIVE_TABLE_NAME.key(), "my_table"); + lockProps.put(HoodieLockConfig.HIVE_METASTORE_URI.key(), "thrift://hms-host:9083"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .fromProperties(lockProps) + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + validateMetadataWriteConfig(metadataWriteConfig, HoodieFailedWritesCleaningPolicy.LAZY, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, hmsLockProviderClass); + assertEquals("my_database", metadataWriteConfig.getProps().getString(HoodieLockConfig.HIVE_DATABASE_NAME.key())); + assertEquals("my_table", metadataWriteConfig.getProps().getString(HoodieLockConfig.HIVE_TABLE_NAME.key())); + assertEquals("thrift://hms-host:9083", metadataWriteConfig.getProps().getString(HoodieLockConfig.HIVE_METASTORE_URI.key())); + } + + @Test + public void testCreateMetadataWriteConfigForOCCWithFileSystemLockProvider() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withFileSystemLockPath("/tmp/lock_dir") + .withFileSystemLockExpire(10) + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + validateMetadataWriteConfig(metadataWriteConfig, HoodieFailedWritesCleaningPolicy.LAZY, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, FileSystemBasedLockProvider.class.getCanonicalName()); + assertEquals("/tmp/lock_dir", metadataWriteConfig.getProps().getString(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key())); + assertEquals(10, metadataWriteConfig.getProps().getInteger(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())); + } + + @Test + public void testCreateMetadataWriteConfigRejectsCustomLockProvider() { + String customLockProviderClass = "com.example.custom.MyCustomLockProvider"; + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), customLockProviderClass); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .fromProperties(lockProps) + .build()) + .build(); + + HoodieException ex = assertThrows(HoodieException.class, () -> + HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT)); + assertTrue(ex.getMessage().contains("only supported for built-in lock providers")); + assertTrue(ex.getMessage().contains(customLockProviderClass)); + } + + @Test + public void testCreateMetadataWriteConfigRejectsConcurrencyModeMismatch() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .build(); + + IllegalStateException ex = assertThrows(IllegalStateException.class, () -> + HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT)); + assertTrue(ex.getMessage().contains("must match the data table concurrency mode")); + } + + @Test + public void testCreateMetadataWriteConfigForOCCPreservesMdtSpecificValues() { + String dataTableBasePath = "/tmp/base_path/"; + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(dataTableBasePath) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withFileSystemLockPath("/tmp/lock_dir") + .withFileSystemLockExpire(10) + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + + validateMetadataWriteConfig(metadataWriteConfig, HoodieFailedWritesCleaningPolicy.LAZY, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, FileSystemBasedLockProvider.class.getCanonicalName()); + + // MDT-specific values must be preserved + String expectedMdtBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataTableBasePath); + assertEquals(expectedMdtBasePath, metadataWriteConfig.getBasePath()); + assertFalse(metadataWriteConfig.isAutoClean(), "Auto clean should be disabled for MDT"); + assertFalse(metadataWriteConfig.inlineCompactionEnabled(), "Inline compaction should be disabled for MDT"); + assertFalse(metadataWriteConfig.isMetadataTableEnabled(), "Metadata listing should be disabled for MDT"); + assertNotEquals(dataTableBasePath, metadataWriteConfig.getBasePath(), + "MDT base path should not be overwritten to data table base path"); + } + + @Test + public void testCreateMetadataWriteConfigForOCCWithDynamoDBLockProvider() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS); + lockProps.put("hoodie.write.lock.dynamodb.table", "my_lock_table"); + lockProps.put("hoodie.write.lock.dynamodb.region", "us-west-2"); + lockProps.put("hoodie.write.lock.dynamodb.partition_key", "test_table"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .fromProperties(lockProps) + .build()) + .build(); + + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT); + validateMetadataWriteConfig(metadataWriteConfig, HoodieFailedWritesCleaningPolicy.LAZY, + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL, + HoodieLockConfig.DYNAMODB_BASED_LOCK_PROVIDER_CLASS); + assertEquals("my_lock_table", metadataWriteConfig.getProps().getString("hoodie.write.lock.dynamodb.table")); + assertEquals("us-west-2", metadataWriteConfig.getProps().getString("hoodie.write.lock.dynamodb.region")); + assertEquals("test_table", metadataWriteConfig.getProps().getString("hoodie.write.lock.dynamodb.partition_key")); + } + + @Test + public void testCreateMetadataWriteConfigRejectsStorageBasedLockProvider() { + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), + StorageBasedLockProvider.class.getCanonicalName()); + lockProps.put("hoodie.write.lock.storage.validity.timeout.secs", "600"); + lockProps.put("hoodie.write.lock.storage.renew.interval.secs", "60"); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .fromProperties(lockProps) + .build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT)); + assertTrue(ex.getMessage().contains("derives its lock identity from the table's base path")); + } + + @Test + public void testCreateMetadataWriteConfigRejectsZookeeperImplicitBasePathLockProvider() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(ZookeeperBasedImplicitBasePathLockProvider.class) + .withZkQuorum("zk-host:2181") + .withZkPort("2181") + .build()) + .build(); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT)); + assertTrue(ex.getMessage().contains("derives its lock identity from the table's base path")); + } + + @Test + public void testCreateMetadataWriteConfigRejectsCustomLockProviders() { + String customLockProviderClass = "com.example.custom.DistributedLockProvider"; + Properties lockProps = new Properties(); + lockProps.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), customLockProviderClass); + + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp/base_path/") + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withStreamingWriteEnabled(false) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build()) + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .withLockConfig(HoodieLockConfig.newBuilder() + .fromProperties(lockProps) + .build()) + .build(); + + HoodieException ex = assertThrows(HoodieException.class, () -> + HoodieMetadataWriteUtils.createMetadataWriteConfig( + writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT)); + assertTrue(ex.getMessage().contains("only supported for built-in lock providers")); + } + private void validateMetadataWriteConfig(HoodieWriteConfig metadataWriteConfig, HoodieFailedWritesCleaningPolicy expectedPolicy, WriteConcurrencyMode expectedWriteConcurrencyMode, String expectedLockProviderClass) { assertEquals(expectedPolicy, metadataWriteConfig.getFailedWritesCleanPolicy()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 5fa3201d2ffea..cd4d03ee4ee70 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -20,6 +20,8 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.ActionType; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -32,10 +34,12 @@ import java.io.FileReader; import java.io.IOException; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -81,6 +85,31 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "in streaming manner rather than two disjoint writes. By default " + "streaming writes to metadata table is enabled for SPARK engine for incremental operations and disabled for all other cases."); + public static final ConfigProperty METADATA_WRITE_CONCURRENCY_MODE = ConfigProperty + .key(METADATA_PREFIX + ".write.concurrency.mode") + .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name()) + .markAdvanced() + .withDocumentation("Change this to OPTIMISTIC_CONCURRENCY_CONTROL when MDT operations are being performed " + + "from an external concurrent writer (such as a table service platform) so that appropriate locks are taken."); + + public static final ConfigProperty TABLE_SERVICE_MANAGER_ENABLED = ConfigProperty + .key(METADATA_PREFIX + ".table.service.manager.enabled") + .defaultValue(false) + .markAdvanced() + .withDocumentation("If true, delegate specified table service actions on the metadata table to the table service manager " + + "instead of executing them inline. This prevents the current writer from executing compaction/logcompaction " + + "on the metadata table, allowing a separate async pipeline to handle them."); + + public static final Set SUPPORTED_TABLE_SERVICE_MANAGER_ACTIONS = + EnumSet.of(ActionType.compaction, ActionType.logcompaction); + + public static final ConfigProperty TABLE_SERVICE_MANAGER_ACTIONS = ConfigProperty + .key(METADATA_PREFIX + ".table.service.manager.actions") + .defaultValue("") + .markAdvanced() + .withDocumentation("Comma-separated list of table service actions on the metadata table " + + "that should be delegated to the table service manager. Currently supported actions are: compaction, logcompaction."); + public static final ConfigProperty STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR = ConfigProperty .key(METADATA_PREFIX + ".streaming.write.datatable.write.statuses.coalesce.divisor") .defaultValue(5000) @@ -678,6 +707,18 @@ public boolean isStreamingWriteEnabled() { return getBoolean(STREAMING_WRITE_ENABLED); } + public String getWriteConcurrencyMode() { + return getString(METADATA_WRITE_CONCURRENCY_MODE); + } + + public boolean isTableServiceManagerEnabled() { + return getBoolean(TABLE_SERVICE_MANAGER_ENABLED); + } + + public String getTableServiceManagerActions() { + return getString(TABLE_SERVICE_MANAGER_ACTIONS); + } + public int getStreamingWritesCoalesceDivisorForDataTableWrites() { return getInt(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR); } @@ -1006,6 +1047,24 @@ public Builder withStreamingWriteEnabled(boolean enabled) { return this; } + public Builder withWriteConcurrencyMode(WriteConcurrencyMode mode) { + metadataConfig.setValue(METADATA_WRITE_CONCURRENCY_MODE, mode.name()); + return this; + } + + public Builder withTableServiceManagerEnabled(boolean enabled) { + metadataConfig.setValue(TABLE_SERVICE_MANAGER_ENABLED, String.valueOf(enabled)); + return this; + } + + public Builder withTableServiceManagerActions(String actions) { + if (!actions.isEmpty()) { + validateTableServiceManagerActions(actions); + } + metadataConfig.setValue(TABLE_SERVICE_MANAGER_ACTIONS, actions); + return this; + } + public Builder withMetadataIndexBloomFilter(boolean enable) { metadataConfig.setValue(ENABLE_METADATA_INDEX_BLOOM_FILTER, String.valueOf(enable)); return this; @@ -1297,6 +1356,17 @@ public HoodieMetadataConfig build() { metadataConfig.setDefaultValue(STREAMING_WRITE_ENABLED, getDefaultForStreamingWriteEnabled(engineType)); // fix me: disable when schema on read is enabled. metadataConfig.setDefaults(HoodieMetadataConfig.class.getName()); + + String tsmActions = metadataConfig.getString(TABLE_SERVICE_MANAGER_ACTIONS); + if (tsmActions != null && !tsmActions.isEmpty()) { + validateTableServiceManagerActions(tsmActions); + } + if (metadataConfig.getBoolean(TABLE_SERVICE_MANAGER_ENABLED) + && (tsmActions == null || tsmActions.isEmpty())) { + throw new IllegalArgumentException(TABLE_SERVICE_MANAGER_ENABLED.key() + " is set to true but " + + TABLE_SERVICE_MANAGER_ACTIONS.key() + " is empty. Specify at least one action to delegate" + + " (supported: " + SUPPORTED_TABLE_SERVICE_MANAGER_ACTIONS + ")."); + } return metadataConfig; } @@ -1347,6 +1417,23 @@ private boolean getDefaultSecondaryIndexEnable(EngineType engineType) { throw new HoodieNotSupportedException("Unsupported engine " + engineType); } } + + private static void validateTableServiceManagerActions(String actions) { + for (String action : actions.split(",")) { + String trimmed = action.trim(); + ActionType actionType; + try { + actionType = ActionType.valueOf(trimmed); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown metadata table service manager action: " + trimmed + + ". Supported actions are: " + SUPPORTED_TABLE_SERVICE_MANAGER_ACTIONS, e); + } + if (!SUPPORTED_TABLE_SERVICE_MANAGER_ACTIONS.contains(actionType)) { + throw new IllegalArgumentException("Unsupported metadata table service manager action: " + trimmed + + ". Supported actions are: " + SUPPORTED_TABLE_SERVICE_MANAGER_ACTIONS); + } + } + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java index 8e59ce1efa0e7..a2cef4558e1b6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableServiceManagerConfig.java @@ -22,7 +22,10 @@ import javax.annotation.concurrent.Immutable; +import java.util.Arrays; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; /** * Configurations used by the Hudi Table Service Manager. @@ -171,9 +174,13 @@ public int getConnectionRetryDelay() { } public boolean isEnabledAndActionSupported(ActionType actionType) { - boolean isActionSupported = getTableServiceManagerActions().contains(actionType.name()); + Set actions = Arrays.stream(getTableServiceManagerActions().split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toSet()); + boolean isActionSupported = actions.contains(actionType.name()); if (actionType.equals(ActionType.clustering)) { - isActionSupported = isActionSupported || getTableServiceManagerActions().contains(ActionType.replacecommit.name()); + isActionSupported = isActionSupported || actions.contains(ActionType.replacecommit.name()); } return isTableServiceManagerEnabled() && isActionSupported; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java index 028c69683ef22..b66b432abc24d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java @@ -19,12 +19,15 @@ package org.apache.hudi.common.config; +import org.apache.hudi.common.model.WriteConcurrencyMode; + import org.junit.jupiter.api.Test; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -171,4 +174,99 @@ void testFailOnTableServiceFailures() { assertEquals("hoodie.metadata.write.fail.on.table.service.failures", HoodieMetadataConfig.FAIL_ON_TABLE_SERVICE_FAILURES.key()); } + + @Test + void testWriteConcurrencyMode() { + HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build(); + assertEquals(WriteConcurrencyMode.SINGLE_WRITER.name(), config.getWriteConcurrencyMode()); + + HoodieMetadataConfig occConfig = HoodieMetadataConfig.newBuilder() + .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + .build(); + assertEquals(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name(), occConfig.getWriteConcurrencyMode()); + + Properties props = new Properties(); + props.put(HoodieMetadataConfig.METADATA_WRITE_CONCURRENCY_MODE.key(), + WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()); + HoodieMetadataConfig propsConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + assertEquals(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name(), propsConfig.getWriteConcurrencyMode()); + } + + @Test + void testTableServiceManagerEnabled() { + HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build(); + assertFalse(config.isTableServiceManagerEnabled()); + + HoodieMetadataConfig enabledConfig = HoodieMetadataConfig.newBuilder() + .withTableServiceManagerEnabled(true) + .withTableServiceManagerActions("compaction") + .build(); + assertTrue(enabledConfig.isTableServiceManagerEnabled()); + + HoodieMetadataConfig disabledConfig = HoodieMetadataConfig.newBuilder() + .withTableServiceManagerEnabled(false) + .build(); + assertFalse(disabledConfig.isTableServiceManagerEnabled()); + } + + @Test + void testTableServiceManagerActions() { + HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build(); + assertEquals("", config.getTableServiceManagerActions()); + + HoodieMetadataConfig configWithActions = HoodieMetadataConfig.newBuilder() + .withTableServiceManagerActions("compaction,logcompaction") + .build(); + assertEquals("compaction,logcompaction", configWithActions.getTableServiceManagerActions()); + + Properties props = new Properties(); + props.put(HoodieMetadataConfig.TABLE_SERVICE_MANAGER_ACTIONS.key(), "compaction"); + HoodieMetadataConfig propsConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + assertEquals("compaction", propsConfig.getTableServiceManagerActions()); + } + + @Test + void testTableServiceManagerActionsRejectsUnsupportedActions() { + assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataConfig.newBuilder() + .withTableServiceManagerActions("clean") + .build()); + + assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataConfig.newBuilder() + .withTableServiceManagerActions("clustering") + .build()); + + assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataConfig.newBuilder() + .withTableServiceManagerActions("compaction,clean") + .build()); + + assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataConfig.newBuilder() + .withTableServiceManagerActions("nonexistent") + .build()); + } + + @Test + void testTableServiceManagerActionsValidatedInBuildFromProperties() { + Properties props = new Properties(); + props.put(HoodieMetadataConfig.TABLE_SERVICE_MANAGER_ACTIONS.key(), "clean"); + assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build()); + } + + @Test + void testTableServiceManagerEnabledWithEmptyActionsRejected() { + assertThrows(IllegalArgumentException.class, () -> + HoodieMetadataConfig.newBuilder() + .withTableServiceManagerEnabled(true) + .build()); + } }