Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions sync_diff_inspector/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ type Bound struct {
// ChunkID is to identify the sequence of chunks
type ChunkID struct {
TableIndex int `json:"table-index"`
// we especially treat random split has only one bucket
// which is the whole table
// range is [left, right]
// One chunk may cross multiple buckets, which is expressed as:
// [BucketIndexLeft, BucketIndexRight]
// For table with no stats collected, we treat it as a single bucket.
BucketIndexLeft int `json:"bucket-index-left"`
BucketIndexRight int `json:"bucket-index-right"`
ChunkIndex int `json:"chunk-index"`
// `ChunkCnt` is the number of chunks in this bucket
// We can compare `ChunkIndex` and `ChunkCnt` to know
// whether this chunk is the last one

// ChunkIndex is the index of this chunk in the bucket range.
ChunkIndex int `json:"chunk-index"`

// ChunkCnt is the number of chunks in the bucket range.
// The count is > 1 only when BucketIndexLeft = < BucketIndexRight.
ChunkCnt int `json:"chunk-count"`
}

Expand Down Expand Up @@ -467,24 +469,24 @@ func (c *Range) CopyAndUpdate(column, lower, upper string, updateLower, updateUp
return newChunk
}

// Notice: chunk may contain not only one bucket, which can be expressed as a range [3, 5],
//
// And `lastBucketID` means the `5` and `firstBucketID` means the `3`.
func InitChunks(chunks []*Range, t ChunkType, firstBucketID, lastBucketID int, index int, collation, limits string, chunkCnt int) {
if chunks == nil {
return
}
// InitChunks fills ChunkID/Type/Where/Args for a batch of ranges
func InitChunks(
chunks []*Range,
t ChunkType,
firstBucketID, lastBucketID int,
index, chunkCnt int,
collation, limits string) {
for _, chunk := range chunks {
conditions, args := chunk.ToString(collation)
chunk.Where = fmt.Sprintf("((%s) AND (%s))", conditions, limits)
chunk.Args = args
chunk.Type = t
chunk.Index = &ChunkID{
BucketIndexLeft: firstBucketID,
BucketIndexRight: lastBucketID,
ChunkIndex: index,
ChunkCnt: chunkCnt,
}
chunk.Type = t
index++
}
}
Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/splitter/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type BucketIterator struct {

chunkSize int64
chunks []*chunk.Range
nextChunk uint
nextChunk int

chunksCh chan []*chunk.Range
errCh chan error
Expand Down Expand Up @@ -94,7 +94,7 @@ func (s *BucketIterator) GetIndexID() int64 {

func (s *BucketIterator) Next() (*chunk.Range, error) {
var ok bool
if uint(len(s.chunks)) <= s.nextChunk {
if len(s.chunks) <= s.nextChunk {
select {
case err := <-s.errCh:
return nil, errors.Trace(err)
Expand Down Expand Up @@ -228,7 +228,7 @@ func (s *BucketIterator) splitChunkForBucket(ctx context.Context, firstBucketID,
}
return
}
chunk.InitChunks(chunks, chunk.Bucket, firstBucketID, lastBucketID, beginIndex, s.table.Collation, s.table.Range, bucketChunkCnt)
chunk.InitChunks(chunks, chunk.Bucket, firstBucketID, lastBucketID, beginIndex, bucketChunkCnt, s.table.Collation, s.table.Range)
progress.UpdateTotal(s.progressID, len(chunks), false)
s.chunksCh <- chunks
})
Expand Down
186 changes: 134 additions & 52 deletions sync_diff_inspector/splitter/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,37 @@ import (
"github.com/pingcap/tidb-tools/sync_diff_inspector/progress"
"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
"github.com/pingcap/tidb-tools/sync_diff_inspector/utils"
lightningcommon "github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util/mathutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type RandomIterator struct {
table *common.TableDiff
chunkSize int64
chunks []*chunk.Range
nextChunk uint

dbConn *sql.DB
chunksCh chan *chunk.Range
firstErr lightningcommon.OnceError
eg *errgroup.Group
egCtx context.Context
cancel context.CancelFunc

dbConn *sql.DB
progressID string

splitColumns []*model.ColumnInfo
splitRange *chunk.Range
totalChunkCnt int
beginIndex int

coarseChunks []*chunk.Range
coarseSplitCounts []int
}

const randomSplitCoarseChunks = 256

func NewRandomIterator(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB) (*RandomIterator, error) {
return NewRandomIteratorWithCheckpoint(ctx, progressID, table, dbConn, nil)
}
Expand Down Expand Up @@ -105,13 +123,14 @@ NEXTINDEX:
if startRange != nil {
c := startRange.GetChunk()
if c.IsLastChunkForTable() {
return &RandomIterator{
iter := &RandomIterator{
table: table,
chunkSize: 0,
chunks: nil,
nextChunk: 0,
dbConn: dbConn,
}, nil
chunksCh: make(chan *chunk.Range),
}
close(iter.chunksCh)
return iter, nil
}
// The sequences in `chunk.Range.Bounds` should be equivalent.
for _, bound := range c.Bounds {
Expand All @@ -121,7 +140,6 @@ NEXTINDEX:
// Recover the chunkIndex. Let it be next to the checkpoint node.
beginIndex = c.Index.ChunkIndex + 1
bucketChunkCnt = c.Index.ChunkCnt
// For chunk splitted by random splitter, the checkpoint chunk records the tableCnt.
chunkCnt = bucketChunkCnt - beginIndex
} else {
cnt, err := dbutil.GetRowCount(ctx, dbConn, table.Schema, table.Table, table.Range, nil)
Expand All @@ -130,75 +148,99 @@ NEXTINDEX:
}

chunkSize = table.ChunkSize
// We can use config file to fix chunkSize,
// otherwise chunkSize is 0.
if chunkSize <= 0 {
if len(table.Info.Indices) != 0 {
chunkSize = utils.CalculateChunkSize(cnt)
} else {
// no index
// will use table scan
// so we use one chunk
// plus 1 to avoid chunkSize is 0
// while chunkCnt = (2cnt)/(cnt+1) <= 1
// If table has no index, we use one chunk.
chunkSize = cnt + 1
}
}

log.Info("get chunk size for table", zap.Int64("chunk size", chunkSize),
zap.String("db", table.Schema), zap.String("table", table.Table))

// When cnt is 0, chunkCnt should be also 0.
// When cnt is in [1, chunkSize], chunkCnt should be 1.
chunkCnt = int((cnt + chunkSize - 1) / chunkSize)
log.Info("split range by random", zap.Int64("row count", cnt), zap.Int("split chunk num", chunkCnt))
log.Info("split range by random",
zap.Int64("row count", cnt),
zap.Int("split chunk num", chunkCnt))
bucketChunkCnt = chunkCnt
}

chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields, table.Range, table.Collation)
if err != nil {
return nil, errors.Trace(err)
rctx, cancel := context.WithCancel(ctx)
eg, egctx := errgroup.WithContext(rctx)

iter := &RandomIterator{
table: table,
chunkSize: chunkSize,
dbConn: dbConn,
egCtx: egctx,
cancel: cancel,
eg: eg,
progressID: progressID,
splitColumns: fields,
splitRange: chunkRange,
totalChunkCnt: bucketChunkCnt,
beginIndex: beginIndex,
chunksCh: make(chan *chunk.Range, DefaultChannelBuffer),
}
chunk.InitChunks(chunks, chunk.Random, 0, 0, beginIndex, table.Collation, table.Range, bucketChunkCnt)

failpoint.Inject("ignore-last-n-chunk-in-bucket", func(v failpoint.Value) {
log.Info("failpoint ignore-last-n-chunk-in-bucket injected (random splitter)", zap.Int("n", v.(int)))
if len(chunks) <= 1+v.(int) {
failpoint.Return(nil, nil)
// First split the chunkRange to coarse chunks.
if chunkCnt <= randomSplitCoarseChunks {
iter.coarseChunks = []*chunk.Range{chunkRange}
iter.coarseSplitCounts = []int{max(chunkCnt, 1)}
} else {
coarseChunks, err := splitRangeByRandom(
rctx, dbConn, chunkRange, randomSplitCoarseChunks,
table.Schema, table.Table, fields, table.Range, table.Collation)
if err != nil {
return nil, errors.Trace(err)
}
chunks = chunks[:(len(chunks) - v.(int))]
iter.coarseChunks = coarseChunks
iter.coarseSplitCounts = mathutil.Divide2Batches(chunkCnt, len(coarseChunks))
}

// Then start to produce fine chunks from coarse chunks in background.
iter.eg.Go(func() error {
return iter.produceChunks()
})

progress.StartTable(progressID, len(chunks), true)
return &RandomIterator{
table: table,
chunkSize: chunkSize,
chunks: chunks,
nextChunk: 0,
dbConn: dbConn,
}, nil
progress.StartTable(progressID, 0, false)
return iter, nil

}

func (s *RandomIterator) Next() (*chunk.Range, error) {
if uint(len(s.chunks)) <= s.nextChunk {
return nil, nil
}
c := s.chunks[s.nextChunk]
s.nextChunk = s.nextChunk + 1
failpoint.Inject("print-chunk-info", func() {
lowerBounds := make([]string, len(c.Bounds))
upperBounds := make([]string, len(c.Bounds))
for i, bound := range c.Bounds {
lowerBounds[i] = bound.Lower
upperBounds[i] = bound.Upper
select {
case <-s.egCtx.Done():
return nil, s.firstErr.Get()
case c, ok := <-s.chunksCh:
err := s.firstErr.Get()
if !ok || err != nil {
return nil, err
}
log.Info("failpoint print-chunk-info injected (random splitter)", zap.Strings("lowerBounds", lowerBounds), zap.Strings("upperBounds", upperBounds), zap.String("indexCode", c.Index.ToString()))
})
return c, nil

failpoint.Inject("print-chunk-info", func() {
lowerBounds := make([]string, len(c.Bounds))
upperBounds := make([]string, len(c.Bounds))
for i, bound := range c.Bounds {
lowerBounds[i] = bound.Lower
upperBounds[i] = bound.Upper
}
log.Info("failpoint print-chunk-info injected (random splitter)",
zap.Strings("lowerBounds", lowerBounds),
zap.Strings("upperBounds", upperBounds),
zap.String("indexCode", c.Index.ToString()))
})
return c, nil
}
}

func (s *RandomIterator) Close() {

if s.cancel != nil {
s.cancel()
_ = s.eg.Wait()
}
}

// GetSplitFields returns fields to split chunks, order by pk, uk, index, columns.
Expand Down Expand Up @@ -262,7 +304,7 @@ func splitRangeByRandom(ctx context.Context, db *sql.DB, chunk *chunk.Range, cou
return nil, errors.Trace(err)
}
log.Debug("get split values by random", zap.Stringer("chunk", chunk), zap.Int("random values num", len(randomValues)))
for i := 0; i <= len(randomValues); i++ {
for i := range len(randomValues) {
newChunk := chunk.Copy()

for j, column := range columns {
Expand All @@ -280,6 +322,46 @@ func splitRangeByRandom(ctx context.Context, db *sql.DB, chunk *chunk.Range, cou
}
chunks = append(chunks, newChunk)
}
log.Debug("split range by random", zap.Stringer("origin chunk", chunk), zap.Int("split num", len(chunks)))
log.Debug("split range by random",
zap.Stringer("origin chunk", chunk),
zap.Int("split num", len(chunks)))
return chunks, nil
}

func (s *RandomIterator) produceChunks() error {
defer close(s.chunksCh)
for i := range s.coarseChunks {
fineChunks, err := splitRangeByRandom(
s.egCtx, s.dbConn, s.coarseChunks[i], s.coarseSplitCounts[i],
s.table.Schema, s.table.Table,
s.splitColumns, s.table.Range, s.table.Collation)
if err != nil {
s.firstErr.Set(err)
return err
}

isLastCoarseChunk := i == len(s.coarseChunks)-1
if isLastCoarseChunk {
failpoint.Inject("ignore-last-n-chunk-in-bucket", func(v failpoint.Value) {
log.Info("failpoint ignore-last-n-chunk-in-bucket injected (random splitter)", zap.Int("n", v.(int)))
if len(fineChunks) <= 1+v.(int) {
fineChunks = nil
return
}
fineChunks = fineChunks[:(len(fineChunks) - v.(int))]
})
}

chunk.InitChunks(fineChunks, chunk.Random, 0, 0, s.beginIndex, s.totalChunkCnt, s.table.Collation, s.table.Range)
s.beginIndex += len(fineChunks)
progress.UpdateTotal(s.progressID, len(fineChunks), isLastCoarseChunk)
for _, fineChunk := range fineChunks {
select {
case <-s.egCtx.Done():
return nil
case s.chunksCh <- fineChunk:
}
}
}
return nil
}