[FLINK-39728] Fix pauseOrResumeSplits race with unassigned partitions#257
[FLINK-39728] Fix pauseOrResumeSplits race with unassigned partitions#257jnh5y wants to merge 1 commit into
Conversation
Filter against consumer.assignment() before calling pause()/resume() to prevent IllegalStateException when a partition is concurrently unassigned by fetch() or removeEmptySplits(). Generated-by: Claude Opus 4.6 (Anthropic)
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
Savonitar
left a comment
There was a problem hiding this comment.
Hi, thank you for the fix. The filter against consumer.assignment() before each pause/resume call is the right defensive guard.
I find “concurrently” in the comments/description a bit misleading: the unassign and the pause/resume call are sequential on the SplitFetcher thread, not parallel. The race is between the mailbox thread’s view at enqueue time and the SplitFetcher’s state at execution time.
But it does not affect the proposed fix.
LGTM
Efrat19
left a comment
There was a problem hiding this comment.
Confirmed this IllegalStateException can occur when the split finished event doesn't propagate in time to abort alignment check on the sourceOperator, causing pauseOrResumeSplits to be called on a finished split
LGTM % nit
| Collection<KafkaPartitionSplit> splitsToResume) { | ||
| // Filter against current assignment to avoid IllegalStateException when a partition | ||
| // was concurrently unassigned by fetch() or removeEmptySplits(). | ||
| Set<TopicPartition> assigned = consumer.assignment(); |
There was a problem hiding this comment.
Nit: Wdyt about leaving a warn here to unmask any edge case where pauseOrResumeSplits is called for unassigned partition from an unfinished split?
Filter against consumer.assignment() before calling pause()/resume() to prevent IllegalStateException when a partition is concurrently unassigned by fetch() or removeEmptySplits().
Generated-by: Claude Opus 4.6 (Anthropic)