Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ type Syncer struct {
// the minimal timestamp of currently processing binlog events.
// this lag will consider time difference between upstream and DM nodes
secondsBehindMaster atomic.Int64
// lastNonZeroMinTS is the last known minimum event timestamp across all workers
// when at least one worker was active. Used as a fallback in updateReplicationLagMetric
// to avoid spurious drops to 0 between DML batches when all worker TSes are reset to 0.
lastNonZeroMinTS atomic.Int64
// stores the last job TS(binlog event timestamp) of each worker,
// if there's no active job, the corresponding worker's TS is reset to 0.
// since DML worker runs jobs in batch, the TS is the TS of the first job in the batch.
Expand Down Expand Up @@ -626,6 +630,7 @@ func (s *Syncer) reset() {
s.streamerController.Close()
}
s.secondsBehindMaster.Store(0)
s.lastNonZeroMinTS.Store(0)
for _, jobTS := range s.workerJobTSArray {
jobTS.Store(0)
}
Expand Down Expand Up @@ -965,7 +970,13 @@ func (s *Syncer) updateReplicationLagMetric() {
}
}
if minTS != int64(0) {
s.lastNonZeroMinTS.Store(minTS)
lag = s.calcReplicationLag(minTS)
} else if lastTS := s.lastNonZeroMinTS.Load(); lastTS != int64(0) {
// All workers are between batches (their TSes were reset to 0 after the
// last batch completed). Use the most recent known event timestamp so we
// don't emit a spurious drop to 0 while no worker is actively processing.
lag = s.calcReplicationLag(lastTS)
Comment on lines 972 to +979
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When DM is actually caught up and the upstream is simply idle. In that state calcReplicationLag(lastTS) keeps growing with wall-clock time, so secondsBehindMaster starts reporting artificial lag even though there is no backlog.

}

s.metricsProxies.Metrics.ReplicationLagHistogram.Observe(float64(lag))
Expand Down
55 changes: 55 additions & 0 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,9 +965,11 @@ func (s *testSyncerSuite) TestRun(c *check.C) {
resultCh = make(chan pb.ProcessResult)
// simulate `syncer.Resume` here, but doesn't reset database conns
syncer.secondsBehindMaster.Store(100)
syncer.lastNonZeroMinTS.Store(100)
syncer.workerJobTSArray[ddlJobIdx].Store(100)
syncer.reset()
c.Assert(syncer.secondsBehindMaster.Load(), check.Equals, int64(0))
c.Assert(syncer.lastNonZeroMinTS.Load(), check.Equals, int64(0))
c.Assert(syncer.workerJobTSArray[ddlJobIdx].Load(), check.Equals, int64(0))
mockStreamerProducer = &MockStreamProducer{s.generateEvents(events2, c)}
mockStreamer, err = mockStreamerProducer.GenerateStreamFrom(binlog.MustZeroLocation(mysql.MySQLFlavor))
Expand Down Expand Up @@ -2003,3 +2005,56 @@ func TestCheckCanUpdateCfg(t *testing.T) {
cfg2.SyncerConfig.Compact = !cfg.SyncerConfig.Compact
require.NoError(t, syncer.CheckCanUpdateCfg(cfg))
}

// TestUpdateReplicationLagMetric verifies the secondsBehindMaster calculation,
// specifically that it does not drop spuriously to 0 between DML batches when
// all workerJobTSArray entries are temporarily reset to 0.
func TestUpdateReplicationLagMetric(t *testing.T) {
cfg := genDefaultSubTaskConfig4Test()
cfg.WorkerCount = 2
syncer := NewSyncer(cfg, nil, nil)
syncer.metricsProxies = metrics.DefaultMetricsProxies.CacheForOneTask("task", "worker", "source")

// Use tsOffset=0 so lag = time.Now().Unix() - eventTS.
syncer.tsOffset.Store(0)

// --- case 1: no events processed yet, all TSes are 0 → lag should be 0.
syncer.updateReplicationLagMetric()
require.Equal(t, int64(0), syncer.secondsBehindMaster.Load(),
"lag should be 0 before any events are processed")
require.Equal(t, int64(0), syncer.lastNonZeroMinTS.Load(),
"lastNonZeroMinTS should remain 0 when no events have been seen")

// --- case 2: a worker has an active TS → lag is computed from that TS.
eventTS := time.Now().Unix() - 10 // simulates an event 10 seconds old
syncer.workerJobTSArray[dmlWorkerJobIdx(0)].Store(eventTS)

syncer.updateReplicationLagMetric()
lag := syncer.secondsBehindMaster.Load()
require.Greater(t, lag, int64(0),
"lag should be positive when a worker has an active event TS")
require.Equal(t, eventTS, syncer.lastNonZeroMinTS.Load(),
"lastNonZeroMinTS should be updated to the active worker's TS")

// --- case 3: all workers reset to 0 (between batches) → lag must NOT drop to 0.
// This is the bug scenario: successFunc calls updateReplicationJobTS(nil, idx)
// after each batch, briefly making all TSes 0 before the next batch starts.
syncer.workerJobTSArray[dmlWorkerJobIdx(0)].Store(0)

syncer.updateReplicationLagMetric()
lagAfterReset := syncer.secondsBehindMaster.Load()
require.Greater(t, lagAfterReset, int64(0),
"lag must not spuriously drop to 0 when workers are idle between batches")
require.Equal(t, eventTS, syncer.lastNonZeroMinTS.Load(),
"lastNonZeroMinTS should be unchanged when workers are idle")

// --- case 4: a newer worker TS arrives → lastNonZeroMinTS advances.
newerEventTS := time.Now().Unix() - 3
syncer.workerJobTSArray[dmlWorkerJobIdx(1)].Store(newerEventTS)

syncer.updateReplicationLagMetric()
require.Equal(t, newerEventTS, syncer.lastNonZeroMinTS.Load(),
"lastNonZeroMinTS should update when a newer active TS is present")
require.Less(t, syncer.secondsBehindMaster.Load(), lag,
"lag should decrease as newer events are processed")
}