From 402ef5653bd7524e08df9cec046f795531d04080 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 28 Nov 2025 11:26:52 +0100 Subject: [PATCH 1/3] DM: Get DB flavor from source instead of guessing it --- dm/loader/util.go | 15 ++++++++++++++- dm/pkg/dumpling/utils.go | 7 ++++--- dm/syncer/checkpoint.go | 15 ++++++++++++++- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/dm/loader/util.go b/dm/loader/util.go index 688d3c4518..92f987b592 100644 --- a/dm/loader/util.go +++ b/dm/loader/util.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/dm/config" + "github.com/pingcap/tiflow/dm/pkg/conn" "github.com/pingcap/tiflow/dm/pkg/dumpling" "github.com/pingcap/tiflow/dm/pkg/ha" "github.com/pingcap/tiflow/dm/pkg/log" @@ -45,6 +46,18 @@ func percent(a int64, b int64, finish bool) string { } func getMydumpMetadata(ctx context.Context, cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) (string, string, error) { + flavor := "" + baseDB, err := conn.GetUpstreamDB(&cfg.From) + if err != nil { + log.L().Warn("set up db connect failed", zap.Any("db", cfg.From), zap.Error(err)) + } else { + flavor, err = conn.GetFlavor(ctx, baseDB) + if err != nil { + log.L().Warn("failed to get database flavor", zap.Any("db", cfg.From), zap.Error(err)) + } + baseDB.Close() + } + metafile := "metadata" failpoint.Inject("TestRemoveMetaFile", func() { err := storage.RemoveAll(ctx, cfg.LoaderConfig.Dir, nil) @@ -52,7 +65,7 @@ func getMydumpMetadata(ctx context.Context, cli *clientv3.Client, cfg *config.Su log.L().Warn("TestRemoveMetaFile Error", log.ShortError(err)) } }) - loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage) + loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage, flavor) if err == nil { return loc.Position.String(), loc.GTIDSetStr(), nil } diff --git a/dm/pkg/dumpling/utils.go b/dm/pkg/dumpling/utils.go index 654c9a59af..477919d064 100644 --- a/dm/pkg/dumpling/utils.go +++ b/dm/pkg/dumpling/utils.go @@ -48,6 +48,7 @@ func ParseMetaData( dir string, filename string, extStorage brstorage.ExternalStorage, + flavor string, ) (*binlog.Location, *binlog.Location, error) { fd, err := storage.OpenFile(ctx, dir, filename, extStorage) if err != nil { @@ -55,11 +56,11 @@ func ParseMetaData( } defer fd.Close() - return ParseMetaDataByReader(filename, fd) + return ParseMetaDataByReader(filename, fd, flavor) } // ParseMetaDataByReader parses mydumper's output meta file by created reader and returns binlog location. -func ParseMetaDataByReader(filename string, rd io.Reader) (*binlog.Location, *binlog.Location, error) { +func ParseMetaDataByReader(filename string, rd io.Reader, flavor string) (*binlog.Location, *binlog.Location, error) { invalidErr := fmt.Errorf("file %s invalid format", filename) var ( @@ -156,7 +157,7 @@ func ParseMetaDataByReader(filename string, rd io.Reader) (*binlog.Location, *bi return nil, nil, terror.ErrMetadataNoBinlogLoc.Generate(filename) } - gset, err := gtid.ParserGTID("", gtidStr) + gset, err := gtid.ParserGTID(flavor, gtidStr) if err != nil { return nil, nil, invalidErr } diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index d227c64c0e..af003319e1 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -1302,7 +1302,20 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl func (cp *RemoteCheckPoint) parseMetaData(ctx context.Context) (*binlog.Location, *binlog.Location, error) { // `metadata` is mydumper's output meta file name filename := "metadata" - loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage) + + flavor := "" + baseDB, err := conn.GetUpstreamDB(&cp.cfg.From) + if err != nil { + log.L().Warn("set up db connect failed", zap.Any("db", cp.cfg.From), zap.Error(err)) + } else { + flavor, err = conn.GetFlavor(ctx, baseDB) + if err != nil { + log.L().Warn("failed to get database flavor", zap.Any("db", cp.cfg.From), zap.Error(err)) + } + baseDB.Close() + } + + loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage, flavor) if err != nil { toPrint, err2 := storage.ReadFile(ctx, cp.cfg.LoaderConfig.Dir, filename, nil) if err2 != nil { From 7cd48266510b30d3866a73c8724ee2d4dc09e160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Fri, 28 Nov 2025 12:43:29 +0100 Subject: [PATCH 2/3] Update tests --- dm/pkg/dumpling/utils_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/pkg/dumpling/utils_test.go b/dm/pkg/dumpling/utils_test.go index 7aa373e6b5..0cc8e27375 100644 --- a/dm/pkg/dumpling/utils_test.go +++ b/dm/pkg/dumpling/utils_test.go @@ -255,7 +255,7 @@ Finished dump at: 2020-09-30 12:16:49 for _, tc := range testCases { err2 := os.WriteFile(f.Name(), []byte(tc.source), 0o644) require.NoError(t, err2) - loc, loc2, err2 := ParseMetaData(ctx, fdir, fname, nil) + loc, loc2, err2 := ParseMetaData(ctx, fdir, fname, nil, "mysql") require.NoError(t, err2) require.Equal(t, tc.pos, loc.Position) gs, _ := gtid.ParserGTID("mysql", tc.gsetStr) @@ -274,7 +274,7 @@ Finished dump at: 2020-12-02 17:13:56 ` err = os.WriteFile(f.Name(), []byte(noBinlogLoc), 0o644) require.NoError(t, err) - _, _, err = ParseMetaData(ctx, fdir, fname, nil) + _, _, err = ParseMetaData(ctx, fdir, fname, nil, "mysql") require.True(t, terror.ErrMetadataNoBinlogLoc.Equal(err)) } From da54b00357cdff70b4aa32cfb67a47df10df01dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20Eeden?= Date: Sun, 30 Nov 2025 12:25:07 +0100 Subject: [PATCH 3/3] Update flavor source based on review --- dm/loader/util.go | 15 +-------------- dm/syncer/checkpoint.go | 15 +-------------- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/dm/loader/util.go b/dm/loader/util.go index 92f987b592..bacd9cb018 100644 --- a/dm/loader/util.go +++ b/dm/loader/util.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/dm/config" - "github.com/pingcap/tiflow/dm/pkg/conn" "github.com/pingcap/tiflow/dm/pkg/dumpling" "github.com/pingcap/tiflow/dm/pkg/ha" "github.com/pingcap/tiflow/dm/pkg/log" @@ -46,18 +45,6 @@ func percent(a int64, b int64, finish bool) string { } func getMydumpMetadata(ctx context.Context, cli *clientv3.Client, cfg *config.SubTaskConfig, workerName string) (string, string, error) { - flavor := "" - baseDB, err := conn.GetUpstreamDB(&cfg.From) - if err != nil { - log.L().Warn("set up db connect failed", zap.Any("db", cfg.From), zap.Error(err)) - } else { - flavor, err = conn.GetFlavor(ctx, baseDB) - if err != nil { - log.L().Warn("failed to get database flavor", zap.Any("db", cfg.From), zap.Error(err)) - } - baseDB.Close() - } - metafile := "metadata" failpoint.Inject("TestRemoveMetaFile", func() { err := storage.RemoveAll(ctx, cfg.LoaderConfig.Dir, nil) @@ -65,7 +52,7 @@ func getMydumpMetadata(ctx context.Context, cli *clientv3.Client, cfg *config.Su log.L().Warn("TestRemoveMetaFile Error", log.ShortError(err)) } }) - loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage, flavor) + loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage, cfg.Flavor) if err == nil { return loc.Position.String(), loc.GTIDSetStr(), nil } diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index af003319e1..869a0ff0b2 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -1302,20 +1302,7 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl func (cp *RemoteCheckPoint) parseMetaData(ctx context.Context) (*binlog.Location, *binlog.Location, error) { // `metadata` is mydumper's output meta file name filename := "metadata" - - flavor := "" - baseDB, err := conn.GetUpstreamDB(&cp.cfg.From) - if err != nil { - log.L().Warn("set up db connect failed", zap.Any("db", cp.cfg.From), zap.Error(err)) - } else { - flavor, err = conn.GetFlavor(ctx, baseDB) - if err != nil { - log.L().Warn("failed to get database flavor", zap.Any("db", cp.cfg.From), zap.Error(err)) - } - baseDB.Close() - } - - loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage, flavor) + loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage, cp.cfg.Flavor) if err != nil { toPrint, err2 := storage.ReadFile(ctx, cp.cfg.LoaderConfig.Dir, filename, nil) if err2 != nil {