Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/eventservice/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,16 @@ type changefeedStatus struct {
lastAdjustTime atomic.Time
lastTrendAdjustTime atomic.Time
usageWindow *memoryUsageWindow
syncPointInterval atomic.Int64
// syncPointInterval is a changefeed-level configuration and should not be changed
// after the status is created.
syncPointInterval time.Duration
}

func newChangefeedStatus(changefeedID common.ChangeFeedID) *changefeedStatus {
func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus {
status := &changefeedStatus{
changefeedID: changefeedID,
usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration),
changefeedID: changefeedID,
usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration),
syncPointInterval: syncPointInterval,
}
status.scanInterval.Store(int64(defaultScanInterval))
status.lastAdjustTime.Store(time.Now())
Expand All @@ -466,5 +469,5 @@ func (c *changefeedStatus) isEmpty() bool {
}

func (c *changefeedStatus) isSyncpointEnabled() bool {
return c.syncPointInterval.Load() > 0
return c.syncPointInterval > 0
}
8 changes: 4 additions & 4 deletions pkg/eventservice/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestNewDispatcherStat(t *testing.T) {
info.syncPointInterval = syncPointInterval

workerCount := uint64(1)
status := newChangefeedStatus(info.GetChangefeedID())
status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info))
stat := newDispatcherStat(info, workerCount, workerCount, nil, status)

require.Equal(t, info.GetID(), stat.id)
Expand All @@ -63,7 +63,7 @@ func TestDispatcherStatResolvedTs(t *testing.T) {
t.Parallel()

info := newMockDispatcherInfo(t, 100, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER)
status := newChangefeedStatus(info.GetChangefeedID())
status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info))
stat := newDispatcherStat(info, 1, 1, nil, status)

// Test normal update
Expand All @@ -80,7 +80,7 @@ func TestDispatcherStatGetDataRange(t *testing.T) {
t.Parallel()

info := newMockDispatcherInfo(t, 100, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER)
status := newChangefeedStatus(info.GetChangefeedID())
status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info))
stat := newDispatcherStat(info, 1, 1, nil, status)
stat.setHandshaked()

Expand Down Expand Up @@ -120,7 +120,7 @@ func TestDispatcherStatGetDataRange(t *testing.T) {
func TestDispatcherStatUpdateWatermark(t *testing.T) {
startTs := uint64(100)
info := newMockDispatcherInfo(t, startTs, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER)
status := newChangefeedStatus(info.GetChangefeedID())
status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info))
stat := newDispatcherStat(info, 1, 1, nil, status)

// Case 1: no new events, only watermark change
Expand Down
21 changes: 15 additions & 6 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,7 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) error {
span := info.GetTableSpan()
changefeedID := info.GetChangefeedID()

status := c.getOrSetChangefeedStatus(changefeedID)
status.SetSyncPointConfig(info)
status := c.getOrSetChangefeedStatus(changefeedID, getSyncPointInterval(info))
dispatcher := newDispatcherStat(info, uint64(len(c.taskChan)), uint64(len(c.messageCh)), nil, status)
dispatcherPtr := &atomic.Pointer[dispatcherStat]{}
dispatcherPtr.Store(dispatcher)
Expand Down Expand Up @@ -1156,8 +1155,7 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {
return err
}
}
status := c.getOrSetChangefeedStatus(changefeedID)
status.SetSyncPointConfig(dispatcherInfo)
status := c.getOrSetChangefeedStatus(changefeedID, getSyncPointInterval(dispatcherInfo))

newStat := newDispatcherStat(dispatcherInfo, uint64(len(c.taskChan)), uint64(len(c.messageCh)), tableInfo, status)
newStat.copyStatistics(oldStat)
Expand Down Expand Up @@ -1197,10 +1195,21 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {
return nil
}

func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID) *changefeedStatus {
func getSyncPointInterval(info DispatcherInfo) time.Duration {
if !info.SyncPointEnabled() {
return 0
}
interval := info.GetSyncPointInterval()
if interval <= 0 {
return 0
}
return interval
}

func (c *eventBroker) getOrSetChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus {
stat, ok := c.changefeedMap.Load(changefeedID)
if !ok {
stat = newChangefeedStatus(changefeedID)
stat = newChangefeedStatus(changefeedID, syncPointInterval)
log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID))
c.changefeedMap.Store(changefeedID, stat)
metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeedID.String()).Set(0)
Expand Down
15 changes: 7 additions & 8 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestCheckNeedScan(t *testing.T) {
broker.close()

disInfo := newMockDispatcherInfoForTest(t)
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID())
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo))

info := newMockDispatcherInfoForTest(t)
info.startTs = 100
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestOnNotify(t *testing.T) {
}

// Case 4: Do scan, it will update the sentResolvedTs.
status := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID())
status := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo))
status.availableMemoryQuota.Store(node.ID(task.info.GetServerID()), atomic.NewUint64(broker.scanLimitInBytes))

broker.doScan(context.TODO(), task)
Expand All @@ -179,8 +179,7 @@ func TestScanRangeCappedByScanWindow(t *testing.T) {

info := newMockDispatcherInfoForTest(t)
info.epoch = 1
changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID())
changefeedStatus.SetSyncPointConfig(info)
changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info))

disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus)
disp.seq.Store(1)
Expand All @@ -206,7 +205,7 @@ func TestHandleCongestionControlV2AdjustsScanInterval(t *testing.T) {
defer broker.close()

changefeedID := common.NewChangefeedID4Test("default", "test")
status := broker.getOrSetChangefeedStatus(changefeedID)
status := broker.getOrSetChangefeedStatus(changefeedID, 0)

status.scanInterval.Store(int64(40 * time.Second))
status.lastAdjustTime.Store(time.Now())
Expand All @@ -223,7 +222,7 @@ func TestHandleCongestionControlV2ResetsScanIntervalOnMemoryRelease(t *testing.T
defer broker.close()

changefeedID := common.NewChangefeedID4Test("default", "test")
status := broker.getOrSetChangefeedStatus(changefeedID)
status := broker.getOrSetChangefeedStatus(changefeedID, 0)

status.scanInterval.Store(int64(40 * time.Second))

Expand All @@ -239,7 +238,7 @@ func TestHandleCongestionControlV1DoesNotAdjustScanInterval(t *testing.T) {
defer broker.close()

changefeedID := common.NewChangefeedID4Test("default", "test")
status := broker.getOrSetChangefeedStatus(changefeedID)
status := broker.getOrSetChangefeedStatus(changefeedID, 0)

status.scanInterval.Store(int64(40 * time.Second))
status.lastAdjustTime.Store(time.Now())
Expand Down Expand Up @@ -532,7 +531,7 @@ func TestSendHandshakeIfNeedConcurrency(t *testing.T) {

// Create a mock dispatcher info
dispInfo := newMockDispatcherInfoForTest(t)
changefeedStatus := broker.getOrSetChangefeedStatus(dispInfo.GetChangefeedID())
changefeedStatus := broker.getOrSetChangefeedStatus(dispInfo.GetChangefeedID(), getSyncPointInterval(dispInfo))

// Test 1: Sequential calls should only send one handshake
t.Run("Sequential calls", func(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/eventservice/event_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestEventScanner(t *testing.T) {

disInfo := newMockDispatcherInfoForTest(t)
disInfo.startTs = uint64(100)
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID())
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo))
tableID := disInfo.GetTableSpan().TableID
dispatcherID := disInfo.GetID()

Expand Down Expand Up @@ -371,7 +371,7 @@ func TestEventScannerWithDeleteTable(t *testing.T) {

disInfo := newMockDispatcherInfoForTest(t)
disInfo.startTs = uint64(100)
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID())
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo))
tableID := disInfo.GetTableSpan().TableID
dispatcherID := disInfo.GetID()

Expand Down Expand Up @@ -449,7 +449,7 @@ func TestEventScannerWithDDL(t *testing.T) {

disInfo := newMockDispatcherInfoForTest(t)
disInfo.startTs = uint64(100)
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID())
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo))
tableID := disInfo.GetTableSpan().TableID
dispatcherID := disInfo.GetID()

Expand Down Expand Up @@ -1537,7 +1537,7 @@ func TestGetTableInfo4Txn(t *testing.T) {

disInfo := newMockDispatcherInfoForTest(t)
disInfo.startTs = uint64(100)
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID())
changefeedStatus := broker.getOrSetChangefeedStatus(disInfo.GetChangefeedID(), getSyncPointInterval(disInfo))
tableID := disInfo.GetTableSpan().TableID

disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus)
Expand Down
8 changes: 4 additions & 4 deletions pkg/eventservice/metrics_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestCollectSlowestDispatchersByCheckpointTs(t *testing.T) {
for i := 0; i < 20; i++ {
checkpointTs := uint64(100 + i*10)
info := newMockDispatcherInfo(t, checkpointTs, common.NewDispatcherID(), int64(i+1), eventpb.ActionType_ACTION_TYPE_REGISTER)
status := newChangefeedStatus(info.GetChangefeedID())
status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info))
stat := newDispatcherStat(info, 1, 1, nil, status)
stat.checkpointTs.Store(checkpointTs)
stat.receivedResolvedTs.Store(checkpointTs + 10)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestCollectSlowestDispatchersLessThan10(t *testing.T) {
for i := 0; i < 5; i++ {
checkpointTs := uint64(100 + i*10)
info := newMockDispatcherInfo(t, checkpointTs, common.NewDispatcherID(), int64(i+1), eventpb.ActionType_ACTION_TYPE_REGISTER)
status := newChangefeedStatus(info.GetChangefeedID())
status := newChangefeedStatus(info.GetChangefeedID(), getSyncPointInterval(info))
stat := newDispatcherStat(info, 1, 1, nil, status)
stat.checkpointTs.Store(checkpointTs)
stat.receivedResolvedTs.Store(checkpointTs + 10)
Expand Down Expand Up @@ -147,12 +147,12 @@ func TestDispatcherHeapItem(t *testing.T) {

// Create two dispatchers with different checkpointTs
info1 := newMockDispatcherInfo(t, 100, common.NewDispatcherID(), 1, eventpb.ActionType_ACTION_TYPE_REGISTER)
status1 := newChangefeedStatus(info1.GetChangefeedID())
status1 := newChangefeedStatus(info1.GetChangefeedID(), getSyncPointInterval(info1))
stat1 := newDispatcherStat(info1, 1, 1, nil, status1)
stat1.checkpointTs.Store(100)

info2 := newMockDispatcherInfo(t, 200, common.NewDispatcherID(), 2, eventpb.ActionType_ACTION_TYPE_REGISTER)
status2 := newChangefeedStatus(info2.GetChangefeedID())
status2 := newChangefeedStatus(info2.GetChangefeedID(), getSyncPointInterval(info2))
stat2 := newDispatcherStat(info2, 1, 1, nil, status2)
stat2.checkpointTs.Store(200)

Expand Down
11 changes: 2 additions & 9 deletions pkg/eventservice/scan_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *changefeedStatus) adjustScanInterval(now time.Time, usage memoryUsageSt
zap.Float64("trendDelta", trendDelta),
zap.Int("usageSamples", usage.cnt),
zap.Bool("syncPointEnabled", c.isSyncpointEnabled()),
zap.Int64("syncPointInterval", c.syncPointInterval.Load()))
zap.Duration("syncPointInterval", c.syncPointInterval))
}
}

Expand All @@ -317,7 +317,7 @@ func (c *changefeedStatus) maxScanInterval() time.Duration {
return maxScanInterval
}

interval := time.Duration(c.syncPointInterval.Load())
interval := c.syncPointInterval
if interval <= 0 {
return maxScanInterval
}
Expand Down Expand Up @@ -392,13 +392,6 @@ func (c *changefeedStatus) storeMinSentTs(value uint64) {
metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(c.changefeedID.String()).Set(float64(value))
}

func (c *changefeedStatus) SetSyncPointConfig(info DispatcherInfo) {
if !info.SyncPointEnabled() {
return
}
c.syncPointInterval.Store(int64(info.GetSyncPointInterval()))
}

func scaleDuration(d time.Duration, numerator int64, denominator int64) time.Duration {
if numerator <= 0 || denominator <= 0 {
return d
Expand Down
Loading