From b4273fc32122095f90da98746a386f205ff0c695 Mon Sep 17 00:00:00 2001 From: mengxian-li Date: Wed, 1 Apr 2026 10:07:54 -0700 Subject: [PATCH 1/2] [dm/syncer]: Fix secondsBehindMaster spurious drop to 0 between DML batches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem `query-status` occasionally reported `secondsBehindMaster = 0` even when DM was significantly behind the upstream, before jumping back to the real lag value. ## Root Cause `updateReplicationLagMetric()` runs every 100ms and computes lag from the minimum non-zero entry in `workerJobTSArray`. After each DML batch completes, `successFunc` calls `updateReplicationJobTS(nil, idx)` which resets that worker's TS to 0. During the brief window between one batch finishing and the next starting, all worker TSes can be 0 simultaneously. If the cron fires in that window, `minTS` is 0, `lag` is computed as 0, and `secondsBehindMaster` is stored as 0 — a false reading. ## Fix Add a `lastNonZeroMinTS` field that remembers the most recent minimum event timestamp from when at least one worker was active. In `updateReplicationLagMetric`, fall back to this value when all current worker TSes are 0 (idle between batches) instead of reporting 0. Reset `lastNonZeroMinTS` in `reset()` alongside `secondsBehindMaster` so task restarts begin cleanly. This is safe for the genuine catch-up case: when DM is truly caught up, the last processed event has a recent timestamp, so `calcReplicationLag(lastNonZeroMinTS)` returns a near-zero value, which is correct. Co-Authored-By: Claude Sonnet 4.6 --- dm/syncer/syncer.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 308dc4c6d8..a768259c13 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -234,6 +234,10 @@ type Syncer struct { // the minimal timestamp of currently processing binlog events. // this lag will consider time difference between upstream and DM nodes secondsBehindMaster atomic.Int64 + // lastNonZeroMinTS is the last known minimum event timestamp across all workers + // when at least one worker was active. Used as a fallback in updateReplicationLagMetric + // to avoid spurious drops to 0 between DML batches when all worker TSes are reset to 0. + lastNonZeroMinTS atomic.Int64 // stores the last job TS(binlog event timestamp) of each worker, // if there's no active job, the corresponding worker's TS is reset to 0. // since DML worker runs jobs in batch, the TS is the TS of the first job in the batch. @@ -626,6 +630,7 @@ func (s *Syncer) reset() { s.streamerController.Close() } s.secondsBehindMaster.Store(0) + s.lastNonZeroMinTS.Store(0) for _, jobTS := range s.workerJobTSArray { jobTS.Store(0) } @@ -965,7 +970,13 @@ func (s *Syncer) updateReplicationLagMetric() { } } if minTS != int64(0) { + s.lastNonZeroMinTS.Store(minTS) lag = s.calcReplicationLag(minTS) + } else if lastTS := s.lastNonZeroMinTS.Load(); lastTS != int64(0) { + // All workers are between batches (their TSes were reset to 0 after the + // last batch completed). Use the most recent known event timestamp so we + // don't emit a spurious drop to 0 while no worker is actively processing. + lag = s.calcReplicationLag(lastTS) } s.metricsProxies.Metrics.ReplicationLagHistogram.Observe(float64(lag)) From 40b80debf1c601e9039e01936c87e35494a423b5 Mon Sep 17 00:00:00 2001 From: mengxian-li Date: Wed, 1 Apr 2026 10:27:16 -0700 Subject: [PATCH 2/2] [dm/syncer]: Add tests for secondsBehindMaster spurious-zero fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add TestUpdateReplicationLagMetric to cover the four key cases of updateReplicationLagMetric: 1. No events seen yet (lastNonZeroMinTS=0) → lag is 0. 2. Active worker TS → lag computed from that TS and lastNonZeroMinTS is updated. 3. All workers reset to 0 between batches → lag does NOT drop to 0, lastNonZeroMinTS fallback is used (the bug scenario). 4. Newer active TS arrives → lastNonZeroMinTS advances and lag decreases. Also assert that reset() clears lastNonZeroMinTS alongside secondsBehindMaster in the existing TestRun reset assertions. Co-Authored-By: Claude Sonnet 4.6 --- dm/syncer/syncer_test.go | 55 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 5d5b86df18..9609fd51a7 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -965,9 +965,11 @@ func (s *testSyncerSuite) TestRun(c *check.C) { resultCh = make(chan pb.ProcessResult) // simulate `syncer.Resume` here, but doesn't reset database conns syncer.secondsBehindMaster.Store(100) + syncer.lastNonZeroMinTS.Store(100) syncer.workerJobTSArray[ddlJobIdx].Store(100) syncer.reset() c.Assert(syncer.secondsBehindMaster.Load(), check.Equals, int64(0)) + c.Assert(syncer.lastNonZeroMinTS.Load(), check.Equals, int64(0)) c.Assert(syncer.workerJobTSArray[ddlJobIdx].Load(), check.Equals, int64(0)) mockStreamerProducer = &MockStreamProducer{s.generateEvents(events2, c)} mockStreamer, err = mockStreamerProducer.GenerateStreamFrom(binlog.MustZeroLocation(mysql.MySQLFlavor)) @@ -2003,3 +2005,56 @@ func TestCheckCanUpdateCfg(t *testing.T) { cfg2.SyncerConfig.Compact = !cfg.SyncerConfig.Compact require.NoError(t, syncer.CheckCanUpdateCfg(cfg)) } + +// TestUpdateReplicationLagMetric verifies the secondsBehindMaster calculation, +// specifically that it does not drop spuriously to 0 between DML batches when +// all workerJobTSArray entries are temporarily reset to 0. +func TestUpdateReplicationLagMetric(t *testing.T) { + cfg := genDefaultSubTaskConfig4Test() + cfg.WorkerCount = 2 + syncer := NewSyncer(cfg, nil, nil) + syncer.metricsProxies = metrics.DefaultMetricsProxies.CacheForOneTask("task", "worker", "source") + + // Use tsOffset=0 so lag = time.Now().Unix() - eventTS. + syncer.tsOffset.Store(0) + + // --- case 1: no events processed yet, all TSes are 0 → lag should be 0. + syncer.updateReplicationLagMetric() + require.Equal(t, int64(0), syncer.secondsBehindMaster.Load(), + "lag should be 0 before any events are processed") + require.Equal(t, int64(0), syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should remain 0 when no events have been seen") + + // --- case 2: a worker has an active TS → lag is computed from that TS. + eventTS := time.Now().Unix() - 10 // simulates an event 10 seconds old + syncer.workerJobTSArray[dmlWorkerJobIdx(0)].Store(eventTS) + + syncer.updateReplicationLagMetric() + lag := syncer.secondsBehindMaster.Load() + require.Greater(t, lag, int64(0), + "lag should be positive when a worker has an active event TS") + require.Equal(t, eventTS, syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should be updated to the active worker's TS") + + // --- case 3: all workers reset to 0 (between batches) → lag must NOT drop to 0. + // This is the bug scenario: successFunc calls updateReplicationJobTS(nil, idx) + // after each batch, briefly making all TSes 0 before the next batch starts. + syncer.workerJobTSArray[dmlWorkerJobIdx(0)].Store(0) + + syncer.updateReplicationLagMetric() + lagAfterReset := syncer.secondsBehindMaster.Load() + require.Greater(t, lagAfterReset, int64(0), + "lag must not spuriously drop to 0 when workers are idle between batches") + require.Equal(t, eventTS, syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should be unchanged when workers are idle") + + // --- case 4: a newer worker TS arrives → lastNonZeroMinTS advances. + newerEventTS := time.Now().Unix() - 3 + syncer.workerJobTSArray[dmlWorkerJobIdx(1)].Store(newerEventTS) + + syncer.updateReplicationLagMetric() + require.Equal(t, newerEventTS, syncer.lastNonZeroMinTS.Load(), + "lastNonZeroMinTS should update when a newer active TS is present") + require.Less(t, syncer.secondsBehindMaster.Load(), lag, + "lag should decrease as newer events are processed") +}