Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 91 additions & 5 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,74 @@ type partitionProgress struct {
// tableSinkMap -> [tableID]tableSink
tableSinkMap sync.Map

<<<<<<< HEAD
eventGroups map[int64]*eventsGroup
decoder codec.RowEventDecoder
=======
tableSinkMap map[model.TableID]tablesink.TableSink
eventGroups map[model.TableID]*eventsGroup
decoder codec.RowEventDecoder
}
Comment on lines +80 to +87
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found in partitionProgress. Additionally, the struct is missing the partition field which is initialized in newPartitionProgress (line 91) and used in updateWatermark (line 103). Please resolve the conflicts and add the missing field to ensure the code compiles and functions correctly.


func newPartitionProgress(partition int32, decoder codec.RowEventDecoder) *partitionProgress {
return &partitionProgress{
partition: partition,
eventGroups: make(map[model.TableID]*eventsGroup),
tableSinkMap: make(map[model.TableID]tablesink.TableSink),
decoder: decoder,
}
}

func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Offset) {
watermark := p.loadWatermark()
if newWatermark >= watermark {
p.watermark = newWatermark
p.watermarkOffset = offset
Comment on lines +101 to +102
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Direct assignment to p.watermark and p.watermarkOffset causes a data race. These fields are accessed concurrently by other goroutines (e.g., in getMinWatermark and checkOldMessage) using atomic operations or direct reads. Use atomic operations to ensure thread safety.

		atomic.StoreUint64(&p.watermark, newWatermark)
		atomic.StoreInt64((*int64)(&p.watermarkOffset), int64(offset))

log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset),
zap.Uint64("watermark", newWatermark))
return
}
fields := []zap.Field{
zap.Int32("partition", p.partition),
zap.Uint64("newWatermark", newWatermark),
zap.Any("offset", offset),
zap.Uint64("watermark", watermark),
zap.Any("watermarkOffset", p.watermarkOffset),
}

// TiCDC only guarantees at-least-once delivery. Duplicate MQ delivery can
// replay old resolved/checkpoint markers, making the resolved ts appear to
// fall back. This is unexpected but tolerable, so the consumer keeps the
// larger watermark.
if offset > p.watermarkOffset {
log.Warn("partition resolved ts fall back from newer offset: unexpected but tolerable under at-least-once delivery, ignore it", fields...)
return
}
log.Warn("partition resolved ts fall back, ignore it since consumer read old offset message", fields...)
}

func (p *partitionProgress) loadWatermark() uint64 {
return p.watermark
>>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596))
}
Comment on lines +126 to 129
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The loadWatermark method should use atomic.LoadUint64 to safely read the watermark value, as it is updated concurrently. Additionally, the conflict marker at line 128 must be removed.

func (p *partitionProgress) loadWatermark() uint64 {
	return atomic.LoadUint64(&p.watermark)
}


type writer struct {
option *option

<<<<<<< HEAD
ddlList []*model.DDLEvent
ddlWithMaxCommitTs *model.DDLEvent
ddlSink ddlsink.Sink
fakeTableIDGenerator *fakeTableIDGenerator
=======
ddlList []*model.DDLEvent
ddlWithMaxCommitTs *model.DDLEvent
// ddlKeysWithMaxCommitTs records every logical DDL seen at the current
// maximum CommitTs, so replayed prefixes of split DDL sequences can be
// ignored without collapsing distinct DDLs that share the same CommitTs.
ddlKeysWithMaxCommitTs map[ddlEventKey]struct{}
ddlSink ddlsink.Sink
>>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596))
Comment on lines +134 to +147
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers found in the writer struct definition. This will prevent the code from compiling and indicates an incomplete cherry-pick process.


// sinkFactory is used to create table sink for each table.
sinkFactory *eventsinkfactory.SinkFactory
Expand All @@ -96,6 +153,25 @@ type writer struct {
eventRouter *dispatcher.EventRouter
}

// ddlEventKey identifies a logical DDL even if the Kafka replay decodes it
// into a fresh DDLEvent object. The MQ codecs preserve StartTs/CommitTs/Query/Seq,
// which is enough to distinguish split DDLs while still recognizing replays.
type ddlEventKey struct {
startTs uint64
commitTs uint64
query string
seq uint64
}

func newDDLEventKey(ddl *model.DDLEvent) ddlEventKey {
return ddlEventKey{
startTs: ddl.StartTs,
commitTs: ddl.CommitTs,
query: ddl.Query,
seq: ddl.Seq,
}
}

func newWriter(ctx context.Context, o *option) *writer {
w := &writer{
option: o,
Expand Down Expand Up @@ -174,17 +250,27 @@ func (w *writer) appendDDL(ddl *model.DDLEvent) {
return
}

// A rename tables DDL job contains multiple DDL events with same CommitTs.
// So to tell if a DDL is redundant or not, we must check the equivalence of
// the current DDL and the DDL with max CommitTs.
if ddl == w.ddlWithMaxCommitTs {
log.Warn("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs",
ddlKey := newDDLEventKey(ddl)
if w.ddlWithMaxCommitTs == nil || ddl.CommitTs > w.ddlWithMaxCommitTs.CommitTs {
w.ddlKeysWithMaxCommitTs = make(map[ddlEventKey]struct{})
}

// The DDL with max CommitTs may be one event in a split DDL job, so we must
// remember every logical DDL already seen at that CommitTs rather than only
// comparing against the last decoded event object.
if _, duplicated := w.ddlKeysWithMaxCommitTs[ddlKey]; duplicated {
log.Warn("ignore redundant DDL, the DDL has already been seen at max CommitTs",
zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query))
return
}

w.ddlList = append(w.ddlList, ddl)
w.ddlWithMaxCommitTs = ddl
<<<<<<< HEAD
=======
w.ddlKeysWithMaxCommitTs[ddlKey] = struct{}{}
log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query))
>>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596))
Comment on lines +269 to +273
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers in appendDDL. The logic should be resolved to correctly update the deduplication map and log the DDL event using the provided offset.

	w.ddlKeysWithMaxCommitTs[ddlKey] = struct{}{}
	log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query))

}

func (w *writer) getFrontDDL() *model.DDLEvent {
Expand Down
152 changes: 152 additions & 0 deletions cmd/kafka-consumer/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"testing"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestUpdateWatermarkIgnoresFallbackFromNewerOffset(t *testing.T) {
// Scenario: at-least-once delivery replays an older resolved event with a newer Kafka offset.
// Steps: advance the partition watermark, then feed a smaller resolved ts from a larger offset.
// Expectation: the consumer keeps the existing watermark instead of panicking.
progress := &partitionProgress{partition: 1}
progress.updateWatermark(120, kafka.Offset(10))

require.NotPanics(t, func() {
progress.updateWatermark(100, kafka.Offset(20))
})
require.Equal(t, uint64(120), progress.watermark)
require.Equal(t, kafka.Offset(10), progress.watermarkOffset)
}

func TestAppendDDLIgnoresEquivalentReplay(t *testing.T) {
// Scenario: Kafka replays the same DDL as a freshly decoded event object.
// Steps: append one DDL, then append a separate object with identical logical fields.
// Expectation: the writer keeps only one pending DDL.
w := &writer{}
ddl := &model.DDLEvent{
StartTs: 100,
CommitTs: 120,
Query: "create table t(id int primary key)",
Seq: 1,
}
dup := &model.DDLEvent{
StartTs: 100,
CommitTs: 120,
Query: "create table t(id int primary key)",
Seq: 1,
}

w.appendDDL(ddl, kafka.Offset(1))
w.appendDDL(dup, kafka.Offset(2))

require.Len(t, w.ddlList, 1)
require.Same(t, ddl, w.ddlWithMaxCommitTs)
}

func TestAppendDDLKeepsSplitDDLSequenceDistinct(t *testing.T) {
// Scenario: one logical DDL job is split into multiple DDL events with the same commitTs.
// Steps: append two DDLs whose startTs, commitTs, and query text match but Seq differs.
// Expectation: both events remain queued because Seq is part of the DDL identity.
w := &writer{}
first := &model.DDLEvent{
StartTs: 200,
CommitTs: 220,
Query: "rename tables split ddl",
Seq: 1,
}
second := &model.DDLEvent{
StartTs: 200,
CommitTs: 220,
Query: "rename tables split ddl",
Seq: 2,
}

w.appendDDL(first, kafka.Offset(3))
w.appendDDL(second, kafka.Offset(4))

require.Len(t, w.ddlList, 2)
require.Same(t, second, w.ddlWithMaxCommitTs)
}

func TestAppendDDLIgnoresReplayedSplitDDLSequence(t *testing.T) {
// Scenario: Kafka replays an already queued split DDL sequence after later parts were seen.
// Steps: append the original split DDLs, then replay the same logical DDLs as fresh objects.
// Expectation: the queue keeps only the original sequence and does not append duplicates.
w := &writer{}
first := &model.DDLEvent{
StartTs: 300,
CommitTs: 320,
Query: "rename table test.t1 to test.t2",
Seq: 0,
}
second := &model.DDLEvent{
StartTs: 300,
CommitTs: 320,
Query: "rename table test.t3 to test.t4",
Seq: 1,
}
replayFirst := &model.DDLEvent{
StartTs: 300,
CommitTs: 320,
Query: "rename table test.t1 to test.t2",
Seq: 0,
}
replaySecond := &model.DDLEvent{
StartTs: 300,
CommitTs: 320,
Query: "rename table test.t3 to test.t4",
Seq: 1,
}

w.appendDDL(first, kafka.Offset(10))
w.appendDDL(second, kafka.Offset(11))
w.appendDDL(replayFirst, kafka.Offset(12))
w.appendDDL(replaySecond, kafka.Offset(13))

require.Len(t, w.ddlList, 2)
require.Same(t, first, w.ddlList[0])
require.Same(t, second, w.ddlList[1])
require.Same(t, second, w.ddlWithMaxCommitTs)
}

func TestAppendDDLKeepsDifferentDDLsWithSameCommitTsWithoutSeq(t *testing.T) {
// Scenario: a multi-DDL job can emit different DDLs that share the same commitTs while Seq stays at zero.
// Steps: append two distinct DDLs with the same timestamps but different queries and default Seq.
// Expectation: both DDLs remain queued because Query is still part of the logical DDL identity.
w := &writer{}
first := &model.DDLEvent{
StartTs: 400,
CommitTs: 420,
Query: "create table test.t1(id int primary key)",
}
second := &model.DDLEvent{
StartTs: 400,
CommitTs: 420,
Query: "create table test.t2(id int primary key)",
}

w.appendDDL(first, kafka.Offset(20))
w.appendDDL(second, kafka.Offset(21))

require.Len(t, w.ddlList, 2)
require.Same(t, first, w.ddlList[0])
require.Same(t, second, w.ddlList[1])
require.Same(t, second, w.ddlWithMaxCommitTs)
}