-
-
Notifications
You must be signed in to change notification settings - Fork 79
HasRestoredToEnd() stuck forever when changelog is empty (beginOffset == endOffset) and no checkpoint exists #463
Description
Description
HasRestoredToEnd() in StoreChangelogReader.cs returns false indefinitely when two conditions are combined:
- No
.checkpointfile exists (fresh PVC, lost on shutdown, etc.) - Changelog topic is empty after segment deletion (
beginOffset == endOffset)
This causes tasks to remain stuck in RESTORING state forever, processing 0 records while lag accumulates.
Root Cause
When there is no checkpoint, the restore consumer is assigned at BEGINNING with currentOffset = null. On an empty changelog topic where beginOffset == endOffset (e.g., both at 22512), all checks in HasRestoredToEnd() fail:
// StoreChangelogReader.cs:260-279
long? endOffset = changelogMetadata.RestoreEndOffset;
if (endOffset == null || endOffset == Offset.Unset || endOffset == 0)
return true; // ❌ endOffset = 22512 → fails
if(changelogMetadata.CurrentOffset >= endOffset
|| changelogMetadata.BeginOffset > changelogMetadata.RestoreEndOffset)
return true; // ❌ currentOffset is null, 22512 > 22512 is false → fails
// Falls through to Position() which returns Unset → returns false foreverThe fix for #195 added the beginOffset > endOffset check, but a Kafka topic's offsets are monotonic begin can never exceed end. The actual state of an empty topic after segment deletion is begin == end, not begin > end.
Why this only happens without a checkpoint
| Checkpoint exists | No checkpoint | |
|---|---|---|
| Changelog has records | ✅ Normal restore | |
| Changelog empty (begin==end) | ✅ currentOffset >= endOffset passes |
❌ Stuck forever |
With a checkpoint, currentOffset is set to a valid value and currentOffset >= endOffset succeeds. Without one, currentOffset stays null and the only fallback is the beginOffset > endOffset check which fails because begin == end.
Reproduction
- Application with state stores and changelog topics with
cleanup.policy=delete - Retention expires → all segments deleted →
beginOffset == endOffset - Pod restarts without
.checkpointfile - Tasks stuck in RESTORING, processing 0 records indefinitely
Common in low-traffic environments (integration/staging) where changelogs have few records.
Observed behavior
Store STATE-STORE-0000: checkpointOffset=NONE, beginOffset=22512, endOffset=22512
Store STATE-STORE-0006: checkpointOffset=NONE, beginOffset=368, endOffset=368
Store STATE-STORE-0007: checkpointOffset=NONE, beginOffset=243, endOffset=243
Store STATE-STORE-0019: checkpointOffset=NONE, beginOffset=232, endOffset=232
Store STATE-STORE-0023: checkpointOffset=NONE, beginOffset=835, endOffset=835
Processed 0 total records, ran 0 punctuators ← repeats forever
All 5 changelog topics exhibited beginOffset == endOffset. After applying the fix, restore completed instantly and the enricher processed all 11,000+ accumulated records.
Proposed Fix
Change > to >=:
if(changelogMetadata.CurrentOffset >= endOffset
|| changelogMetadata.BeginOffset >= changelogMetadata.RestoreEndOffset)
return true;Environment
- Streamiz.Kafka.Net v1.8.0
- Confluent Cloud (3 brokers)