From 8b75c4df8c3c2e9d7154f3d87741b08f28cc304e Mon Sep 17 00:00:00 2001 From: wlwilliamx <53336371+wlwilliamx@users.noreply.github.com> Date: Fri, 24 Apr 2026 16:50:57 +0800 Subject: [PATCH] This is an automated cherry-pick of #12596 Signed-off-by: ti-chi-bot --- cmd/kafka-consumer/writer.go | 96 ++++++++++++++++++- cmd/kafka-consumer/writer_test.go | 152 ++++++++++++++++++++++++++++++ 2 files changed, 243 insertions(+), 5 deletions(-) create mode 100644 cmd/kafka-consumer/writer_test.go diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index a277348605..8f5ce08908 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -77,17 +77,74 @@ type partitionProgress struct { // tableSinkMap -> [tableID]tableSink tableSinkMap sync.Map +<<<<<<< HEAD eventGroups map[int64]*eventsGroup decoder codec.RowEventDecoder +======= + tableSinkMap map[model.TableID]tablesink.TableSink + eventGroups map[model.TableID]*eventsGroup + decoder codec.RowEventDecoder +} + +func newPartitionProgress(partition int32, decoder codec.RowEventDecoder) *partitionProgress { + return &partitionProgress{ + partition: partition, + eventGroups: make(map[model.TableID]*eventsGroup), + tableSinkMap: make(map[model.TableID]tablesink.TableSink), + decoder: decoder, + } +} + +func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Offset) { + watermark := p.loadWatermark() + if newWatermark >= watermark { + p.watermark = newWatermark + p.watermarkOffset = offset + log.Info("watermark received", zap.Int32("partition", p.partition), zap.Any("offset", offset), + zap.Uint64("watermark", newWatermark)) + return + } + fields := []zap.Field{ + zap.Int32("partition", p.partition), + zap.Uint64("newWatermark", newWatermark), + zap.Any("offset", offset), + zap.Uint64("watermark", watermark), + zap.Any("watermarkOffset", p.watermarkOffset), + } + + // TiCDC only guarantees at-least-once delivery. Duplicate MQ delivery can + // replay old resolved/checkpoint markers, making the resolved ts appear to + // fall back. This is unexpected but tolerable, so the consumer keeps the + // larger watermark. + if offset > p.watermarkOffset { + log.Warn("partition resolved ts fall back from newer offset: unexpected but tolerable under at-least-once delivery, ignore it", fields...) + return + } + log.Warn("partition resolved ts fall back, ignore it since consumer read old offset message", fields...) +} + +func (p *partitionProgress) loadWatermark() uint64 { + return p.watermark +>>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)) } type writer struct { option *option +<<<<<<< HEAD ddlList []*model.DDLEvent ddlWithMaxCommitTs *model.DDLEvent ddlSink ddlsink.Sink fakeTableIDGenerator *fakeTableIDGenerator +======= + ddlList []*model.DDLEvent + ddlWithMaxCommitTs *model.DDLEvent + // ddlKeysWithMaxCommitTs records every logical DDL seen at the current + // maximum CommitTs, so replayed prefixes of split DDL sequences can be + // ignored without collapsing distinct DDLs that share the same CommitTs. + ddlKeysWithMaxCommitTs map[ddlEventKey]struct{} + ddlSink ddlsink.Sink +>>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)) // sinkFactory is used to create table sink for each table. sinkFactory *eventsinkfactory.SinkFactory @@ -96,6 +153,25 @@ type writer struct { eventRouter *dispatcher.EventRouter } +// ddlEventKey identifies a logical DDL even if the Kafka replay decodes it +// into a fresh DDLEvent object. The MQ codecs preserve StartTs/CommitTs/Query/Seq, +// which is enough to distinguish split DDLs while still recognizing replays. +type ddlEventKey struct { + startTs uint64 + commitTs uint64 + query string + seq uint64 +} + +func newDDLEventKey(ddl *model.DDLEvent) ddlEventKey { + return ddlEventKey{ + startTs: ddl.StartTs, + commitTs: ddl.CommitTs, + query: ddl.Query, + seq: ddl.Seq, + } +} + func newWriter(ctx context.Context, o *option) *writer { w := &writer{ option: o, @@ -174,17 +250,27 @@ func (w *writer) appendDDL(ddl *model.DDLEvent) { return } - // A rename tables DDL job contains multiple DDL events with same CommitTs. - // So to tell if a DDL is redundant or not, we must check the equivalence of - // the current DDL and the DDL with max CommitTs. - if ddl == w.ddlWithMaxCommitTs { - log.Warn("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", + ddlKey := newDDLEventKey(ddl) + if w.ddlWithMaxCommitTs == nil || ddl.CommitTs > w.ddlWithMaxCommitTs.CommitTs { + w.ddlKeysWithMaxCommitTs = make(map[ddlEventKey]struct{}) + } + + // The DDL with max CommitTs may be one event in a split DDL job, so we must + // remember every logical DDL already seen at that CommitTs rather than only + // comparing against the last decoded event object. + if _, duplicated := w.ddlKeysWithMaxCommitTs[ddlKey]; duplicated { + log.Warn("ignore redundant DDL, the DDL has already been seen at max CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) return } w.ddlList = append(w.ddlList, ddl) w.ddlWithMaxCommitTs = ddl +<<<<<<< HEAD +======= + w.ddlKeysWithMaxCommitTs[ddlKey] = struct{}{} + log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) +>>>>>>> 431c2afbed (kafka-consumer(ticdc): tolerate replayed resolved and DDL events (#12596)) } func (w *writer) getFrontDDL() *model.DDLEvent { diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go new file mode 100644 index 0000000000..cd1ff784f8 --- /dev/null +++ b/cmd/kafka-consumer/writer_test.go @@ -0,0 +1,152 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestUpdateWatermarkIgnoresFallbackFromNewerOffset(t *testing.T) { + // Scenario: at-least-once delivery replays an older resolved event with a newer Kafka offset. + // Steps: advance the partition watermark, then feed a smaller resolved ts from a larger offset. + // Expectation: the consumer keeps the existing watermark instead of panicking. + progress := &partitionProgress{partition: 1} + progress.updateWatermark(120, kafka.Offset(10)) + + require.NotPanics(t, func() { + progress.updateWatermark(100, kafka.Offset(20)) + }) + require.Equal(t, uint64(120), progress.watermark) + require.Equal(t, kafka.Offset(10), progress.watermarkOffset) +} + +func TestAppendDDLIgnoresEquivalentReplay(t *testing.T) { + // Scenario: Kafka replays the same DDL as a freshly decoded event object. + // Steps: append one DDL, then append a separate object with identical logical fields. + // Expectation: the writer keeps only one pending DDL. + w := &writer{} + ddl := &model.DDLEvent{ + StartTs: 100, + CommitTs: 120, + Query: "create table t(id int primary key)", + Seq: 1, + } + dup := &model.DDLEvent{ + StartTs: 100, + CommitTs: 120, + Query: "create table t(id int primary key)", + Seq: 1, + } + + w.appendDDL(ddl, kafka.Offset(1)) + w.appendDDL(dup, kafka.Offset(2)) + + require.Len(t, w.ddlList, 1) + require.Same(t, ddl, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLKeepsSplitDDLSequenceDistinct(t *testing.T) { + // Scenario: one logical DDL job is split into multiple DDL events with the same commitTs. + // Steps: append two DDLs whose startTs, commitTs, and query text match but Seq differs. + // Expectation: both events remain queued because Seq is part of the DDL identity. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 200, + CommitTs: 220, + Query: "rename tables split ddl", + Seq: 1, + } + second := &model.DDLEvent{ + StartTs: 200, + CommitTs: 220, + Query: "rename tables split ddl", + Seq: 2, + } + + w.appendDDL(first, kafka.Offset(3)) + w.appendDDL(second, kafka.Offset(4)) + + require.Len(t, w.ddlList, 2) + require.Same(t, second, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLIgnoresReplayedSplitDDLSequence(t *testing.T) { + // Scenario: Kafka replays an already queued split DDL sequence after later parts were seen. + // Steps: append the original split DDLs, then replay the same logical DDLs as fresh objects. + // Expectation: the queue keeps only the original sequence and does not append duplicates. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t1 to test.t2", + Seq: 0, + } + second := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t3 to test.t4", + Seq: 1, + } + replayFirst := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t1 to test.t2", + Seq: 0, + } + replaySecond := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t3 to test.t4", + Seq: 1, + } + + w.appendDDL(first, kafka.Offset(10)) + w.appendDDL(second, kafka.Offset(11)) + w.appendDDL(replayFirst, kafka.Offset(12)) + w.appendDDL(replaySecond, kafka.Offset(13)) + + require.Len(t, w.ddlList, 2) + require.Same(t, first, w.ddlList[0]) + require.Same(t, second, w.ddlList[1]) + require.Same(t, second, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLKeepsDifferentDDLsWithSameCommitTsWithoutSeq(t *testing.T) { + // Scenario: a multi-DDL job can emit different DDLs that share the same commitTs while Seq stays at zero. + // Steps: append two distinct DDLs with the same timestamps but different queries and default Seq. + // Expectation: both DDLs remain queued because Query is still part of the logical DDL identity. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 400, + CommitTs: 420, + Query: "create table test.t1(id int primary key)", + } + second := &model.DDLEvent{ + StartTs: 400, + CommitTs: 420, + Query: "create table test.t2(id int primary key)", + } + + w.appendDDL(first, kafka.Offset(20)) + w.appendDDL(second, kafka.Offset(21)) + + require.Len(t, w.ddlList, 2) + require.Same(t, first, w.ddlList[0]) + require.Same(t, second, w.ddlList[1]) + require.Same(t, second, w.ddlWithMaxCommitTs) +}