From edbaaa8ea4803637546fe82a1379af2910ccdbb7 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 2 Mar 2026 11:50:20 +0800 Subject: [PATCH] set syncpoint config on creation --- pkg/eventservice/dispatcher_stat.go | 13 +++-- pkg/eventservice/dispatcher_stat_test.go | 8 +-- pkg/eventservice/event_broker.go | 21 +++++-- pkg/eventservice/event_broker_test.go | 15 +++-- pkg/eventservice/event_scanner_test.go | 8 +-- pkg/eventservice/metrics_collector_test.go | 8 +-- pkg/eventservice/scan_window.go | 11 +--- pkg/eventservice/scan_window_test.go | 65 ++++++++-------------- 8 files changed, 66 insertions(+), 83 deletions(-) diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 16076d7afc..9bf31cd6a7 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -433,13 +433,16 @@ type changefeedStatus struct { lastAdjustTime atomic.Time lastTrendAdjustTime atomic.Time usageWindow *memoryUsageWindow - syncPointInterval atomic.Int64 + // syncPointInterval is a changefeed-level configuration and should not be changed + // after the status is created. + syncPointInterval time.Duration } -func newChangefeedStatus(changefeedID common.ChangeFeedID) *changefeedStatus { +func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { status := &changefeedStatus{ - changefeedID: changefeedID, - usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration), + changefeedID: changefeedID, + usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration), + syncPointInterval: syncPointInterval, } status.scanInterval.Store(int64(defaultScanInterval)) status.lastAdjustTime.Store(time.Now()) @@ -466,5 +469,5 @@ func (c *changefeedStatus) isEmpty() bool { } func (c *changefeedStatus) isSyncpointEnabled() bool { - return c.syncPointInterval.Load() > 0 + return c.syncPointInterval > 0 } diff --git a/pkg/eventservice/dispatcher_stat_test.go b/pkg/eventservice/dispatcher_stat_test.go index c28ccf8369..7447a71f17 100644 --- a/pkg/eventservice/dispatcher_stat_test.go +++ b/pkg/eventservice/dispatcher_stat_test.go @@ -43,7 +43,7 @@ func TestNewDispatcherStat(t *testing.T) { info.syncPointInterval = syncPointInterval workerCount := uint64(1) - status := newChangefeedStatus(info.GetChangefeedID()) + status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info)) stat := newDispatcherStat(info, workerCount, workerCount, nil, status) require.Equal(t, info.GetID(), stat.id) @@ -63,7 +63,7 @@ func TestDispatcherStatResolvedTs(t *testing.T) { t.Parallel() info := newMockDispatcherInfo(t, 100, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) - status := newChangefeedStatus(info.GetChangefeedID()) + status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info)) stat := newDispatcherStat(info, 1, 1, nil, status) // Test normal update @@ -80,7 +80,7 @@ func TestDispatcherStatGetDataRange(t *testing.T) { t.Parallel() info := newMockDispatcherInfo(t, 100, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) - status := newChangefeedStatus(info.GetChangefeedID()) + status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info)) stat := newDispatcherStat(info, 1, 1, nil, status) stat.setHandshaked() @@ -120,7 +120,7 @@ func TestDispatcherStatGetDataRange(t *testing.T) { func TestDispatcherStatUpdateWatermark(t *testing.T) { startTs := uint64(100) info := newMockDispatcherInfo(t, startTs, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) - status := newChangefeedStatus(info.GetChangefeedID()) + status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info)) stat := newDispatcherStat(info, 1, 1, nil, status) // Case 1: no new events, only watermark change diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 1003dd9e8e..4bd70100f1 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -965,8 +965,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error { span := info.GetTableSpan() changefeedID := info.GetChangefeedID() - status := c.getOrSetChangefeedStatus(changefeedID) - status.SetSyncPointConfig(info) + status := c.getOrSetChangefeedStatus(changefeedID, getSyncPointInterval(info)) dispatcher := newDispatcherStat(info, uint64(len(c.taskChan)), uint64(len(c.messageCh)), nil, status) dispatcherPtr := &atomic.Pointer[dispatcherStat]{} dispatcherPtr.Store(dispatcher) @@ -1156,8 +1155,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { return err } } - status := c.getOrSetChangefeedStatus(changefeedID) - status.SetSyncPointConfig(dispatcherInfo) + status := c.getOrSetChangefeedStatus(changefeedID, getSyncPointInterval(dispatcherInfo)) newStat := newDispatcherStat(dispatcherInfo, uint64(len(c.taskChan)), uint64(len(c.messageCh)), tableInfo, status) newStat.copyStatistics(oldStat) @@ -1197,10 +1195,21 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { return nil } -func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID) *changefeedStatus { +func getSyncPointInterval(info DispatcherInfo) time.Duration { + if !info.SyncPointEnabled() { + return 0 + } + interval := info.GetSyncPointInterval() + if interval <= 0 { + return 0 + } + return interval +} + +func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { stat, ok := c.changefeedMap.Load(changefeedID) if !ok { - stat = newChangefeedStatus(changefeedID) + stat = newChangefeedStatus(changefeedID, syncPointInterval) log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) c.changefeedMap.Store(changefeedID, stat) metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeedID.String()).Set(0) diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 76903952e8..00e3a7278b 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -65,7 +65,7 @@ func TestCheckNeedScan(t *testing.T) { broker.close() disInfo := newMockDispatcherInfoForTest(t) - changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID()) + changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo)) info := newMockDispatcherInfoForTest(t) info.startTs = 100 @@ -155,7 +155,7 @@ func TestOnNotify(t *testing.T) { } // Case 4: Do scan, it will update the sentResolvedTs. - status := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID()) + status := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo)) status.availableMemoryQuota.Store(node.ID(task.info.GetServerID()), atomic.NewUint64(broker.scanLimitInBytes)) broker.doScan(context.TODO(), task) @@ -179,8 +179,7 @@ func TestScanRangeCappedByScanWindow(t *testing.T) { info := newMockDispatcherInfoForTest(t) info.epoch = 1 - changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID()) - changefeedStatus.SetSyncPointConfig(info) + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info)) disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) disp.seq.Store(1) @@ -206,7 +205,7 @@ func TestHandleCongestionControlV2AdjustsScanInterval(t *testing.T) { defer broker.close() changefeedID := common.NewChangefeedID4Test("default", "test") - status := broker.getOrSetChangefeedStatus(changefeedID) + status := broker.getOrSetChangefeedStatus(changefeedID, 0) status.scanInterval.Store(int64(40 * time.Second)) status.lastAdjustTime.Store(time.Now()) @@ -223,7 +222,7 @@ func TestHandleCongestionControlV2ResetsScanIntervalOnMemoryRelease(t *testing.T defer broker.close() changefeedID := common.NewChangefeedID4Test("default", "test") - status := broker.getOrSetChangefeedStatus(changefeedID) + status := broker.getOrSetChangefeedStatus(changefeedID, 0) status.scanInterval.Store(int64(40 * time.Second)) @@ -239,7 +238,7 @@ func TestHandleCongestionControlV1DoesNotAdjustScanInterval(t *testing.T) { defer broker.close() changefeedID := common.NewChangefeedID4Test("default", "test") - status := broker.getOrSetChangefeedStatus(changefeedID) + status := broker.getOrSetChangefeedStatus(changefeedID, 0) status.scanInterval.Store(int64(40 * time.Second)) status.lastAdjustTime.Store(time.Now()) @@ -532,7 +531,7 @@ func TestSendHandshakeIfNeedConcurrency(t *testing.T) { // Create a mock dispatcher info dispInfo := newMockDispatcherInfoForTest(t) - changefeedStatus := broker.getOrSetChangefeedStatus(dispInfo.GetChangefeedID()) + changefeedStatus := broker.getOrSetChangefeedStatus(dispInfo.GetChangefeedID(), getSyncPointInterval(dispInfo)) // Test 1: Sequential calls should only send one handshake t.Run("Sequential calls", func(t *testing.T) { diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 46723a726f..9f7093c2a8 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -61,7 +61,7 @@ func TestEventScanner(t *testing.T) { disInfo := newMockDispatcherInfoForTest(t) disInfo.startTs = uint64(100) - changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID()) + changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo)) tableID := disInfo.GetTableSpan().TableID dispatcherID := disInfo.GetID() @@ -371,7 +371,7 @@ func TestEventScannerWithDeleteTable(t *testing.T) { disInfo := newMockDispatcherInfoForTest(t) disInfo.startTs = uint64(100) - changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID()) + changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo)) tableID := disInfo.GetTableSpan().TableID dispatcherID := disInfo.GetID() @@ -449,7 +449,7 @@ func TestEventScannerWithDDL(t *testing.T) { disInfo := newMockDispatcherInfoForTest(t) disInfo.startTs = uint64(100) - changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID()) + changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo)) tableID := disInfo.GetTableSpan().TableID dispatcherID := disInfo.GetID() @@ -1537,7 +1537,7 @@ func TestGetTableInfo4Txn(t *testing.T) { disInfo := newMockDispatcherInfoForTest(t) disInfo.startTs = uint64(100) - changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID()) + changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo)) tableID := disInfo.GetTableSpan().TableID disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) diff --git a/pkg/eventservice/metrics_collector_test.go b/pkg/eventservice/metrics_collector_test.go index 7f8e1b3eea..3e6914599b 100644 --- a/pkg/eventservice/metrics_collector_test.go +++ b/pkg/eventservice/metrics_collector_test.go @@ -42,7 +42,7 @@ func TestCollectSlowestDispatchersByCheckpointTs(t *testing.T) { for i := 0; i < 20; i++ { checkpointTs := uint64(100 + i*10) info := newMockDispatcherInfo(t, checkpointTs, common.NewDispatcherID(), int64(i+1), eventpb.ActionType_ACTION_TYPE_REGISTER) - status := newChangefeedStatus(info.GetChangefeedID()) + status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info)) stat := newDispatcherStat(info, 1, 1, nil, status) stat.checkpointTs.Store(checkpointTs) stat.receivedResolvedTs.Store(checkpointTs + 10) @@ -104,7 +104,7 @@ func TestCollectSlowestDispatchersLessThan10(t *testing.T) { for i := 0; i < 5; i++ { checkpointTs := uint64(100 + i*10) info := newMockDispatcherInfo(t, checkpointTs, common.NewDispatcherID(), int64(i+1), eventpb.ActionType_ACTION_TYPE_REGISTER) - status := newChangefeedStatus(info.GetChangefeedID()) + status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info)) stat := newDispatcherStat(info, 1, 1, nil, status) stat.checkpointTs.Store(checkpointTs) stat.receivedResolvedTs.Store(checkpointTs + 10) @@ -147,12 +147,12 @@ func TestDispatcherHeapItem(t *testing.T) { // Create two dispatchers with different checkpointTs info1 := newMockDispatcherInfo(t, 100, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) - status1 := newChangefeedStatus(info1.GetChangefeedID()) + status1 := newChangefeedStatus(info1.GetChangefeedID(), getSyncPointInterval(info1)) stat1 := newDispatcherStat(info1, 1, 1, nil, status1) stat1.checkpointTs.Store(100) info2 := newMockDispatcherInfo(t, 200, common.NewDispatcherID(), 2, eventpb.ActionType_ACTION_TYPE_REGISTER) - status2 := newChangefeedStatus(info2.GetChangefeedID()) + status2 := newChangefeedStatus(info2.GetChangefeedID(), getSyncPointInterval(info2)) stat2 := newDispatcherStat(info2, 1, 1, nil, status2) stat2.checkpointTs.Store(200) diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 82b3aefe7a..ab3848733f 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -308,7 +308,7 @@ func (c *changefeedStatus) adjustScanInterval(now time.Time, usage memoryUsageSt zap.Float64("trendDelta", trendDelta), zap.Int("usageSamples", usage.cnt), zap.Bool("syncPointEnabled", c.isSyncpointEnabled()), - zap.Int64("syncPointInterval", c.syncPointInterval.Load())) + zap.Duration("syncPointInterval", c.syncPointInterval)) } } @@ -317,7 +317,7 @@ func (c *changefeedStatus) maxScanInterval() time.Duration { return maxScanInterval } - interval := time.Duration(c.syncPointInterval.Load()) + interval := c.syncPointInterval if interval <= 0 { return maxScanInterval } @@ -392,13 +392,6 @@ func (c *changefeedStatus) storeMinSentTs(value uint64) { metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(c.changefeedID.String()).Set(float64(value)) } -func (c *changefeedStatus) SetSyncPointConfig(info DispatcherInfo) { - if !info.SyncPointEnabled() { - return - } - c.syncPointInterval.Store(int64(info.GetSyncPointInterval())) -} - func scaleDuration(d time.Duration, numerator int64, denominator int64) time.Duration { if numerator <= 0 || denominator <= 0 { return d diff --git a/pkg/eventservice/scan_window_test.go b/pkg/eventservice/scan_window_test.go index 7fb13547e5..1625223c31 100644 --- a/pkg/eventservice/scan_window_test.go +++ b/pkg/eventservice/scan_window_test.go @@ -17,7 +17,6 @@ import ( "testing" "time" - "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/pkg/common" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" @@ -27,8 +26,7 @@ import ( func TestAdjustScanIntervalVeryLowBypassesSyncPointCap(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) - status.syncPointInterval.Store(int64(1 * time.Minute)) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() status.lastAdjustTime.Store(now.Add(-scanIntervalAdjustCooldown - time.Second)) @@ -46,8 +44,7 @@ func TestAdjustScanIntervalVeryLowBypassesSyncPointCap(t *testing.T) { func TestAdjustScanIntervalLowRespectsSyncPointCap(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) - status.syncPointInterval.Store(int64(1 * time.Minute)) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() status.lastAdjustTime.Store(now.Add(-scanIntervalAdjustCooldown - time.Second)) @@ -63,7 +60,7 @@ func TestAdjustScanIntervalLowRespectsSyncPointCap(t *testing.T) { func TestAdjustScanIntervalDecreaseIgnoresCooldown(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) now := time.Now() status.lastAdjustTime.Store(now) @@ -75,7 +72,7 @@ func TestAdjustScanIntervalDecreaseIgnoresCooldown(t *testing.T) { func TestAdjustScanIntervalCriticalPressure(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) now := time.Now() status.lastAdjustTime.Store(now) @@ -88,7 +85,7 @@ func TestAdjustScanIntervalCriticalPressure(t *testing.T) { func TestUpdateMemoryUsageResetsScanIntervalOnMemoryRelease(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) now := time.Now() status.scanInterval.Store(int64(40 * time.Second)) @@ -99,8 +96,7 @@ func TestUpdateMemoryUsageResetsScanIntervalOnMemoryRelease(t *testing.T) { func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) - status.syncPointInterval.Store(int64(1 * time.Minute)) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) start := time.Now() status.lastAdjustTime.Store(start.Add(-scanIntervalAdjustCooldown - time.Second)) @@ -119,7 +115,7 @@ func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { func TestAdjustScanIntervalDecreasesWhenUsageIncreasing(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) now := time.Now() status.lastAdjustTime.Store(now) @@ -135,7 +131,7 @@ func TestAdjustScanIntervalDecreasesWhenUsageIncreasing(t *testing.T) { func TestAdjustScanIntervalDecreasesWhenUsageIncreasingAboveThirtyPercent(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) now := time.Now() status.lastAdjustTime.Store(now) status.lastTrendAdjustTime.Store(now.Add(-scanTrendAdjustCooldown - time.Second)) @@ -152,7 +148,7 @@ func TestAdjustScanIntervalDecreasesWhenUsageIncreasingAboveThirtyPercent(t *tes func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) stale := &dispatcherStat{} stale.seq.Store(1) @@ -212,7 +208,7 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) stale := &dispatcherStat{} stale.seq.Store(1) @@ -230,7 +226,7 @@ func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { func TestGetScanMaxTsFallbackInterval(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 0) baseTime := time.Unix(1234, 0) baseTs := oracle.GoTimeToTS(baseTime) @@ -246,38 +242,21 @@ func TestGetScanMaxTsFallbackInterval(t *testing.T) { require.Equal(t, uint64(0), status.getScanMaxTs()) } -func TestUpdateSyncPointConfigUsesMinimumInterval(t *testing.T) { +func TestGetSyncPointInterval(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test")) - - disabled := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) + disabled := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, 0) disabled.enableSyncPoint = false disabled.syncPointInterval = 2 * time.Minute - status.SetSyncPointConfig(disabled) - require.Equal(t, int64(0), status.syncPointInterval.Load()) - - first := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) - first.enableSyncPoint = true - first.syncPointInterval = 2 * time.Minute - status.SetSyncPointConfig(first) - require.Equal(t, int64(2*time.Minute), status.syncPointInterval.Load()) - - second := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) - second.enableSyncPoint = true - second.syncPointInterval = 1 * time.Minute - status.SetSyncPointConfig(second) - require.Equal(t, int64(1*time.Minute), status.syncPointInterval.Load()) - - third := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) - third.enableSyncPoint = true - third.syncPointInterval = 3 * time.Minute - status.SetSyncPointConfig(third) - require.Equal(t, int64(1*time.Minute), status.syncPointInterval.Load()) - - invalid := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER) + require.Equal(t, time.Duration(0), getSyncPointInterval(disabled)) + + enabled := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, 0) + enabled.enableSyncPoint = true + enabled.syncPointInterval = 2 * time.Minute + require.Equal(t, 2*time.Minute, getSyncPointInterval(enabled)) + + invalid := newMockDispatcherInfo(t, 0, common.NewDispatcherID(), 1, 0) invalid.enableSyncPoint = true invalid.syncPointInterval = 0 - status.SetSyncPointConfig(invalid) - require.Equal(t, int64(1*time.Minute), status.syncPointInterval.Load()) + require.Equal(t, time.Duration(0), getSyncPointInterval(invalid)) }