feat(metadata): Allow users to safely execute compaction plans on metadata table concurrently through a table service platform (rather than only inline during write)#18295
Conversation
| if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) { | ||
| writeClient.runAnyPendingCompactions(); | ||
| ranServices = true; | ||
| if (tsmConfig.isEnabledAndActionSupported(ActionType.compaction)) { |
There was a problem hiding this comment.
In our org's internal build we have a separate config for "disabling" compaction/logcompaction inline. But since it seems in OSS we use HoodieTableServiceManagerConfig for data table, I decided to stick with that usage/convention
| + "in streaming manner rather than two disjoint writes. By default " | ||
| + "streaming writes to metadata table is enabled for SPARK engine for incremental operations and disabled for all other cases."); | ||
|
|
||
| public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE = ConfigProperty |
There was a problem hiding this comment.
I was debating on making this a boolean like "is executing table service plans on MDT concurrently" and (if true) infer the concurrency mode of data table. But I wasn't sure if that would be more confusing to read/use
There was a problem hiding this comment.
Hmm based on looking at NBCC further, maybe I can just do it that way ^ (and make MDT write inherit the OCC/NBCV mode from data table). Let me know your thoughts
nsivabalan
left a comment
There was a problem hiding this comment.
Will review tests in next iteration
| + "streaming writes to metadata table is enabled for SPARK engine for incremental operations and disabled for all other cases."); | ||
|
|
||
| public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE = ConfigProperty | ||
| .key(METADATA_PREFIX + ".write.concurrency.mode") |
There was a problem hiding this comment.
can we add tests to TestHoodieMetadataConfig
There was a problem hiding this comment.
Sure added some basic tests
There was a problem hiding this comment.
🤖 The PR does add TestHoodieMetadataConfig.java with 79 new lines covering the new configs — looks like that's already been addressed.
| String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue())))); | ||
|
|
||
| String lockProviderClass = writeConfig.getLockProviderClass(); | ||
| if (lockProviderClass == null) { |
There was a problem hiding this comment.
shouldn't we throw here?
how come data table's write config does not have any lock provider class set?
There was a problem hiding this comment.
Sure let me add more assertions
There was a problem hiding this comment.
🤖 It does throw — checkState(lockProviderClass != null, "Lock provider class must be set...") fires before getLockConfigForBuiltInLockProvider is ever called, so null is an explicit error condition that results in IllegalStateException.
There was a problem hiding this comment.
🤖 Agreed — if multi-writer mode is explicitly configured on the MDT, the data table must have a lock provider. Silently falling back to no-lock would mask a misconfiguration that could lead to data corruption. Throwing here (or at least a checkState) would be the right call.
There was a problem hiding this comment.
🤖 Agree this should throw rather than silently proceeding. If OCC is requested for MDT but the data table has no lock provider configured, that's a misconfiguration that will fail confusingly later. The current code does have a checkState(lockProviderClass != null, ...) a few lines below (line ~168 in the new code), so it does throw — but it's a checkState IllegalStateException rather than a more descriptive error. Might be worth confirming that path is hit in this scenario.
| } | ||
| } else { | ||
| // For any custom lock provider, pass through all lock-prefixed properties | ||
| // so provider-specific configs are preserved. |
There was a problem hiding this comment.
can we simplify and do this for all? Is there any case, where this may not work?
btw, how are alternate keys if any are accounted here?
There was a problem hiding this comment.
bcoz, if a new lock provider is added tomorrow, how do we ensure this code is updated?
for eg, I already don't see dynamo db, storage based lock provider not being accounted here.
There was a problem hiding this comment.
Hmm initially I wanted to follow the same approach we are doing here for metrics reporter setup in MDT write config. But I updated PR to go with another approach, adding all configs that are hoodie.lock.* or that are in data table write config but not explictly set by MDT write config. I wasn't sure of another way to handle user-provided lock provider classes (or future lock configs).
To be honest though I would prefer to revert this and do it the original approach, and just correct it to also handle dynamo db and storage based (I missed those initially since I was looking in HoodieLockConfig and didn't see DynamoDbBasedLockConfig). And require future lock providers to also update this code, the same way we already require for metrics reporter.
There was a problem hiding this comment.
🤖 Alternate keys are handled by the prefix-based approach — copyPropsWithPrefix iterates all data table props and copies anything that starts with the provider's dedicated prefix (e.g. hoodie.write.lock.dynamodb., hoodie.write.lock.storage.), so any non-standard keys under that prefix come along automatically. The per-provider dispatch is still needed to select the right prefix.
There was a problem hiding this comment.
🤖 Both are actually accounted for in the current implementation — DynamoDBBasedLockProvider (and the implicit partition key variant) are referenced via the string constants DYNAMODB_BASED_LOCK_PROVIDER_CLASS / DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS (since the class lives in hudi-aws and can't be imported directly), and StorageBasedLockProvider is handled via direct class reference.
There was a problem hiding this comment.
🤖 Good point. Beyond just the maintenance burden of enumerating providers, an alternative approach could be to copy all properties with the hoodie.write.lock. prefix from the data table config, rather than selecting provider-specific prefixes. That way any new lock provider (DynamoDB, storage-based, or future ones) would automatically get its config propagated. The only thing to be careful about is not overwriting MDT-specific settings like base path.
There was a problem hiding this comment.
🤖 This is a great point. The current allowlist approach in deriveLockConfigForDifferentTable is fragile — any new lock provider added to Hudi will hit the else throw HoodieException branch. A more defensive approach could be: copy ALL properties with the hoodie.write.lock. prefix by default, and only reject providers known to derive lock identity from base path (the implicit providers). That way new providers work out of the box unless they have the implicit-identity problem.
There was a problem hiding this comment.
🤖 Exactly right — DynamoDB and StorageBasedLockProvider are handled (StorageBasedLockProvider is rejected, DynamoDB is supported with explicit partition key). But the broader concern stands: any future lock provider will hit the catch-all throw HoodieException instead of working. The allowlist should ideally be inverted into a denylist of known-implicit providers.
| LOG.info("Skipping execution of log compaction on MDT as it is delegated to table service manager."); | ||
| } else { | ||
| writeClient.logCompact(logCompactionInstantTime, true); | ||
| } |
There was a problem hiding this comment.
looks like we are only supporting compaction and log compactions for now.
can we call out in the config property documentation for the new configs added
There was a problem hiding this comment.
Sure updated config documentation
nsivabalan
left a comment
There was a problem hiding this comment.
will review tests once we align on the re-entrancy.
| } | ||
| lockProps.setProperty(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), metadataWriteConcurrencyMode.name()); | ||
| String lockProviderClass = writeConfig.getLockProviderClass(); | ||
| checkState(lockProviderClass != null, "Lock provider class must be set for metadata table"); |
There was a problem hiding this comment.
minor. Lock provider class must be set for data table to enable async executions of table services in metadata table
| } | ||
| lockProps.setProperty(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), metadataWriteConcurrencyMode.name()); | ||
| String lockProviderClass = writeConfig.getLockProviderClass(); | ||
| checkState(lockProviderClass != null, "Lock provider class must be set for metadata table"); |
There was a problem hiding this comment.
also, wondering if we should check that its not InProcessLockProvider as well
| concurrencyMode = WriteConcurrencyMode.SINGLE_WRITER; | ||
| lockConfig = HoodieLockConfig.newBuilder().build(); | ||
| failedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY; | ||
| mergeMetdataLockConfigAtEnd = true; |
There was a problem hiding this comment.
can we validate the lock provider class alone here and fail fast.
|
|
||
| WriteConcurrencyMode concurrencyMode; | ||
| HoodieLockConfig lockConfig; | ||
| final boolean mergeMetdataLockConfigAtEnd; |
There was a problem hiding this comment.
`deriveMetadataLockConfigsFromDataTableConfigs
| TypedProperties dataTableProps = writeConfig.getProps(); | ||
| TypedProperties mdtProps = metadataWriteConfig.getProps(); | ||
| for (String key : dataTableProps.stringPropertyNames()) { | ||
| if (key.startsWith(LockConfiguration.LOCK_PREFIX) || !mdtProps.containsKey(key)) { |
There was a problem hiding this comment.
oh, something that striked me just now. Don't we need to have reentrancy support for all lock providers if we are going w/ this solution?
if not, how can we support this for any LP?
bcoz, the writes to mdt happens w/n data table lock, irrespective of whether we mdt is single writer or not. If its not a single writer, then writes to mdt will take a new set of locks we are configuring.
So, if a given LP does not support re-entrancy, this may go into deadlock right?
There was a problem hiding this comment.
Oh writers to data table should not be setting METADATA_WRITE_CONCURRENCY_MODE , this should only be set by a table service user application that intends to execute compaction plans on the MDT (and does not hold any table lock while executing the plans) . Example usage would be
HoodieBackedTableMetadataWriter metadataTableWriter =
(HoodieBackedTableMetadataWriter)
SparkHoodieBackedTableMetadataWriter.create(
getJavaSparkContext().hadoopConfiguration(),
writeConfig, // User specifies METADATA_WRITE_CONCURRENCY_MODE
dataTableWriteClient.getEngineContext());
metaClient = metadataTableWriter.getMetadataMetaClient();
writeClient = (SparkRDDWriteClient) metadataTableWriter.getWriteClient();
final List<HoodieInstant> pendingCompactionInstants =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants();
for (HoodieInstant pendingCompactionInstant : pendingCompactionInstants) {
writeClient.compact(pendingCompactionInstant.getTimestamp());
I'm open to making a wrapper API in HUDI lib itself for that. But even if we do that, I can't see a way around createMetadataWriteConfig , except maybe creating a new function like createMetadataWriteConfigForTableServiceExecution and then creating a new static helper function in HoodieBackedTableMetadataWriter that uses that API to execute pending table service plans in MDT?
At the end of the day our core problem is that we want a way to have a concurrent writer execute pending compaction plans on MDT while making sure that the data table lock is
- The data table lock is held during the necessary checks (starting heartbeat, transitioning instant states, committing the compaction, etc)
- . . . but not held the whole time during plan execution (since that will block ingestion)
So if there's a more ergonomic way to achieve that (other than this PR) then we should definitely consider it.
There was a problem hiding this comment.
sure. I get it now. can we clarify these in the documentation. ie, "one should not enable or set these configs for regular ingestion writers..... "
| checkState(lockProviderClass != null, "Lock provider class must be set for metadata table"); | ||
| lockProps.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass); | ||
| metadataWriteConfig = HoodieWriteConfig.newBuilder() | ||
| .withProperties(metadataWriteConfig.getProps()) |
There was a problem hiding this comment.
minor. we could use mdtProps
| .defaultValue("") | ||
| .markAdvanced() | ||
| .withDocumentation("Comma-separated list of table service actions on the metadata table " | ||
| + "that should be delegated to the table service manager. Currently supported actions are: compaction, logcompaction."); |
There was a problem hiding this comment.
should we throw if someone sets any other actions?
| */ | ||
| @Override | ||
| void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> writeClient, Option<String> latestDeltaCommitTimeOpt) { | ||
| HoodieTableServiceManagerConfig tsmConfig = metadataWriteConfig.getTableServiceManagerConfig(); |
There was a problem hiding this comment.
what about the latest table version?
There was a problem hiding this comment.
Oh does hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java not cover that? I assumed so since I didn't see any other child class override compactIfNecessary
| // First lets create the MDT write config with default single writer lock configs. | ||
| // Then, once all MDT-specific write configs are set, we can derive lock configs | ||
| // from the data table and re-build the MDT write config with the merged lock config. | ||
| concurrencyMode = WriteConcurrencyMode.SINGLE_WRITER; |
There was a problem hiding this comment.
The concurrency mode must be WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL when isStreamingWritesToMetadataEnabled is enabled no matter whether the concurrency mode of MDT is specified explicitly or not. cc @nsivabalan
There was a problem hiding this comment.
Oh that should still be the case, since https://github.com/apache/hudi/pull/18295/changes#diff-222263167c64d14376f622a020c075f95600cb456409742875925559391226beR161 if metadataWriteConcurrencyMode is set by user to be OCC/NBCC then we anyway fail if isStreamingWritesToMetadataEnabled is enabled. We don't necessarily have to implement it this way, but I thought it would make it easier to follow (see comment #18295 (comment) ) if we enforce this. Since iiuc isStreamingWritesToMetadataEnabled is meant for writes/table services on data table (to allow files in data table/MDT partitions to be written together) and that anyway should not apply to the use case we want to support here: to allow concurrent applications to directly execute a table service plan on MDT
There was a problem hiding this comment.
that anyway should not apply to the use case we want to support here: to allow concurrent applications to directly execute a table service plan on MDT
For the case we want to support in this PR(separate plan execution for compaction & log compaction), the NBCC should work and I'm wondering if we could got some smart inference to avoid confusing errors throwing from conf mismatch.
| checkState(!InProcessLockProvider.class.getCanonicalName().equals(lockProviderClass), | ||
| "InProcessLockProvider cannot be used for metadata table multi-writer mode as it does not support cross-process locking. " | ||
| + "Configure a distributed lock provider on the data table."); | ||
| // First lets create the MDT write config with default single writer lock configs. |
There was a problem hiding this comment.
can we infer the MDT log config in one shot instead of putting the default and override later, the write config props are all visible right?
| Properties lockProps = new Properties(); | ||
| TypedProperties dataTableProps = writeConfig.getProps(); | ||
| TypedProperties mdtProps = metadataWriteConfig.getProps(); | ||
| for (String key : dataTableProps.stringPropertyNames()) { |
There was a problem hiding this comment.
wondering why we need the props override, the mdtProps at line 398 never contains any property with prefix LockConfiguration.LOCK_PREFIX?
There was a problem hiding this comment.
The issue I'm running into, which I'm discussing with @nsivabalan in #18295 , is that we need to "inherit" the lock configs from data table. But we can't directly add all the props from data table, since that will override the non-lock related ones that are explictly set in MDT. Originally when I first raised this PR, I was iterating through lock provider types the same way we already handle all metrics reporter type here. But then the issue is that this would have to be updated anytime we add a new lock config or lock provider
There was a problem hiding this comment.
is that we need to "inherit" the lock configs from data table. But we can't directly add all the props from data table
+1 to this part, I see that the props we want to set up into the MDT config is as following:
- key.startsWith(LockConfiguration.LOCK_PREFIX)
- HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key()
- HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key()
we can prepare this props in one shot from data table write config, and set it in the last step of mdt builder using .withProperties(lockProps).
Two concens here:
- it seems we want to include all the options that are contained in the data table config but not in the metadata table config with check
!mdtProps.containsKey(key), are there any risk that wong config are leaked from DT into MDT and cause unexpected behaviors? looks dangerous though. - get the props from HoodieWriteConfig and rebuild with the props generally works but there is risk for properties lost, we have some POJO configs within the
HoodieWriteConfigthat instantiated within the constructor of theHoodieWriteConfig, those config could infer options that are not included in theHoodieWriteConfigprops.(this is anti-pattern and tech debt though)
There was a problem hiding this comment.
Yeah because of the possible edge cases I noticed and that you called out in the above comment, I am leaning towards instead changing this PR to back to my first attempt - creating a new lock config for MDT write config to use by
iterating through lock provider types the same way we already handle all metrics reporter type here. But then the issue is that this would have to be updated anytime we add a new lock config or lock provider
Since even though that increases the amount of code by a lot (to handle every lock provider case for lock providers implemented in HUDI), at least it will be easier to follow and mainly update.
And for lock provider classes that are not in packages accessible to this class, we resort to the heuristic/hack of putting in all props (from data table write config) with
key.startsWith(LockConfiguration.LOCK_PREFIX)
There was a problem hiding this comment.
Lets brainstorm on this more today.
| if (metadataWriteConcurrencyMode.supportsMultiWriter()) { | ||
| // Configuring Multi-writer directly on metadata table is intended for executing table service plans, not for writes. | ||
| checkState(!isStreamingWritesToMetadataEnabled, | ||
| "Streaming writes to metadata table must be disabled when using multi-writer concurrency mode " |
There was a problem hiding this comment.
NBCC also belongs to multi-writer and it works with isStreamingWritesToMetadataEnabled
There was a problem hiding this comment.
Oh hmm I might be misunderstanding. My understanding was that NBCC is a "type" of multiwriter concurrency mode (like OCC), that can be set regardless of wether streamingWritesToMetadataEnabled is set. But if streamingWritesToMetadataEnabled is set, then concurrency mode must be NBCC (in data table and MDT). So I decided to allow both OCC/NBCC (allow all multiwriter concurrency modes) but explicitly not allow streamingWritesToMetadataEnabled to be enabled in data table config https://github.com/apache/hudi/pull/18295/changes/BASE..cc60d66f90ea53f922b194a249fd76fd2f923533#r2914296574 . Since I figured it might be a bit confusing/misleading, since METADATA_PREFIX + ".write.concurrency.mode is not supposed to be meant for clients writing to the data table
There was a problem hiding this comment.
We have a check method: WriteConcurrencyMode#supportsMultiWriter, OCC and NBCC both means to support multiple writers.
What if the user just set up the MDT concurrency mode as NBCC and also isStreamingWritesToMetadataEnabled as true, it should work for your case since NBCC is more flexible than OCC(the current patch only executes the plans instead of schedule).
| * Only built-in lock providers are supported. | ||
| */ | ||
| @VisibleForTesting | ||
| static HoodieLockConfig buildMdtLockConfig(String lockProviderClass, HoodieWriteConfig writeConfig) { |
There was a problem hiding this comment.
does it makes sense we move this util into HoodieLockConfig
| @Deprecated | ||
| public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PROVIDER_CLASS_NAME.key(); | ||
|
|
||
| // Lock provider class names from modules not directly accessible in hudi-client-common. |
There was a problem hiding this comment.
Instead of introducing lock-provider-specific changes, could we move metadata compaction or any table service into a higher-level API on SparkRDDWriteClient for metadata-table table services?
Since compaction already has explicit scheduling/execution phases, it seems cleaner to encapsulate the metadata-table service flow there and acquire the main-table lock only around the finalization/publication step. That would keep concurrency control at the table-service orchestration layer rather than coupling behavior to individual lock providers.
There was a problem hiding this comment.
+1, since we have the completion time based file slicing, the compaction scheduling and execution and be alway done in a separate job without breaking the semantics.
There was a problem hiding this comment.
Won't we still need this change so that we can create a MDT write client that will implicitly take data table lock when calling compact APIs? Since although in the data table write client we can construct a MDT write client and call compact on it, afaik it would just perform all the execution/completion steps with single_writer mode. Is there a way to work around this?
@nsivabalan what are your thoughts?
There was a problem hiding this comment.
in theory, this might look neat, but in reality, this may not land neatly as we think. I did bring this up surya last week when he proposed this.
anyways, let me clarify. As of now, all touch points to mdt from data table write client is fairly standard.
- txnManager.beginTxn
- mdtWriter = getMedataWriter
- mdtWriter. apply updates
- mdtWriter.close
- txnManager.endTxn
but w/ above proposal, we might end up something like
SparkRDDWriteClient.performAsyncTableServicesInMDT() {
w/o acquiring any data table lock
1. mdtWriter = getMedataWriter
2. Execute compaction for pending compaction instants. // this means that we need to expose apis in mdt writer just for compaction execution, but not completing it.
3. txnManager.beginTxn // data table lock
4. complete the compaction for mdt // again, we need to expose apis in mdt writer just to complete the compaction.
5. txnManager.endTxn // data table lock
}
This does not seem elegant and unnecessarily complicates the layering.
But if you folks have any other better alternative, let us know. Happy to explore.
There was a problem hiding this comment.
This does not seem elegant and unnecessarily complicates the layering.
since we execute MDT table services in a separate job, adding a few APIs on the MDT writer is not that too bad from my side, since the MDT itself is now the main API we are operating on now.
There was a problem hiding this comment.
yeah, that's in my mind too.
There was a problem hiding this comment.
but the issue is, we can't confine those APIs just for the standalone tool right. it will go in w/ the standard write client we have. And hence, anyone can use it otherwise as well.
lets chat through this and take a call. I don't see lot of users might use this feature for now. So, unless we plan to make this a major one, and make it robust(either by adding separate locks for metadata table), don't wanna pollute the write client meant for data table.
There was a problem hiding this comment.
we can mark these APIs as expremental or internal and doc it well, the tests would guard the correctness, a separate compaction job for MDT makes sense in general as the volumn scales to super large, just like what we do to the data table.
There was a problem hiding this comment.
I synced up w/ @yihua on this and generally we feel, adding a new public api to write client will confuse users and we got to be very thorough in adding any such public apis.
my recommendation is to take it up as a follow up. Consider different locks for data table and mdt table or list down all possible approaches, go over pros and cons and then add standalone tool for MDT table services if need be or take the write client approach we feel we have compelling reasons.
There was a problem hiding this comment.
Given that MDT write operations are critical, it is not quite safe to expose new APIs of SparkRDDWriteClient.performAsyncTableServicesInMDT, and users not aware of what they are doing can easily corrupt MDT. So, given that it is an advanced usage, I would suggest avoiding a new public API and leveraging the lock config.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Instead of throwing an error when streaming writes and multi-writer are both configured on the metadata table, silently force streaming writes off since multi-writer mode is for separate table service execution and is not compatible with streaming writes.
|
hey @kbuci : can you take a look at ethan's last set of feedback. |
| + ". The inferred default from table name is not propagated" | ||
| + " when building a lock config for a different table."); | ||
| } | ||
| } else if (DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) { |
There was a problem hiding this comment.
if the data table has DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS, we can retrieve the key and explicitly set it for mdt right?
that way, we don't need to fail here.
Same applies for other implicit lock providers
There was a problem hiding this comment.
We cannot support lock providers that inherit lock keys from table name/basepath , since if we set the table name/basepath in the returned lock config, then that will cause final MDT lock config to have table name/basepath of data table. Which will cause issues
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! The approach of allowing external table service platforms to execute MDT compaction/logcompaction is well-structured, with good validation of lock providers and clear separation between scheduling and execution. The main concern is a pre-existing substring matching bug in HoodieTableServiceManagerConfig.isEnabledAndActionSupported() that breaks the partial delegation scenario this PR enables — please see the inline comments.
| if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) { | ||
| writeClient.runAnyPendingCompactions(); | ||
| ranServices = true; | ||
| if (writeClient.shouldDelegateToTableServiceManager(writeClient.getConfig(), ActionType.compaction)) { |
There was a problem hiding this comment.
🤖 The underlying isEnabledAndActionSupported uses String.contains() for substring matching, which means if the user configures only logcompaction as the delegated action, ActionType.compaction will also match (since "logcompaction".contains("compaction") == true). This breaks the partial delegation scenario. Could you either fix isEnabledAndActionSupported to do exact token matching, or add a note/workaround here?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| EnumSet.of(ActionType.compaction, ActionType.logcompaction); | ||
|
|
||
| public static final ConfigProperty<String> TABLE_SERVICE_MANAGER_ACTIONS = ConfigProperty | ||
| .key(METADATA_PREFIX + ".table.service.manager.actions") |
There was a problem hiding this comment.
🤖 The validateTableServiceManagerActions in the builder only runs when withTableServiceManagerActions() is called explicitly. When configs are loaded via fromProperties() (which is the common path for real deployments), validation is skipped entirely. Could you add validation in the build() method as well?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| checkState(metadataWriteConcurrencyMode == writeConfig.getWriteConcurrencyMode(), | ||
| "If multiwriter is used on metadata table, its concurrency mode (" + metadataWriteConcurrencyMode | ||
| + ") must match the data table concurrency mode (" + writeConfig.getWriteConcurrencyMode() + ")"); | ||
| String lockProviderClass = writeConfig.getLockProviderClass(); |
There was a problem hiding this comment.
🤖 When metadataWriteConcurrencyMode.supportsMultiWriter() is true but the data table is SINGLE_WRITER, this checkState will fail. Is there a clear enough error message for the user here? A user might set hoodie.metadata.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL without realizing the data table also needs multi-writer. Have you considered inferring the data table requirement or improving the error guidance?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue())))); | ||
|
|
||
| String lockProviderClass = writeConfig.getLockProviderClass(); | ||
| if (lockProviderClass == null) { |
There was a problem hiding this comment.
🤖 Agree this should throw rather than silently proceeding. If OCC is requested for MDT but the data table has no lock provider configured, that's a misconfiguration that will fail confusingly later. The current code does have a checkState(lockProviderClass != null, ...) a few lines below (line ~168 in the new code), so it does throw — but it's a checkState IllegalStateException rather than a more descriptive error. Might be worth confirming that path is hit in this scenario.
| } | ||
| } else { | ||
| // For any custom lock provider, pass through all lock-prefixed properties | ||
| // so provider-specific configs are preserved. |
There was a problem hiding this comment.
🤖 This is a great point. The current allowlist approach in deriveLockConfigForDifferentTable is fragile — any new lock provider added to Hudi will hit the else throw HoodieException branch. A more defensive approach could be: copy ALL properties with the hoodie.write.lock. prefix by default, and only reject providers known to derive lock identity from base path (the implicit providers). That way new providers work out of the box unless they have the implicit-identity problem.
| } | ||
| } else { | ||
| // For any custom lock provider, pass through all lock-prefixed properties | ||
| // so provider-specific configs are preserved. |
There was a problem hiding this comment.
🤖 Exactly right — DynamoDB and StorageBasedLockProvider are handled (StorageBasedLockProvider is rejected, DynamoDB is supported with explicit partition key). But the broader concern stands: any future lock provider will hit the catch-all throw HoodieException instead of working. The allowlist should ideally be inverted into a denylist of known-implicit providers.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18295 +/- ##
============================================
+ Coverage 68.84% 68.88% +0.03%
- Complexity 28208 28275 +67
============================================
Files 2460 2464 +4
Lines 135206 135569 +363
Branches 16384 16446 +62
============================================
+ Hits 93089 93382 +293
- Misses 34752 34801 +49
- Partials 7365 7386 +21
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for addressing the feedback! The substring matching bug in isEnabledAndActionSupported is now fixed with proper comma-split token matching (and a dedicated test for the logcompaction-only case), and config validation has been moved into build() so it now catches both fromProperties() misconfigurations and the enabled-with-empty-actions case. Both prior BUG findings are resolved; the remaining QUESTION about UX guidance for concurrency mode mismatch is still open but non-blocking. LGTM on this incremental change.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: Adds clustering-expiration/heartbeat rollback logic, metadata-table service delegation, distributed metric registries, Flink reader refactors with global record-limit push-down, VECTOR nesting validation, compaction/log-compaction scheduling helpers, lock-config derivation, Catalog-backed partition listing, many handler/factory abstractions for Flink table services, and extensive tests and utilities across modules.
Greptile Summary: This PR adds support for safely executing metadata-table (MDT) compaction plans from an external table service platform (rather than inline during writes), along with clustering expiration/rollback via heartbeat-based detection, column pruning for incremental reads, distributed metrics support, log-compaction scheduling improvements, and VECTOR schema validation for nested types.
Key changes:
- MDT table service delegation: New
METADATA_WRITE_CONCURRENCY_MODE,TABLE_SERVICE_MANAGER_ENABLED, andTABLE_SERVICE_MANAGER_ACTIONSconfigs allow MDT compaction/log-compaction to be scheduled but not executed inline, delegating execution to an external table service platform.HoodieBackedTableMetadataWriternow implementsRunsTableService. - Clustering expiration: New
ENABLE_EXPIRATIONS/EXPIRATION_THRESHOLD_MINSconfigs enable LAZY-clean-policy writers to roll back stale pending clustering instants whose heartbeats have expired, usingHoodieClusteringJob.rollbackFailedClusteringForPartitions. - MDT multi-writer lock config:
HoodieLockConfig.deriveLockConfigForDifferentTablederives a lock config for the MDT from the data table's lock provider, with explicit validation and rejection of implicit-path providers. - Column pruning for incremental reads:
IncrementalRelationV1andV2now implementPrunedScaninstead ofTableScan, withIncrementalRelationUtilproviding schema pruning and column-dropping helpers. - Log-compaction gating:
ScheduleCompactionActionExecutor.needLogCompactgates scheduling on delta commits since both the last compaction and the last log compaction. - Distributed metrics:
HoodieEngineContext.getMetricRegistry()overridden inHoodieSparkEngineContextto returnDistributedRegistryinstances. - VECTOR schema validation:
HoodieSchemaandHoodieSparkSchemaConvertersnow reject VECTOR types in nested positions. ParquetReaderIterator: Auto-closes on exhaustion with idempotent close guard.
Greptile Confidence Score: 3/5
PR introduces several independently useful features but has multiple open structural issues that need resolution before merge
Several previous-thread issues remain unresolved (Registry key mismatch, static DISTRIBUTED_REGISTRY_MAP, fail-fast in rollbackFailedClusteringForPartitions, inflightInstantsStream duplication). Two new issues are identified: the hasInstantExpired timezone dead-code affecting non-UTC tables, and a potential PUSH_DOWN_INCR_FILTERS regression from the TableScan→PrunedScan upgrade. The core MDT delegation and lock-config derivation logic are sound, and tests are substantial.
BaseHoodieTableServiceClient.java (timezone bug), IncrementalRelationUtil.scala (PUSH_DOWN_INCR_FILTERS regression), Registry.java (key mismatch), HoodieSparkEngineContext.java (static registry map)
Sequence Diagram (Greptile):
sequenceDiagram
participant DT as Data Table Writer
participant MDT_W as HoodieBackedTableMetadataWriter
participant MDT_C as MDT Write Client
participant TSM as Table Service Manager (External)
DT->>MDT_W: syncDataTableCommit()
MDT_W->>MDT_W: Check pending compactions on MDT timeline
alt shouldDelegateToTableServiceManager(compaction)?
MDT_W->>MDT_C: scheduleCompactionAtInstant()
MDT_C-->>MDT_W: plan scheduled (REQUESTED state)
MDT_W-->>DT: skip inline execution
note over TSM: TSM independently picks up and executes MDT compaction plan
TSM->>MDT_C: compact(instantTime)
else inline execution
MDT_W->>MDT_C: scheduleCompactionAtInstant()
MDT_W->>MDT_C: compact(instantTime)
end
Note over DT,MDT_C: Clustering Expiration Flow
DT->>DT: rollFailedWrites (LAZY policy)
DT->>DT: filterPendingClusteringTimeline()
DT->>DT: isClusteringInstantEligibleForRollback()
alt eligible for rollback
DT->>DT: rollback(clusteringInstant)
end
CodeRabbit: yihua#27 (review)
Greptile: yihua#27 (review)
| + " must be explicitly set when using " + lockProviderClass | ||
| + ". The inferred default from table name is not propagated" | ||
| + " when building a lock config for a different table."); | ||
| } |
There was a problem hiding this comment.
Validate required lock-identity properties as non-blank, not just present.
Current checks use containsKey(...), so empty values pass validation and can still produce invalid lock identities (for example an empty ZK lock key or filesystem lock path).
Proposed fix
- if (!lockProps.containsKey(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass
+ ". Without it, the lock path is derived from the table's base path,"
+ " which would resolve to a different location when building a lock config for a different table.");
}
@@
- if (!lockProps.containsKey(LockConfiguration.ZK_LOCK_KEY_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.ZK_LOCK_KEY_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass
+ ". The inferred default from table name is not propagated"
+ " when building a lock config for a different table.");
}
@@
- if (!lockProps.containsKey(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass);
}
- if (!lockProps.containsKey(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY))) {
throw new IllegalArgumentException(LockConfiguration.HIVE_TABLE_NAME_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass);
}
@@
- if (!lockProps.containsKey(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY)) {
+ if (isBlank(lockProps.getProperty(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY))) {
throw new IllegalArgumentException(DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY
+ " must be explicitly set when using " + lockProviderClass
+ ". The inferred default from table name is not propagated"
+ " when building a lock config for a different table.");
}
@@
private static void copyPropsWithPrefix(TypedProperties source, Properties target, String prefix) {
for (String key : source.stringPropertyNames()) {
if (key.startsWith(prefix)) {
target.setProperty(key, source.getProperty(key));
}
}
}
+
+ private static boolean isBlank(String value) {
+ return value == null || value.trim().isEmpty();
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java`
around lines 308 - 345, The current validations in HoodieLockConfig use
lockProps.containsKey(...) which allows empty values; update each validation
after copyPropsWithPrefix(...) to fetch the value (e.g.,
lockProps.get(LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY)) and throw
IllegalArgumentException when the value is null or blank
(value.trim().isEmpty()), not just when the key is missing. Apply this non-blank
check for the keys LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY,
LockConfiguration.ZK_LOCK_KEY_PROP_KEY,
LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY,
LockConfiguration.HIVE_TABLE_NAME_PROP_KEY and
DYNAMODB_LOCK_PARTITION_KEY_PROP_KEY in the same conditional branches (those
that reference ZookeeperBasedLockProvider,
ZookeeperBasedImplicitBasePathLockProvider,
HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS and
DYNAMODB_BASED_LOCK_PROVIDER_CLASS), leaving copyPropsWithPrefix(...) usage
intact.
— CodeRabbit (original) (source:comment#3095603949)
Describe the issue this Pull Request addresses
Enable support for executing metadata table (MDT) compaction/logcompaction plans from a concurrent writer that operates independently of the primary write path. Additionally, allow users to configure a table service manager to skip inline execution of compaction/logcompaction on the metadata table, so that these operations can be handled by a dedicated async "table service platform".
Summary and Changelog
Summary: Adds configuration-driven support for multi-writer concurrency on the metadata table and table service manager delegation of MDT compaction/logcompaction.
Changelog:
hoodie.metadata.write.concurrency.modeconfig toHoodieMetadataConfigto control the write concurrency mode for the metadata table. When set toOPTIMISTIC_CONCURRENCY_CONTROL, the MDT write config inherits the lock configuration from the data table, enabling a concurrent writer to execute table service plans on the MDT.hoodie.metadata.table.service.manager.enabledandhoodie.metadata.table.service.manager.actionsconfigs toHoodieMetadataConfig, allowing users to delegate specific table service actions (compaction, logcompaction) on the metadata table to an external table service manager.HoodieBackedTableMetadataWriter(table version 8+) andHoodieBackedTableMetadataWriterTableVersionSix.Impact
hoodie.metadata.write.concurrency.mode,hoodie.metadata.table.service.manager.enabled,hoodie.metadata.table.service.manager.actions— all marked as advanced with safe defaults (single writer, TSM disabled).Risk Level
Low. All new behavior is gated behind new configuration properties that default to the existing behavior (single writer, no TSM delegation). Existing write paths are unaffected unless the user explicitly opts in.
Documentation Update
New config descriptions are included in the code. The following configs are added:
hoodie.metadata.write.concurrency.mode— Controls write concurrency mode for the metadata table. Default:SINGLE_WRITER.hoodie.metadata.table.service.manager.enabled— Enables table service manager delegation for the metadata table. Default:false.hoodie.metadata.table.service.manager.actions— Comma-separated list of actions to delegate (e.g.,compaction,logcompaction). Default: empty.Contributor's checklist