[OSS PR #17928] feat: Add ReverseOrderHoodieRecordPayload and configurable ordering behavior#36
Conversation
📝 WalkthroughWalkthroughIntroduces configurable payload ordering behavior and sentinel emission, adds a reverse-order payload implementation and tests, and implements multiple client/config/rollback/metrics/spark/io changes across the codebase (clustering expiration, cleaner/archival limits, rollback refactors, distributed registries, variant shredding, and schema/vector validations). Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
Greptile SummaryThis PR adds a Key changes:
Confidence Score: 4/5Safe to merge with minor cleanup; no logic bugs or breaking changes detected. The core logic is sound: the SENTINEL return correctly aligns with the existing HoodieAvroRecordMerger sentinel check; the reverse-order comparison is correctly inverted for both preCombine and compareOrderingVal; the new property default preserves prior <= semantics for backward compatibility. All issues found are P2 style/documentation nits. TestReverseOrderHoodieRecordPayload.java — inconsistent orderingVal vs ts field values in test constructor args could mask future ordering bugs. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[combineAndGetUpdateValue called] --> B{needUpdatingPersistedRecord?}
B -- No --> C[return Option.of SENTINEL]
B -- Yes --> D{isDeleteRecord?}
D -- Yes --> E[return Option.empty]
D -- No --> F[return incomingRecord]
B --> G[needUpdatingPersistedRecord logic]
G --> H{orderingFields configured?}
H -- No --> I[return true - always update]
H -- Yes --> J[read persistedOrderingVal from currentValue]
J --> K[read incomingOrderingVal from record or orderingVal fallback]
K --> L{incoming empty delete with default ordering?}
L -- Yes --> I
L -- No --> M[read UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD prop default=true]
M --> N[compareOrderingVal]
N --> O{DefaultHoodieRecordPayload}
O --> P{updateOnSameOrderingField}
P -- true --> Q[return persistedVal le incomingVal]
P -- false --> R[return persistedVal lt incomingVal]
N --> S{ReverseOrderHoodieRecordPayload}
S --> T{updateOnSameOrderingField}
T -- true --> U[return persistedVal ge incomingVal]
T -- false --> V[return persistedVal gt incomingVal]
Reviews (1): Last reviewed commit: "Re-applying changes from https://github...." | Re-trigger Greptile |
| ReverseOrderHoodieRecordPayload payload1 = new ReverseOrderHoodieRecordPayload(record1, 1); | ||
| ReverseOrderHoodieRecordPayload payload2 = new ReverseOrderHoodieRecordPayload(record2, 2); |
There was a problem hiding this comment.
Inconsistent
orderingVal vs ts field values in test
payload1 is constructed with orderingVal=1 but its underlying record has ts=0L; payload2 uses orderingVal=2 while its record has ts=1L. This matters because:
preCombineuses the constructororderingVal(1 vs 2).combineAndGetUpdateValuereads thetsfield directly from the record (0 vs 1).
The test passes today because both pairs happen to agree on relative order, but the inconsistency could mask bugs in edge cases (e.g. a tie at one level but not the other). Compare with TestDefaultHoodieRecordPayload.testActiveRecords, which correctly aligns payload1 = new DefaultHoodieRecordPayload(record1, 0L) with record1.ts = 0L.
| ReverseOrderHoodieRecordPayload payload1 = new ReverseOrderHoodieRecordPayload(record1, 1); | |
| ReverseOrderHoodieRecordPayload payload2 = new ReverseOrderHoodieRecordPayload(record2, 2); | |
| ReverseOrderHoodieRecordPayload payload1 = new ReverseOrderHoodieRecordPayload(record1, 0L); | |
| ReverseOrderHoodieRecordPayload payload2 = new ReverseOrderHoodieRecordPayload(record2, 1L); |
| assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent()); | ||
| } | ||
| } | ||
| } No newline at end of file |
There was a problem hiding this comment.
Missing newline at end of file
The file is missing a trailing newline (No newline at end of file in the diff). Most Java style guides and POSIX require files to end with a newline character.
| } | |
| } |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| */ | ||
| public static final String UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD_PROP_KEY = "hoodie.payload.update.on.same.ordering.field"; | ||
|
|
||
| public static final String DEFAULT_UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD_PROP_VALUE = "true"; |
There was a problem hiding this comment.
Missing Javadoc on the default-value constant
DEFAULT_UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD_PROP_VALUE has no Javadoc, making it less discoverable relative to the property key constant just above it. A brief doc explaining what the default means (i.e. same ordering value → update, preserving prior <= semantics) would help.
| public static final String DEFAULT_UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD_PROP_VALUE = "true"; | |
| /** | |
| * Default value for {@link #UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD_PROP_KEY}. | |
| * When {@code true} (the default), records whose ordering field equals the persisted record's | |
| * ordering field are treated as updates — preserving the original {@code <=} comparison semantics. | |
| */ | |
| public static final String DEFAULT_UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD_PROP_VALUE = "true"; |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| * Compares the ordering between persisted entry and input payload. | ||
| * If updateOnSameOrderingField is true, then incoming record is returned when payload ordering field is the same. | ||
| * @param persistedOrderingVal record present in Disk | ||
| * @param incomingOrderingVal record part of input payload | ||
| * @return true if the older record(persisted entry) is older than incoming record. |
There was a problem hiding this comment.
Javadoc
@return description is misleading for subclasses
The @return tag reads "true if the older record(persisted entry) is older than incoming record" — this is a tautology ("older … is older"). More importantly, for ReverseOrderHoodieRecordPayload the override has the opposite semantics (returns true when the persisted entry is newer), so the inherited doc actively misleads readers of that subclass.
Suggested improvement:
| * Compares the ordering between persisted entry and input payload. | |
| * If updateOnSameOrderingField is true, then incoming record is returned when payload ordering field is the same. | |
| * @param persistedOrderingVal record present in Disk | |
| * @param incomingOrderingVal record part of input payload | |
| * @return true if the older record(persisted entry) is older than incoming record. | |
| /** | |
| * Compares the ordering between the persisted entry and the incoming payload to decide | |
| * whether the persisted record should be replaced. | |
| * If {@code updateOnSameOrderingField} is {@code true}, a tie is treated as an update. | |
| * | |
| * @param persistedOrderingVal ordering value of the record currently on disk | |
| * @param incomingOrderingVal ordering value of the incoming record | |
| * @param updateOnSameOrderingField if {@code true}, equal ordering values are treated as an update | |
| * @return {@code true} if the incoming record should replace the persisted record | |
| */ |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
hudi-common/src/test/java/org/apache/hudi/common/model/TestReverseOrderHoodieRecordPayload.java (1)
59-129: Add an equality-case test for reverse-order tie handling.
ReverseOrderHoodieRecordPayload.compareOrderingVal(...)has distinct behavior forcompareVal == 0based onhoodie.payload.update.on.same.ordering.field, but this suite only exercises strictly lower/higher ordering values. A regression in the reverse-order tie path would still pass.➕ Suggested coverage
+ `@ParameterizedTest` + `@ValueSource`(booleans = {false, true}) + public void testEqualOrderingRespectsUpdateOnSameOrderingField(boolean updateOnSame) throws IOException { + props.setProperty(HoodiePayloadProps.UPDATE_ON_SAME_PAYLOAD_ORDERING_FIELD_PROP_KEY, String.valueOf(updateOnSame)); + + GenericRecord currentRecord = new GenericData.Record(schema); + currentRecord.put("id", "1"); + currentRecord.put("partition", "partition0"); + currentRecord.put("ts", 100L); + currentRecord.put("_hoodie_is_deleted", false); + + GenericRecord incomingRecord = new GenericData.Record(schema); + incomingRecord.put("id", "1"); + incomingRecord.put("partition", "partition0"); + incomingRecord.put("ts", 100L); + incomingRecord.put("_hoodie_is_deleted", false); + + ReverseOrderHoodieRecordPayload payload = new ReverseOrderHoodieRecordPayload(incomingRecord, 100L); + if (updateOnSame) { + assertEquals(incomingRecord, payload.combineAndGetUpdateValue(currentRecord, schema, props).get()); + } else { + assertEquals(SENTINEL, payload.combineAndGetUpdateValue(currentRecord, schema, props).get()); + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-common/src/test/java/org/apache/hudi/common/model/TestReverseOrderHoodieRecordPayload.java` around lines 59 - 129, Add a new unit test in TestReverseOrderHoodieRecordPayload that exercises the tie (compareVal == 0) path by creating two GenericRecord instances with identical ordering field values and constructing ReverseOrderHoodieRecordPayload instances for them; toggle the configuration property "hoodie.payload.update.on.same.ordering.field" (true and false) via OrderingFieldsTestUtils.setOrderingFieldsConfig(props, ...) and assert the observed behavior via payload.preCombine(...) and payload.combineAndGetUpdateValue(...), thereby validating ReverseOrderHoodieRecordPayload.compareOrderingVal(...) tie handling under both config values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@hudi-common/src/test/java/org/apache/hudi/common/model/TestReverseOrderHoodieRecordPayload.java`:
- Around line 59-129: Add a new unit test in TestReverseOrderHoodieRecordPayload
that exercises the tie (compareVal == 0) path by creating two GenericRecord
instances with identical ordering field values and constructing
ReverseOrderHoodieRecordPayload instances for them; toggle the configuration
property "hoodie.payload.update.on.same.ordering.field" (true and false) via
OrderingFieldsTestUtils.setOrderingFieldsConfig(props, ...) and assert the
observed behavior via payload.preCombine(...) and
payload.combineAndGetUpdateValue(...), thereby validating
ReverseOrderHoodieRecordPayload.compareOrderingVal(...) tie handling under both
config values.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d40f8d8f-bf70-4a79-a50c-9bc8b20d6616
📒 Files selected for processing (5)
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.javahudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.javahudi-common/src/main/java/org/apache/hudi/common/model/ReverseOrderHoodieRecordPayload.javahudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.javahudi-common/src/test/java/org/apache/hudi/common/model/TestReverseOrderHoodieRecordPayload.java
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java (1)
163-170:⚠️ Potential issue | 🟠 MajorCarry
MAX_COMMITS_TO_CLEANinto the derived MDT cleaner config.Line 168 forwards the clean trigger threshold, but the new per-run clean cap is dropped. If the data table is configured to bound cleaner work, metadata-table cleaning still runs unbounded and can become the long pole on the write path.
Proposed fix
HoodieCleanConfig.Builder cleanConfigBuilder = HoodieCleanConfig.newBuilder() .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN) .withAutoClean(false) .withCleanerParallelism(MDT_DEFAULT_PARALLELISM) .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy) .withMaxCommitsBeforeCleaning(writeConfig.getCleanTriggerMaxCommits()) + .withMaxCommitsToClean(writeConfig.getMaxCommitsToClean()) .withCleanOptimizationWithLocalEngineEnabled( writeConfig.isCleanOptimizationWithLocalEngineEnabled());🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java` around lines 163 - 170, The derived metadata-table cleaner config is missing the per-run cap (MAX_COMMITS_TO_CLEAN) so cleaning can run unbounded; update the HoodieCleanConfig.Builder used in HoodieMetadataWriteUtils to also set the per-run cap from the write config (e.g., call the builder method that sets MAX_COMMITS_TO_CLEAN with writeConfig.getMaxCommitsToClean() or the corresponding getter) in addition to the existing withMaxCommitsBeforeCleaning(...) so the MDT honors the same max-commits-to-clean bound as the data table.
🧹 Nitpick comments (8)
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java (1)
124-130: Strengthen Line 129 assertion to verifythirdCommitis actually represented.Using only map-size comparison can still pass when
thirdCommitdata is missing. Add a content assertion that at least one returned commit time equalsthirdCommit.✅ Proposed test hardening
assertNotNull(allLogFiles); + boolean includesThirdCommit = allLogFiles.values().stream() + .flatMap(List::stream) + .anyMatch(commitTime -> commitTime.equals(thirdCommit)); + assertTrue(includesThirdCommit, + "Expected at least one log file associated with third commit"); assertTrue(allLogFiles.size() >= logFilesWithMaxCommit.size(), "Should have same or more log files when including third commit");🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java` around lines 124 - 130, The assertion only checks map size and can miss that thirdCommit is actually represented; update the test in TestLogReaderUtils after calling LogReaderUtils.getAllLogFilesWithMaxCommit(metaClient, fsView, partitions, thirdCommit, context()) to also assert that at least one entry in the returned allLogFiles contains a log file or commit time equal to thirdCommit (e.g., iterate allLogFiles.keySet() or values to confirm a commit time matches thirdCommit), ensuring thirdCommit is present in the returned results in addition to the size check.hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java (1)
160-161: Minor typo in comment.Double period at end of line 161: "...without changing the _hoodie_commit_time field.."
📝 Suggested fix
- * so current value in disk is rewritten as it is without changing the _hoodie_commit_time field.. + * so current value in disk is rewritten as it is without changing the _hoodie_commit_time field.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java` around lines 160 - 161, Fix the minor punctuation typo in the Javadoc comment inside the DefaultHoodieRecordPayload class: find the comment block that ends with "...without changing the _hoodie_commit_time field.." and change the trailing double period to a single period so it reads "...without changing the _hoodie_commit_time field.".hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java (1)
573-591: Shredded variant field writers lack HoodieSchema for timestamp precision.The field writers for the shredded struct are created with
nullHoodieSchema (line 577). This means nested timestamps withintyped_valuewill use default micros precision rather than the precision specified in the original schema.If the original Variant's
typed_valuecontains timestamp fields with millis precision, they would be incorrectly written as micros.Consider propagating the HoodieSchema for the shredded struct fields:
private ValueWriter makeShreddedVariantWriter(StructType shreddedStructType) { - // Create writers for the shredded struct fields - // The shreddedStructType contains: metadata (binary), value (binary), typed_value (optional) - ValueWriter[] shreddedFieldWriters = Arrays.stream(shreddedStructType.fields()) - .map(field -> makeWriter(null, field.dataType())) - .toArray(ValueWriter[]::new); + // Note: We pass null for HoodieSchema since the shredded schema is generated + // dynamically and typed_value fields are inferred from Spark types. + // This means timestamp precision defaults to micros. + // TODO: Consider extracting HoodieSchema for typed_value fields if millis precision is needed. + ValueWriter[] shreddedFieldWriters = Arrays.stream(shreddedStructType.fields()) + .map(field -> makeWriter(null, field.dataType())) + .toArray(ValueWriter[]::new);At minimum, add a comment explaining this limitation. If millis-precision timestamps in shredded Variants are a supported use case, the schema information should be threaded through.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java` around lines 573 - 591, In makeShreddedVariantWriter, the shreddedFieldWriters are created by calling makeWriter(null, field.dataType()), which loses HoodieSchema and causes timestamp precision to default to micros; update makeShreddedVariantWriter to obtain or construct the correct HoodieSchema for the shreddedStructType/typed_value and pass that schema into makeWriter for each field (i.e., replace the null with the appropriate HoodieSchema or thread the original Variant schema into createShreddedVariantWriter so nested timestamp precision is preserved), or if millis-precision shredded Variants are not supported, add a clear comment in makeShreddedVariantWriter documenting the limitation and why null is used.docker/build_docker_images.sh (2)
108-112: Quote--build-argassignments to avoid shell splitting/globbingLine 109–Line 111 use unquoted expansions (
SC2086). Quoting removes word-splitting/globbing risk and clears lint warnings.Proposed fix
if ! docker build \ - --build-arg HADOOP_VERSION=${HADOOP_VERSION} \ - --build-arg SPARK_VERSION=${SPARK_VERSION} \ - --build-arg HIVE_VERSION=${HIVE_VERSION} \ + --build-arg "HADOOP_VERSION=${HADOOP_VERSION}" \ + --build-arg "SPARK_VERSION=${SPARK_VERSION}" \ + --build-arg "HIVE_VERSION=${HIVE_VERSION}" \ "$IMAGE_CONTEXT" -t "$TAG_LATEST" -t "$TAG_VERSIONED"; then🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker/build_docker_images.sh` around lines 108 - 112, The docker build invocation uses unquoted build-arg expansions which can cause word-splitting/globbing; update the docker build command so each build-arg is quoted (e.g., --build-arg HADOOP_VERSION="${HADOOP_VERSION}", --build-arg SPARK_VERSION="${SPARK_VERSION}", --build-arg HIVE_VERSION="${HIVE_VERSION}") while keeping the existing quoted IMAGE_CONTEXT and tags, then re-run the script to verify the shellcheck SC2086 warning is resolved.
70-82: ValidateVERSION_TAGformat before using it in image tagsLine 71–Line 81 set
VERSION_TAG, but there’s no Docker tag format check. Invalid values fail later duringdocker buildwith less actionable errors.Proposed fix
if [ -n "$VERSION_TAG_ARG" ]; then VERSION_TAG="$VERSION_TAG_ARG" else @@ fi fi + +if [[ ! "$VERSION_TAG" =~ ^[A-Za-z0-9_][A-Za-z0-9_.-]{0,127}$ ]]; then + echo "Error: Invalid Docker tag '$VERSION_TAG'" >&2 + exit 1 +fi🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker/build_docker_images.sh` around lines 70 - 82, After computing VERSION_TAG (set from VERSION_TAG_ARG or get_hudi_project_version) add a validation step to ensure the value is a valid Docker tag: check it matches allowed characters (e.g. regex /^[A-Za-z0-9_.-]+$/) and is within Docker's length limit (<=128); if the check fails print a clear error to stderr and exit 1 so docker build won't run with an invalid tag. Insert this validation immediately after VERSION_TAG is assigned in build_docker_images.sh (the block that calls get_hudi_project_version) and use the existing variable names (VERSION_TAG, VERSION_TAG_ARG, get_hudi_project_version) in the message to aid debugging.hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java (1)
109-117: Fail fast on any unsupported V8+ log-block rollback request.If this helper is supposed to reject
logBlocksToBeDeleted, validate that before branching. Right now the guard only runs whenfilesToBeDeletedis empty, so a mixed request would delete the files and silently drop the log-block work.Possible cleanup
List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineContext context, HoodieInstant instantToRollback, List<SerializableHoodieRollbackRequest> rollbackRequests, boolean doDelete, int numPartitions) { return context.flatMap(rollbackRequests, (SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String, HoodieRollbackStat>>>) rollbackRequest -> { + checkArgument(rollbackRequest.getLogBlocksToBeDeleted().isEmpty(), + "V8+ rollback should not have logBlocksToBeDeleted, but found for partition: " + + rollbackRequest.getPartitionPath() + ", fileId: " + rollbackRequest.getFileId()); List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted(); if (!filesToBeDeleted.isEmpty()) { List<HoodieRollbackStat> rollbackStats = deleteFiles(metaClient, filesToBeDeleted, doDelete); return rollbackStats.stream().map(entry -> Pair.of(entry.getPartitionPath(), entry)); } else { - checkArgument(rollbackRequest.getLogBlocksToBeDeleted().isEmpty(), - "V8+ rollback should not have logBlocksToBeDeleted, but found for partition: " - + rollbackRequest.getPartitionPath() + ", fileId: " + rollbackRequest.getFileId()); return Stream.of( Pair.of(rollbackRequest.getPartitionPath(), HoodieRollbackStat.newBuilder()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java` around lines 109 - 117, The current logic only checks for unsupported V8+ log-block rollbacks when filesToBeDeleted is empty, allowing mixed requests to proceed and delete files; move or add a pre-check on SerializableHoodieRollbackRequest.getLogBlocksToBeDeleted() at the start of the lambda used in context.flatMap so any non-empty logBlocksToBeDeleted immediately triggers the checkArgument/exception before calling deleteFiles(metaClient, filesToBeDeleted, doDelete) or mapping results; ensure you reference SerializableHoodieRollbackRequest.getFilesToBeDeleted(), getLogBlocksToBeDeleted(), deleteFiles(...) and the existing checkArgument to enforce failing fast for any log-block rollback entries.hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java (2)
161-168: Consider adding defensive error handling for empty timeline.Line 163 calls
lastInstant().get()which will throwNoSuchElementExceptionif the timeline is empty. While the timeline should always have instants during rollback, a descriptive exception would aid debugging:metaClient.getActiveTimeline().lastInstant() .orElseThrow(() -> new HoodieRollbackException("Cannot generate rollback header: active timeline has no instants")) .requestedTime()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java` around lines 161 - 168, generateHeader currently calls metaClient.getActiveTimeline().lastInstant().get().requestedTime() which will throw NoSuchElementException if the timeline is empty; replace the unsafe get() with a defensive check that throws a clearer HoodieRollbackException (or appropriate runtime exception) when lastInstant() is empty so the error message explains "Cannot generate rollback header: active timeline has no instants", then use the requestedTime() from the present instant to populate the HEADER_INSTANT_TIME entry in generateHeader.
314-338: Consider consolidating write token setting.The write token is set twice: once on line 317 with a new token, then potentially overwritten on line 335 with the precomputed token. While this works correctly (the precomputed token triggers rollover), the intent could be clearer:
♻️ Suggested refactor for clarity
HoodieLogFormat.WriterBuilder writerBuilder = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), partitionPath)) .withFileId(fileId) - .withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier)) .withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) ? instantToRollback.requestedTime() : rollbackRequest.getLatestBaseInstant() ) .withStorage(metaClient.getStorage()) .withFileCreationCallback(getRollbackLogMarkerCallback(writeMarkers, partitionPath, fileId)) .withTableVersion(tableVersion) .withFileExtension(HoodieLogFile.DELTA_EXTENSION); String logVersionKey = logVersionLookupKey(partitionPath, fileId, rollbackRequest.getLatestBaseInstant()); Pair<Integer, String> preComputedVersion = logVersionMap.get(logVersionKey); if (preComputedVersion != null) { + // Use precomputed version/token to trigger rollover to N+1 writerBuilder.withLogVersion(preComputedVersion.getLeft()) .withLogWriteToken(preComputedVersion.getRight()); + } else { + // Fall back to generating a new token; build() will list for latest version + writerBuilder.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java` around lines 314 - 338, The code currently calls withLogWriteToken twice on writerBuilder (once unconditionally with CommonClientUtils.generateWriteToken(taskContextSupplier) and again when preComputedVersion != null), which is confusing; change it so the write token is set exactly once: remove the unconditional withLogWriteToken(...) from the initial writerBuilder chain and, after computing preComputedVersion via logVersionLookupKey/logVersionMap, call writerBuilder.withLogWriteToken(preComputedVersion.getRight()) when preComputedVersion != null, otherwise call writerBuilder.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier)); keep all other builder calls (e.g., withInstantTime, withLogVersion) unchanged so semantics remain identical.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docker/build_docker_images.sh`:
- Around line 39-49: The argument parsing loop currently assigns $2 blindly to
HADOOP_VERSION, SPARK_VERSION, HIVE_VERSION, and VERSION_TAG_ARG; modify the
case branches in the while loop to validate that a next argument exists and is
not another flag (e.g., does not start with '-') before assigning and shifting:
for each option (--hadoop-version, --spark-version, --hive-version,
--version-tag) check that "$2" is present and not a flag, otherwise print a
clear error and exit; keep the overall while/case structure and variable names
(HADOOP_VERSION, SPARK_VERSION, HIVE_VERSION, VERSION_TAG_ARG) and ensure shifts
are only performed after successful validation.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`:
- Around line 739-742: The heartbeat for clustering is only started in the
scheduling branch (where heartbeatClient.start(instantTime) is called), so
recovered or externally scheduled runs that go straight to cluster(...) never
start the heartbeat and can be rolled back by expiration cleaning; move
heartbeat ownership into the clustering execution path by starting the heartbeat
at the beginning of the cluster(...) method when
config.isExpirationOfClusteringEnabled() is true (instead of in the scheduling
branch) and ensure heartbeatClient.stop() is invoked in a finally block around
the execution/commit in cluster(...) so the heartbeat is always stopped
regardless of success or failure.
- Around line 1210-1215: hasInstantExpired currently parses instantTime with
HoodieInstantTimeGenerator.parseDateFromInstantTime which uses
ZoneId.systemDefault(), causing timezone mismatch with
metaClient.getTableConfig().getTimelineTimezone().getZoneId(); fix by changing
parseDateFromInstantTime to accept a ZoneId (or add an overload that does) and
use that ZoneId when parsing, then update hasInstantExpired to call the new
parse method with metaClient.getTableConfig().getTimelineTimezone().getZoneId();
ensure any other callers of parseDateFromInstantTime are updated to pass the
appropriate timezone or kept working via the existing default overload.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java`:
- Around line 279-285: HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS
currently allows negative values; add validation to reject negatives where the
property is parsed/used (e.g., in the HoodieClusteringConfig builder/constructor
or the code path that retrieves EXPIRATION_THRESHOLD_MINS) by checking the long
value returned by EXPIRATION_THRESHOLD_MINS.get() (or the config accessor you
use) and throwing an IllegalArgumentException (or using ConfigProperty
validation) when value < 0, ensuring any callers (builder methods) surface a
clear error message like "hoodie.clustering.expiration.threshold.mins must be >=
0".
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java`:
- Around line 159-187: The keys in logBlocksToBeDeleted must be normalized to
filename-only; update the branch that handles latestLogFile (inside
MarkerBasedRollbackStrategy) to use the log file name instead of the full path
(e.g., use latestLogFile.getPathInfo().getPath().getName() or equivalent) so it
matches the other branch that uses logFileToRollback.getPath().getName(); ensure
the value remains the path length and keep variable names (logBlocksToBeDeleted,
latestLogFile, logFileToRollback) unchanged.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`:
- Around line 92-97: The static DISTRIBUTED_REGISTRY_MAP causes registries to
leak across multiple HoodieSparkEngineContext/JVM Spark applications; change it
from a static field to be scoped per Spark context (e.g., make it an instance
field on HoodieSparkEngineContext or a map keyed by the JavaSparkContext/appId)
and update getMetricRegistry() and DistributedRegistry creation to use the
per-context map; additionally ensure registries are removed/closed when the
Spark context stops (hook into context stop/close or provide a dispose method)
so accumulators/registries don't persist across applications; apply the same
change for the other static registry cache referenced around the 281-288 region.
In
`@hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java`:
- Around line 310-312: The test pins HoodieTableConfig.VERSION to SIX but still
uses the default tableType (COPY_ON_WRITE), so switch the test to use
MERGE_ON_READ to exercise the append-marker path; update the setup where
initMetaClient(tableType, props) is called by setting tableType to
HoodieTableType.MERGE_ON_READ (or equivalent enum/constants used in this test
class) before calling initMetaClient in TestMarkerBasedRollbackStrategy so the
append-marker behavior runs under the production-valid MOR configuration.
In
`@hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java`:
- Around line 198-209: The compareOrderingVal method lacks a null check for
incomingOrderingVal; add a guard at the start of compareOrderingVal (in class
DefaultHoodieRecordPayload) that returns false when incomingOrderingVal is null
(i.e., if persistedOrderingVal != null and incomingOrderingVal == null, do not
update), then proceed with the existing compareTo logic and
updateOnSameOrderingField handling; reference compareOrderingVal and callers
such as needUpdatingPersistedRecord when making this defensive change.
---
Outside diff comments:
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java`:
- Around line 163-170: The derived metadata-table cleaner config is missing the
per-run cap (MAX_COMMITS_TO_CLEAN) so cleaning can run unbounded; update the
HoodieCleanConfig.Builder used in HoodieMetadataWriteUtils to also set the
per-run cap from the write config (e.g., call the builder method that sets
MAX_COMMITS_TO_CLEAN with writeConfig.getMaxCommitsToClean() or the
corresponding getter) in addition to the existing
withMaxCommitsBeforeCleaning(...) so the MDT honors the same
max-commits-to-clean bound as the data table.
---
Nitpick comments:
In `@docker/build_docker_images.sh`:
- Around line 108-112: The docker build invocation uses unquoted build-arg
expansions which can cause word-splitting/globbing; update the docker build
command so each build-arg is quoted (e.g., --build-arg
HADOOP_VERSION="${HADOOP_VERSION}", --build-arg
SPARK_VERSION="${SPARK_VERSION}", --build-arg HIVE_VERSION="${HIVE_VERSION}")
while keeping the existing quoted IMAGE_CONTEXT and tags, then re-run the script
to verify the shellcheck SC2086 warning is resolved.
- Around line 70-82: After computing VERSION_TAG (set from VERSION_TAG_ARG or
get_hudi_project_version) add a validation step to ensure the value is a valid
Docker tag: check it matches allowed characters (e.g. regex /^[A-Za-z0-9_.-]+$/)
and is within Docker's length limit (<=128); if the check fails print a clear
error to stderr and exit 1 so docker build won't run with an invalid tag. Insert
this validation immediately after VERSION_TAG is assigned in
build_docker_images.sh (the block that calls get_hudi_project_version) and use
the existing variable names (VERSION_TAG, VERSION_TAG_ARG,
get_hudi_project_version) in the message to aid debugging.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java`:
- Around line 109-117: The current logic only checks for unsupported V8+
log-block rollbacks when filesToBeDeleted is empty, allowing mixed requests to
proceed and delete files; move or add a pre-check on
SerializableHoodieRollbackRequest.getLogBlocksToBeDeleted() at the start of the
lambda used in context.flatMap so any non-empty logBlocksToBeDeleted immediately
triggers the checkArgument/exception before calling deleteFiles(metaClient,
filesToBeDeleted, doDelete) or mapping results; ensure you reference
SerializableHoodieRollbackRequest.getFilesToBeDeleted(),
getLogBlocksToBeDeleted(), deleteFiles(...) and the existing checkArgument to
enforce failing fast for any log-block rollback entries.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java`:
- Around line 161-168: generateHeader currently calls
metaClient.getActiveTimeline().lastInstant().get().requestedTime() which will
throw NoSuchElementException if the timeline is empty; replace the unsafe get()
with a defensive check that throws a clearer HoodieRollbackException (or
appropriate runtime exception) when lastInstant() is empty so the error message
explains "Cannot generate rollback header: active timeline has no instants",
then use the requestedTime() from the present instant to populate the
HEADER_INSTANT_TIME entry in generateHeader.
- Around line 314-338: The code currently calls withLogWriteToken twice on
writerBuilder (once unconditionally with
CommonClientUtils.generateWriteToken(taskContextSupplier) and again when
preComputedVersion != null), which is confusing; change it so the write token is
set exactly once: remove the unconditional withLogWriteToken(...) from the
initial writerBuilder chain and, after computing preComputedVersion via
logVersionLookupKey/logVersionMap, call
writerBuilder.withLogWriteToken(preComputedVersion.getRight()) when
preComputedVersion != null, otherwise call
writerBuilder.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier));
keep all other builder calls (e.g., withInstantTime, withLogVersion) unchanged
so semantics remain identical.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java`:
- Around line 573-591: In makeShreddedVariantWriter, the shreddedFieldWriters
are created by calling makeWriter(null, field.dataType()), which loses
HoodieSchema and causes timestamp precision to default to micros; update
makeShreddedVariantWriter to obtain or construct the correct HoodieSchema for
the shreddedStructType/typed_value and pass that schema into makeWriter for each
field (i.e., replace the null with the appropriate HoodieSchema or thread the
original Variant schema into createShreddedVariantWriter so nested timestamp
precision is preserved), or if millis-precision shredded Variants are not
supported, add a clear comment in makeShreddedVariantWriter documenting the
limitation and why null is used.
In
`@hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java`:
- Around line 124-130: The assertion only checks map size and can miss that
thirdCommit is actually represented; update the test in TestLogReaderUtils after
calling LogReaderUtils.getAllLogFilesWithMaxCommit(metaClient, fsView,
partitions, thirdCommit, context()) to also assert that at least one entry in
the returned allLogFiles contains a log file or commit time equal to thirdCommit
(e.g., iterate allLogFiles.keySet() or values to confirm a commit time matches
thirdCommit), ensuring thirdCommit is present in the returned results in
addition to the size check.
In
`@hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java`:
- Around line 160-161: Fix the minor punctuation typo in the Javadoc comment
inside the DefaultHoodieRecordPayload class: find the comment block that ends
with "...without changing the _hoodie_commit_time field.." and change the
trailing double period to a single period so it reads "...without changing the
_hoodie_commit_time field.".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e905b2c1-032f-4389-80fb-028db6559f8b
📒 Files selected for processing (42)
docker/build_docker_images.shhudi-cli/src/main/java/org/apache/hudi/cli/utils/InputStreamConsumer.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalUtils.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperFactory.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.javahudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/DistributedRegistryUtil.javahudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.javahudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.javahudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.javahudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackHelper.javahudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.javahudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.javahudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scalahudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scalahudi-client/hudi-spark-client/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.javahudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metrics/TestDistributedRegistry.javahudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.javahudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.javahudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.javahudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.javahudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.javahudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.javahudi-common/src/main/java/org/apache/hudi/common/model/ReverseOrderHoodieRecordPayload.javahudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
💤 Files with no reviewable changes (2)
- hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
- hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
✅ Files skipped from review due to trivial changes (2)
- hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
- hudi-cli/src/main/java/org/apache/hudi/cli/utils/InputStreamConsumer.java
| # Parse command-line arguments | ||
| while [[ "$#" -gt 0 ]]; do | ||
| case $1 in | ||
| --hadoop-version) HADOOP_VERSION="$2"; shift ;; | ||
| --spark-version) SPARK_VERSION="$2"; shift ;; | ||
| --hive-version) HIVE_VERSION="$2"; shift ;; | ||
| --version-tag) VERSION_TAG_ARG="$2"; shift ;; | ||
| *) echo "Unknown parameter passed: $1"; exit 1 ;; | ||
| esac | ||
| shift | ||
| done |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Examine the docker/build_docker_images.sh file structure
head -100 docker/build_docker_images.sh | tail -80Repository: yihua/hudi
Length of output: 2935
🏁 Script executed:
# Get exact line counts and view around lines 39-49
wc -l docker/build_docker_images.sh
sed -n '30,60p' docker/build_docker_images.shRepository: yihua/hudi
Length of output: 919
🏁 Script executed:
# Check if there are other tests or usage examples
find . -name "*.sh" -type f -exec grep -l "build_docker_images" {} \;Repository: yihua/hudi
Length of output: 85
Validate option values before assignment and shifting
The argument parsing at lines 42–46 assigns $2 without checking it exists or validating it's not another flag. Calling --hadoop-version --spark-version 3.5.3 sets HADOOP_VERSION="--spark-version" and causes a confusing error on the remaining argument. Calling --hadoop-version (no value) silently sets an empty version, overwriting defaults.
Proposed fix
-while [[ "$#" -gt 0 ]]; do
- case $1 in
- --hadoop-version) HADOOP_VERSION="$2"; shift ;;
- --spark-version) SPARK_VERSION="$2"; shift ;;
- --hive-version) HIVE_VERSION="$2"; shift ;;
- --version-tag) VERSION_TAG_ARG="$2"; shift ;;
- *) echo "Unknown parameter passed: $1"; exit 1 ;;
- esac
- shift
-done
+while [[ "$#" -gt 0 ]]; do
+ case "$1" in
+ --hadoop-version|--spark-version|--hive-version|--version-tag)
+ if [[ $# -lt 2 || "$2" == --* ]]; then
+ echo "Error: Missing value for $1" >&2
+ exit 1
+ fi
+ case "$1" in
+ --hadoop-version) HADOOP_VERSION="$2" ;;
+ --spark-version) SPARK_VERSION="$2" ;;
+ --hive-version) HIVE_VERSION="$2" ;;
+ --version-tag) VERSION_TAG_ARG="$2" ;;
+ esac
+ shift 2
+ ;;
+ *)
+ echo "Unknown parameter passed: $1" >&2
+ exit 1
+ ;;
+ esac
+done📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Parse command-line arguments | |
| while [[ "$#" -gt 0 ]]; do | |
| case $1 in | |
| --hadoop-version) HADOOP_VERSION="$2"; shift ;; | |
| --spark-version) SPARK_VERSION="$2"; shift ;; | |
| --hive-version) HIVE_VERSION="$2"; shift ;; | |
| --version-tag) VERSION_TAG_ARG="$2"; shift ;; | |
| *) echo "Unknown parameter passed: $1"; exit 1 ;; | |
| esac | |
| shift | |
| done | |
| # Parse command-line arguments | |
| while [[ "$#" -gt 0 ]]; do | |
| case "$1" in | |
| --hadoop-version|--spark-version|--hive-version|--version-tag) | |
| if [[ $# -lt 2 || "$2" == --* ]]; then | |
| echo "Error: Missing value for $1" >&2 | |
| exit 1 | |
| fi | |
| case "$1" in | |
| --hadoop-version) HADOOP_VERSION="$2" ;; | |
| --spark-version) SPARK_VERSION="$2" ;; | |
| --hive-version) HIVE_VERSION="$2" ;; | |
| --version-tag) VERSION_TAG_ARG="$2" ;; | |
| esac | |
| shift 2 | |
| ;; | |
| *) | |
| echo "Unknown parameter passed: $1" >&2 | |
| exit 1 | |
| ;; | |
| esac | |
| done |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docker/build_docker_images.sh` around lines 39 - 49, The argument parsing
loop currently assigns $2 blindly to HADOOP_VERSION, SPARK_VERSION,
HIVE_VERSION, and VERSION_TAG_ARG; modify the case branches in the while loop to
validate that a next argument exists and is not another flag (e.g., does not
start with '-') before assigning and shifting: for each option
(--hadoop-version, --spark-version, --hive-version, --version-tag) check that
"$2" is present and not a flag, otherwise print a clear error and exit; keep the
overall while/case structure and variable names (HADOOP_VERSION, SPARK_VERSION,
HIVE_VERSION, VERSION_TAG_ARG) and ensure shifts are only performed after
successful validation.
| if (option.isPresent() && config.isExpirationOfClusteringEnabled()) { | ||
| heartbeatClient.start(instantTime); | ||
| log.info("Started heartbeat for clustering instant {}", instantTime); | ||
| } |
There was a problem hiding this comment.
Start the clustering heartbeat from execution, not just scheduling.
Recovered or externally scheduled clustering instants can reach cluster(...) without passing through this branch in the same process. Those runs never call heartbeatClient.start(...), so the new expiration-based lazy cleanup can roll back an actively running clustering once the instant ages past the threshold. Move heartbeat ownership into cluster(...) and stop it from a finally block around execution/commit.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 739 - 742, The heartbeat for clustering is only started in the
scheduling branch (where heartbeatClient.start(instantTime) is called), so
recovered or externally scheduled runs that go straight to cluster(...) never
start the heartbeat and can be rolled back by expiration cleaning; move
heartbeat ownership into the clustering execution path by starting the heartbeat
at the beginning of the cluster(...) method when
config.isExpirationOfClusteringEnabled() is true (instead of in the scheduling
branch) and ensure heartbeatClient.stop() is invoked in a finally block around
the execution/commit in cluster(...) so the heartbeat is always stopped
regardless of success or failure.
| private static boolean hasInstantExpired(HoodieTableMetaClient metaClient, String instantTime, long expirationMins) throws ParseException { | ||
| ZoneId zoneId = metaClient.getTableConfig().getTimelineTimezone().getZoneId(); | ||
| long currentTimeMs = ZonedDateTime.ofInstant(java.time.Instant.now(), zoneId).toInstant().toEpochMilli(); | ||
| long instantTimeMs = HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime).toInstant().toEpochMilli(); | ||
| long ageMs = currentTimeMs - instantTimeMs; | ||
| return ageMs >= TimeUnit.MINUTES.toMillis(expirationMins); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
fd -i 'HoodieInstantTimeGenerator.java' | while read -r file; do
echo "=== $file ==="
rg -n -C3 'parseDateFromInstantTime|formatDate|getZoneId|timelineTimezone' "$file"
done
echo "=== Related call sites ==="
rg -n -C2 'parseDateFromInstantTime\(|getTimelineTimezone\(' --glob '*.java'Repository: yihua/hudi
Length of output: 22717
🏁 Script executed:
sed -n '86,100p' hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.javaRepository: yihua/hudi
Length of output: 894
🏁 Script executed:
rg -A10 'public static Date parseDateFromInstantTime' hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.javaRepository: yihua/hudi
Length of output: 594
🏁 Script executed:
rg -B5 -A10 'commitTimeZone|LocalDateTime.parse' hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java | head -60Repository: yihua/hudi
Length of output: 3278
Fix timezone handling in instant expiration calculation.
parseDateFromInstantTime() always uses ZoneId.systemDefault() when parsing the instant time string, but currentTimeMs correctly uses the table's configured timelineTimezone. When the table's timeline timezone differs from the system default, this causes a timezone mismatch that produces incorrect age calculations.
For example, if a table uses UTC but the system runs in a different timezone, the same instant time string is interpreted as different absolute times, leading to wrong expiration decisions.
Root cause: HoodieInstantTimeGenerator.parseDateFromInstantTime() must be modified to accept and use the table's timezone instead of hardcoding ZoneId.systemDefault(). Alternatively, hasInstantExpired() should parse the instant time with the correct timezone before computing the age.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1210 - 1215, hasInstantExpired currently parses instantTime with
HoodieInstantTimeGenerator.parseDateFromInstantTime which uses
ZoneId.systemDefault(), causing timezone mismatch with
metaClient.getTableConfig().getTimelineTimezone().getZoneId(); fix by changing
parseDateFromInstantTime to accept a ZoneId (or add an overload that does) and
use that ZoneId when parsing, then update hasInstantExpired to call the new
parse method with metaClient.getTableConfig().getTimelineTimezone().getZoneId();
ensure any other callers of parseDateFromInstantTime are updated to pass the
appropriate timezone or kept working via the existing default overload.
| public static final ConfigProperty<Long> EXPIRATION_THRESHOLD_MINS = ConfigProperty | ||
| .key("hoodie.clustering.expiration.threshold.mins") | ||
| .defaultValue(60L) | ||
| .markAdvanced() | ||
| .withDocumentation("When hoodie.clustering.enable.expirations is enabled, a clustering instant will not be " | ||
| + "considered expired unless its instant creation time is at least this many minutes old. This serves as a guardrail to avoid " | ||
| + "unnecessary work in rolling back clustering instants that other writers are already attempting to roll back."); |
There was a problem hiding this comment.
Validate negative clustering expiration thresholds.
EXPIRATION_THRESHOLD_MINS is a safety guardrail, but the builder never rejects negative values. A negative threshold effectively makes every sufficiently heartbeat-stale clustering instant “old enough,” which can trigger rollback of clustering work that was never meant to be eligible yet.
Proposed fix
private void validate() {
boolean inlineCluster = clusteringConfig.getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
boolean inlineClusterSchedule = clusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING);
ValidationUtils.checkArgument(!(inlineCluster && inlineClusterSchedule), String.format("Either of inline clustering (%s) or "
+ "schedule inline clustering (%s) can be enabled. Both can't be set to true at the same time. %s,%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(),
HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), inlineCluster, inlineClusterSchedule));
+
+ if (clusteringConfig.getBooleanOrDefault(HoodieClusteringConfig.ENABLE_EXPIRATIONS)) {
+ long expirationThresholdMins = clusteringConfig.getLong(HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS);
+ ValidationUtils.checkArgument(expirationThresholdMins >= 0,
+ HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS.key() + " must be >= 0, but was " + expirationThresholdMins);
+ }
if (isConsistentHashingBucketIndex()) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java`
around lines 279 - 285, HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS
currently allows negative values; add validation to reject negatives where the
property is parsed/used (e.g., in the HoodieClusteringConfig builder/constructor
or the code path that retrieves EXPIRATION_THRESHOLD_MINS) by checking the long
value returned by EXPIRATION_THRESHOLD_MINS.get() (or the config accessor you
use) and throwing an IllegalArgumentException (or using ConfigProperty
validation) when value < 0, ensuring any callers (builder methods) surface a
clear error message like "hoodie.clustering.expiration.threshold.mins must be >=
0".
| private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) { | ||
| Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction(); | ||
| int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft(); | ||
| int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent() | ||
| ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft() | ||
| : 0; | ||
|
|
||
| int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction); | ||
| boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold(); | ||
| if (shouldLogCompact) { | ||
| log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince); | ||
| } | ||
| return shouldLogCompact; | ||
| } |
There was a problem hiding this comment.
Critical bug: Log compaction will never trigger when no prior log compaction exists.
When latestDeltaCommitInfoSinceLogCompactOption is not present (no previous log compaction), numDeltaCommitsSinceLatestLogCompaction defaults to 0. Combined with Math.min, this means:
Math.min(numDeltaCommitsSinceLatestCompaction, 0)→ always returns00 >= threshold→ alwaysfalse(assuming threshold > 0)
This prevents log compaction from ever being triggered initially.
The default should represent "no recent log compaction" (large value), not "just happened" (0).
🐛 Proposed fix
int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
- : 0;
+ : Integer.MAX_VALUE;
int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction);Alternatively, if the intention is to use just the compaction count when no log compaction exists:
int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent()
? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft()
- : 0;
+ : numDeltaCommitsSinceLatestCompaction;Additionally, the log message states "since last compaction or log compaction" but the Math.min logic effectively represents "and" semantics. Consider updating the message for accuracy.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) { | |
| Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction(); | |
| int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft(); | |
| int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent() | |
| ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft() | |
| : 0; | |
| int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction); | |
| boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold(); | |
| if (shouldLogCompact) { | |
| log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince); | |
| } | |
| return shouldLogCompact; | |
| } | |
| private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) { | |
| Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction(); | |
| int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft(); | |
| int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent() | |
| ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft() | |
| : Integer.MAX_VALUE; | |
| int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction); | |
| boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold(); | |
| if (shouldLogCompact) { | |
| log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince); | |
| } | |
| return shouldLogCompact; | |
| } |
| private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) { | |
| Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction(); | |
| int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft(); | |
| int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent() | |
| ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft() | |
| : 0; | |
| int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction); | |
| boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold(); | |
| if (shouldLogCompact) { | |
| log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince); | |
| } | |
| return shouldLogCompact; | |
| } | |
| private boolean needLogCompact(Pair<Integer, String> latestDeltaCommitInfoSinceCompact) { | |
| Option<Pair<Integer, String>> latestDeltaCommitInfoSinceLogCompactOption = getLatestDeltaCommitInfoSinceLogCompaction(); | |
| int numDeltaCommitsSinceLatestCompaction = latestDeltaCommitInfoSinceCompact.getLeft(); | |
| int numDeltaCommitsSinceLatestLogCompaction = latestDeltaCommitInfoSinceLogCompactOption.isPresent() | |
| ? latestDeltaCommitInfoSinceLogCompactOption.get().getLeft() | |
| : numDeltaCommitsSinceLatestCompaction; | |
| int numDeltaCommitsSince = Math.min(numDeltaCommitsSinceLatestCompaction, numDeltaCommitsSinceLatestLogCompaction); | |
| boolean shouldLogCompact = numDeltaCommitsSince >= config.getLogCompactionBlocksThreshold(); | |
| if (shouldLogCompact) { | |
| log.info("There have been {} delta commits since last compaction or log compaction, triggering log compaction.", numDeltaCommitsSince); | |
| } | |
| return shouldLogCompact; | |
| } |
| if (latestLogFileOption.isPresent()) { | ||
| HoodieLogFile latestLogFile = latestLogFileOption.get(); | ||
| // NOTE: Markers don't carry information about the cumulative size of the blocks that have been appended, | ||
| // therefore we simply stub this value. | ||
| logBlocksToBeDeleted = Collections.singletonMap(latestLogFile.getPathInfo().getPath().toString(), latestLogFile.getPathInfo().getLength()); | ||
| } | ||
| return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted); | ||
| } catch (IOException ioException) { | ||
| throw new HoodieIOException( | ||
| "Failed to get latestLogFile for fileId: " + fileId + " in partition: " + partitionPath, | ||
| ioException); | ||
| } | ||
| } else { | ||
| HoodieLogFile logFileToRollback = new HoodieLogFile(fullFilePath); | ||
| fileId = logFileToRollback.getFileId(); | ||
| // For tbl version 6, deltaCommitTime is the commit time of the base file for the file slice | ||
| baseCommitTime = logFileToRollback.getDeltaCommitTime(); | ||
| try { | ||
| StoragePathInfo pathInfo = table.getMetaClient().getStorage().getPathInfo(logFileToRollback.getPath()); | ||
| if (pathInfo != null) { | ||
| if (baseCommitTime.equals(instantToRollback.requestedTime())) { | ||
| // delete the log file that creates a new file group | ||
| return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, EMPTY_STRING, | ||
| Collections.singletonList(fullDeletePath.toString()), | ||
| Collections.singletonList(logFileToRollback.getPath().toString()), | ||
| Collections.emptyMap()); | ||
| } | ||
| if (latestLogFileOption.isPresent()) { | ||
| HoodieLogFile latestLogFile = latestLogFileOption.get(); | ||
| // NOTE: Markers don't carry information about the cumulative size of the blocks that have been appended, | ||
| // therefore we simply stub this value. | ||
| logBlocksToBeDeleted = Collections.singletonMap(latestLogFile.getPathInfo().getPath().toString(), latestLogFile.getPathInfo().getLength()); | ||
| } | ||
| return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), logBlocksToBeDeleted); | ||
| } catch (IOException ioException) { | ||
| throw new HoodieIOException( | ||
| "Failed to get latestLogFile for fileId: " + fileId + " in partition: " + partitionPath, | ||
| ioException); | ||
| } | ||
| } else { | ||
| HoodieLogFile logFileToRollback = new HoodieLogFile(fullFilePath); | ||
| fileId = logFileToRollback.getFileId(); | ||
| // For tbl version 6, deltaCommitTime is the commit time of the base file for the file slice | ||
| baseCommitTime = logFileToRollback.getDeltaCommitTime(); | ||
| try { | ||
| StoragePathInfo pathInfo = table.getMetaClient().getStorage().getPathInfo(logFileToRollback.getPath()); | ||
| if (pathInfo != null) { | ||
| if (baseCommitTime.equals(instantToRollback.requestedTime())) { | ||
| // delete the log file that creates a new file group | ||
| return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING, EMPTY_STRING, | ||
| Collections.singletonList(logFileToRollback.getPath().toString()), | ||
| Collections.emptyMap()); | ||
| } | ||
| // append a rollback block to the log block that is added to an existing file group | ||
| logBlocksToBeDeleted = Collections.singletonMap( | ||
| logFileToRollback.getPath().getName(), pathInfo.getLength()); | ||
| } else { | ||
| log.debug( | ||
| "File info of {} is null indicating the file does not exist;" | ||
| + " there is no need to include it in the rollback.", | ||
| fullFilePath); | ||
| } | ||
| } catch (FileNotFoundException e) { | ||
| // append a rollback block to the log block that is added to an existing file group | ||
| logBlocksToBeDeleted = Collections.singletonMap( | ||
| logFileToRollback.getPath().getName(), pathInfo.getLength()); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether rollback consumers expect logBlocksToBeDeleted keys to be full paths or file names.
# Expected result:
# - If consumers join the key with partition/base paths, Line 163 should likely use getName().
# - If consumers treat the key as a fully qualified path, Line 187 should likely use toString().
rg -n -C4 'logBlocksToBeDeleted|getLogBlocksToBeDeleted\(\)' .Repository: yihua/hudi
Length of output: 37662
Normalize logBlocksToBeDeleted keys to one format.
Line 163 stores the full path, while Line 187 stores only the file name. This inconsistency complicates downstream handling — test utilities like TestHoodieSparkMergeOnReadTableCompaction and TestRollbackHelper expect filename-only keys (joining with partition path), while RollbackHelperV1 compensates by extracting filenames from full paths. Standardize both locations to use the same format.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java`
around lines 159 - 187, The keys in logBlocksToBeDeleted must be normalized to
filename-only; update the branch that handles latestLogFile (inside
MarkerBasedRollbackStrategy) to use the log file name instead of the full path
(e.g., use latestLogFile.getPathInfo().getPath().getName() or equivalent) so it
matches the other branch that uses logFileToRollback.getPath().getName(); ensure
the value remains the path length and keep variable names (logBlocksToBeDeleted,
latestLogFile, logFileToRollback) unchanged.
| /** | ||
| * Map of all distributed registries created via getMetricRegistry(). | ||
| * This map is passed to Spark executors to make the registries available there. | ||
| */ | ||
| private static final Map<String, Registry> DISTRIBUTED_REGISTRY_MAP = new ConcurrentHashMap<>(); | ||
|
|
There was a problem hiding this comment.
Scope the registry cache to the Spark context.
Because DISTRIBUTED_REGISTRY_MAP is static, a later HoodieSparkEngineContext in the same JVM can reuse a DistributedRegistry that was already registered against a different or already-stopped JavaSparkContext. That makes the accumulator lifecycle cross-application and can silently drop metrics while also bloating every task closure with stale registries.
Suggested fix
- private static final Map<String, Registry> DISTRIBUTED_REGISTRY_MAP = new ConcurrentHashMap<>();
+ private final Map<String, Registry> distributedRegistryMap = new ConcurrentHashMap<>();- final Map<String, Registry> registries = DISTRIBUTED_REGISTRY_MAP;
+ final Map<String, Registry> registries = distributedRegistryMap;- return DISTRIBUTED_REGISTRY_MAP.computeIfAbsent(prefixedName, key -> {
+ return distributedRegistryMap.computeIfAbsent(prefixedName, key -> {
Registry registry = Registry.getRegistryOfClass(tableName, registryName, DistributedRegistry.class.getName());
((DistributedRegistry) registry).register(javaSparkContext);
return registry;
});Also applies to: 281-288
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java`
around lines 92 - 97, The static DISTRIBUTED_REGISTRY_MAP causes registries to
leak across multiple HoodieSparkEngineContext/JVM Spark applications; change it
from a static field to be scoped per Spark context (e.g., make it an instance
field on HoodieSparkEngineContext or a map keyed by the JavaSparkContext/appId)
and update getMetricRegistry() and DistributedRegistry creation to use the
per-context map; additionally ensure registries are removed/closed when the
Spark context stops (hook into context stop/close or provide a dispose method)
so accumulators/registries don't persist across applications; apply the same
change for the other static registry cache referenced around the 281-288 region.
| Properties props = new Properties(); | ||
| props.put(HoodieTableConfig.VERSION.key(), HoodieTableVersion.SIX.versionCode()); | ||
| initMetaClient(tableType, props); |
There was a problem hiding this comment.
Switch this fallback test to MERGE_ON_READ too.
These new props pin the version to SIX, but tableType is still the default COPY_ON_WRITE here. Since APPEND markers are only valid on MOR, this can pass without covering the production-valid path exercised by the other append-marker tests.
Suggested test setup change
`@Test`
public void testMarkerBasedRollbackFallbackToTimelineServerWhenDirectMarkerFails() throws Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
Properties props = new Properties();
props.put(HoodieTableConfig.VERSION.key(), HoodieTableVersion.SIX.versionCode());
initMetaClient(tableType, props);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Properties props = new Properties(); | |
| props.put(HoodieTableConfig.VERSION.key(), HoodieTableVersion.SIX.versionCode()); | |
| initMetaClient(tableType, props); | |
| tearDown(); | |
| tableType = HoodieTableType.MERGE_ON_READ; | |
| setUp(); | |
| Properties props = new Properties(); | |
| props.put(HoodieTableConfig.VERSION.key(), HoodieTableVersion.SIX.versionCode()); | |
| initMetaClient(tableType, props); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java`
around lines 310 - 312, The test pins HoodieTableConfig.VERSION to SIX but still
uses the default tableType (COPY_ON_WRITE), so switch the test to use
MERGE_ON_READ to exercise the append-marker path; update the setup where
initMetaClient(tableType, props) is called by setting tableType to
HoodieTableType.MERGE_ON_READ (or equivalent enum/constants used in this test
class) before calling initMetaClient in TestMarkerBasedRollbackStrategy so the
append-marker behavior runs under the production-valid MOR configuration.
| protected boolean compareOrderingVal(Comparable persistedOrderingVal, Comparable incomingOrderingVal, | ||
| boolean updateOnSameOrderingField) { | ||
| if (persistedOrderingVal == null) { | ||
| return true; | ||
| } else { | ||
| int compareVal = persistedOrderingVal.compareTo(incomingOrderingVal); | ||
| if (updateOnSameOrderingField) { | ||
| return compareVal <= 0; | ||
| } else { | ||
| return compareVal < 0; | ||
| } | ||
| } |
There was a problem hiding this comment.
Consider handling null incomingOrderingVal.
The method handles persistedOrderingVal == null but not incomingOrderingVal == null. If incomingOrderingVal is null, compareTo may throw NPE depending on the Comparable implementation.
While the caller in needUpdatingPersistedRecord uses orElse(orderingVal) to provide a fallback, this method is protected and could be called from subclasses with different inputs.
🛡️ Proposed defensive fix
protected boolean compareOrderingVal(Comparable persistedOrderingVal, Comparable incomingOrderingVal,
boolean updateOnSameOrderingField) {
if (persistedOrderingVal == null) {
return true;
+ } else if (incomingOrderingVal == null) {
+ return false; // Cannot compare, keep persisted
} else {
int compareVal = persistedOrderingVal.compareTo(incomingOrderingVal);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| protected boolean compareOrderingVal(Comparable persistedOrderingVal, Comparable incomingOrderingVal, | |
| boolean updateOnSameOrderingField) { | |
| if (persistedOrderingVal == null) { | |
| return true; | |
| } else { | |
| int compareVal = persistedOrderingVal.compareTo(incomingOrderingVal); | |
| if (updateOnSameOrderingField) { | |
| return compareVal <= 0; | |
| } else { | |
| return compareVal < 0; | |
| } | |
| } | |
| protected boolean compareOrderingVal(Comparable persistedOrderingVal, Comparable incomingOrderingVal, | |
| boolean updateOnSameOrderingField) { | |
| if (persistedOrderingVal == null) { | |
| return true; | |
| } else if (incomingOrderingVal == null) { | |
| return false; // Cannot compare, keep persisted | |
| } else { | |
| int compareVal = persistedOrderingVal.compareTo(incomingOrderingVal); | |
| if (updateOnSameOrderingField) { | |
| return compareVal <= 0; | |
| } else { | |
| return compareVal < 0; | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java`
around lines 198 - 209, The compareOrderingVal method lacks a null check for
incomingOrderingVal; add a guard at the start of compareOrderingVal (in class
DefaultHoodieRecordPayload) that returns false when incomingOrderingVal is null
(i.e., if persistedOrderingVal != null and incomingOrderingVal == null, do not
update), then proceed with the existing compareTo logic and
updateOnSameOrderingField handling; reference compareOrderingVal and callers
such as needUpdatingPersistedRecord when making this defensive change.
Mirror of apache#17928 for automated bot review.
Original author: @suryaprasanna
Base branch: master
Summary by CodeRabbit
New Features
Tests