kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)#12619
kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)#12619ti-chi-bot wants to merge 1 commit into
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request introduces a Kafka consumer writer and partition progress tracking system to handle DDL and DML events. The review feedback highlights critical concurrency concerns, specifically data races on shared fields in the writer and partitionProgress structs that require synchronization. Furthermore, the reviewer identified a likely compilation error regarding the Seq field, a performance issue with a busy-wait loop in syncFlushRowChangedEvents, and suggested several minor improvements to code structure and log clarity.
| type writer struct { | ||
| option *option | ||
|
|
||
| ddlList []*model.DDLEvent | ||
| ddlWithMaxCommitTs *model.DDLEvent | ||
| // ddlKeysWithMaxCommitTs records every logical DDL seen at the current | ||
| // maximum CommitTs, so replayed prefixes of split DDL sequences can be | ||
| // ignored without collapsing distinct DDLs that share the same CommitTs. | ||
| ddlKeysWithMaxCommitTs map[ddlEventKey]struct{} | ||
| ddlSink ddlsink.Sink | ||
|
|
||
| // sinkFactory is used to create table sink for each table. | ||
| sinkFactory *eventsinkfactory.SinkFactory | ||
| progresses []*partitionProgress | ||
|
|
||
| eventRouter *dispatcher.EventRouter | ||
| } |
There was a problem hiding this comment.
The writer struct is accessed concurrently from multiple partition processing goroutines (via WriteMessage), but it lacks synchronization for its shared fields such as ddlList, ddlWithMaxCommitTs, and ddlKeysWithMaxCommitTs. This will lead to data races and inconsistent state. You should add a sync.Mutex to protect these fields, similar to the implementation in the original Consumer struct in main.go.
| startTs: ddl.StartTs, | ||
| commitTs: ddl.CommitTs, | ||
| query: ddl.Query, | ||
| seq: ddl.Seq, |
There was a problem hiding this comment.
| watermark uint64 | ||
| watermarkOffset kafka.Offset |
There was a problem hiding this comment.
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| log.Warn("sync flush row changed event canceled", zap.Error(ctx.Err())) | ||
| return | ||
| default: | ||
| } | ||
| flushedResolvedTs := true | ||
| for _, tableSink := range progress.tableSinkMap { | ||
| if err := tableSink.UpdateResolvedTs(resolvedTs); err != nil { | ||
| log.Panic("Failed to update resolved ts", zap.Error(err)) | ||
| } | ||
| if tableSink.GetCheckpointTs().Less(resolvedTs) { | ||
| flushedResolvedTs = false | ||
| } | ||
| } | ||
| if flushedResolvedTs { | ||
| return | ||
| } | ||
| } |
There was a problem hiding this comment.
The loop in syncFlushRowChangedEvents is a busy-wait loop that can cause high CPU usage if the downstream sink takes time to progress. It repeatedly calls UpdateResolvedTs and checks the checkpoint without any yielding. Adding a small sleep inside the loop would significantly reduce CPU overhead.
for {
select {
case <-ctx.Done():
log.Warn("sync flush row changed event canceled", zap.Error(ctx.Err()))
return
default:
}
flushedResolvedTs := true
for _, tableSink := range progress.tableSinkMap {
if err := tableSink.UpdateResolvedTs(resolvedTs); err != nil {
log.Panic("Failed to update resolved ts", zap.Error(err))
}
if tableSink.GetCheckpointTs().Less(resolvedTs) {
flushedResolvedTs = false
}
}
if flushedResolvedTs {
return
}
time.Sleep(10 * time.Millisecond)
}| for i := 0; i < int(o.partitionNum); i++ { | ||
| if err != nil { | ||
| log.Panic("cannot create the decoder", zap.Error(err)) | ||
| } | ||
| w.progresses[i] = newPartitionProgress(int32(i), decoder) | ||
| } |
There was a problem hiding this comment.
The error check for NewDecoder is redundant inside the loop since err is not modified within the loop. It should be checked once outside the loop for better clarity.
| for i := 0; i < int(o.partitionNum); i++ { | |
| if err != nil { | |
| log.Panic("cannot create the decoder", zap.Error(err)) | |
| } | |
| w.progresses[i] = newPartitionProgress(int32(i), decoder) | |
| } | |
| if err != nil { | |
| log.Panic("cannot create the decoder", zap.Error(err)) | |
| } | |
| for i := 0; i < int(o.partitionNum); i++ { | |
| w.progresses[i] = newPartitionProgress(int32(i), decoder) | |
| } |
| // for DDL a / b received in the order, a.CommitTs < b.CommitTs should be true. | ||
| func (w *writer) appendDDL(ddl *model.DDLEvent, offset kafka.Offset) { | ||
| // DDL CommitTs fallback, just crash it to indicate the bug. | ||
| if w.ddlWithMaxCommitTs != nil && ddl.CommitTs < w.ddlWithMaxCommitTs.CommitTs { |
| if o.upstreamTiDBDSN != "" { | ||
| db, err = openDB(ctx, o.upstreamTiDBDSN) | ||
| if err != nil { | ||
| log.Panic("cannot open the upstream TiDB, handle key only enabled", |
There was a problem hiding this comment.
|
@ti-chi-bot: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12596
What problem does this PR solve?
Issue Number: close #12595
What is changed and how it works?
cmd/kafka-consumeras duplicate delivery instead of a fatal errorCheck List
Tests
Questions
Will it cause performance regression or break compatibility?
No. This only makes the standalone Kafka consumer tolerate duplicate MQ delivery in line with TiCDC's at-least-once behavior.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note