diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java index 7ad947830097b..3a297467c82e5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKinesisSource.java @@ -143,12 +143,12 @@ protected JavaRDD toBatch(KinesisOffsetGen.KinesisShardRange[] shardRang KinesisSource.ShardRecordIterator recordIt = KinesisSource.readShardRecords( client, readConfig.getStreamName(), range, readConfig.getStartingPosition(), readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMilliSeconds(), - readConfig.getMaxRecordsPerShard(), readConfig.isEnableDeaggregation(), + readConfig.getMaxRecordsPerShard(), readConfig.isDeaggregationEnabled(), readConfig.getRetryInitialIntervalMs(), readConfig.getRetryMaxIntervalMs(), readConfig.getThrottleTimeoutMs()); String shardId = range.getShardId(); - boolean addMetaFields = readConfig.isShouldAddMetaFields(); + boolean addMetaFields = readConfig.isMetaFieldsEnabled(); List jsonRecords = new ArrayList<>(); long numNull = 0; java.time.Instant lastArrivalTimestamp = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java index 919bac890581f..a5ec35994d8e0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KinesisSource.java @@ -237,7 +237,7 @@ private void fetchNextPage() { response = client.getRecords( GetRecordsRequest.builder() .shardIterator(shardIteratorStr) - .limit(Math.min(currentMaxRecords, (int) (maxTotalRecords - totalConsumed))) + .limit(Math.min(currentMaxRecords, Math.toIntExact(maxTotalRecords - totalConsumed))) .build()); lastSuccessTimeMs = System.currentTimeMillis(); break; @@ -290,7 +290,9 @@ private void fetchNextPage() { // Process records first (done above), then decide whether to stop. // millisBehindLatest can be 0 in LocalStack even when the response contained records. - if (response.millisBehindLatest() == 0) { + // It is documented as nullable on AWS SDK responses, so guard against NPE on auto-unbox. + Long millisBehind = response.millisBehindLatest(); + if (millisBehind != null && millisBehind == 0) { fetchingDone = true; } if (shardIteratorStr == null) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java index b0f1779a50be2..27d160c3de0fc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisOffsetGen.java @@ -42,6 +42,7 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; +import java.math.BigInteger; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -162,6 +163,17 @@ public static Pair, Option> parseCheckpointValue(String v Option.ofNullable(getEndSeqFromValue(value))); } + /** + * Compares two Kinesis sequence numbers numerically. + * Kinesis sequence numbers are 128-bit integers represented as decimal strings whose lengths + * can vary, so lexicographic {@link String#compareTo} is not correct for numeric ordering. + * + * @return negative if a < b, zero if equal, positive if a > b + */ + public static int compareSequenceNumbers(String a, String b) { + return new BigInteger(a).compareTo(new BigInteger(b)); + } + /** * Build checkpoint value without arrival time: "lastSeq" or "lastSeq|endSeq". */ @@ -262,7 +274,7 @@ public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) { return !useLatestWhenNoCheckpoint; } // CASE 3: Closed shard: lastSeq >= endSeq means fully consumed - if (lastSeq.compareTo(endSeq) >= 0) { + if (CheckpointUtils.compareSequenceNumbers(lastSeq, endSeq) >= 0) { return false; } // CASE 4: lastSeq < endSeq: may have unread records @@ -459,9 +471,11 @@ void checkDataLossOnExpiredShards(List expiredShardIds, Map lastSeqOpt = seqs.getLeft(); Option endSeqOpt = seqs.getRight(); // endSeq absent = was open shard; conservatively assume not fully consumed. - // endSeq present: fully consumed iff lastSeq >= endSeq. + // endSeq present with non-empty lastSeq: fully consumed iff lastSeq >= endSeq. + // Empty lastSeq (e.g. "|endSeq" checkpoint) is treated as not consumed. boolean fullyConsumed = endSeqOpt.isPresent() - && lastSeqOpt.map(last -> last.compareTo(endSeqOpt.get()) >= 0).orElse(false); + && lastSeqOpt.map(last -> !last.isEmpty() + && CheckpointUtils.compareSequenceNumbers(last, endSeqOpt.get()) >= 0).orElse(false); if (fullyConsumed) { log.info("Expired shard {} was fully consumed (lastSeq >= endSeq); pruning from checkpoint", shardId); } else { @@ -495,7 +509,7 @@ void checkDataLossOnAvailableShards(List shards, Map from continue; } // lastSeq < shardStartSeq: records between the checkpoint and the trim horizon were dropped. - if (lastSeq.compareTo(shardStartSeq) < 0) { + if (CheckpointUtils.compareSequenceNumbers(lastSeq, shardStartSeq) < 0) { reportDataLoss("Shard " + shardId + " checkpoint lastSeq=" + lastSeq + " is before the shard's earliest available sequence number " + shardStartSeq + ". Records may have been trimmed due to retention expiry (data loss)."); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java index e6bf12c5f8b1d..7fe53f24d0f93 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KinesisReadConfig.java @@ -41,8 +41,8 @@ public class KinesisReadConfig implements Serializable { private final String accessKey; // null if not set private final String secretKey; // null if not set private final KinesisSourceConfig.KinesisStartingPositionStrategy startingPosition; - private final boolean shouldAddMetaFields; - private final boolean enableDeaggregation; + private final boolean metaFieldsEnabled; + private final boolean deaggregationEnabled; private final int maxRecordsPerRequest; private final long intervalMilliSeconds; private final long maxRecordsPerShard;