Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (c *captureImpl) reset(ctx context.Context) (*vars.GlobalVars, error) {
if err != nil {
return nil, errors.Trace(err)
}
log.Info("reset session successfully", zap.Any("session", sess))
log.Info("reset session successfully", zap.Int64("leaseID", int64(lease.ID)))

c.captureMu.Lock()
defer c.captureMu.Unlock()
Expand Down Expand Up @@ -291,7 +291,11 @@ func (c *captureImpl) reset(ctx context.Context) (*vars.GlobalVars, error) {
c.processorManager = c.newProcessorManager(
c.info, c.upstreamManager, &c.liveness, c.config.Debug.Scheduler, globalVars)

log.Info("capture initialized", zap.Any("capture", c.info))
log.Info("capture initialized",
zap.String("captureID", c.info.ID),
zap.String("advertiseAddr", c.info.AdvertiseAddr),
zap.String("version", c.info.Version),
zap.String("gitHash", c.info.GitHash))
return globalVars, nil
}

Expand Down
6 changes: 1 addition & 5 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,18 +824,14 @@ func (s *SharedClient) handleResolveLockTasks(ctx context.Context) error {
}

func (s *SharedClient) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(5 * time.Minute)
Comment thread
3AceShowHand marked this conversation as resolved.
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
log.Info("event feed starts to check locked regions",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID))

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
var slowInitializeRegionCount int
Expand Down
6 changes: 1 addition & 5 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ func (info *ChangeFeedInfo) NeedBlockGC() bool {

func (info *ChangeFeedInfo) isFailedByGC() bool {
if info.Error == nil {
log.Panic("changefeed info is not consistent",
zap.Any("state", info.State), zap.Any("error", info.Error))
log.Panic("changefeed info is not consistent", zap.Any("state", info.State))
}
return cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}
Expand Down Expand Up @@ -381,9 +380,6 @@ func (info *ChangeFeedInfo) RmUnusedFields() {
}

func (info *ChangeFeedInfo) rmMQOnlyFields() {
log.Info("since the downstream is not a MQ, remove MQ only fields",
zap.String("namespace", info.Namespace),
zap.String("changefeed", info.ID))
info.Config.Sink.DispatchRules = nil
info.Config.Sink.SchemaRegistry = nil
info.Config.Sink.EncoderConcurrency = nil
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ func trySplitAndSortUpdateEvent(
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event", zap.Any("event", e))
log.Warn("skip emit nil event")
continue
}

Expand Down
7 changes: 0 additions & 7 deletions cdc/scheduler/internal/v3/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ func (t *p2pTransport) Send(
return errors.Trace(err)
}
}

if len(msgs) != 0 {
log.Debug("schedulerv3: all messages sent",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Int("len", len(msgs)))
}
return nil
}

Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/dmlsink/cloudstorage/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/codec"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -52,10 +50,6 @@ func newEncodingWorker(
}

func (w *encodingWorker) run(ctx context.Context) error {
log.Debug("encoding worker started", zap.Int("workerID", w.id),
zap.String("namespace", w.changeFeedID.Namespace),
zap.String("changefeed", w.changeFeedID.ID))

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
for {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ require (
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 // indirect
)

replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20250415
replace github.com/IBM/sarama v1.41.2 => github.com/pingcap/sarama v1.41.2-pingcap-20251202-x

// Fix https://github.com/pingcap/tiflow/issues/4961
replace github.com/benbjohnson/clock v1.3.5 => github.com/benbjohnson/clock v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -888,8 +888,8 @@ github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGa
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/metering_sdk v0.0.0-20251110022152-dac449ac5389 h1:bqbE3bwFSrUDSiN5M4EG+IXmm5eWLJnGRy/caXnxuHA=
github.com/pingcap/metering_sdk v0.0.0-20251110022152-dac449ac5389/go.mod h1:zie1N5PRttgtqkZmRtpIDM7CuyWtvlX9LTxRd3fVSc4=
github.com/pingcap/sarama v1.41.2-pingcap-20250415 h1:jc/31lgAuSWLh8zr3y5bL0atpBFHAjch5H1Nnlb04J0=
github.com/pingcap/sarama v1.41.2-pingcap-20250415/go.mod h1:Kwi9CT6CuDYad3KS7HqjsbmD2DWkIKI7qI6a8PKlGb4=
github.com/pingcap/sarama v1.41.2-pingcap-20251202-x h1:9Vi3qqyDNZxG6fnXQhpeTsnwzSBWNpMeb8o02JkL9JM=
github.com/pingcap/sarama v1.41.2-pingcap-20251202-x/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tidb v1.1.0-beta.0.20260325043212-0c4df2e19ecc h1:2Ateg1PUQqozi+TtbL+Bx62qn4IpQ48VTOpqCh2d3AI=
Expand Down
5 changes: 0 additions & 5 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool {
return false
}
should := s.ShouldSendBootstrapMsg() && util.GetOrZero(s.SendAllBootstrapAtStart)
log.Info("should send all bootstrap at start", zap.Bool("should", should))
return should
}

Expand Down Expand Up @@ -845,10 +844,6 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error {
return err
}

log.Info("succeed to parse parameter from sink uri",
zap.String("protocol", util.GetOrZero(s.Protocol)),
zap.String("txnAtomicity", string(util.GetOrZero(s.TxnAtomicity))))

// Check that protocol config is compatible with the scheme.
if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != nil {
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol %s "+
Expand Down
2 changes: 0 additions & 2 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,7 @@ func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersi
log.Error("update meta version failed, etcd meta data migration failed", zap.Error(err))
return cerror.WrapError(cerror.ErrEtcdMigrateFailed, err)
}
log.Info("etcd data migration successful")
cleanOldData(ctx, m.cli.GetEtcdClient())
log.Info("clean old etcd data successful")
return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/sink/pulsar/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,5 @@ func setupAuthentication(config *config.PulsarConfig) (bool, pulsar.Authenticati
func NewMockCreatorFactory(config *config.PulsarConfig, changefeedID model.ChangeFeedID,
sinkConfig *config.SinkConfig,
) (pulsar.Client, error) {
log.Info("mock pulsar client factory created", zap.Any("changfeedID", changefeedID))
return nil, nil
}
3 changes: 0 additions & 3 deletions pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ func (m *gcManager) TryUpdateGCSafePoint(
failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) {
actual = uint64(val.(int))
})
if actual == checkpointTs {
log.Info("update gc safe point success", zap.Uint64("gcSafePointTs", checkpointTs))
}
if actual > checkpointTs {
log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs",
zap.Uint64("actual", actual), zap.Uint64("checkpointTs", checkpointTs))
Expand Down
24 changes: 0 additions & 24 deletions pkg/txnutil/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@ package txnutil
import (
"bytes"
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"go.uber.org/zap"
)

// LockResolver resolves lock in the given region.
Expand All @@ -52,26 +49,6 @@ func NewLockerResolver(
const scanLockLimit = 1024

func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) (err error) {
var totalLocks []*txnkv.Lock

start := time.Now()

defer func() {
// Only log when there are locks or error to avoid log flooding.
if len(totalLocks) != 0 || err != nil {
cost := time.Since(start)
log.Info("resolve lock finishes",
zap.Uint64("regionID", regionID),
zap.Int("lockCount", len(totalLocks)),
zap.Any("locks", totalLocks),
zap.Uint64("maxVersion", maxVersion),
zap.String("namespace", r.changefeed.Namespace),
zap.String("changefeed", r.changefeed.ID),
zap.Duration("duration", cost),
zap.Error(err))
}
}()

// TODO test whether this function will kill active transaction
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
MaxVersion: maxVersion,
Expand Down Expand Up @@ -130,7 +107,6 @@ func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint
for i := range locksInfo {
locks[i] = txnkv.NewLock(locksInfo[i])
}
totalLocks = append(totalLocks, locks...)

_, err1 := r.kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks)
if err1 != nil {
Expand Down