Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func NewKafkaDDLSink(
topic,
options.DeriveTopicConfig(),
adminClient,
options.KeepConnAliveInterval,
)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -113,7 +112,7 @@ func NewKafkaDDLSink(
}

ddlProducer := producerCreator(ctx, changefeedID, syncProducer)
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol, syncProducer)
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder.Build(), protocol)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
return s, nil
}
29 changes: 8 additions & 21 deletions cdc/sink/ddlsink/mq/mq_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ type DDLSink struct {
statistics *metrics.Statistics
// admin is used to query kafka cluster information.
admin kafka.ClusterAdminClient
// connRefresherForDDL is used to refresh the connection for DDL events.
connRefresherForDDL kafka.SyncProducer
}

func newDDLSink(
Expand All @@ -84,18 +82,16 @@ func newDDLSink(
eventRouter *dispatcher.EventRouter,
encoder codec.RowEventEncoder,
protocol config.Protocol,
connRefresherForDDL kafka.SyncProducer,
) *DDLSink {
return &DDLSink{
id: changefeedID,
protocol: protocol,
eventRouter: eventRouter,
topicManager: topicManager,
encoder: encoder,
producer: producer,
statistics: metrics.NewStatistics(changefeedID, sink.RowSink),
admin: adminClient,
connRefresherForDDL: connRefresherForDDL,
id: changefeedID,
protocol: protocol,
eventRouter: eventRouter,
topicManager: topicManager,
encoder: encoder,
producer: producer,
statistics: metrics.NewStatistics(changefeedID, sink.RowSink),
admin: adminClient,
}
}

Expand Down Expand Up @@ -145,15 +141,6 @@ func (k *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
func (k *DDLSink) WriteCheckpointTs(ctx context.Context,
ts uint64, tables []*model.TableInfo,
) error {
// This operation is used to keep the kafka connection alive.
// For more details, see https://github.com/pingcap/tiflow/pull/12173
if k.connRefresherForDDL != nil {
// The implementation is saramaSyncProducer.HeartbeatBrokers. And
// there is a keepConnAliveInterval in the saramaSyncProducer, so
// we don't need to worry about the heartbeat is too frequent.
k.connRefresherForDDL.HeartbeatBrokers()
}

var (
err error
partitionNum int32
Expand Down
97 changes: 0 additions & 97 deletions cdc/sink/ddlsink/mq/mq_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@ import (
"context"
"fmt"
"net/url"
"sync"
"testing"

mm "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -296,96 +292,3 @@ func TestGetDLLDispatchRuleByProtocol(t *testing.T) {
require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolSimple))
require.Equal(t, PartitionAll, getDDLDispatchRule(config.ProtocolDebezium))
}

// mockSyncProducer is used to count the calls to HeartbeatBrokers.
type mockSyncProducer struct {
kafka.MockSaramaSyncProducer
heartbeatCount int
mu sync.Mutex
}

func (m *mockSyncProducer) HeartbeatBrokers() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockSyncProducer) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

// mockEncoder is a mock implementation of codec.RowEventEncoder.
// It is used to prevent the test from needing to set up a real, complex encoder.
type mockEncoder struct{}

// This line ensures at compile time that mockEncoder correctly implements the interface.
var _ codec.RowEventEncoder = (*mockEncoder)(nil)

// A predefined error that our mock encoder will return.
var errMockEncoder = errors.New("mock encoder error")

// EncodeCheckpointEvent returns a specific error to halt the execution
// of the function under test right after the heartbeat logic.
func (m *mockEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {
return nil, errMockEncoder
}

// The following methods are part of the RowEventEncoder interface.
// They can be left with a minimal implementation as they are not called
// in the tested code path.
func (m *mockEncoder) AppendRowChangedEvent(
_ context.Context, _ string, _ *model.RowChangedEvent, _ func(),
) error {
return nil
}

func (m *mockEncoder) Build() []*common.Message {
return nil
}

func (m *mockEncoder) EncodeDDLEvent(event *model.DDLEvent) (*common.Message, error) {
return nil, nil
}
func (m *mockEncoder) SetMaxMessageBytes(bytes int) {}

func TestDDLSinkHeartbeat(t *testing.T) {
t.Parallel()

ctx := context.Background()
changefeedID := model.DefaultChangeFeedID("test-ddl-sink")
// Protocol is needed by newDDLSink but not used in the tested path.
proto := config.ProtocolOpen
encoder := &mockEncoder{}

// Case 1: DDL Sink with a connection refresher.
t.Run("DDLSinkWithConnectionRefresher", func(t *testing.T) {
t.Parallel()
producer := &mockSyncProducer{}
// Other dependencies for newDDLSink can be nil as they are not used
// before our mock encoder returns an error.
ddlSink := newDDLSink(changefeedID, nil, nil, nil, nil, encoder, proto, producer)

require.Equal(t, 0, producer.GetHeartbeatCount())

// WriteCheckpointTs should first call HeartbeatBrokers, then fail at the encoder.
err := ddlSink.WriteCheckpointTs(ctx, 12345, nil)

// Assert that the heartbeat was called exactly once.
require.Equal(t, 1, producer.GetHeartbeatCount())
// Assert that the function failed with the mock encoder's specific error.
require.ErrorIs(t, err, errMockEncoder)
})

// Case 2: DDLSink with a nil connection refresher (e.g., for Pulsar).
t.Run("DDLSinkWithNilConnectionRefresher", func(t *testing.T) {
t.Parallel()
// Create the sink with a nil refresher.
ddlSinkNilRefresher := newDDLSink(changefeedID, nil, nil, nil, nil, encoder, proto, nil)

// The call should not panic and should fail with the mock encoder's error.
err := ddlSinkNilRefresher.WriteCheckpointTs(ctx, 12347, nil)
require.ErrorIs(t, err, errMockEncoder)
})
}
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mq/pulsar_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewPulsarDDLSink(
return nil, errors.Trace(err)
}

s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol, nil)
s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder.Build(), protocol)

return s, nil
}
1 change: 0 additions & 1 deletion cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func NewKafkaDMLSink(
topic,
options.DeriveTopicConfig(),
adminClient,
options.KeepConnAliveInterval,
)
if err != nil {
return nil, errors.Trace(err)
Expand Down
23 changes: 7 additions & 16 deletions cdc/sink/dmlsink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type kafkaTopicManager struct {

topics sync.Map

metaRefreshTicker *time.Ticker
keepConnAliveTicker *time.Ticker

// cancel is used to cancel the background goroutine.
cancel context.CancelFunc
}
Expand All @@ -62,15 +59,12 @@ func NewKafkaTopicManager(
changefeedID model.ChangeFeedID,
admin kafka.ClusterAdminClient,
cfg *kafka.AutoCreateTopicConfig,
keepConnAliveInterval time.Duration,
) *kafkaTopicManager {
mgr := &kafkaTopicManager{
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
metaRefreshTicker: time.NewTicker(metaRefreshInterval),
keepConnAliveTicker: time.NewTicker(keepConnAliveInterval),
defaultTopic: defaultTopic,
changefeedID: changefeedID,
admin: admin,
cfg: cfg,
}

ctx, mgr.cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -100,7 +94,8 @@ func (m *kafkaTopicManager) GetPartitionNum(
}

func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
defer m.keepConnAliveTicker.Stop()
ticker := time.NewTicker(metaRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
Expand All @@ -109,17 +104,13 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
zap.String("changefeed", m.changefeedID.ID),
)
return
case <-m.metaRefreshTicker.C:
case <-ticker.C:
// We ignore the error here, because the error may be caused by the
// network problem, and we can try to get the metadata next time.
topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum(ctx)
for topic, partitionNum := range topicPartitionNums {
m.tryUpdatePartitionsAndLogging(topic, partitionNum)
}
case <-m.keepConnAliveTicker.C:
// This operation is used to keep the kafka connection alive.
// For more details, see https://github.com/pingcap/tiflow/pull/12173
m.admin.HeartbeatBrokers()
}
}
}
Expand Down
64 changes: 4 additions & 60 deletions cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@ package manager

import (
"context"
"math"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/stretchr/testify/require"
)

// A fake interval to keep the connection alive in kafka topic manager.
const FOREVER = time.Duration(math.MaxInt64)

func TestCreateTopic(t *testing.T) {
t.Parallel()

Expand All @@ -41,7 +35,7 @@ func TestCreateTopic(t *testing.T) {

changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg, FOREVER)
manager := NewKafkaTopicManager(ctx, kafka.DefaultMockTopicName, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName)
require.NoError(t, err)
Expand All @@ -56,7 +50,7 @@ func TestCreateTopic(t *testing.T) {

// Try to create a topic without auto create.
cfg.AutoCreate = false
manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg, FOREVER)
manager = NewKafkaTopicManager(ctx, "new-topic2", changefeedID, adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2")
require.Regexp(
Expand All @@ -73,7 +67,7 @@ func TestCreateTopic(t *testing.T) {
PartitionNum: 2,
ReplicationFactor: 4,
}
manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg, FOREVER)
manager = NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, topic)
require.Regexp(
Expand All @@ -97,7 +91,7 @@ func TestCreateTopicWithDelay(t *testing.T) {
topic := "new_topic"
changefeedID := model.DefaultChangeFeedID("test")
ctx := context.Background()
manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg, FOREVER)
manager := NewKafkaTopicManager(ctx, topic, changefeedID, adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic(ctx, topic)
require.NoError(t, err)
Expand All @@ -107,53 +101,3 @@ func TestCreateTopicWithDelay(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int32(2), partitionNum)
}

// mockAdminClientForHeartbeat is used to count the calls to HeartbeatBrokers.
type mockAdminClientForHeartbeat struct {
kafka.ClusterAdminClientMockImpl
heartbeatCount int
mu sync.Mutex
}

func (m *mockAdminClientForHeartbeat) HeartbeatBrokers() {
m.mu.Lock()
defer m.mu.Unlock()
m.heartbeatCount++
}

func (m *mockAdminClientForHeartbeat) GetHeartbeatCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.heartbeatCount
}

func TestKafkaManagerHeartbeat(t *testing.T) {
t.Parallel()

adminClient := &mockAdminClientForHeartbeat{}
cfg := &kafka.AutoCreateTopicConfig{AutoCreate: false}
changefeedID := model.DefaultChangeFeedID("test-heartbeat")
ctx, cancel := context.WithCancel(context.Background())

// Use a short interval for testing.
keepAliveInterval := 50 * time.Millisecond
manager := NewKafkaTopicManager(ctx, "topic", changefeedID, adminClient, cfg, keepAliveInterval)

// Ensure the manager is closed and the context is canceled at the end of the test.
defer manager.Close()
defer cancel()

// Wait for a sufficient amount of time to ensure the heartbeat ticker triggers several times.
// Waiting for 175ms should be enough to trigger 3 times (at 50ms, 100ms, 150ms).
// Use Eventually to avoid test flakiness.
require.Eventually(t, func() bool {
return adminClient.GetHeartbeatCount() >= 2
}, 2*time.Second, 50*time.Millisecond, "HeartbeatBrokers should be called periodically")

// Verify that closing the manager stops the heartbeat.
countBeforeClose := adminClient.GetHeartbeatCount()
manager.Close()
// Wait for a short period to ensure no new heartbeats occur.
time.Sleep(keepAliveInterval * 2)
require.Equal(t, countBeforeClose, adminClient.GetHeartbeatCount(), "Heartbeat should stop after manager is closed")
}
4 changes: 1 addition & 3 deletions cdc/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"net/url"
"strings"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
Expand Down Expand Up @@ -105,10 +104,9 @@ func GetTopicManagerAndTryCreateTopic(
topic string,
topicCfg *kafka.AutoCreateTopicConfig,
adminClient kafka.ClusterAdminClient,
keepConnAliveInterval time.Duration,
) (manager.TopicManager, error) {
topicManager := manager.NewKafkaTopicManager(
ctx, topic, changefeedID, adminClient, topicCfg, keepConnAliveInterval,
ctx, topic, changefeedID, adminClient, topicCfg,
)

if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestPartition(t *testing.T) {
ctx := context.Background()

manager, err := GetTopicManagerAndTryCreateTopic(
ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient, FOREVER)
ctx, changefeedID, kafka.DefaultMockTopicName, cfg, adminClient)
require.NoError(t, err)
defer manager.Close()

Expand Down
Loading
Loading