Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ protected JavaRDD<String> toBatch(KinesisOffsetGen.KinesisShardRange[] shardRang
KinesisSource.ShardRecordIterator recordIt = KinesisSource.readShardRecords(
client, readConfig.getStreamName(), range, readConfig.getStartingPosition(),
readConfig.getMaxRecordsPerRequest(), readConfig.getIntervalMilliSeconds(),
readConfig.getMaxRecordsPerShard(), readConfig.isEnableDeaggregation(),
readConfig.getMaxRecordsPerShard(), readConfig.isDeaggregationEnabled(),
readConfig.getRetryInitialIntervalMs(), readConfig.getRetryMaxIntervalMs(),
readConfig.getThrottleTimeoutMs());

String shardId = range.getShardId();
boolean addMetaFields = readConfig.isShouldAddMetaFields();
boolean addMetaFields = readConfig.isMetaFieldsEnabled();
List<String> jsonRecords = new ArrayList<>();
long numNull = 0;
java.time.Instant lastArrivalTimestamp = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private void fetchNextPage() {
response = client.getRecords(
GetRecordsRequest.builder()
.shardIterator(shardIteratorStr)
.limit(Math.min(currentMaxRecords, (int) (maxTotalRecords - totalConsumed)))
.limit(Math.min(currentMaxRecords, Math.toIntExact(maxTotalRecords - totalConsumed)))
Comment thread
linliu-code marked this conversation as resolved.
.build());
lastSuccessTimeMs = System.currentTimeMillis();
break;
Expand Down Expand Up @@ -290,7 +290,9 @@ private void fetchNextPage() {

// Process records first (done above), then decide whether to stop.
// millisBehindLatest can be 0 in LocalStack even when the response contained records.
if (response.millisBehindLatest() == 0) {
// It is documented as nullable on AWS SDK responses, so guard against NPE on auto-unbox.
Long millisBehind = response.millisBehindLatest();
if (millisBehind != null && millisBehind == 0) {
fetchingDone = true;
}
if (shardIteratorStr == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;

import java.math.BigInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -162,6 +163,17 @@ public static Pair<Option<String>, Option<String>> parseCheckpointValue(String v
Option.ofNullable(getEndSeqFromValue(value)));
}

/**
* Compares two Kinesis sequence numbers numerically.
* Kinesis sequence numbers are 128-bit integers represented as decimal strings whose lengths
* can vary, so lexicographic {@link String#compareTo} is not correct for numeric ordering.
*
* @return negative if a &lt; b, zero if equal, positive if a &gt; b
*/
public static int compareSequenceNumbers(String a, String b) {
return new BigInteger(a).compareTo(new BigInteger(b));
}

/**
* Build checkpoint value without arrival time: "lastSeq" or "lastSeq|endSeq".
*/
Expand Down Expand Up @@ -262,7 +274,7 @@ public boolean hasUnreadRecords(boolean useLatestWhenNoCheckpoint) {
return !useLatestWhenNoCheckpoint;
}
// CASE 3: Closed shard: lastSeq >= endSeq means fully consumed
if (lastSeq.compareTo(endSeq) >= 0) {
if (CheckpointUtils.compareSequenceNumbers(lastSeq, endSeq) >= 0) {
return false;
}
// CASE 4: lastSeq < endSeq: may have unread records
Expand Down Expand Up @@ -459,9 +471,11 @@ void checkDataLossOnExpiredShards(List<String> expiredShardIds, Map<String, Stri
Option<String> lastSeqOpt = seqs.getLeft();
Option<String> endSeqOpt = seqs.getRight();
// endSeq absent = was open shard; conservatively assume not fully consumed.
// endSeq present: fully consumed iff lastSeq >= endSeq.
// endSeq present with non-empty lastSeq: fully consumed iff lastSeq >= endSeq.
// Empty lastSeq (e.g. "|endSeq" checkpoint) is treated as not consumed.
boolean fullyConsumed = endSeqOpt.isPresent()
&& lastSeqOpt.map(last -> last.compareTo(endSeqOpt.get()) >= 0).orElse(false);
&& lastSeqOpt.map(last -> !last.isEmpty()
&& CheckpointUtils.compareSequenceNumbers(last, endSeqOpt.get()) >= 0).orElse(false);
if (fullyConsumed) {
log.info("Expired shard {} was fully consumed (lastSeq >= endSeq); pruning from checkpoint", shardId);
} else {
Expand Down Expand Up @@ -495,7 +509,7 @@ void checkDataLossOnAvailableShards(List<Shard> shards, Map<String, String> from
continue;
}
// lastSeq < shardStartSeq: records between the checkpoint and the trim horizon were dropped.
if (lastSeq.compareTo(shardStartSeq) < 0) {
if (CheckpointUtils.compareSequenceNumbers(lastSeq, shardStartSeq) < 0) {
reportDataLoss("Shard " + shardId + " checkpoint lastSeq=" + lastSeq
+ " is before the shard's earliest available sequence number " + shardStartSeq
+ ". Records may have been trimmed due to retention expiry (data loss).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class KinesisReadConfig implements Serializable {
private final String accessKey; // null if not set
private final String secretKey; // null if not set
private final KinesisSourceConfig.KinesisStartingPositionStrategy startingPosition;
private final boolean shouldAddMetaFields;
private final boolean enableDeaggregation;
private final boolean metaFieldsEnabled;
private final boolean deaggregationEnabled;
private final int maxRecordsPerRequest;
private final long intervalMilliSeconds;
private final long maxRecordsPerShard;
Expand Down
Loading