From 33b430d36649b581760b42b58f936e3dddc28695 Mon Sep 17 00:00:00 2001 From: Lin Liu Date: Mon, 4 May 2026 09:57:35 -0700 Subject: [PATCH 1/2] [MINOR] Follow-ups to JsonKinesisSource - KinesisOffsetGen: add CheckpointUtils.compareSequenceNumbers using BigInteger and use it in hasUnreadRecords, checkDataLossOnExpiredShards, and checkDataLossOnAvailableShards. Kinesis sequence numbers are 128-bit decimal strings whose lengths can vary, so lexicographic String.compareTo is not correct for numeric ordering. Also treat empty lastSeq (e.g. a "|endSeq" checkpoint) as not-consumed in checkDataLossOnExpiredShards. - KinesisSource: null-guard response.millisBehindLatest() before unboxing (the AWS SDK documents the field as nullable), and use Math.toIntExact on the per-call limit so we fail loudly instead of silently truncating if the remaining record budget exceeds Integer.MAX_VALUE. - KinesisReadConfig: replace Lombok-generated isShouldAddMetaFields and isEnableDeaggregation with explicit shouldAddMetaFields and isDeaggregationEnabled, and update call sites in JsonKinesisSource. --- .../utilities/sources/JsonKinesisSource.java | 4 ++-- .../hudi/utilities/sources/KinesisSource.java | 6 +++-- .../sources/helpers/KinesisOffsetGen.java | 22 +++++++++++++++---- .../sources/helpers/KinesisReadConfig.java | 8 ++++++- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java index 7ad947830097b..b7c5c51de8c0a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java @@ -143,12 +143,12 @@ protected JavaRDD toBatch(KinesisOffsetGen.KinesisShardRange[] shardRang KinesisSource.ShardRecordIterator recordIt = KinesisSource.readShardRecords( client, readConfig.getStreamName(), range, readConfig.getStartingPosition(), readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMilliSeconds(), - readConfig.getMaxRecordsPerShard(), readConfig.isEnableDeaggregation(), + readConfig.getMaxRecordsPerShard(), readConfig.isDeaggregationEnabled(), readConfig.getRetryInitialIntervalMs(), readConfig.getRetryMaxIntervalMs(), readConfig.getThrottleTimeoutMs()); String shardId = range.getShardId(); - boolean addMetaFields = readConfig.isShouldAddMetaFields(); + boolean addMetaFields = readConfig.shouldAddMetaFields(); List jsonRecords = new ArrayList<>(); long numNull = 0; java.time.Instant lastArrivalTimestamp = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java index 919bac890581f..a5ec35994d8e0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java @@ -237,7 +237,7 @@ private void fetchNextPage() { response = client.getRecords( GetRecordsRequest.builder() .shardIterator(shardIteratorStr) - .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords - totalConsumed))) + .limit(Math.min(currentMaxRecords, Math.toIntExact(maxTotalRecords - totalConsumed))) .build()); lastSuccessTimeMs = System.currentTimeMillis(); break; @@ -290,7 +290,9 @@ private void fetchNextPage() { // Process records first (done above), then decide whether to stop. // millisBehindLatest can be 0 in LocalStack even when the response contained records. - if (response.millisBehindLatest() == 0) { + // It is documented as nullable on AWS SDK responses, so guard against NPE on auto-unbox. + Long millisBehind = response.millisBehindLatest(); + if (millisBehind != null && millisBehind == 0) { fetchingDone = true; } if (shardIteratorStr == null) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java index b0f1779a50be2..27d160c3de0fc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; +import java.math.BigInteger; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -162,6 +163,17 @@ public static Pair, Option> parseCheckpointValue(String v Option.ofNullable(getEndSeqFromValue(value))); } + /** + * Compares two Kinesis sequence numbers numerically. + * Kinesis sequence numbers are 128-bit integers represented as decimal strings whose lengths + * can vary, so lexicographic {@link String#compareTo} is not correct for numeric ordering. + * + * @return negative if a < b, zero if equal, positive if a > b + */ + public static int compareSequenceNumbers(String a, String b) { + return new BigInteger(a).compareTo(new BigInteger(b)); + } + /** * Build checkpoint value without arrival time: "lastSeq" or "lastSeq|endSeq". */ @@ -262,7 +274,7 @@ public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) { return !useLatestWhenNoCheckpoint; } // CASE 3: Closed shard: lastSeq >= endSeq means fully consumed - if (lastSeq.compareTo(endSeq) >= 0) { + if (CheckpointUtils.compareSequenceNumbers(lastSeq, endSeq) >= 0) { return false; } // CASE 4: lastSeq < endSeq: may have unread records @@ -459,9 +471,11 @@ void checkDataLossOnExpiredShards(List expiredShardIds, Map lastSeqOpt = seqs.getLeft(); Option endSeqOpt = seqs.getRight(); // endSeq absent = was open shard; conservatively assume not fully consumed. - // endSeq present: fully consumed iff lastSeq >= endSeq. + // endSeq present with non-empty lastSeq: fully consumed iff lastSeq >= endSeq. + // Empty lastSeq (e.g. "|endSeq" checkpoint) is treated as not consumed. boolean fullyConsumed = endSeqOpt.isPresent() - && lastSeqOpt.map(last -> last.compareTo(endSeqOpt.get()) >= 0).orElse(false); + && lastSeqOpt.map(last -> !last.isEmpty() + && CheckpointUtils.compareSequenceNumbers(last, endSeqOpt.get()) >= 0).orElse(false); if (fullyConsumed) { log.info("Expired shard {} was fully consumed (lastSeq >= endSeq); pruning from checkpoint", shardId); } else { @@ -495,7 +509,7 @@ void checkDataLossOnAvailableShards(List shards, Map from continue; } // lastSeq < shardStartSeq: records between the checkpoint and the trim horizon were dropped. - if (lastSeq.compareTo(shardStartSeq) < 0) { + if (CheckpointUtils.compareSequenceNumbers(lastSeq, shardStartSeq) < 0) { reportDataLoss("Shard " + shardId + " checkpoint lastSeq=" + lastSeq + " is before the shard's earliest available sequence number " + shardStartSeq + ". Records may have been trimmed due to retention expiry (data loss)."); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java index e6bf12c5f8b1d..31f4e07500ecf 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java @@ -20,6 +20,7 @@ import org.apache.hudi.utilities.config.KinesisSourceConfig; +import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -41,12 +42,17 @@ public class KinesisReadConfig implements Serializable { private final String accessKey; // null if not set private final String secretKey; // null if not set private final KinesisSourceConfig.KinesisStartingPositionStrategy startingPosition; + @Getter(AccessLevel.NONE) private final boolean shouldAddMetaFields; - private final boolean enableDeaggregation; + private final boolean deaggregationEnabled; private final int maxRecordsPerRequest; private final long intervalMilliSeconds; private final long maxRecordsPerShard; private final long retryInitialIntervalMs; private final long retryMaxIntervalMs; private final long throttleTimeoutMs; + + public boolean shouldAddMetaFields() { + return shouldAddMetaFields; + } } From 3a1e9be9e9c1fdf35f989ebe0047b444be9a67e4 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 15 May 2026 13:44:37 -0700 Subject: [PATCH 2/2] Use Lombok-generated getter for metaFieldsEnabled in KinesisReadConfig --- .../apache/hudi/utilities/sources/JsonKinesisSource.java | 2 +- .../hudi/utilities/sources/helpers/KinesisReadConfig.java | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java index b7c5c51de8c0a..3a297467c82e5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java @@ -148,7 +148,7 @@ protected JavaRDD toBatch(KinesisOffsetGen.KinesisShardRange[] shardRang readConfig.getThrottleTimeoutMs()); String shardId = range.getShardId(); - boolean addMetaFields = readConfig.shouldAddMetaFields(); + boolean addMetaFields = readConfig.isMetaFieldsEnabled(); List jsonRecords = new ArrayList<>(); long numNull = 0; java.time.Instant lastArrivalTimestamp = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java index 31f4e07500ecf..7fe53f24d0f93 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java @@ -20,7 +20,6 @@ import org.apache.hudi.utilities.config.KinesisSourceConfig; -import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -42,8 +41,7 @@ public class KinesisReadConfig implements Serializable { private final String accessKey; // null if not set private final String secretKey; // null if not set private final KinesisSourceConfig.KinesisStartingPositionStrategy startingPosition; - @Getter(AccessLevel.NONE) - private final boolean shouldAddMetaFields; + private final boolean metaFieldsEnabled; private final boolean deaggregationEnabled; private final int maxRecordsPerRequest; private final long intervalMilliSeconds; @@ -51,8 +49,4 @@ public class KinesisReadConfig implements Serializable { private final long retryInitialIntervalMs; private final long retryMaxIntervalMs; private final long throttleTimeoutMs; - - public boolean shouldAddMetaFields() { - return shouldAddMetaFields; - } }