fix: Follow-ups to JsonKinesisSource: numeric sequence comparison and call-site fixes#18689
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the follow-up! These are targeted correctness fixes on the JsonKinesisSource path: numeric sequence-number comparison via BigInteger, null-safe handling of millisBehindLatest, a Math.toIntExact guard for the per-call limit, and explicit getter names on KinesisReadConfig. I traced each call site of compareSequenceNumbers and confirmed the empty/null lastSeq cases are guarded everywhere it's invoked. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One small naming inconsistency between the backing field and its newly-added getter in KinesisReadConfig; the rest of the changes are clean.
cc @yihua
| } | ||
|
|
||
| public boolean isDeaggregationEnabled() { | ||
| return enableDeaggregation; |
There was a problem hiding this comment.
🤖 nit: the getter is isDeaggregationEnabled() (adjective form) but the backing field is enableDeaggregation (verb form) — could you rename the field to deaggregationEnabled to keep them consistent?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
a90b3db to
45e1219
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The follow-up correctness fixes (BigInteger sequence-number ordering, null-safe millisBehindLatest, and explicit accessor renames) look good and the empty-lastSeq guard in checkDataLossOnExpiredShards is a sensible defensive addition. One question worth checking on the Math.toIntExact change in the inline comment. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One small redundancy to consider in KinesisReadConfig — otherwise the changes look clean.
- KinesisOffsetGen: add CheckpointUtils.compareSequenceNumbers using BigInteger and use it in hasUnreadRecords, checkDataLossOnExpiredShards, and checkDataLossOnAvailableShards. Kinesis sequence numbers are 128-bit decimal strings whose lengths can vary, so lexicographic String.compareTo is not correct for numeric ordering. Also treat empty lastSeq (e.g. a "|endSeq" checkpoint) as not-consumed in checkDataLossOnExpiredShards. - KinesisSource: null-guard response.millisBehindLatest() before unboxing (the AWS SDK documents the field as nullable), and use Math.toIntExact on the per-call limit so we fail loudly instead of silently truncating if the remaining record budget exceeds Integer.MAX_VALUE. - KinesisReadConfig: replace Lombok-generated isShouldAddMetaFields and isEnableDeaggregation with explicit shouldAddMetaFields and isDeaggregationEnabled, and update call sites in JsonKinesisSource.
45e1219 to
33b430d
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the follow-up! Numeric BigInteger comparison for sequence numbers, the null-safe millisBehindLatest check, and the KinesisReadConfig getter naming cleanup all look reasonable, and I traced the new compareSequenceNumbers call sites — each has prior guards against null/empty inputs. No new issues flagged from this automated pass beyond what earlier rounds already raised — a Hudi committer or PMC member can take it from here for a final review. One small naming/consistency suggestion in KinesisReadConfig; the rest of the changes are clean.
cc @yihua
| private final String accessKey; // null if not set | ||
| private final String secretKey; // null if not set | ||
| private final KinesisSourceConfig.KinesisStartingPositionStrategy startingPosition; | ||
| @Getter(AccessLevel.NONE) |
There was a problem hiding this comment.
🤖 nit: could you rename the field to metaFieldsEnabled (mirroring deaggregationEnabled) and let Lombok generate isMetaFieldsEnabled() automatically? That would let you drop the @Getter(AccessLevel.NONE) suppression and the hand-written getter below, keeping the boolean accessor style consistent across the class.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## master #18689 +/- ##
=============================================
- Coverage 68.08% 54.00% -14.08%
+ Complexity 28940 12456 -16484
=============================================
Files 2519 1434 -1085
Lines 140646 72161 -68485
Branches 17427 8245 -9182
=============================================
- Hits 95757 38973 -56784
+ Misses 37030 29690 -7340
+ Partials 7859 3498 -4361
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Follow-up to #18224 (JsonKinesisSource). Small correctness fixes and a naming consistency tweak that surfaced after the initial merge.
Summary and Changelog
Numeric sequence-number comparison. Kinesis sequence numbers are 128-bit integers represented as decimal strings whose lengths can vary, so
String.compareTois not correct for ordering them. IntroduceKinesisOffsetGen.CheckpointUtils.compareSequenceNumbers(a, b)usingBigInteger, and use it in:KinesisShardRange.hasUnreadRecords(closed-shard fully-consumed check)KinesisOffsetGen.checkDataLossOnExpiredShards(also adds an empty-lastSeqguard for|endSeqcheckpoints — a closed shard that has not yet had any record consumed is treated as not consumed)KinesisOffsetGen.checkDataLossOnAvailableShards(trim-horizon check)Null-safe
millisBehindLatest.GetRecordsResponse#millisBehindLatest()is documented as nullable in the AWS SDK; the previousresponse.millisBehindLatest() == 0would NPE on auto-unbox if a server returned null. Extract to aLongand null-check before comparing.Integer-overflow guard on per-call limit.
(int) (maxTotalRecords - totalConsumed)could silently truncate if the remaining budget exceededInteger.MAX_VALUE. Switch toMath.toIntExact(...)so it fails loudly instead.KinesisReadConfiggetter naming consistency.enableDeaggregation→deaggregationEnabledso Lombok's@Getterproduces the desiredisDeaggregationEnabled()(adjective form), letting us drop the manual override.shouldAddMetaFields()method (with@Getter(AccessLevel.NONE)on the field) since Lombok would otherwise generateisShouldAddMetaFields(), which doesn't read well.JsonKinesisSource(isEnableDeaggregation()→isDeaggregationEnabled(),isShouldAddMetaFields()→shouldAddMetaFields()).Impact
No user-visible behavior change in the common case. The numeric-comparison fix only matters when comparing two Kinesis sequence numbers of different decimal lengths;
Math.toIntExactand the null guard are defensive and only change behavior on edge-case inputs that previously truncated or NPE'd. TheKinesisReadConfigchange is internal — the public getters remainisDeaggregationEnabled()/shouldAddMetaFields().Risk Level
Low
Documentation Update
none
Contributor's checklist
TestKinesisCheckpointUtils,TestJsonKinesisSource,TestShardRecordIterator,TestKinesisShardRange,TestKinesisSourceFiltering,TestKinesisDataLossChecks) all pass: 80 tests, 0 failures, 0 errors, 2 pre-existing skips.