kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)#12620
kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)#12620ti-chi-bot wants to merge 1 commit into
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@wlwilliamx This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request implements logic to tolerate replayed resolved and DDL events in the Kafka consumer by introducing DDL deduplication and watermark fallback handling. The review identifies critical issues including multiple unresolved merge conflict markers throughout the code, a missing field in the partitionProgress struct, and potential data races due to non-atomic access of shared state.
| <<<<<<< HEAD | ||
| eventGroups map[int64]*eventsGroup | ||
| decoder codec.RowEventDecoder | ||
| ======= | ||
| tableSinkMap map[model.TableID]tablesink.TableSink | ||
| eventGroups map[model.TableID]*eventsGroup | ||
| decoder codec.RowEventDecoder | ||
| } |
There was a problem hiding this comment.
Unresolved merge conflict markers found in partitionProgress. Additionally, the struct is missing the partition field which is initialized in newPartitionProgress (line 91) and used in updateWatermark (line 103). Please resolve the conflicts and add the missing field to ensure the code compiles and functions correctly.
| <<<<<<< HEAD | ||
| ddlList []*model.DDLEvent | ||
| ddlWithMaxCommitTs *model.DDLEvent | ||
| ddlSink ddlsink.Sink | ||
| fakeTableIDGenerator *fakeTableIDGenerator | ||
| ======= | ||
| ddlList []*model.DDLEvent | ||
| ddlWithMaxCommitTs *model.DDLEvent | ||
| // ddlKeysWithMaxCommitTs records every logical DDL seen at the current | ||
| // maximum CommitTs, so replayed prefixes of split DDL sequences can be | ||
| // ignored without collapsing distinct DDLs that share the same CommitTs. | ||
| ddlKeysWithMaxCommitTs map[ddlEventKey]struct{} | ||
| ddlSink ddlsink.Sink | ||
| >>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)) |
| <<<<<<< HEAD | ||
| ======= | ||
| w.ddlKeysWithMaxCommitTs[ddlKey] = struct{}{} | ||
| log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) | ||
| >>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)) |
There was a problem hiding this comment.
Unresolved merge conflict markers in appendDDL. The logic should be resolved to correctly update the deduplication map and log the DDL event using the provided offset.
w.ddlKeysWithMaxCommitTs[ddlKey] = struct{}{}
log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query))| p.watermark = newWatermark | ||
| p.watermarkOffset = offset |
There was a problem hiding this comment.
Direct assignment to p.watermark and p.watermarkOffset causes a data race. These fields are accessed concurrently by other goroutines (e.g., in getMinWatermark and checkOldMessage) using atomic operations or direct reads. Use atomic operations to ensure thread safety.
atomic.StoreUint64(&p.watermark, newWatermark)
atomic.StoreInt64((*int64)(&p.watermarkOffset), int64(offset))| func (p *partitionProgress) loadWatermark() uint64 { | ||
| return p.watermark | ||
| >>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)) | ||
| } |
There was a problem hiding this comment.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## release-8.5 #12620 +/- ##
================================================
Coverage ? 53.4200%
================================================
Files ? 1026
Lines ? 137924
Branches ? 0
================================================
Hits ? 73679
Misses ? 58726
Partials ? 5519 🚀 New features to boost your workflow:
|
|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12596
What problem does this PR solve?
Issue Number: close #12595
What is changed and how it works?
cmd/kafka-consumeras duplicate delivery instead of a fatal errorCheck List
Tests
Questions
Will it cause performance regression or break compatibility?
No. This only makes the standalone Kafka consumer tolerate duplicate MQ delivery in line with TiCDC's at-least-once behavior.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note