diff --git a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go index a47cd57c80..81591d701c 100644 --- a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go @@ -83,7 +83,6 @@ func NewKafkaDDLSink( topic, options.DeriveTopicConfig(), adminClient, - options.KeepConnAliveInterval, ) if err != nil { return nil, errors.Trace(err) @@ -113,7 +112,7 @@ func NewKafkaDDLSink( } ddlProducer := producerCreator(ctx, changefeedID, syncProducer) - s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol, syncProducer) + s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol) log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) return s, nil } diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink.go b/cdc/sink/ddlsink/mq/mq_ddl_sink.go index 0bf09117d9..deb0a767c8 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink.go @@ -72,8 +72,6 @@ type DDLSink struct { statistics *metrics.Statistics // admin is used to query kafka cluster information. admin kafka.ClusterAdminClient - // connRefresherForDDL is used to refresh the connection for DDL events. - connRefresherForDDL kafka.SyncProducer } func newDDLSink( @@ -84,18 +82,16 @@ func newDDLSink( eventRouter *dispatcher.EventRouter, encoder codec.RowEventEncoder, protocol config.Protocol, - connRefresherForDDL kafka.SyncProducer, ) *DDLSink { return &DDLSink{ - id: changefeedID, - protocol: protocol, - eventRouter: eventRouter, - topicManager: topicManager, - encoder: encoder, - producer: producer, - statistics: metrics.NewStatistics(changefeedID, sink.RowSink), - admin: adminClient, - connRefresherForDDL: connRefresherForDDL, + id: changefeedID, + protocol: protocol, + eventRouter: eventRouter, + topicManager: topicManager, + encoder: encoder, + producer: producer, + statistics: metrics.NewStatistics(changefeedID, sink.RowSink), + admin: adminClient, } } @@ -145,15 +141,6 @@ func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error func (k *DDLSink) WriteCheckpointTs(ctx context.Context, ts uint64, tables []*model.TableInfo, ) error { - // This operation is used to keep the kafka connection alive. - // For more details, see https://github.com/pingcap/tiflow/pull/12173 - if k.connRefresherForDDL != nil { - // The implementation is saramaSyncProducer.HeartbeatBrokers. And - // there is a keepConnAliveInterval in the saramaSyncProducer, so - // we don't need to worry about the heartbeat is too frequent. - k.connRefresherForDDL.HeartbeatBrokers() - } - var ( err error partitionNum int32 diff --git a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go index 2cf800f674..c3dfc873c1 100644 --- a/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go +++ b/cdc/sink/ddlsink/mq/mq_ddl_sink_test.go @@ -17,16 +17,12 @@ import ( "context" "fmt" "net/url" - "sync" "testing" mm "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/sink/codec" - "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) @@ -296,96 +292,3 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) { require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolSimple)) require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolDebezium)) } - -// mockSyncProducer is used to count the calls to HeartbeatBrokers. -type mockSyncProducer struct { - kafka.MockSaramaSyncProducer - heartbeatCount int - mu sync.Mutex -} - -func (m *mockSyncProducer) HeartbeatBrokers() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockSyncProducer) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -// mockEncoder is a mock implementation of codec.RowEventEncoder. -// It is used to prevent the test from needing to set up a real, complex encoder. -type mockEncoder struct{} - -// This line ensures at compile time that mockEncoder correctly implements the interface. -var _ codec.RowEventEncoder = (*mockEncoder)(nil) - -// A predefined error that our mock encoder will return. -var errMockEncoder = errors.New("mock encoder error") - -// EncodeCheckpointEvent returns a specific error to halt the execution -// of the function under test right after the heartbeat logic. -func (m *mockEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { - return nil, errMockEncoder -} - -// The following methods are part of the RowEventEncoder interface. -// They can be left with a minimal implementation as they are not called -// in the tested code path. -func (m *mockEncoder) AppendRowChangedEvent( - _ context.Context, _ string, _ *model.RowChangedEvent, _ func(), -) error { - return nil -} - -func (m *mockEncoder) Build() []*common.Message { - return nil -} - -func (m *mockEncoder) EncodeDDLEvent(event *model.DDLEvent) (*common.Message, error) { - return nil, nil -} -func (m *mockEncoder) SetMaxMessageBytes(bytes int) {} - -func TestDDLSinkHeartbeat(t *testing.T) { - t.Parallel() - - ctx := context.Background() - changefeedID := model.DefaultChangeFeedID("test-ddl-sink") - // Protocol is needed by newDDLSink but not used in the tested path. - proto := config.ProtocolOpen - encoder := &mockEncoder{} - - // Case 1: DDL Sink with a connection refresher. - t.Run("DDLSinkWithConnectionRefresher", func(t *testing.T) { - t.Parallel() - producer := &mockSyncProducer{} - // Other dependencies for newDDLSink can be nil as they are not used - // before our mock encoder returns an error. - ddlSink := newDDLSink(changefeedID, nil, nil, nil, nil, encoder, proto, producer) - - require.Equal(t, 0, producer.GetHeartbeatCount()) - - // WriteCheckpointTs should first call HeartbeatBrokers, then fail at the encoder. - err := ddlSink.WriteCheckpointTs(ctx, 12345, nil) - - // Assert that the heartbeat was called exactly once. - require.Equal(t, 1, producer.GetHeartbeatCount()) - // Assert that the function failed with the mock encoder's specific error. - require.ErrorIs(t, err, errMockEncoder) - }) - - // Case 2: DDLSink with a nil connection refresher (e.g., for Pulsar). - t.Run("DDLSinkWithNilConnectionRefresher", func(t *testing.T) { - t.Parallel() - // Create the sink with a nil refresher. - ddlSinkNilRefresher := newDDLSink(changefeedID, nil, nil, nil, nil, encoder, proto, nil) - - // The call should not panic and should fail with the mock encoder's error. - err := ddlSinkNilRefresher.WriteCheckpointTs(ctx, 12347, nil) - require.ErrorIs(t, err, errMockEncoder) - }) -} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index b4df662b11..058ee65482 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -100,7 +100,7 @@ func NewPulsarDDLSink( return nil, errors.Trace(err) } - s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol, nil) + s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol) return s, nil } diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index f399db3dfc..a0b0c4193c 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -87,7 +87,6 @@ func NewKafkaDMLSink( topic, options.DeriveTopicConfig(), adminClient, - options.KeepConnAliveInterval, ) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager.go b/cdc/sink/dmlsink/mq/manager/kafka_manager.go index d39aba2422..63accee354 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager.go @@ -48,9 +48,6 @@ type kafkaTopicManager struct { topics sync.Map - metaRefreshTicker *time.Ticker - keepConnAliveTicker *time.Ticker - // cancel is used to cancel the background goroutine. cancel context.CancelFunc } @@ -62,15 +59,12 @@ func NewKafkaTopicManager( changefeedID model.ChangeFeedID, admin kafka.ClusterAdminClient, cfg *kafka.AutoCreateTopicConfig, - keepConnAliveInterval time.Duration, ) *kafkaTopicManager { mgr := &kafkaTopicManager{ - defaultTopic: defaultTopic, - changefeedID: changefeedID, - admin: admin, - cfg: cfg, - metaRefreshTicker: time.NewTicker(metaRefreshInterval), - keepConnAliveTicker: time.NewTicker(keepConnAliveInterval), + defaultTopic: defaultTopic, + changefeedID: changefeedID, + admin: admin, + cfg: cfg, } ctx, mgr.cancel = context.WithCancel(ctx) @@ -100,7 +94,8 @@ func (m *kafkaTopicManager) GetPartitionNum( } func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { - defer m.keepConnAliveTicker.Stop() + ticker := time.NewTicker(metaRefreshInterval) + defer ticker.Stop() for { select { case <-ctx.Done(): @@ -109,17 +104,13 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) { zap.String("changefeed", m.changefeedID.ID), ) return - case <-m.metaRefreshTicker.C: + case <-ticker.C: // We ignore the error here, because the error may be caused by the // network problem, and we can try to get the metadata next time. topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum(ctx) for topic, partitionNum := range topicPartitionNums { m.tryUpdatePartitionsAndLogging(topic, partitionNum) } - case <-m.keepConnAliveTicker.C: - // This operation is used to keep the kafka connection alive. - // For more details, see https://github.com/pingcap/tiflow/pull/12173 - m.admin.HeartbeatBrokers() } } } diff --git a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go index dd4a7f58db..d4fb885502 100644 --- a/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go +++ b/cdc/sink/dmlsink/mq/manager/kafka_manager_test.go @@ -15,19 +15,13 @@ package manager import ( "context" - "math" - "sync" "testing" - "time" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/sink/kafka" "github.com/stretchr/testify/require" ) -// A fake interval to keep the connection alive in kafka topic manager. -const FOREVER = time.Duration(math.MaxInt64) - func TestCreateTopic(t *testing.T) { t.Parallel() @@ -41,7 +35,7 @@ func TestCreateTopic(t *testing.T) { changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg, FOREVER) + manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg) defer manager.Close() partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName) require.NoError(t, err) @@ -56,7 +50,7 @@ func TestCreateTopic(t *testing.T) { // Try to create a topic without auto create. cfg.AutoCreate = false - manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg, FOREVER) + manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2") require.Regexp( @@ -73,7 +67,7 @@ func TestCreateTopic(t *testing.T) { PartitionNum: 2, ReplicationFactor: 4, } - manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg, FOREVER) + manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) defer manager.Close() _, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic) require.Regexp( @@ -97,7 +91,7 @@ func TestCreateTopicWithDelay(t *testing.T) { topic := "new_topic" changefeedID := model.DefaultChangeFeedID("test") ctx := context.Background() - manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg, FOREVER) + manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg) defer manager.Close() partitionNum, err := manager.createTopic(ctx, topic) require.NoError(t, err) @@ -107,53 +101,3 @@ func TestCreateTopicWithDelay(t *testing.T) { require.NoError(t, err) require.Equal(t, int32(2), partitionNum) } - -// mockAdminClientForHeartbeat is used to count the calls to HeartbeatBrokers. -type mockAdminClientForHeartbeat struct { - kafka.ClusterAdminClientMockImpl - heartbeatCount int - mu sync.Mutex -} - -func (m *mockAdminClientForHeartbeat) HeartbeatBrokers() { - m.mu.Lock() - defer m.mu.Unlock() - m.heartbeatCount++ -} - -func (m *mockAdminClientForHeartbeat) GetHeartbeatCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.heartbeatCount -} - -func TestKafkaManagerHeartbeat(t *testing.T) { - t.Parallel() - - adminClient := &mockAdminClientForHeartbeat{} - cfg := &kafka.AutoCreateTopicConfig{AutoCreate: false} - changefeedID := model.DefaultChangeFeedID("test-heartbeat") - ctx, cancel := context.WithCancel(context.Background()) - - // Use a short interval for testing. - keepAliveInterval := 50 * time.Millisecond - manager := NewKafkaTopicManager(ctx, "topic", changefeedID, adminClient, cfg, keepAliveInterval) - - // Ensure the manager is closed and the context is canceled at the end of the test. - defer manager.Close() - defer cancel() - - // Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times. - // Waiting for 175ms should be enough to trigger 3 times (at 50ms, 100ms, 150ms). - // Use Eventually to avoid test flakiness. - require.Eventually(t, func() bool { - return adminClient.GetHeartbeatCount() >= 2 - }, 2*time.Second, 50*time.Millisecond, "HeartbeatBrokers should be called periodically") - - // Verify that closing the manager stops the heartbeat. - countBeforeClose := adminClient.GetHeartbeatCount() - manager.Close() - // Wait for a short period to ensure no new heartbeats occur. - time.Sleep(keepAliveInterval * 2) - require.Equal(t, countBeforeClose, adminClient.GetHeartbeatCount(), "Heartbeat should stop after manager is closed") -} diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index bf07e661ae..b7a32aed43 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -17,7 +17,6 @@ import ( "context" "net/url" "strings" - "time" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" @@ -105,10 +104,9 @@ func GetTopicManagerAndTryCreateTopic( topic string, topicCfg *kafka.AutoCreateTopicConfig, adminClient kafka.ClusterAdminClient, - keepConnAliveInterval time.Duration, ) (manager.TopicManager, error) { topicManager := manager.NewKafkaTopicManager( - ctx, topic, changefeedID, adminClient, topicCfg, keepConnAliveInterval, + ctx, topic, changefeedID, adminClient, topicCfg, ) if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil { diff --git a/cdc/sink/util/helper_test.go b/cdc/sink/util/helper_test.go index d0efc5acc9..680df59abc 100644 --- a/cdc/sink/util/helper_test.go +++ b/cdc/sink/util/helper_test.go @@ -44,7 +44,7 @@ func TestPartition(t *testing.T) { ctx := context.Background() manager, err := GetTopicManagerAndTryCreateTopic( - ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient, FOREVER) + ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient) require.NoError(t, err) defer manager.Close() diff --git a/go.mod b/go.mod index aceb438e44..a61e289e6e 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.4 github.com/jmoiron/sqlx v1.3.3 github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d - github.com/klauspost/compress v1.18.2 + github.com/klauspost/compress v1.18.5 github.com/labstack/gommon v0.4.0 github.com/linkedin/goavro/v2 v2.14.0 github.com/mailru/easyjson v0.7.7 @@ -68,7 +68,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 github.com/olekukonko/tablewriter v0.0.5 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 - github.com/pierrec/lz4/v4 v4.1.22 + github.com/pierrec/lz4/v4 v4.1.26 github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20260310054046-9c8b3586e4b2 github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 @@ -80,7 +80,7 @@ require ( github.com/prometheus/client_golang v1.23.0 github.com/prometheus/client_model v0.6.2 github.com/r3labs/diff v1.1.0 - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 github.com/robfig/cron v1.2.0 github.com/segmentio/kafka-go v0.4.41-0.20230526171612-f057b1d369cd github.com/shirou/gopsutil/v3 v3.24.5 @@ -113,11 +113,11 @@ require ( go.uber.org/ratelimit v0.2.0 go.uber.org/zap v1.27.1 golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 - golang.org/x/net v0.50.0 + golang.org/x/net v0.53.0 golang.org/x/oauth2 v0.33.0 - golang.org/x/sync v0.19.0 - golang.org/x/sys v0.41.0 - golang.org/x/text v0.34.0 + golang.org/x/sync v0.20.0 + golang.org/x/sys v0.43.0 + golang.org/x/text v0.36.0 golang.org/x/time v0.14.0 google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 @@ -180,6 +180,7 @@ require ( github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f // indirect github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/envoyproxy/go-control-plane/envoy v1.35.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -301,8 +302,7 @@ require ( github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect - github.com/eapache/go-resiliency v1.4.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/gin-contrib/sse v0.1.0 // indirect @@ -422,9 +422,9 @@ require ( go.opentelemetry.io/otel/sdk v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect go.opentelemetry.io/proto/otlp v1.7.0 // indirect - golang.org/x/crypto v0.48.0 // indirect + golang.org/x/crypto v0.50.0 // indirect golang.org/x/mod v0.33.0 // indirect - golang.org/x/term v0.40.0 + golang.org/x/term v0.42.0 golang.org/x/tools v0.42.0 // indirect google.golang.org/api v0.230.0 // indirect gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect @@ -439,7 +439,7 @@ require ( sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 // indirect ) -replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20251202-x +replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20260508 // Fix https://github.com/pingcap/tiflow/issues/4961 replace github.com/benbjohnson/clock v1.3.5 => github.com/benbjohnson/clock v1.1.0 diff --git a/go.sum b/go.sum index 1cc75fead9..a8280cfc52 100644 --- a/go.sum +++ b/go.sum @@ -337,8 +337,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= -github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= -github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -680,8 +680,8 @@ github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= -github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= @@ -860,8 +860,8 @@ github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJ github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= +github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pingcap/badger v1.5.1-0.20241015064302-38533b6cbf8d h1:eHcokyHxm7HVM+7+Qy1zZwC7NhX9wVNX8oQDcSZw1qI= github.com/pingcap/badger v1.5.1-0.20241015064302-38533b6cbf8d/go.mod h1:KiO2zumBCWx7yoVYoFRpb+DNrwEPk1pR1LF7NvOACMQ= github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= @@ -888,8 +888,8 @@ github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGa github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/metering_sdk v0.0.0-20251110022152-dac449ac5389 h1:bqbE3bwFSrUDSiN5M4EG+IXmm5eWLJnGRy/caXnxuHA= github.com/pingcap/metering_sdk v0.0.0-20251110022152-dac449ac5389/go.mod h1:zie1N5PRttgtqkZmRtpIDM7CuyWtvlX9LTxRd3fVSc4= -github.com/pingcap/sarama v1.41.2-pingcap-20251202-x h1:9Vi3qqyDNZxG6fnXQhpeTsnwzSBWNpMeb8o02JkL9JM= -github.com/pingcap/sarama v1.41.2-pingcap-20251202-x/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= +github.com/pingcap/sarama v1.41.2-pingcap-20260508 h1:3ZFtYLUGMMZeA6U0iz3EyFnNGPHu3qOuPLj5wXxHmeU= +github.com/pingcap/sarama v1.41.2-pingcap-20260508/go.mod h1:PIL6ZKKKhm19IbQpmpJcFnybAi1yXtgLAitDAeBdNCw= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530= github.com/pingcap/tidb v1.1.0-beta.0.20260325043212-0c4df2e19ecc h1:2Ateg1PUQqozi+TtbL+Bx62qn4IpQ48VTOpqCh2d3AI= @@ -938,8 +938,8 @@ github.com/qri-io/jsonschema v0.2.1 h1:NNFoKms+kut6ABPf6xiKNM5214jzxAhDBrPHCJ97W github.com/qri-io/jsonschema v0.2.1/go.mod h1:g7DPkiOsK1xv6T/Ao5scXRkd+yTFygcANPBaaqW+VrI= github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M= github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= -github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -1281,8 +1281,8 @@ golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOM golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= -golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= -golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 h1:R84qjqJb5nVJMxqWYb3np9L5ZsaDtB+a39EqjV0JSUM= @@ -1349,8 +1349,8 @@ golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= -golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= -golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.33.0 h1:4Q+qn+E5z8gPRJfmRy7C2gGG3T4jIprK6aSYgTXGRpo= @@ -1369,8 +1369,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1430,8 +1430,8 @@ golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= -golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/telemetry v0.0.0-20240521205824-bda55230c457/go.mod h1:pRgIJT+bRLFKnoM1ldnzKoxTIn14Yxz928LQRYYgIN0= golang.org/x/telemetry v0.0.0-20250710130107-8d8967aff50b/go.mod h1:4ZwOYna0/zsOKwuR5X/m0QFOJpSZvAxFfkQT+Erd9D4= golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 h1:bTLqdHv7xrGlFbvf5/TXNxy/iUwwdkjhqQTJDjW7aj0= @@ -1451,8 +1451,8 @@ golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= -golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= -golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= +golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= +golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index 138db763b9..2af26bc93a 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "io" + stdlog "log" "os" "strconv" "strings" @@ -228,6 +229,10 @@ func initSaramaLogger(level zapcore.Level) error { return errors.Trace(err) } sarama.Logger = logger + sarama.DebugLogger = stdlog.New(io.Discard, "[Sarama] ", stdlog.LstdFlags) + if level.Enabled(zapcore.DebugLevel) { + sarama.DebugLogger = logger + } return nil } diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index df36e17cf8..5fccac26b8 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -193,7 +193,3 @@ func (a *saramaAdminClient) Close() { } } } - -func (a *saramaAdminClient) HeartbeatBrokers() { - KeepConnAlive(a.client) -} diff --git a/pkg/sink/kafka/cluster_admin_client.go b/pkg/sink/kafka/cluster_admin_client.go index ed665ed1a2..3a19560de3 100644 --- a/pkg/sink/kafka/cluster_admin_client.go +++ b/pkg/sink/kafka/cluster_admin_client.go @@ -52,9 +52,6 @@ type ClusterAdminClient interface { // CreateTopic creates a new topic. CreateTopic(ctx context.Context, detail *TopicDetail, validateOnly bool) error - // HeartbeatBroker sends a heartbeat to all brokers to keep the kafka connection alive. - HeartbeatBrokers() - // Close shuts down the admin client and releases any owned underlying client connections. Close() } diff --git a/pkg/sink/kafka/cluster_admin_client_mock_impl.go b/pkg/sink/kafka/cluster_admin_client_mock_impl.go index f7a4b4f7f4..5995c34204 100644 --- a/pkg/sink/kafka/cluster_admin_client_mock_impl.go +++ b/pkg/sink/kafka/cluster_admin_client_mock_impl.go @@ -42,9 +42,6 @@ const ( // defaultMinInsyncReplicas specifies the default `min.insync.replicas` for broker and topic. defaultMinInsyncReplicas = "1" - - // 10 minutes, identical to kafka broker's `connections.max.idle.ms` - defaultBrokerConnectionsMaxIdleMs = "600000" ) var ( @@ -54,8 +51,6 @@ var ( TopicMaxMessageBytes = defaultMaxMessageBytes // MinInSyncReplicas is the `min.insync.replicas` MinInSyncReplicas = defaultMinInsyncReplicas - // BrokerConnectionsMaxIdleMs is the broker's `connections.max.idle.ms` - BrokerConnectionsMaxIdleMs = defaultBrokerConnectionsMaxIdleMs ) type topicDetail struct { @@ -86,7 +81,6 @@ func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl { brokerConfigs := make(map[string]string) brokerConfigs[BrokerMessageMaxBytesConfigName] = BrokerMessageMaxBytes brokerConfigs[MinInsyncReplicasConfigName] = MinInSyncReplicas - brokerConfigs[BrokerConnectionsMaxIdleMsConfigName] = BrokerConnectionsMaxIdleMs topicConfigs := make(map[string]map[string]string) topicConfigs[DefaultMockTopicName] = make(map[string]string) @@ -106,9 +100,6 @@ func (c *ClusterAdminClientMockImpl) GetAllBrokers(context.Context) ([]Broker, e return nil, nil } -// HeartbeatBrokers implement the ClusterAdminClient interface -func (c *ClusterAdminClientMockImpl) HeartbeatBrokers() {} - // GetBrokerConfig implement the ClusterAdminClient interface func (c *ClusterAdminClientMockImpl) GetBrokerConfig( _ context.Context, diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index d58b8ca56e..df1d95399a 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -57,9 +57,6 @@ type SyncProducer interface { // SendMessages will return an error. SendMessages(ctx context.Context, topic string, partitionNum int32, message *common.Message) error - // HeartbeatBrokers sends heartbeat to all brokers to keep the connection alive. - HeartbeatBrokers() - // Close shuts down the producer and releases the client owned by this wrapper. // You must call this function before the producer passes out of scope, as it // may otherwise leak memory. @@ -85,11 +82,9 @@ type AsyncProducer interface { } type saramaSyncProducer struct { - id model.ChangeFeedID - producer sarama.SyncProducer - client sarama.Client - keepConnAliveInterval time.Duration - lastHeartbeatTime time.Time // used to check if we need to send heartbeat + id model.ChangeFeedID + producer sarama.SyncProducer + client sarama.Client } func (p *saramaSyncProducer) SendMessage( @@ -120,18 +115,6 @@ func (p *saramaSyncProducer) SendMessages(_ context.Context, topic string, parti return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } -func (p *saramaSyncProducer) HeartbeatBrokers() { - // We only send heartbeat to brokers when the last heartbeat time is - // older than the keep connection alive interval, to avoid sending heartbeat - // too frequently. - // This function will be called periodically in DDLSink.WriteCheckpointTs. - if time.Since(p.lastHeartbeatTime) < p.keepConnAliveInterval { - return - } - p.lastHeartbeatTime = time.Now() - KeepConnAlive(p.client) -} - func (p *saramaSyncProducer) Close() { start := time.Now() err := p.producer.Close() @@ -165,11 +148,10 @@ func (p *saramaSyncProducer) Close() { } type saramaAsyncProducer struct { - client sarama.Client - producer sarama.AsyncProducer - changefeedID model.ChangeFeedID - keepConnAliveInterval time.Duration - failpointCh chan error + client sarama.Client + producer sarama.AsyncProducer + changefeedID model.ChangeFeedID + failpointCh chan error } func (p *saramaAsyncProducer) Close() { @@ -225,8 +207,6 @@ func (p *saramaAsyncProducer) Close() { func (p *saramaAsyncProducer) AsyncRunCallback( ctx context.Context, ) error { - ticker := time.NewTicker(p.keepConnAliveInterval) - defer ticker.Stop() for { select { case <-ctx.Done(): @@ -247,8 +227,6 @@ func (p *saramaAsyncProducer) AsyncRunCallback( callback() } } - case <-ticker.C: - p.heartbeatBrokers() case err := <-p.producer.Errors(): // We should not wrap a nil pointer if the pointer // is of a subtype of `error` because Go would store the type info @@ -280,8 +258,3 @@ func (p *saramaAsyncProducer) AsyncSend(ctx context.Context, topic string, parti } return nil } - -// heartbeatBrokers sends heartbeat to all brokers to keep the connection alive. -func (p *saramaAsyncProducer) heartbeatBrokers() { - KeepConnAlive(p.client) -} diff --git a/pkg/sink/kafka/mock_factory.go b/pkg/sink/kafka/mock_factory.go index 03c76bea1b..2d5b479b4f 100644 --- a/pkg/sink/kafka/mock_factory.go +++ b/pkg/sink/kafka/mock_factory.go @@ -119,9 +119,6 @@ func (m *MockSaramaSyncProducer) SendMessages(ctx context.Context, topic string, return m.Producer.SendMessages(msgs) } -// HeartbeatBrokers implement the SyncProducer interface. -func (m *MockSaramaSyncProducer) HeartbeatBrokers() {} - // Close implement the SyncProducer interface. func (m *MockSaramaSyncProducer) Close() { m.Producer.Close() diff --git a/pkg/sink/kafka/options.go b/pkg/sink/kafka/options.go index 3b09999ba0..31f20d0234 100644 --- a/pkg/sink/kafka/options.go +++ b/pkg/sink/kafka/options.go @@ -62,10 +62,6 @@ const ( // See: https://kafka.apache.org/documentation/#brokerconfigs_min.insync.replicas and // https://kafka.apache.org/documentation/#topicconfigs_min.insync.replicas MinInsyncReplicasConfigName = "min.insync.replicas" - // BrokerConnectionsMaxIdleMsConfigName specifies the maximum idle time of a connection to a broker. - // Broker will close the connection if it is idle for this long. - // See: https://kafka.apache.org/documentation/#brokerconfigs_connections.max.idle.ms - BrokerConnectionsMaxIdleMsConfigName = "connections.max.idle.ms" ) const ( @@ -171,10 +167,9 @@ type Options struct { SASL *security.SASL // Timeout for network configurations, default to `10s` - DialTimeout time.Duration - WriteTimeout time.Duration - ReadTimeout time.Duration - KeepConnAliveInterval time.Duration + DialTimeout time.Duration + WriteTimeout time.Duration + ReadTimeout time.Duration } // NewOptions returns a default Kafka configuration @@ -182,18 +177,17 @@ func NewOptions() *Options { return &Options{ Version: "2.4.0", // MaxMessageBytes will be used to initialize producer - MaxMessageBytes: config.DefaultMaxMessageBytes, - ReplicationFactor: 1, - Compression: "none", - RequiredAcks: WaitForAll, - Credential: &security.Credential{}, - InsecureSkipVerify: false, - SASL: &security.SASL{}, - AutoCreate: true, - DialTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - ReadTimeout: 10 * time.Second, - KeepConnAliveInterval: 5 * time.Minute, + MaxMessageBytes: config.DefaultMaxMessageBytes, + ReplicationFactor: 1, + Compression: "none", + RequiredAcks: WaitForAll, + Credential: &security.Credential{}, + InsecureSkipVerify: false, + SASL: &security.SASL{}, + AutoCreate: true, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, } } @@ -594,21 +588,6 @@ func AdjustOptions( } } - // adjust keepConnAliveInterval by `connections.max.idle.ms` broker config. - idleMs, err := admin.GetBrokerConfig(ctx, BrokerConnectionsMaxIdleMsConfigName) - if err != nil { - log.Warn("GetBrokerConfig failed for connections.max.idle.ms", zap.Error(err)) - } else { - idleMsInt, err := strconv.Atoi(idleMs) - if err != nil || idleMsInt <= 0 { - log.Warn("invalid broker config", - zap.String("configName", BrokerConnectionsMaxIdleMsConfigName), zap.String("configValue", idleMs)) - return errors.Trace(err) - } - options.KeepConnAliveInterval = time.Duration(idleMsInt/3) * time.Millisecond - log.Info("Adjust KeepConnAliveInterval", zap.Duration("KeepConnAliveInterval", options.KeepConnAliveInterval)) - } - info, exists := topics[topic] // once we have found the topic, no matter `auto-create-topic`, // make sure user input parameters are valid. diff --git a/pkg/sink/kafka/options_test.go b/pkg/sink/kafka/options_test.go index ce9ec3f0a5..b665b823a0 100644 --- a/pkg/sink/kafka/options_test.go +++ b/pkg/sink/kafka/options_test.go @@ -785,75 +785,3 @@ func TestMerge(t *testing.T) { require.Equal(t, "cert.pem", c.Credential.CertPath) require.Equal(t, "key.pem", c.Credential.KeyPath) } - -// mockAdminClientForAdjust mocks the ClusterAdminClient to test AdjustOptions. -type mockAdminClientForAdjust struct { - ClusterAdminClientMockImpl // We know there is a Mock implementation from the diff - brokerConfigValue string - shouldError bool -} - -// GetBrokerConfig simulates the behavior of getting configuration from a broker. -func (m *mockAdminClientForAdjust) GetBrokerConfig(_ context.Context, configName string) (string, error) { - if m.shouldError { - return "", errors.New("mock error: cannot get broker config") - } - if configName == BrokerConnectionsMaxIdleMsConfigName { - return m.brokerConfigValue, nil - } - return "", errors.Errorf("unexpected config name: %s", configName) -} - -func TestAdjustOptionsKeepAlive(t *testing.T) { - t.Parallel() - ctx := context.Background() - - // Case 1: Successful adjustment. - // The broker returns a valid idle time, KeepConnAliveInterval should be set to 1/3 of it. - t.Run("SuccessfulAdjustment", func(t *testing.T) { - t.Parallel() - o := NewOptions() - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: "300000", // 300,000 ms = 300 s - } - err := AdjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.NoError(t, err) - // Expected value is 300000ms / 3 = 100000ms = 100s - require.Equal(t, 100*time.Second, o.KeepConnAliveInterval) - }) - - // Case 2: Broker returns an invalid (non-integer) config value. - t.Run("InvalidNonIntegerConfig", func(t *testing.T) { - t.Parallel() - o := NewOptions() - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: "not-a-number", - } - err := AdjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.Error(t, err) - // The error should be a type conversion error. - _, ok := errors.Cause(err).(*strconv.NumError) - require.True(t, ok, "error should be of type strconv.NumError") - }) - - // Case 3: Broker returns an invalid (zero or negative) config value. - // According to the code in the diff, this case will log a warning and return a nil error, - // and the configuration item will not be updated. - t.Run("InvalidZeroOrNegativeConfig", func(t *testing.T) { - t.Parallel() - for _, val := range []string{"0", "-1000"} { - o := NewOptions() - defaultInterval := o.KeepConnAliveInterval - adminClient := &mockAdminClientForAdjust{ - ClusterAdminClientMockImpl: *NewClusterAdminClientMockImpl(), - brokerConfigValue: val, - } - err := AdjustOptions(ctx, adminClient, o, adminClient.GetDefaultMockTopicName()) - require.NoError(t, err, "should not return error for zero or negative idle time") - // KeepConnAliveInterval should remain its default value. - require.Equal(t, defaultInterval, o.KeepConnAliveInterval, "interval should not be changed") - } - }) -} diff --git a/pkg/sink/kafka/sarama.go b/pkg/sink/kafka/sarama.go index ae729f2f98..fe47e17d6a 100644 --- a/pkg/sink/kafka/sarama.go +++ b/pkg/sink/kafka/sarama.go @@ -53,15 +53,11 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { // set it as the read timeout. config.Admin.Timeout = 10 * time.Second - // Producer.Retry take effect when the producer try to send message to kafka - // brokers. If kafka cluster is healthy, just the default value should be enough. - // For kafka cluster with a bad network condition, producer should not try to - // waster too much time on sending a message, get response no matter success - // or fail as soon as possible is preferred. - // According to the https://github.com/IBM/sarama/issues/2619, - // sarama may send message out of order even set the `config.Net.MaxOpenRequest` to 1, - // when the kafka cluster is unhealthy and trigger the internal retry mechanism. - config.Producer.Retry.Max = 0 + // Keep a bounded producer retry budget to tolerate transient broker-side + // connection failures such as stale connections or broken pipe errors. + // The PingCAP Sarama fork includes the partition-muting ordering fix, while + // Net.MaxOpenRequests=1 below remains an extra ordering guard. + config.Producer.Retry.Max = 5 // make sure sarama producer flush messages as soon as possible. config.Producer.Flush.Bytes = 0 @@ -255,16 +251,3 @@ func getKafkaVersionFromBroker(config *sarama.Config, requestVersion int16, addr } return KafkaVersion, nil } - -// KeepConnAlive sends a heartbeat request to all brokers to keep the connection alive. -func KeepConnAlive(client sarama.Client) { - // We don't care about the response and error here, even the connection - // is unestablished, we just need to keep the connection alive WHEN it's established. - // The connection will be established when a producer send messages. - // This is a workaround for the issue that sarama doesn't keep the connection alive - // when the connection is idle for a long time and we have disabled the retry in sarama. - brokers := client.Brokers() - for _, b := range brokers { - _, _ = b.Heartbeat(&sarama.HeartbeatRequest{}) - } -} diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index df07bd97a4..1e50253641 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -121,11 +121,9 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) } return &saramaSyncProducer{ - id: f.changefeedID, - producer: p, - client: client, - keepConnAliveInterval: f.option.KeepConnAliveInterval, - lastHeartbeatTime: time.Now().Add(-f.option.KeepConnAliveInterval), + id: f.changefeedID, + producer: p, + client: client, }, nil } @@ -151,11 +149,10 @@ func (f *saramaFactory) AsyncProducer( return nil, errors.Trace(err) } return &saramaAsyncProducer{ - client: client, - producer: p, - changefeedID: f.changefeedID, - keepConnAliveInterval: f.option.KeepConnAliveInterval, - failpointCh: failpointCh, + client: client, + producer: p, + changefeedID: f.changefeedID, + failpointCh: failpointCh, }, nil } diff --git a/pkg/sink/kafka/sarama_factory_test.go b/pkg/sink/kafka/sarama_factory_test.go index 9f29de9614..8f9acb8fc8 100644 --- a/pkg/sink/kafka/sarama_factory_test.go +++ b/pkg/sink/kafka/sarama_factory_test.go @@ -16,12 +16,9 @@ package kafka import ( "context" stdErrors "errors" - "sync" "testing" - "time" "github.com/IBM/sarama" - "github.com/IBM/sarama/mocks" "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" ) @@ -48,11 +45,7 @@ func TestSyncProducer(t *testing.T) { t.Parallel() leader := sarama.NewMockBroker(t, 1) - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - // Response for `sarama.NewClient` - leader.Returns(metadataResponse) + setProducerMockHandlers(t, leader) defer leader.Close() @@ -80,11 +73,7 @@ func TestAsyncProducer(t *testing.T) { t.Parallel() leader := sarama.NewMockBroker(t, 1) - - metadataResponse := new(sarama.MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - // Response for `sarama.NewClient` - leader.Returns(metadataResponse) + setProducerMockHandlers(t, leader) defer leader.Close() @@ -106,146 +95,18 @@ func TestAsyncProducer(t *testing.T) { async.Close() } -// mockClientForHeartbeat is a mock sarama.Client used to verify -// that the Brokers() method is called as part of the heartbeat logic. -type mockClientForHeartbeat struct { - sarama.Client // Embed the interface to avoid implementing all methods. - brokersCalled chan struct{} -} - -// Brokers is the mocked method. It sends a signal to a channel when called. -func (c *mockClientForHeartbeat) Brokers() []*sarama.Broker { - // The function under test iterates over the returned slice. - // Returning nil is fine, as we only want to check if this method was called. - c.brokersCalled <- struct{}{} - return nil -} - -// Close closes the signal channel. -func (c *mockClientForHeartbeat) Close() error { - close(c.brokersCalled) - return nil -} - -func TestSaramaSyncProducerHeartbeatThrottling(t *testing.T) { - t.Parallel() - - mockClient := &mockClientForHeartbeat{ - // Use a buffered channel to prevent blocking in case of unexpected calls. - brokersCalled: make(chan struct{}, 10), - } - - producer := &saramaSyncProducer{ - id: model.DefaultChangeFeedID("test-sync-producer"), - producer: nil, // Not needed for this test. - client: mockClient, - keepConnAliveInterval: 100 * time.Millisecond, - // Set the last heartbeat time to a long time ago to ensure the first call is not throttled. - lastHeartbeatTime: time.Now().Add(-200 * time.Millisecond), - } - - // First call, should trigger a call to Brokers(). - producer.HeartbeatBrokers() - select { - case <-mockClient.brokersCalled: - // Expected behavior. - case <-time.After(100 * time.Millisecond): - t.Fatal("HeartbeatBrokers should have called client.Brokers(), but it did not") - } - - // Second call immediately after, should be throttled. - producer.HeartbeatBrokers() - select { - case <-mockClient.brokersCalled: - t.Fatal("client.Brokers() was called, but it should have been throttled") - case <-time.After(50 * time.Millisecond): - // Expected behavior, no call was made. - } - - // Wait for the interval to pass. - time.Sleep(producer.keepConnAliveInterval) - - // Third call, should trigger a call to Brokers() again. - producer.HeartbeatBrokers() - select { - case <-mockClient.brokersCalled: - // Expected behavior. - case <-time.After(100 * time.Millisecond): - t.Fatal("HeartbeatBrokers should have called client.Brokers() again, but it did not") - } -} - -func TestSaramaAsyncProducerHeartbeat(t *testing.T) { - t.Parallel() - - mockClient := &mockClientForHeartbeat{ - brokersCalled: make(chan struct{}, 10), - } - // Using a test config is better practice for initializing mocks. - config := mocks.NewTestConfig() - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - mockAsyncProducer := mocks.NewAsyncProducer(t, config) - - producer := &saramaAsyncProducer{ - client: mockClient, - producer: mockAsyncProducer, // Use the mock producer. - changefeedID: model.DefaultChangeFeedID("test-async-producer"), - keepConnAliveInterval: 50 * time.Millisecond, // Use a short interval for testing. - failpointCh: make(chan error, 1), - } - - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - _ = producer.AsyncRunCallback(ctx) - }() - - // Check for the first heartbeat. - select { - case <-mockClient.brokersCalled: - // Good, first heartbeat received. - case <-time.After(producer.keepConnAliveInterval * 4): // Use a more generous timeout - t.Fatal("Timed out waiting for the first heartbeat") - } - - // Check for the second heartbeat. - select { - case <-mockClient.brokersCalled: - // Good, second heartbeat received. - case <-time.After(producer.keepConnAliveInterval * 4): // Use a more generous timeout - t.Fatal("Timed out waiting for the second heartbeat") - } - - // Stop the AsyncRunCallback goroutine. - cancel() - wg.Wait() - - // Manually close the mock producer's input channel to fix a deadlock issue. - // This deterministically signals the mock's internal goroutine to shut down. - // This is safe because AsyncRunCallback has already exited and will no longer - // try to close or write to the producer. - close(mockAsyncProducer.Input()) - - // Drain the producer's channels to ensure its internal goroutine has shut down - // before the test finishes. This prevents both leaks and deadlocks. - var drainWg sync.WaitGroup - drainWg.Add(2) - go func() { - defer drainWg.Done() - for range mockAsyncProducer.Successes() { - // Drain successes - } - }() - go func() { - defer drainWg.Done() - for range mockAsyncProducer.Errors() { - // Drain errors - } - }() - drainWg.Wait() +func setProducerMockHandlers(t *testing.T, broker *sarama.MockBroker) { + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "ApiVersionsRequest": sarama.NewMockApiVersionsResponse(t).SetApiKeys([]sarama.ApiVersionsResponseKey{ + {ApiKey: 0, MinVersion: 0, MaxVersion: 8}, // Produce + {ApiKey: 1, MinVersion: 0, MaxVersion: 11}, // Fetch + {ApiKey: 2, MinVersion: 0, MaxVersion: 5}, // ListOffsets + {ApiKey: 3, MinVersion: 0, MaxVersion: 10}, // Metadata + {ApiKey: 18, MinVersion: 0, MaxVersion: 3}, // ApiVersions + }), + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()), + }) } type testSaramaClient struct { diff --git a/pkg/sink/kafka/v2/admin.go b/pkg/sink/kafka/v2/admin.go index 41a7d63112..eca433f974 100644 --- a/pkg/sink/kafka/v2/admin.go +++ b/pkg/sink/kafka/v2/admin.go @@ -230,10 +230,6 @@ func (a *admin) CreateTopic( return nil } -// HeartbeatBrokers is a no-op for the admin client. -// Just satisfy the ClusterAdminClient interface. -func (a *admin) HeartbeatBrokers() {} - func (a *admin) Close() { log.Info("admin client start closing", zap.String("namespace", a.changefeedID.Namespace), diff --git a/pkg/sink/kafka/v2/factory.go b/pkg/sink/kafka/v2/factory.go index defecacdc1..a4b346b210 100644 --- a/pkg/sink/kafka/v2/factory.go +++ b/pkg/sink/kafka/v2/factory.go @@ -306,10 +306,6 @@ func (s *syncWriter) SendMessages(ctx context.Context, topic string, partitionNu return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } -// HeartbeatBrokers is a no-op for the sync producer. -// Just to satisfy the interface. -func (s *syncWriter) HeartbeatBrokers() {} - // Close shuts down the producer; you must call this function before a producer // object passes out of scope, as it may otherwise leak memory. // You must call this before calling Close on the underlying client.