From df84e3d2b532373877cd01c011e6a0628b0ec17f Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Wed, 8 Apr 2026 18:07:39 +0800 Subject: [PATCH 1/3] fix --- cmd/kafka-consumer/writer.go | 28 +++++++--- cmd/kafka-consumer/writer_test.go | 86 +++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 8 deletions(-) create mode 100644 cmd/kafka-consumer/writer_test.go diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index ffc8ff0629..6132b91719 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -99,13 +99,10 @@ func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Of zap.Uint64("watermark", newWatermark)) return } - if offset > p.watermarkOffset { - log.Panic("partition resolved ts fallback", - zap.Int32("partition", p.partition), - zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) - } - log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message", + // TiCDC only guarantees at-least-once delivery. A replayed resolved event can + // be appended to Kafka again with a newer offset, so any fallback watermark + // must be treated as a duplicate instead of a fatal ordering bug. + log.Warn("partition resolved ts fall back, ignore it since duplicate delivery may replay an old resolved ts", zap.Int32("partition", p.partition), zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) @@ -129,6 +126,21 @@ type writer struct { eventRouter *dispatcher.EventRouter } +// isEquivalentDDLEvent checks whether two DDL events represent the same logical +// DDL, even if they were decoded from different replayed Kafka messages. +// Seq keeps split DDLs with the same commitTs distinct. +func isEquivalentDDLEvent(left, right *model.DDLEvent) bool { + if left == nil || right == nil { + return left == right + } + return left.StartTs == right.StartTs && + left.CommitTs == right.CommitTs && + left.Query == right.Query && + left.Seq == right.Seq && + left.Type == right.Type && + left.IsBootstrap == right.IsBootstrap +} + func newWriter(ctx context.Context, o *option) *writer { w := &writer{ option: o, @@ -204,7 +216,7 @@ func (w *writer) appendDDL(ddl *model.DDLEvent, offset kafka.Offset) { // A rename tables DDL job contains multiple DDL events with same CommitTs. // So to tell if a DDL is redundant or not, we must check the equivalence of // the current DDL and the DDL with max CommitTs. - if ddl == w.ddlWithMaxCommitTs { + if isEquivalentDDLEvent(ddl, w.ddlWithMaxCommitTs) { log.Warn("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) return diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go new file mode 100644 index 0000000000..40c2e79492 --- /dev/null +++ b/cmd/kafka-consumer/writer_test.go @@ -0,0 +1,86 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestUpdateWatermarkIgnoresFallbackFromNewerOffset(t *testing.T) { + // Scenario: at-least-once delivery replays an older resolved event with a newer Kafka offset. + // Steps: advance the partition watermark, then feed a smaller resolved ts from a larger offset. + // Expectation: the consumer keeps the existing watermark instead of panicking. + progress := &partitionProgress{partition: 1} + progress.updateWatermark(120, kafka.Offset(10)) + + require.NotPanics(t, func() { + progress.updateWatermark(100, kafka.Offset(20)) + }) + require.Equal(t, uint64(120), progress.watermark) + require.Equal(t, kafka.Offset(10), progress.watermarkOffset) +} + +func TestAppendDDLIgnoresEquivalentReplay(t *testing.T) { + // Scenario: Kafka replays the same DDL as a freshly decoded event object. + // Steps: append one DDL, then append a separate object with identical logical fields. + // Expectation: the writer keeps only one pending DDL. + w := &writer{} + ddl := &model.DDLEvent{ + StartTs: 100, + CommitTs: 120, + Query: "create table t(id int primary key)", + Seq: 1, + } + dup := &model.DDLEvent{ + StartTs: 100, + CommitTs: 120, + Query: "create table t(id int primary key)", + Seq: 1, + } + + w.appendDDL(ddl, kafka.Offset(1)) + w.appendDDL(dup, kafka.Offset(2)) + + require.Len(t, w.ddlList, 1) + require.Same(t, ddl, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLKeepsSplitDDLSequenceDistinct(t *testing.T) { + // Scenario: one logical DDL job is split into multiple DDL events with the same commitTs. + // Steps: append two DDLs whose startTs, commitTs, and query text match but Seq differs. + // Expectation: both events remain queued because Seq is part of the DDL identity. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 200, + CommitTs: 220, + Query: "rename tables split ddl", + Seq: 1, + } + second := &model.DDLEvent{ + StartTs: 200, + CommitTs: 220, + Query: "rename tables split ddl", + Seq: 2, + } + + w.appendDDL(first, kafka.Offset(3)) + w.appendDDL(second, kafka.Offset(4)) + + require.Len(t, w.ddlList, 2) + require.Same(t, second, w.ddlWithMaxCommitTs) +} From e430765c72b4473490b2523b92af4c957211695a Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Thu, 9 Apr 2026 16:13:12 +0800 Subject: [PATCH 2/3] fix appendDDL --- cmd/kafka-consumer/writer.go | 52 +++++++++++++++--------- cmd/kafka-consumer/writer_test.go | 66 +++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 19 deletions(-) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 6132b91719..c252561d36 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -117,7 +117,11 @@ type writer struct { ddlList []*model.DDLEvent ddlWithMaxCommitTs *model.DDLEvent - ddlSink ddlsink.Sink + // ddlKeysWithMaxCommitTs records every logical DDL seen at the current + // maximum CommitTs, so replayed prefixes of split DDL sequences can be + // ignored without collapsing distinct DDLs that share the same CommitTs. + ddlKeysWithMaxCommitTs map[ddlEventKey]struct{} + ddlSink ddlsink.Sink // sinkFactory is used to create table sink for each table. sinkFactory *eventsinkfactory.SinkFactory @@ -126,19 +130,23 @@ type writer struct { eventRouter *dispatcher.EventRouter } -// isEquivalentDDLEvent checks whether two DDL events represent the same logical -// DDL, even if they were decoded from different replayed Kafka messages. -// Seq keeps split DDLs with the same commitTs distinct. -func isEquivalentDDLEvent(left, right *model.DDLEvent) bool { - if left == nil || right == nil { - return left == right - } - return left.StartTs == right.StartTs && - left.CommitTs == right.CommitTs && - left.Query == right.Query && - left.Seq == right.Seq && - left.Type == right.Type && - left.IsBootstrap == right.IsBootstrap +// ddlEventKey identifies a logical DDL even if the Kafka replay decodes it +// into a fresh DDLEvent object. The MQ codecs preserve StartTs/CommitTs/Query/Seq, +// which is enough to distinguish split DDLs while still recognizing replays. +type ddlEventKey struct { + startTs uint64 + commitTs uint64 + query string + seq uint64 +} + +func newDDLEventKey(ddl *model.DDLEvent) ddlEventKey { + return ddlEventKey{ + startTs: ddl.StartTs, + commitTs: ddl.CommitTs, + query: ddl.Query, + seq: ddl.Seq, + } } func newWriter(ctx context.Context, o *option) *writer { @@ -213,17 +221,23 @@ func (w *writer) appendDDL(ddl *model.DDLEvent, offset kafka.Offset) { return } - // A rename tables DDL job contains multiple DDL events with same CommitTs. - // So to tell if a DDL is redundant or not, we must check the equivalence of - // the current DDL and the DDL with max CommitTs. - if isEquivalentDDLEvent(ddl, w.ddlWithMaxCommitTs) { - log.Warn("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", + ddlKey := newDDLEventKey(ddl) + if w.ddlWithMaxCommitTs == nil || ddl.CommitTs > w.ddlWithMaxCommitTs.CommitTs { + w.ddlKeysWithMaxCommitTs = make(map[ddlEventKey]struct{}) + } + + // The DDL with max CommitTs may be one event in a split DDL job, so we must + // remember every logical DDL already seen at that CommitTs rather than only + // comparing against the last decoded event object. + if _, duplicated := w.ddlKeysWithMaxCommitTs[ddlKey]; duplicated { + log.Warn("ignore redundant DDL, the DDL has already been seen at max CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) return } w.ddlList = append(w.ddlList, ddl) w.ddlWithMaxCommitTs = ddl + w.ddlKeysWithMaxCommitTs[ddlKey] = struct{}{} log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) } diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go index 40c2e79492..cd1ff784f8 100644 --- a/cmd/kafka-consumer/writer_test.go +++ b/cmd/kafka-consumer/writer_test.go @@ -84,3 +84,69 @@ func TestAppendDDLKeepsSplitDDLSequenceDistinct(t *testing.T) { require.Len(t, w.ddlList, 2) require.Same(t, second, w.ddlWithMaxCommitTs) } + +func TestAppendDDLIgnoresReplayedSplitDDLSequence(t *testing.T) { + // Scenario: Kafka replays an already queued split DDL sequence after later parts were seen. + // Steps: append the original split DDLs, then replay the same logical DDLs as fresh objects. + // Expectation: the queue keeps only the original sequence and does not append duplicates. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t1 to test.t2", + Seq: 0, + } + second := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t3 to test.t4", + Seq: 1, + } + replayFirst := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t1 to test.t2", + Seq: 0, + } + replaySecond := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t3 to test.t4", + Seq: 1, + } + + w.appendDDL(first, kafka.Offset(10)) + w.appendDDL(second, kafka.Offset(11)) + w.appendDDL(replayFirst, kafka.Offset(12)) + w.appendDDL(replaySecond, kafka.Offset(13)) + + require.Len(t, w.ddlList, 2) + require.Same(t, first, w.ddlList[0]) + require.Same(t, second, w.ddlList[1]) + require.Same(t, second, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLKeepsDifferentDDLsWithSameCommitTsWithoutSeq(t *testing.T) { + // Scenario: a multi-DDL job can emit different DDLs that share the same commitTs while Seq stays at zero. + // Steps: append two distinct DDLs with the same timestamps but different queries and default Seq. + // Expectation: both DDLs remain queued because Query is still part of the logical DDL identity. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 400, + CommitTs: 420, + Query: "create table test.t1(id int primary key)", + } + second := &model.DDLEvent{ + StartTs: 400, + CommitTs: 420, + Query: "create table test.t2(id int primary key)", + } + + w.appendDDL(first, kafka.Offset(20)) + w.appendDDL(second, kafka.Offset(21)) + + require.Len(t, w.ddlList, 2) + require.Same(t, first, w.ddlList[0]) + require.Same(t, second, w.ddlList[1]) + require.Same(t, second, w.ddlWithMaxCommitTs) +} From 8e122016d7aaf577d81b0a4326f32f358fd86f05 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Tue, 14 Apr 2026 19:29:28 +0800 Subject: [PATCH 3/3] kafka-consumer(ticdc): warn resolved fallback from newer offset --- cmd/kafka-consumer/writer.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index c252561d36..cccab9372b 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -99,13 +99,23 @@ func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Of zap.Uint64("watermark", newWatermark)) return } - // TiCDC only guarantees at-least-once delivery. A replayed resolved event can - // be appended to Kafka again with a newer offset, so any fallback watermark - // must be treated as a duplicate instead of a fatal ordering bug. - log.Warn("partition resolved ts fall back, ignore it since duplicate delivery may replay an old resolved ts", + fields := []zap.Field{ zap.Int32("partition", p.partition), - zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) + zap.Uint64("newWatermark", newWatermark), + zap.Any("offset", offset), + zap.Uint64("watermark", watermark), + zap.Any("watermarkOffset", p.watermarkOffset), + } + + // TiCDC only guarantees at-least-once delivery. Duplicate MQ delivery can + // replay old resolved/checkpoint markers, making the resolved ts appear to + // fall back. This is unexpected but tolerable, so the consumer keeps the + // larger watermark. + if offset > p.watermarkOffset { + log.Warn("partition resolved ts fall back from newer offset: unexpected but tolerable under at-least-once delivery, ignore it", fields...) + return + } + log.Warn("partition resolved ts fall back, ignore it since consumer read old offset message", fields...) } func (p *partitionProgress) loadWatermark() uint64 {