diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 308dc4c6d8..a768259c13 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -234,6 +234,10 @@ type Syncer struct { // the minimal timestamp of currently processing binlog events. // this lag will consider time difference between upstream and DM nodes secondsBehindMaster atomic.Int64 + // lastNonZeroMinTS is the last known minimum event timestamp across all workers + // when at least one worker was active. Used as a fallback in updateReplicationLagMetric + // to avoid spurious drops to 0 between DML batches when all worker TSes are reset to 0. + lastNonZeroMinTS atomic.Int64 // stores the last job TS(binlog event timestamp) of each worker, // if there's no active job, the corresponding worker's TS is reset to 0. // since DML worker runs jobs in batch, the TS is the TS of the first job in the batch. @@ -626,6 +630,7 @@ func (s *Syncer) reset() { s.streamerController.Close() } s.secondsBehindMaster.Store(0) + s.lastNonZeroMinTS.Store(0) for _, jobTS := range s.workerJobTSArray { jobTS.Store(0) } @@ -965,7 +970,13 @@ func (s *Syncer) updateReplicationLagMetric() { } } if minTS != int64(0) { + s.lastNonZeroMinTS.Store(minTS) lag = s.calcReplicationLag(minTS) + } else if lastTS := s.lastNonZeroMinTS.Load(); lastTS != int64(0) { + // All workers are between batches (their TSes were reset to 0 after the + // last batch completed). Use the most recent known event timestamp so we + // don't emit a spurious drop to 0 while no worker is actively processing. + lag = s.calcReplicationLag(lastTS) } s.metricsProxies.Metrics.ReplicationLagHistogram.Observe(float64(lag)) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 5d5b86df18..9609fd51a7 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -965,9 +965,11 @@ func (s *testSyncerSuite) TestRun(c *check.C) { resultCh = make(chan pb.ProcessResult) // simulate `syncer.Resume` here, but doesn't reset database conns syncer.secondsBehindMaster.Store(100) + syncer.lastNonZeroMinTS.Store(100) syncer.workerJobTSArray[ddlJobIdx].Store(100) syncer.reset() c.Assert(syncer.secondsBehindMaster.Load(), check.Equals, int64(0)) + c.Assert(syncer.lastNonZeroMinTS.Load(), check.Equals, int64(0)) c.Assert(syncer.workerJobTSArray[ddlJobIdx].Load(), check.Equals, int64(0)) mockStreamerProducer = &MockStreamProducer{s.generateEvents(events2, c)} mockStreamer, err = mockStreamerProducer.GenerateStreamFrom(binlog.MustZeroLocation(mysql.MySQLFlavor)) @@ -2003,3 +2005,56 @@ func TestCheckCanUpdateCfg(t *testing.T) { cfg2.SyncerConfig.Compact = !cfg.SyncerConfig.Compact require.NoError(t, syncer.CheckCanUpdateCfg(cfg)) } + +// TestUpdateReplicationLagMetric verifies the secondsBehindMaster calculation, +// specifically that it does not drop spuriously to 0 between DML batches when +// all workerJobTSArray entries are temporarily reset to 0. +func TestUpdateReplicationLagMetric(t *testing.T) { + cfg := genDefaultSubTaskConfig4Test() + cfg.WorkerCount = 2 + syncer := NewSyncer(cfg, nil, nil) + syncer.metricsProxies = metrics.DefaultMetricsProxies.CacheForOneTask("task", "worker", "source") + + // Use tsOffset=0 so lag = time.Now().Unix() - eventTS. + syncer.tsOffset.Store(0) + + // --- case 1: no events processed yet, all TSes are 0 → lag should be 0. + syncer.updateReplicationLagMetric() + require.Equal(t, int64(0), syncer.secondsBehindMaster.Load(), + "lag should be 0 before any events are processed") + require.Equal(t, int64(0), syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should remain 0 when no events have been seen") + + // --- case 2: a worker has an active TS → lag is computed from that TS. + eventTS := time.Now().Unix() - 10 // simulates an event 10 seconds old + syncer.workerJobTSArray[dmlWorkerJobIdx(0)].Store(eventTS) + + syncer.updateReplicationLagMetric() + lag := syncer.secondsBehindMaster.Load() + require.Greater(t, lag, int64(0), + "lag should be positive when a worker has an active event TS") + require.Equal(t, eventTS, syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should be updated to the active worker's TS") + + // --- case 3: all workers reset to 0 (between batches) → lag must NOT drop to 0. + // This is the bug scenario: successFunc calls updateReplicationJobTS(nil, idx) + // after each batch, briefly making all TSes 0 before the next batch starts. + syncer.workerJobTSArray[dmlWorkerJobIdx(0)].Store(0) + + syncer.updateReplicationLagMetric() + lagAfterReset := syncer.secondsBehindMaster.Load() + require.Greater(t, lagAfterReset, int64(0), + "lag must not spuriously drop to 0 when workers are idle between batches") + require.Equal(t, eventTS, syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should be unchanged when workers are idle") + + // --- case 4: a newer worker TS arrives → lastNonZeroMinTS advances. + newerEventTS := time.Now().Unix() - 3 + syncer.workerJobTSArray[dmlWorkerJobIdx(1)].Store(newerEventTS) + + syncer.updateReplicationLagMetric() + require.Equal(t, newerEventTS, syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should update when a newer active TS is present") + require.Less(t, syncer.secondsBehindMaster.Load(), lag, + "lag should decrease as newer events are processed") +}