-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(metadata): Allow users to safely execute compaction plans on metadata table concurrently through a table service platform (rather than only inline during write) #18295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d5865df
5b6316f
9f187e0
dca344e
1dbbed5
eb14986
1d71d4e
2b36662
63576bd
3b62360
b1f68c2
7cbe14e
fa3a162
91a61ff
84eb330
f6ae715
6745fd9
9041c22
90d4b1d
64a7c85
1487955
7a92049
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,12 +20,19 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.client.transaction.ConflictResolutionStrategy; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.client.transaction.lock.StorageBasedLockProvider; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.config.ConfigClassProperty; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.config.ConfigGroups; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.config.ConfigProperty; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.config.HoodieConfig; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.config.LockConfiguration; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.config.TypedProperties; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.lock.LockProvider; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.common.util.Option; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.exception.HoodieException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hudi.index.HoodieIndex; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import java.io.File; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -241,6 +248,17 @@ public class HoodieLockConfig extends HoodieConfig { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @Deprecated | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PROVIDER_CLASS_NAME.key(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Lock provider class names from modules not directly accessible in hudi-client-common. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static final String HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "org.apache.hudi.hive.transaction.lock.HiveMetastoreBasedLockProvider"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static final String DYNAMODB_BASED_LOCK_PROVIDER_CLASS = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static final String DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "org.apache.hudi.aws.transaction.lock.DynamoDBBasedImplicitPartitionKeyLockProvider"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb."; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final String DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY = DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "storage."; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private HoodieLockConfig() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -249,6 +267,109 @@ public static HoodieLockConfig.Builder newBuilder() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return new HoodieLockConfig.Builder(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Derive a {@link HoodieLockConfig} for a different table by copying lock-related configs from the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * given write config. Only built-in lock providers with explicitly set lock keys/paths are supported; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * providers that implicitly derive their lock identity from the table's base path are rejected. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param lockProviderClass the lock provider class name | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param writeConfig the source write config to copy lock properties from | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @return a new HoodieLockConfig with the relevant lock properties | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws IllegalArgumentException if required lock keys are missing or the provider derives its lock identity implicitly | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws HoodieException if the lock provider is not a built-in provider | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static HoodieLockConfig deriveLockConfigForDifferentTable(String lockProviderClass, HoodieWriteConfig writeConfig) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TypedProperties dataProps = writeConfig.getProps(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Properties lockProps = new Properties(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Common lock configs used by all providers | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_ACQUIRE_NUM_RETRIES.key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writeConfig.getStringOrDefault(LOCK_ACQUIRE_NUM_RETRIES)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_NUM_RETRIES)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String.valueOf(writeConfig.getIntOrDefault(LOCK_ACQUIRE_WAIT_TIMEOUT_MS))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| lockProps.put(LOCK_HEARTBEAT_INTERVAL_MS.key(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String.valueOf(writeConfig.getIntOrDefault(LOCK_HEARTBEAT_INTERVAL_MS))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Provider-specific configs with lock key validation. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Providers whose lock identity (key/path) is inferred from table name or base path at build time | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // won't carry those inferred values into a rebuilt config for a different table, so we either | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // require them to be set explicitly or reject providers that offer no explicit override. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (FileSystemBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| copyPropsWithPrefix(dataProps, lockProps, LockConfiguration.FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!lockProps.containsKey(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " must be explicitly set when using " + lockProviderClass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + ". Without it, the lock path is derived from the table's base path," | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " which would resolve to a different location when building a lock config for a different table."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (ZookeeperBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
kbuci marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| copyPropsWithPrefix(dataProps, lockProps, LockConfiguration.ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!lockProps.containsKey(LockConfiguration.ZK_LOCK_KEY_PROP_KEY)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(LockConfiguration.ZK_LOCK_KEY_PROP_KEY | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " must be explicitly set when using " + lockProviderClass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + ". The inferred default from table name is not propagated" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " when building a lock config for a different table."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (ZookeeperBasedImplicitBasePathLockProvider.class.getCanonicalName().equals(lockProviderClass)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(lockProviderClass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " is not supported because it derives its lock identity from the table's base path," | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " which would resolve to a different lock when building a lock config for a different table." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " Use " + ZookeeperBasedLockProvider.class.getCanonicalName() + " with an explicit " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + LockConfiguration.ZK_LOCK_KEY_PROP_KEY + " instead."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
kbuci marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| copyPropsWithPrefix(dataProps, lockProps, LockConfiguration.HIVE_METASTORE_LOCK_PROPERTY_PREFIX); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!lockProps.containsKey(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " must be explicitly set when using " + lockProviderClass); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!lockProps.containsKey(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " must be explicitly set when using " + lockProviderClass); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (DYNAMODB_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| copyPropsWithPrefix(dataProps, lockProps, DYNAMODB_BASED_LOCK_PROPERTY_PREFIX); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!lockProps.containsKey(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " must be explicitly set when using " + lockProviderClass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + ". The inferred default from table name is not propagated" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " when building a lock config for a different table."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate required lock-identity properties as non-blank, not just present. Current checks use Proposed fix- if (!lockProps.containsKey(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass
+ ". Without it, the lock path is derived from the table's base path,"
+ " which would resolve to a different location when building a lock config for a different table.");
}
@@
- if (!lockProps.containsKey(LockConfiguration.ZK_LOCK_KEY_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.ZK_LOCK_KEY_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass
+ ". The inferred default from table name is not propagated"
+ " when building a lock config for a different table.");
}
@@
- if (!lockProps.containsKey(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass);
}
- if (!lockProps.containsKey(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass);
}
@@
- if (!lockProps.containsKey(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY))) {
throw new IllegalArgumentException(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass
+ ". The inferred default from table name is not propagated"
+ " when building a lock config for a different table.");
}
@@
private static void copyPropsWithPrefix(TypedProperties source, Properties target, String prefix) {
for (String key : source.stringPropertyNames()) {
if (key.startsWith(prefix)) {
target.setProperty(key, source.getProperty(key));
}
}
}
+
+ private static boolean isBlank(String value) {
+ return value == null || value.trim().isEmpty();
+ }🤖 Prompt for AI Agents— CodeRabbit (original) (source:comment#3095603949) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the data table has Same applies for other implicit lock providers
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot support lock providers that inherit lock keys from table name/basepath , since if we set the table name/basepath in the returned lock config, then that will cause final MDT lock config to have table name/basepath of data table. Which will cause issues |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(lockProviderClass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " is not supported because it derives its lock identity from the table's base path," | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " which would resolve to a different lock when building a lock config for a different table." | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " Use " + DYNAMODB_BASED_LOCK_PROVIDER_CLASS + " with an explicit " | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY + " instead."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (StorageBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException(lockProviderClass | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " is not supported because it derives its lock identity from the table's base path," | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " which would resolve to a different lock when building a lock config for a different table," | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " and does not provide an explicit lock path override."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new HoodieException(writeConfig.getWriteConcurrencyMode() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| + " is only supported for built-in lock providers. Unsupported lock provider: " + lockProviderClass); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return newBuilder().fromProperties(lockProps).build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preserve inferred provider inputs when rebuilding the lock config. This helper creates a fresh Minimal fix sketch public static HoodieLockConfig getLockConfigForBuiltInLockProvider(String lockProviderClass, HoodieWriteConfig writeConfig) {
TypedProperties dataProps = writeConfig.getProps();
Properties lockProps = new Properties();
lockProps.put(LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass);
+ if (writeConfig.contains(HoodieWriteConfig.TBL_NAME)) {
+ lockProps.put(HoodieWriteConfig.TBL_NAME.key(),
+ writeConfig.getString(HoodieWriteConfig.TBL_NAME));
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents— CodeRabbit (original) (source:comment#3055465495) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static void copyPropsWithPrefix(TypedProperties source, Properties target, String prefix) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (String key : source.stringPropertyNames()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (key.startsWith(prefix)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| target.setProperty(key, source.getProperty(key)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public static class Builder { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final HoodieLockConfig lockConfig = new HoodieLockConfig(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,9 +25,11 @@ | |
| import org.apache.hudi.avro.model.HoodieRestorePlan; | ||
| import org.apache.hudi.avro.model.HoodieRollbackMetadata; | ||
| import org.apache.hudi.client.BaseHoodieWriteClient; | ||
| import org.apache.hudi.client.RunsTableService; | ||
| import org.apache.hudi.client.WriteStatus; | ||
| import org.apache.hudi.common.config.HoodieConfig; | ||
| import org.apache.hudi.common.config.HoodieMetadataConfig; | ||
| import org.apache.hudi.common.model.ActionType; | ||
| import org.apache.hudi.common.data.HoodieData; | ||
| import org.apache.hudi.common.engine.EngineType; | ||
| import org.apache.hudi.common.engine.HoodieEngineContext; | ||
|
|
@@ -156,7 +158,7 @@ | |
| * | ||
| * @param <I> Type of input for the write client | ||
| */ | ||
| public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTableMetadataWriter<I, O> { | ||
| public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTableMetadataWriter<I, O>, RunsTableService { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 Having |
||
|
|
||
| static final Logger LOG = LoggerFactory.getLogger(HoodieBackedTableMetadataWriter.class); | ||
|
|
||
|
|
@@ -2153,12 +2155,20 @@ static HoodieActiveTimeline runPendingTableServicesOperationsAndRefreshTimeline( | |
| // finish off any pending log compaction or compactions operations if any from previous attempt. | ||
| boolean ranServices = false; | ||
| if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) { | ||
| writeClient.runAnyPendingCompactions(); | ||
| ranServices = true; | ||
| if (writeClient.shouldDelegateToTableServiceManager(writeClient.getConfig(), ActionType.compaction)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤖 The underlying - Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kbuci : lets attend to this |
||
| LOG.info("Skipping pending compactions on MDT as they are delegated to table service manager."); | ||
| } else { | ||
| writeClient.runAnyPendingCompactions(); | ||
| ranServices = true; | ||
| } | ||
| } | ||
| if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() > 0) { | ||
| writeClient.runAnyPendingLogCompactions(); | ||
| ranServices = true; | ||
| if (writeClient.shouldDelegateToTableServiceManager(writeClient.getConfig(), ActionType.logcompaction)) { | ||
| LOG.info("Skipping pending log compactions on MDT as they are delegated to table service manager."); | ||
| } else { | ||
| writeClient.runAnyPendingLogCompactions(); | ||
| ranServices = true; | ||
| } | ||
| } | ||
| return ranServices ? metadataMetaClient.reloadActiveTimeline() : activeTimeline; | ||
| } catch (Exception e) { | ||
|
|
@@ -2201,7 +2211,11 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient, Option<Strin | |
| LOG.info("Compaction with same {} time is already present in the timeline.", compactionInstantTime); | ||
| } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { | ||
| LOG.info("Compaction is scheduled for timestamp {}", compactionInstantTime); | ||
| writeClient.compact(compactionInstantTime, true); | ||
| if (shouldDelegateToTableServiceManager(metadataWriteConfig, ActionType.compaction)) { | ||
| LOG.info("Skipping execution of compaction on MDT as it is delegated to table service manager."); | ||
| } else { | ||
| writeClient.compact(compactionInstantTime, true); | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
| metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1)); | ||
|
|
@@ -2217,7 +2231,11 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient, Option<Strin | |
| Option<String> scheduledLogCompaction = writeClient.scheduleLogCompaction(Option.empty()); | ||
| if (scheduledLogCompaction.isPresent()) { | ||
| LOG.info("Log compaction is scheduled for timestamp {}", scheduledLogCompaction.get()); | ||
| writeClient.logCompact(scheduledLogCompaction.get(), true); | ||
| if (shouldDelegateToTableServiceManager(metadataWriteConfig, ActionType.logcompaction)) { | ||
| LOG.info("Skipping execution of log compaction on MDT as it is delegated to table service manager."); | ||
| } else { | ||
| writeClient.logCompact(scheduledLogCompaction.get(), true); | ||
| } | ||
| } | ||
| } | ||
| } catch (Exception e) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import org.apache.hudi.avro.model.HoodieRollbackMetadata; | ||
| import org.apache.hudi.client.BaseHoodieWriteClient; | ||
| import org.apache.hudi.common.config.HoodieMetadataConfig; | ||
| import org.apache.hudi.common.model.ActionType; | ||
| import org.apache.hudi.common.engine.HoodieEngineContext; | ||
| import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
|
|
@@ -289,7 +290,11 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient, Option<Strin | |
| LOG.info("Compaction with same {} time is already present in the timeline.", compactionInstantTime); | ||
| } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { | ||
| LOG.info("Compaction is scheduled for timestamp {}", compactionInstantTime); | ||
| writeClient.compact(compactionInstantTime, true); | ||
| if (shouldDelegateToTableServiceManager(metadataWriteConfig, ActionType.compaction)) { | ||
| LOG.info("Skipping execution of compaction on MDT as it is delegated to table service manager."); | ||
| } else { | ||
| writeClient.compact(compactionInstantTime, true); | ||
| } | ||
| } else if (metadataWriteConfig.isLogCompactionEnabled()) { | ||
| // Schedule and execute log compaction with suffixes based on the same instant time. This ensures that any future | ||
| // delta commits synced over will not have an instant time lesser than the last completed instant on the | ||
|
|
@@ -299,7 +304,11 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient, Option<Strin | |
| LOG.info("Log compaction with same {} time is already present in the timeline.", logCompactionInstantTime); | ||
| } else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, Option.empty())) { | ||
| LOG.info("Log compaction is scheduled for timestamp {}", logCompactionInstantTime); | ||
| writeClient.logCompact(logCompactionInstantTime, true); | ||
| if (shouldDelegateToTableServiceManager(metadataWriteConfig, ActionType.logcompaction)) { | ||
| LOG.info("Skipping execution of log compaction on MDT as it is delegated to table service manager."); | ||
| } else { | ||
| writeClient.logCompact(logCompactionInstantTime, true); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like we are only supporting compaction and log compactions for now.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure updated config documentation |
||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of introducing lock-provider-specific changes, could we move metadata compaction or any table service into a higher-level API on SparkRDDWriteClient for metadata-table table services?
Since compaction already has explicit scheduling/execution phases, it seems cleaner to encapsulate the metadata-table service flow there and acquire the main-table lock only around the finalization/publication step. That would keep concurrency control at the table-service orchestration layer rather than coupling behavior to individual lock providers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, since we have the completion time based file slicing, the compaction scheduling and execution and be alway done in a separate job without breaking the semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't we still need this change so that we can create a MDT write client that will implicitly take data table lock when calling
compactAPIs? Since although in the data table write client we can construct a MDT write client and callcompacton it, afaik it would just perform all the execution/completion steps with single_writer mode. Is there a way to work around this?@nsivabalan what are your thoughts?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in theory, this might look neat, but in reality, this may not land neatly as we think. I did bring this up surya last week when he proposed this.
anyways, let me clarify. As of now, all touch points to mdt from data table write client is fairly standard.
but w/ above proposal, we might end up something like
This does not seem elegant and unnecessarily complicates the layering.
But if you folks have any other better alternative, let us know. Happy to explore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we execute MDT table services in a separate job, adding a few APIs on the MDT writer is not that too bad from my side, since the MDT itself is now the main API we are operating on now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's in my mind too.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the issue is, we can't confine those APIs just for the standalone tool right. it will go in w/ the standard write client we have. And hence, anyone can use it otherwise as well.
lets chat through this and take a call. I don't see lot of users might use this feature for now. So, unless we plan to make this a major one, and make it robust(either by adding separate locks for metadata table), don't wanna pollute the write client meant for data table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can mark these APIs as expremental or internal and doc it well, the tests would guard the correctness, a separate compaction job for MDT makes sense in general as the volumn scales to super large, just like what we do to the data table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I synced up w/ @yihua on this and generally we feel, adding a new public api to write client will confuse users and we got to be very thorough in adding any such public apis.
my recommendation is to take it up as a follow up. Consider different locks for data table and mdt table or list down all possible approaches, go over pros and cons and then add standalone tool for MDT table services if need be or take the write client approach we feel we have compelling reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that MDT write operations are critical, it is not quite safe to expose new APIs of
SparkRDDWriteClient.performAsyncTableServicesInMDT, and users not aware of what they are doing can easily corrupt MDT. So, given that it is an advanced usage, I would suggest avoiding a new public API and leveraging the lock config.