Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deployments/ticdc/docker/integration-test.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ RUN ./download-integration-test-binaries.sh $BRANCH $COMMUNITY $VERSION $OS $ARC
RUN ls ./bin

# Download go into /usr/local dir.
ENV GOLANG_VERSION 1.25.8
ENV GOLANG_VERSION 1.25.9
ENV GOLANG_DOWNLOAD_URL https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz
RUN curl -fsSL "$GOLANG_DOWNLOAD_URL" -o golang.tar.gz \
&& tar -C /usr/local -xzf golang.tar.gz \
Expand Down
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ErrBinlogGTIDMySQLNotValid,[code=11059:class=functional:scope=internal:level=hig
ErrBinlogGTIDMariaDBNotValid,[code=11060:class=functional:scope=internal:level=high], "Message: GTID set string %s for MariaDB not valid"
ErrBinlogMariaDBServerIDMismatch,[code=11061:class=functional:scope=internal:level=high], "Message: server_id mismatch, in GTID (%d), in event header/server_id (%d)"
ErrBinlogOnlyOneGTIDSupport,[code=11062:class=functional:scope=internal:level=high], "Message: only one GTID in set is supported, but got %d (%s)"
ErrBinlogOnlyOneIntervalInUUID,[code=11063:class=functional:scope=internal:level=high], "Message: only one Interval in UUIDSet is supported, but got %d (%s)"
ErrBinlogOnlyOneIntervalInUUID,[code=11063:class=functional:scope=internal:level=high], "Message: only one Interval in MysqlGTIDSet is supported, but got %d (%s)"
ErrBinlogIntervalValueNotValid,[code=11064:class=functional:scope=internal:level=high], "Message: Interval's Stop should equal to Start+1, but got %+v (%s)"
ErrBinlogEmptyQuery,[code=11065:class=functional:scope=internal:level=high], "Message: empty query not valid"
ErrBinlogTableMapEvNotValid,[code=11066:class=functional:scope=internal:level=high], "Message: empty schema (% X) or table (% X) or column type (% X)"
Expand Down
15 changes: 8 additions & 7 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (

_ "github.com/go-sql-driver/mysql" // for mysql
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/lightning/pkg/checkpoints"
"github.com/pingcap/tidb/lightning/pkg/importer"
"github.com/pingcap/tidb/lightning/pkg/importer/opts"
"github.com/pingcap/tidb/lightning/pkg/precheck"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/importdef"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -878,7 +879,7 @@ func sameTableNameDetection(tables map[filter.Table][]filter.Table) error {
// lightningPrecheckAdaptor implements the importer.PreRestoreInfoGetter interface.
type lightningPrecheckAdaptor struct {
importer.TargetInfoGetter
allTables map[string]*checkpoints.TidbDBInfo
allTables map[string]*importdef.DBInfo
sourceDataResult importer.EstimateSourceDataSizeResult
}

Expand All @@ -888,18 +889,18 @@ func newLightningPrecheckAdaptor(
) *lightningPrecheckAdaptor {
var (
sourceDataResult importer.EstimateSourceDataSizeResult
allTables = make(map[string]*checkpoints.TidbDBInfo)
allTables = make(map[string]*importdef.DBInfo)
)
if info != nil {
sourceDataResult.SizeWithIndex = info.totalDataSize.Load()
}
for db, tables := range info.db2TargetTables {
allTables[db] = &checkpoints.TidbDBInfo{
allTables[db] = &importdef.DBInfo{
Name: db,
Tables: make(map[string]*checkpoints.TidbTableInfo),
Tables: make(map[string]*importdef.TableInfo),
}
for _, table := range tables {
allTables[db].Tables[table.Name] = &checkpoints.TidbTableInfo{
allTables[db].Tables[table.Name] = &importdef.TableInfo{
DB: db,
Name: table.Name,
}
Expand All @@ -912,7 +913,7 @@ func newLightningPrecheckAdaptor(
}
}

func (l *lightningPrecheckAdaptor) GetAllTableStructures(ctx context.Context, opts ...opts.GetPreInfoOption) (map[string]*checkpoints.TidbDBInfo, error) {
func (l *lightningPrecheckAdaptor) GetAllTableStructures(ctx context.Context, opts ...opts.GetPreInfoOption) (map[string]*importdef.DBInfo, error) {
// re-use with other checker? or in fact we only use other information than structure?
return l.allTables, nil
}
Expand Down
2 changes: 1 addition & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ workaround = ""
tags = ["internal", "high"]

[error.DM-functional-11063]
message = "only one Interval in UUIDSet is supported, but got %d (%s)"
message = "only one Interval in MysqlGTIDSet is supported, but got %d (%s)"
description = ""
workaround = ""
tags = ["internal", "high"]
Expand Down
4 changes: 2 additions & 2 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/lightning/pkg/checkpoints"
"github.com/pingcap/tidb/lightning/pkg/errormanager"
"github.com/pingcap/tidb/lightning/pkg/importinto"
lserver "github.com/pingcap/tidb/lightning/pkg/server"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/errormanager"
"github.com/pingcap/tidb/pkg/parser/mysql"
tidbpromutil "github.com/pingcap/tidb/pkg/util/promutil"
"github.com/pingcap/tiflow/dm/config"
Expand Down
69 changes: 41 additions & 28 deletions dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/google/uuid"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)
Expand Down Expand Up @@ -118,12 +119,19 @@ func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet g

switch flavor {
case gmysql.MySQLFlavor:
uuidSet := singleGTID.(*gmysql.UUIDSet)
interval := uuidSet.Intervals[0]
mSet := singleGTID.(*gmysql.MysqlGTIDSet)
var sid uuid.UUID
var interval gmysql.Interval
for u, tags := range *mSet {
sid = u
for _, intervals := range tags {
interval = intervals[0]
}
}
Comment on lines +125 to +130
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for extracting the single SID and Interval from a MysqlGTIDSet is repeated here and in newGenerator (in generator.go). Since DM requires these sets to have exactly one entry (as enforced by verifySingleGTID), consider moving this extraction logic into a helper function to improve code reuse and maintainability.

if anonymous {
gtidEv, err = GenAnonymousGTIDEvent(header, latestPos, defaultGTIDFlags, defaultLastCommitted, defaultSequenceNumber)
} else {
gtidEv, err = GenGTIDEvent(header, latestPos, defaultGTIDFlags, uuidSet.SID.String(), interval.Start, defaultLastCommitted, defaultSequenceNumber)
gtidEv, err = GenGTIDEvent(header, latestPos, defaultGTIDFlags, sid.String(), interval.Start, defaultLastCommitted, defaultSequenceNumber)
}
case gmysql.MariaDBFlavor:
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
Expand Down Expand Up @@ -153,20 +161,21 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {

switch flavor {
case gmysql.MySQLFlavor:
uuidSet := singleGTID.(*gmysql.UUIDSet)
uuidSet.Intervals[0].Start++
uuidSet.Intervals[0].Stop++
gtidSet := new(gmysql.MysqlGTIDSet)
gtidSet.Sets = map[string]*gmysql.UUIDSet{uuidSet.SID.String(): uuidSet}
clone = gtidSet
mSet := clone.(*gmysql.MysqlGTIDSet)
for u, tags := range *mSet {
for tag, intervals := range tags {
intervals[0].Start++
intervals[0].Stop++
(*mSet)[u][tag] = intervals
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This assignment is redundant because intervals is a slice, which is a reference type in Go. Modifying its elements on lines 167-168 already updates the data within the map. Removing this line would clarify that the modification is happening in-place on the cloned set.

}
}
clone = mSet
case gmysql.MariaDBFlavor:
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: {
mariaGTID.ServerID: mariaGTID,
},
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: mariaGTID,
}
clone = gtidSet
Comment on lines 174 to 180
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the MariaDB flavor case, mariaGTID is a pointer to an element within the original gSet. Modifying mariaGTID.SequenceNumber directly causes a side effect on the input gSet. To ensure GTIDIncrease is pure and doesn't modify its input, you should work on a copy of the GTID struct.

Suggested change
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: {
mariaGTID.ServerID: mariaGTID,
},
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: mariaGTID,
}
clone = gtidSet
mariaGTID := *singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: &mariaGTID,
}
clone = gtidSet

default:
Expand All @@ -187,36 +196,40 @@ func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
if !ok {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(gSet)
}
if len(mysqlGTIDs.Sets) != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(len(mysqlGTIDs.Sets), gSet)
if len(*mysqlGTIDs) != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(len(*mysqlGTIDs), gSet)
}
var sid uuid.UUID
var tags map[gmysql.Tag]gmysql.IntervalSlice
for sid, tags = range *mysqlGTIDs {
}
var uuidSet *gmysql.UUIDSet
for _, uuidSet = range mysqlGTIDs.Sets {
_ = sid
Comment on lines +202 to +206
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variable sid is declared and assigned but not used, necessitating a blank assignment _ = sid to satisfy the compiler. This can be simplified by using a blank identifier directly in the range loop.

		var tags map[gmysql.Tag]gmysql.IntervalSlice
		for _, tags = range *mysqlGTIDs {
		}

if len(tags) != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(len(tags), gSet)
}
intervals := uuidSet.Intervals
if intervals.Len() != 1 {
return nil, terror.ErrBinlogOnlyOneIntervalInUUID.Generate(intervals.Len(), gSet)
var intervals gmysql.IntervalSlice
for _, intervals = range tags {
}
if len(intervals) != 1 {
return nil, terror.ErrBinlogOnlyOneIntervalInUUID.Generate(len(intervals), gSet)
}
interval := intervals[0]
if interval.Stop != interval.Start+1 {
return nil, terror.ErrBinlogIntervalValueNotValid.Generate(interval, gSet)
}
return uuidSet, nil
return mysqlGTIDs, nil
case gmysql.MariaDBFlavor:
mariaGTIDs, ok := gSet.(*gmysql.MariadbGTIDSet)
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet)
}
gtidCount := 0
var mariaGTID *gmysql.MariadbGTID
for _, set := range mariaGTIDs.Sets {
gtidCount += len(set)
for _, mariaGTID = range set {
}
}
gtidCount := len(mariaGTIDs.Sets)
if gtidCount != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(gtidCount, gSet)
}
var mariaGTID *gmysql.MariadbGTID
for _, mariaGTID = range mariaGTIDs.Sets {
}
return mariaGTID, nil
default:
return nil, terror.ErrBinlogGTIDSetNotValid.Generate(gSet, flavor)
Expand Down
37 changes: 16 additions & 21 deletions dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,32 +730,27 @@ func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32,
payload := new(bytes.Buffer)

// Number of GTIDs, 4 bytes
numOfGTIDs := uint32(0)
for _, set := range mariaDBGSet.Sets {
numOfGTIDs += uint32(len(set))
}
numOfGTIDs := uint32(len(mariaDBGSet.Sets))
err := binary.Write(payload, binary.LittleEndian, numOfGTIDs)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Number of GTIDs %d", numOfGTIDs)
}

for _, set := range mariaDBGSet.Sets {
for _, mGTID := range set {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
}
for _, mGTID := range mariaDBGSet.Sets {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
}
}

Expand Down
8 changes: 3 additions & 5 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,10 +663,10 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.True(t, ok)
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 1)
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID][gtidListEvBody.GTIDs[0].ServerID], gtidListEvBody.GTIDs[0])
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID], gtidListEvBody.GTIDs[0])

// valid gSet with multi GTIDs
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,3-4-4")
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,4-4-4")
require.Nil(t, err)
require.NotNil(t, gSet)
mGSet, ok = gSet.(*gmysql.MariadbGTIDSet)
Expand All @@ -683,9 +683,7 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 4)
for _, mGTID := range gtidListEvBody.GTIDs {
set, ok := mGSet.Sets[mGTID.DomainID]
require.True(t, ok)
mGTID2, ok := set[mGTID.ServerID]
mGTID2, ok := mGSet.Sets[mGTID.DomainID]
require.True(t, ok)
require.Equal(t, *mGTID2, mGTID)
}
Expand Down
29 changes: 21 additions & 8 deletions dm/pkg/binlog/event/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/coreos/go-semver/semver"
gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/google/uuid"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)
Expand Down Expand Up @@ -56,14 +57,30 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
var anonymousGTID bool
switch flavor {
case gmysql.MySQLFlavor:
uuidSet := singleGTID.(*gmysql.UUIDSet)
mSet := singleGTID.(*gmysql.MysqlGTIDSet)
prevGSet, ok := previousGTIDs.(*gmysql.MysqlGTIDSet)
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMySQLNotValid.Generate(previousGTIDs)
}

var sid uuid.UUID
var tag gmysql.Tag
var intervals gmysql.IntervalSlice
for u, tags := range *mSet {
sid = u
for t, i := range tags {
tag = t
intervals = i
}
}

// latestGTID should be one of the latest previousGTIDs
prevGTID, ok := prevGSet.Sets[uuidSet.SID.String()]
if !ok || prevGTID.Intervals.Len() != 1 || prevGTID.Intervals[0].Stop != uuidSet.Intervals[0].Stop {
prevTags, ok := (*prevGSet)[sid]
if !ok {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
prevIntervals, ok := prevTags[tag]
if !ok || len(prevIntervals) != 1 || prevIntervals[0].Stop != intervals[0].Stop {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}

Expand All @@ -86,11 +103,7 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(previousGTIDs)
}
set, ok := prevGSet.Sets[mariaGTID.DomainID]
if !ok {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
prevGTID, ok := set[mariaGTID.ServerID]
prevGTID, ok := prevGSet.Sets[mariaGTID.DomainID]
if !ok || prevGTID.ServerID != mariaGTID.ServerID || prevGTID.SequenceNumber != mariaGTID.SequenceNumber {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/conn/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func TestAddGSetWithPurged(t *testing.T) {
getGSetFromString(t, "3ccc475b-2343-11e7-be21-6c0b84d59f30:6-14"),
mariaGTID,
nil,
errors.New("invalid GTID format, must UUID:interval[:interval]"),
errors.New("invalid GTID format, must UUID[:tag]:interval[[:tag]:interval]"),
},
}

Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ var (
ErrBinlogGTIDMariaDBNotValid = New(codeBinlogGTIDMariaDBNotValid, ClassFunctional, ScopeInternal, LevelHigh, "GTID set string %s for MariaDB not valid", "")
ErrBinlogMariaDBServerIDMismatch = New(codeBinlogMariaDBServerIDMismatch, ClassFunctional, ScopeInternal, LevelHigh, "server_id mismatch, in GTID (%d), in event header/server_id (%d)", "")
ErrBinlogOnlyOneGTIDSupport = New(codeBinlogOnlyOneGTIDSupport, ClassFunctional, ScopeInternal, LevelHigh, "only one GTID in set is supported, but got %d (%s)", "")
ErrBinlogOnlyOneIntervalInUUID = New(codeBinlogOnlyOneIntervalInUUID, ClassFunctional, ScopeInternal, LevelHigh, "only one Interval in UUIDSet is supported, but got %d (%s)", "")
ErrBinlogOnlyOneIntervalInUUID = New(codeBinlogOnlyOneIntervalInUUID, ClassFunctional, ScopeInternal, LevelHigh, "only one Interval in MysqlGTIDSet is supported, but got %d (%s)", "")
ErrBinlogIntervalValueNotValid = New(codeBinlogIntervalValueNotValid, ClassFunctional, ScopeInternal, LevelHigh, "Interval's Stop should equal to Start+1, but got %+v (%s)", "")
ErrBinlogEmptyQuery = New(codeBinlogEmptyQuery, ClassFunctional, ScopeInternal, LevelHigh, "empty query not valid", "")
ErrBinlogTableMapEvNotValid = New(codeBinlogTableMapEvNotValid, ClassFunctional, ScopeInternal, LevelHigh, "empty schema (% X) or table (% X) or column type (% X)", "")
Expand Down
2 changes: 1 addition & 1 deletion dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (r *Relay) preprocessEvent(e *replication.BinlogEvent, parser2 *parser.Pars
// when RawModeEnabled not true, XIDEvent will be parsed.
result.GTIDSet = ev.GSet
result.CanSaveGTID = true // need save GTID for XID
case *replication.GenericEvent:
case *replication.GenericEvent, *replication.HeartbeatEvent:
// handle some un-parsed events
if e.Header.EventType == replication.HEARTBEAT_EVENT {
// ignore artificial heartbeat event
Expand Down
4 changes: 3 additions & 1 deletion dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ func (v *DataValidator) doValidate() {
v.sendError(terror.ErrValidatorPersistData.Delegate(err))
return
}
case *replication.GenericEvent:
case *replication.GenericEvent, *replication.HeartbeatEvent:
// go-mysql v1.15+ decodes upstream heartbeats as HeartbeatEvent;
// DM's own injected heartbeats still arrive as GenericEvent.
if e.Header.EventType == replication.HEARTBEAT_EVENT {
if err = v.checkAndPersistCheckpointAndData(locationForFlush); err != nil {
v.sendError(terror.ErrValidatorPersistData.Delegate(err))
Expand Down
Loading
Loading