diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index ffc8ff0629..cccab9372b 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -99,16 +99,23 @@ func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Of zap.Uint64("watermark", newWatermark)) return } + fields := []zap.Field{ + zap.Int32("partition", p.partition), + zap.Uint64("newWatermark", newWatermark), + zap.Any("offset", offset), + zap.Uint64("watermark", watermark), + zap.Any("watermarkOffset", p.watermarkOffset), + } + + // TiCDC only guarantees at-least-once delivery. Duplicate MQ delivery can + // replay old resolved/checkpoint markers, making the resolved ts appear to + // fall back. This is unexpected but tolerable, so the consumer keeps the + // larger watermark. if offset > p.watermarkOffset { - log.Panic("partition resolved ts fallback", - zap.Int32("partition", p.partition), - zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) + log.Warn("partition resolved ts fall back from newer offset: unexpected but tolerable under at-least-once delivery, ignore it", fields...) + return } - log.Warn("partition resolved ts fall back, ignore it, since consumer read old offset message", - zap.Int32("partition", p.partition), - zap.Uint64("newWatermark", newWatermark), zap.Any("offset", offset), - zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", p.watermarkOffset)) + log.Warn("partition resolved ts fall back, ignore it since consumer read old offset message", fields...) } func (p *partitionProgress) loadWatermark() uint64 { @@ -120,7 +127,11 @@ type writer struct { ddlList []*model.DDLEvent ddlWithMaxCommitTs *model.DDLEvent - ddlSink ddlsink.Sink + // ddlKeysWithMaxCommitTs records every logical DDL seen at the current + // maximum CommitTs, so replayed prefixes of split DDL sequences can be + // ignored without collapsing distinct DDLs that share the same CommitTs. + ddlKeysWithMaxCommitTs map[ddlEventKey]struct{} + ddlSink ddlsink.Sink // sinkFactory is used to create table sink for each table. sinkFactory *eventsinkfactory.SinkFactory @@ -129,6 +140,25 @@ type writer struct { eventRouter *dispatcher.EventRouter } +// ddlEventKey identifies a logical DDL even if the Kafka replay decodes it +// into a fresh DDLEvent object. The MQ codecs preserve StartTs/CommitTs/Query/Seq, +// which is enough to distinguish split DDLs while still recognizing replays. +type ddlEventKey struct { + startTs uint64 + commitTs uint64 + query string + seq uint64 +} + +func newDDLEventKey(ddl *model.DDLEvent) ddlEventKey { + return ddlEventKey{ + startTs: ddl.StartTs, + commitTs: ddl.CommitTs, + query: ddl.Query, + seq: ddl.Seq, + } +} + func newWriter(ctx context.Context, o *option) *writer { w := &writer{ option: o, @@ -201,17 +231,23 @@ func (w *writer) appendDDL(ddl *model.DDLEvent, offset kafka.Offset) { return } - // A rename tables DDL job contains multiple DDL events with same CommitTs. - // So to tell if a DDL is redundant or not, we must check the equivalence of - // the current DDL and the DDL with max CommitTs. - if ddl == w.ddlWithMaxCommitTs { - log.Warn("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", + ddlKey := newDDLEventKey(ddl) + if w.ddlWithMaxCommitTs == nil || ddl.CommitTs > w.ddlWithMaxCommitTs.CommitTs { + w.ddlKeysWithMaxCommitTs = make(map[ddlEventKey]struct{}) + } + + // The DDL with max CommitTs may be one event in a split DDL job, so we must + // remember every logical DDL already seen at that CommitTs rather than only + // comparing against the last decoded event object. + if _, duplicated := w.ddlKeysWithMaxCommitTs[ddlKey]; duplicated { + log.Warn("ignore redundant DDL, the DDL has already been seen at max CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) return } w.ddlList = append(w.ddlList, ddl) w.ddlWithMaxCommitTs = ddl + w.ddlKeysWithMaxCommitTs[ddlKey] = struct{}{} log.Info("DDL message received", zap.Any("offset", offset), zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) } diff --git a/cmd/kafka-consumer/writer_test.go b/cmd/kafka-consumer/writer_test.go new file mode 100644 index 0000000000..cd1ff784f8 --- /dev/null +++ b/cmd/kafka-consumer/writer_test.go @@ -0,0 +1,152 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestUpdateWatermarkIgnoresFallbackFromNewerOffset(t *testing.T) { + // Scenario: at-least-once delivery replays an older resolved event with a newer Kafka offset. + // Steps: advance the partition watermark, then feed a smaller resolved ts from a larger offset. + // Expectation: the consumer keeps the existing watermark instead of panicking. + progress := &partitionProgress{partition: 1} + progress.updateWatermark(120, kafka.Offset(10)) + + require.NotPanics(t, func() { + progress.updateWatermark(100, kafka.Offset(20)) + }) + require.Equal(t, uint64(120), progress.watermark) + require.Equal(t, kafka.Offset(10), progress.watermarkOffset) +} + +func TestAppendDDLIgnoresEquivalentReplay(t *testing.T) { + // Scenario: Kafka replays the same DDL as a freshly decoded event object. + // Steps: append one DDL, then append a separate object with identical logical fields. + // Expectation: the writer keeps only one pending DDL. + w := &writer{} + ddl := &model.DDLEvent{ + StartTs: 100, + CommitTs: 120, + Query: "create table t(id int primary key)", + Seq: 1, + } + dup := &model.DDLEvent{ + StartTs: 100, + CommitTs: 120, + Query: "create table t(id int primary key)", + Seq: 1, + } + + w.appendDDL(ddl, kafka.Offset(1)) + w.appendDDL(dup, kafka.Offset(2)) + + require.Len(t, w.ddlList, 1) + require.Same(t, ddl, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLKeepsSplitDDLSequenceDistinct(t *testing.T) { + // Scenario: one logical DDL job is split into multiple DDL events with the same commitTs. + // Steps: append two DDLs whose startTs, commitTs, and query text match but Seq differs. + // Expectation: both events remain queued because Seq is part of the DDL identity. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 200, + CommitTs: 220, + Query: "rename tables split ddl", + Seq: 1, + } + second := &model.DDLEvent{ + StartTs: 200, + CommitTs: 220, + Query: "rename tables split ddl", + Seq: 2, + } + + w.appendDDL(first, kafka.Offset(3)) + w.appendDDL(second, kafka.Offset(4)) + + require.Len(t, w.ddlList, 2) + require.Same(t, second, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLIgnoresReplayedSplitDDLSequence(t *testing.T) { + // Scenario: Kafka replays an already queued split DDL sequence after later parts were seen. + // Steps: append the original split DDLs, then replay the same logical DDLs as fresh objects. + // Expectation: the queue keeps only the original sequence and does not append duplicates. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t1 to test.t2", + Seq: 0, + } + second := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t3 to test.t4", + Seq: 1, + } + replayFirst := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t1 to test.t2", + Seq: 0, + } + replaySecond := &model.DDLEvent{ + StartTs: 300, + CommitTs: 320, + Query: "rename table test.t3 to test.t4", + Seq: 1, + } + + w.appendDDL(first, kafka.Offset(10)) + w.appendDDL(second, kafka.Offset(11)) + w.appendDDL(replayFirst, kafka.Offset(12)) + w.appendDDL(replaySecond, kafka.Offset(13)) + + require.Len(t, w.ddlList, 2) + require.Same(t, first, w.ddlList[0]) + require.Same(t, second, w.ddlList[1]) + require.Same(t, second, w.ddlWithMaxCommitTs) +} + +func TestAppendDDLKeepsDifferentDDLsWithSameCommitTsWithoutSeq(t *testing.T) { + // Scenario: a multi-DDL job can emit different DDLs that share the same commitTs while Seq stays at zero. + // Steps: append two distinct DDLs with the same timestamps but different queries and default Seq. + // Expectation: both DDLs remain queued because Query is still part of the logical DDL identity. + w := &writer{} + first := &model.DDLEvent{ + StartTs: 400, + CommitTs: 420, + Query: "create table test.t1(id int primary key)", + } + second := &model.DDLEvent{ + StartTs: 400, + CommitTs: 420, + Query: "create table test.t2(id int primary key)", + } + + w.appendDDL(first, kafka.Offset(20)) + w.appendDDL(second, kafka.Offset(21)) + + require.Len(t, w.ddlList, 2) + require.Same(t, first, w.ddlList[0]) + require.Same(t, second, w.ddlList[1]) + require.Same(t, second, w.ddlWithMaxCommitTs) +}