Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (c *captureImpl) reset(ctx context.Context) (*vars.GlobalVars, error) {
if err != nil {
return nil, errors.Trace(err)
}
log.Info("reset session successfully", zap.Any("session", sess))
log.Info("reset session successfully", zap.Int64("leaseID", int64(lease.ID)))

c.captureMu.Lock()
defer c.captureMu.Unlock()
Expand Down Expand Up @@ -291,7 +291,11 @@ func (c *captureImpl) reset(ctx context.Context) (*vars.GlobalVars, error) {
c.processorManager = c.newProcessorManager(
c.info, c.upstreamManager, &c.liveness, c.config.Debug.Scheduler, globalVars)

log.Info("capture initialized", zap.Any("capture", c.info))
log.Info("capture initialized",
zap.String("captureID", c.info.ID),
zap.String("advertiseAddr", c.info.AdvertiseAddr),
zap.String("version", c.info.Version),
zap.String("gitHash", c.info.GitHash))
return globalVars, nil
}

Expand Down
103 changes: 80 additions & 23 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,12 @@ func (m *mounter) DecodeEvent(ctx context.Context, event *model.PolymorphicEvent

func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.RawKVEntry) (*model.RowChangedEvent, error) {
if !bytes.HasPrefix(raw.Key, tablePrefix) {
log.Error("unexpected key prefix found in row kv entry", zap.String("key", hex.EncodeToString(raw.Key)), zap.Any("eventCommitTs", raw.CRTs), zap.Any("eventStartTs", raw.StartTs))
log.Error("unexpected key prefix found in row kv entry",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.String("key", hex.EncodeToString(raw.Key)),
zap.Uint64("eventCommitTs", raw.CRTs),
zap.Uint64("eventStartTs", raw.StartTs))
return nil, nil
}
// checksumKey is only used to calculate raw checksum if necessary.
Expand Down Expand Up @@ -177,6 +182,8 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
return nil, nil
}
log.Error("can not found table schema",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint64("ts", raw.CRTs),
zap.String("key", hex.EncodeToString(raw.Key)),
zap.Int64("tableID", physicalTableID))
Expand Down Expand Up @@ -216,8 +223,17 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
return nil, nil
}()
if err != nil && !cerror.ShouldFailChangefeed(err) {
log.Error("failed to mount and unmarshals entry, start to print debug info", zap.Error(err))
snap.PrintStatus(log.Error)
log.Error("failed to mount and unmarshals entry, start to print debug info",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
snap.PrintStatus(func(msg string, fields ...zap.Field) {
fields = append([]zap.Field{
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
}, fields...)
log.Error(msg, fields...)
})
}
return row, err
}
Expand Down Expand Up @@ -369,7 +385,8 @@ func datum2Column(
return nil, nil, nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()),
log.Warn(warn,
zap.String("table", tableInfo.TableName.String()),
zap.String("column", colInfo.Name.String()))
}

Expand Down Expand Up @@ -427,8 +444,12 @@ func (m *mounter) verifyColumnChecksum(
checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate the checksum",
zap.Uint32("first", first), zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns), zap.Error(err))
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint32("first", first),
zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns),
zap.Error(err))
return 0, false, err
}

Expand All @@ -444,16 +465,28 @@ func (m *mounter) verifyColumnChecksum(

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz))
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz))
m.lastSkipOldValueTime = time.Now()
}
return checksum, true, nil
Expand Down Expand Up @@ -559,7 +592,9 @@ func verifyRawBytesChecksum(
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
log.Error("build datum for raw checksum calculation failed",
zap.Any("col", col), zap.Any("columnInfo", columnInfo), zap.Error(err))
zap.Any("col", col),
zap.Any("columnInfo", columnInfo),
zap.Error(err))
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
Expand All @@ -575,9 +610,12 @@ func verifyRawBytesChecksum(

log.Error("raw bytes checksum mismatch",
zap.Int("version", decoder.ChecksumVersion()),
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained),
zap.Any("tableInfo", tableInfo), zap.Any("columns", columns),
zap.Any("handle", handle.String()), zap.Any("tz", tz))
zap.Uint32("expected", expected),
zap.Uint32("obtained", obtained),
zap.Any("tableInfo", tableInfo),
zap.Any("columns", columns),
zap.String("handle", handle.String()),
zap.Any("tz", tz))

return expected, false, nil
}
Expand Down Expand Up @@ -611,8 +649,14 @@ func (m *mounter) verifyChecksum(
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, handle, key, m.tz)
if err != nil {
log.Error("calculate raw checksum failed",
zap.Int("version", version), zap.Any("tz", m.tz), zap.Any("handle", handle.String()),
zap.Any("key", key), zap.Any("columns", columns), zap.Error(err))
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int("version", version),
zap.Any("tz", m.tz),
zap.String("handle", handle.String()),
zap.Binary("key", key),
zap.Any("columns", columns),
zap.Error(err))
return 0, false, errors.Trace(err)
}
if !matched {
Expand All @@ -621,8 +665,12 @@ func (m *mounter) verifyChecksum(
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed",
zap.Any("columnsInfo", columnInfos), zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz), zap.Error(err))
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Any("columnsInfo", columnInfos),
zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz),
zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
Expand Down Expand Up @@ -673,8 +721,12 @@ func (m *mounter) mountRowKVEntry(

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("preCols", preCols), zap.Any("rawCols", preRawCols))
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("preCols", preCols),
zap.Any("rawCols", preRawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand All @@ -700,8 +752,12 @@ func (m *mounter) mountRowKVEntry(
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("cols", cols), zap.Any("rawCols", rawCols))
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("cols", cols),
zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand Down Expand Up @@ -900,7 +956,8 @@ func getDefaultOrZeroValue(
default:
d = table.GetZeroValue(col)
if d.IsNull() {
log.Error("meet unsupported column type", zap.String("columnInfo", col.FieldType.String()))
log.Error("meet unsupported column type",
zap.String("columnInfo", col.FieldType.String()))
}
}
}
Expand Down
30 changes: 7 additions & 23 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func NewSnapshotFromMeta(
tag := negative(currentTs)
for _, dbinfo := range dbinfos {
if filter.ShouldIgnoreSchema(dbinfo.Name.O) {
log.Debug("ignore database", zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id))
log.Debug("ignore database",
zap.String("namespace", id.Namespace),
zap.String("changefeed", id.ID),
zap.Stringer("db", dbinfo.Name))
continue
}
vid := newVersionedID(dbinfo.ID, tag)
Expand Down Expand Up @@ -211,10 +214,11 @@ func NewSnapshotFromMeta(

snap.inner.currentTs = currentTs
log.Info("schema snapshot created",
zap.Stringer("changefeed", id),
zap.String("namespace", id.Namespace),
zap.String("changefeed", id.ID),
zap.Int("tables", tableCount),
zap.Uint64("currentTs", currentTs),
zap.Any("duration", time.Since(start).Seconds()))
zap.Duration("duration", time.Since(start)))
return snap, nil
}

Expand Down Expand Up @@ -732,7 +736,6 @@ func (s *snapshot) dropSchema(id int64, currentTs uint64) error {
s.doDropTable(tbInfo, currentTs)
}
s.currentTs = currentTs
log.Debug("drop schema success", zap.String("name", dbInfo.Name.O), zap.Int64("id", dbInfo.ID))
return nil
}

Expand All @@ -747,7 +750,6 @@ func (s *snapshot) createSchema(dbInfo *timodel.DBInfo, currentTs uint64) error
}
s.doCreateSchema(dbInfo, currentTs)
s.currentTs = currentTs
log.Debug("create schema success", zap.String("name", dbInfo.Name.O), zap.Int64("id", dbInfo.ID))
return nil
}

Expand All @@ -764,7 +766,6 @@ func (s *snapshot) replaceSchema(dbInfo *timodel.DBInfo, currentTs uint64) error
s.schemaNameToID.ReplaceOrInsert(newVersionedEntityName(-1, old.Name.O, tag))
}
s.currentTs = currentTs
log.Debug("replace schema success", zap.String("name", dbInfo.Name.O), zap.Int64("id", dbInfo.ID))
return nil
}

Expand All @@ -786,10 +787,6 @@ func (s *snapshot) dropTable(id int64, currentTs uint64) error {
}
s.doDropTable(tbInfo, currentTs)
s.currentTs = currentTs
log.Debug("drop table success",
zap.String("schema", tbInfo.TableName.Schema),
zap.String("table", tbInfo.TableName.Table),
zap.Int64("id", tbInfo.ID))
return nil
}

Expand Down Expand Up @@ -833,10 +830,6 @@ func (s *snapshot) truncateTable(id int64, tbInfo *model.TableInfo, currentTs ui
s.truncatedTables.ReplaceOrInsert(newVersionedID(id, tag))
}
s.currentTs = currentTs
log.Debug("truncate table success",
zap.String("schema", tbInfo.TableName.Schema),
zap.String("table", tbInfo.TableName.Table),
zap.Int64("id", tbInfo.ID))
return
}

Expand All @@ -850,8 +843,6 @@ func (s *snapshot) createTable(tbInfo *model.TableInfo, currentTs uint64) error
}
s.doCreateTable(tbInfo, currentTs)
s.currentTs = currentTs
log.Debug("create table success", zap.Int64("id", tbInfo.ID),
zap.String("name", fmt.Sprintf("%s.%s", tbInfo.TableName.Schema, tbInfo.TableName.Table)))
return nil
}

Expand All @@ -865,7 +856,6 @@ func (s *snapshot) replaceTable(tbInfo *model.TableInfo, currentTs uint64) error
}
s.doCreateTable(tbInfo, currentTs)
s.currentTs = currentTs
log.Debug("replace table success", zap.String("name", tbInfo.Name.O), zap.Int64("id", tbInfo.ID))
return nil
}

Expand Down Expand Up @@ -947,12 +937,6 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, isTruncate bool, cur
}
}
s.currentTs = currentTs

log.Debug("adjust partition success",
zap.String("schema", tbInfo.TableName.Schema),
zap.String("table", tbInfo.TableName.Table),
zap.Any("partitions", newPi.Definitions),
)
return nil
}

Expand Down
Loading