diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index 3e690d36f..3eadafcc6 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -59,15 +59,17 @@ type Bound struct { // ChunkID is to identify the sequence of chunks type ChunkID struct { TableIndex int `json:"table-index"` - // we especially treat random split has only one bucket - // which is the whole table - // range is [left, right] + // One chunk may cross multiple buckets, which is expressed as: + // [BucketIndexLeft, BucketIndexRight] + // For table with no stats collected, we treat it as a single bucket. BucketIndexLeft int `json:"bucket-index-left"` BucketIndexRight int `json:"bucket-index-right"` - ChunkIndex int `json:"chunk-index"` - // `ChunkCnt` is the number of chunks in this bucket - // We can compare `ChunkIndex` and `ChunkCnt` to know - // whether this chunk is the last one + + // ChunkIndex is the index of this chunk in the bucket range. + ChunkIndex int `json:"chunk-index"` + + // ChunkCnt is the number of chunks in the bucket range. + // The count is > 1 only when BucketIndexLeft = < BucketIndexRight. ChunkCnt int `json:"chunk-count"` } @@ -467,24 +469,24 @@ func (c *Range) CopyAndUpdate(column, lower, upper string, updateLower, updateUp return newChunk } -// Notice: chunk may contain not only one bucket, which can be expressed as a range [3, 5], -// -// And `lastBucketID` means the `5` and `firstBucketID` means the `3`. -func InitChunks(chunks []*Range, t ChunkType, firstBucketID, lastBucketID int, index int, collation, limits string, chunkCnt int) { - if chunks == nil { - return - } +// InitChunks fills ChunkID/Type/Where/Args for a batch of ranges +func InitChunks( + chunks []*Range, + t ChunkType, + firstBucketID, lastBucketID int, + index, chunkCnt int, + collation, limits string) { for _, chunk := range chunks { conditions, args := chunk.ToString(collation) chunk.Where = fmt.Sprintf("((%s) AND (%s))", conditions, limits) chunk.Args = args + chunk.Type = t chunk.Index = &ChunkID{ BucketIndexLeft: firstBucketID, BucketIndexRight: lastBucketID, ChunkIndex: index, ChunkCnt: chunkCnt, } - chunk.Type = t index++ } } diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index eab1888a6..2bedb6d48 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -42,7 +42,7 @@ type BucketIterator struct { chunkSize int64 chunks []*chunk.Range - nextChunk uint + nextChunk int chunksCh chan []*chunk.Range errCh chan error @@ -94,7 +94,7 @@ func (s *BucketIterator) GetIndexID() int64 { func (s *BucketIterator) Next() (*chunk.Range, error) { var ok bool - if uint(len(s.chunks)) <= s.nextChunk { + if len(s.chunks) <= s.nextChunk { select { case err := <-s.errCh: return nil, errors.Trace(err) @@ -228,7 +228,7 @@ func (s *BucketIterator) splitChunkForBucket(ctx context.Context, firstBucketID, } return } - chunk.InitChunks(chunks, chunk.Bucket, firstBucketID, lastBucketID, beginIndex, s.table.Collation, s.table.Range, bucketChunkCnt) + chunk.InitChunks(chunks, chunk.Bucket, firstBucketID, lastBucketID, beginIndex, bucketChunkCnt, s.table.Collation, s.table.Range) progress.UpdateTotal(s.progressID, len(chunks), false) s.chunksCh <- chunks }) diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index cf07f0ad3..e73991cfd 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -27,19 +27,37 @@ import ( "github.com/pingcap/tidb-tools/sync_diff_inspector/progress" "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" + lightningcommon "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/util/mathutil" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) type RandomIterator struct { table *common.TableDiff chunkSize int64 - chunks []*chunk.Range - nextChunk uint - dbConn *sql.DB + chunksCh chan *chunk.Range + firstErr lightningcommon.OnceError + eg *errgroup.Group + egCtx context.Context + cancel context.CancelFunc + + dbConn *sql.DB + progressID string + + splitColumns []*model.ColumnInfo + splitRange *chunk.Range + totalChunkCnt int + beginIndex int + + coarseChunks []*chunk.Range + coarseSplitCounts []int } +const randomSplitCoarseChunks = 256 + func NewRandomIterator(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB) (*RandomIterator, error) { return NewRandomIteratorWithCheckpoint(ctx, progressID, table, dbConn, nil) } @@ -105,13 +123,14 @@ NEXTINDEX: if startRange != nil { c := startRange.GetChunk() if c.IsLastChunkForTable() { - return &RandomIterator{ + iter := &RandomIterator{ table: table, chunkSize: 0, - chunks: nil, - nextChunk: 0, dbConn: dbConn, - }, nil + chunksCh: make(chan *chunk.Range), + } + close(iter.chunksCh) + return iter, nil } // The sequences in `chunk.Range.Bounds` should be equivalent. for _, bound := range c.Bounds { @@ -121,7 +140,6 @@ NEXTINDEX: // Recover the chunkIndex. Let it be next to the checkpoint node. beginIndex = c.Index.ChunkIndex + 1 bucketChunkCnt = c.Index.ChunkCnt - // For chunk splitted by random splitter, the checkpoint chunk records the tableCnt. chunkCnt = bucketChunkCnt - beginIndex } else { cnt, err := dbutil.GetRowCount(ctx, dbConn, table.Schema, table.Table, table.Range, nil) @@ -130,75 +148,99 @@ NEXTINDEX: } chunkSize = table.ChunkSize - // We can use config file to fix chunkSize, - // otherwise chunkSize is 0. if chunkSize <= 0 { if len(table.Info.Indices) != 0 { chunkSize = utils.CalculateChunkSize(cnt) } else { - // no index - // will use table scan - // so we use one chunk - // plus 1 to avoid chunkSize is 0 - // while chunkCnt = (2cnt)/(cnt+1) <= 1 + // If table has no index, we use one chunk. chunkSize = cnt + 1 } } + log.Info("get chunk size for table", zap.Int64("chunk size", chunkSize), zap.String("db", table.Schema), zap.String("table", table.Table)) - // When cnt is 0, chunkCnt should be also 0. - // When cnt is in [1, chunkSize], chunkCnt should be 1. chunkCnt = int((cnt + chunkSize - 1) / chunkSize) - log.Info("split range by random", zap.Int64("row count", cnt), zap.Int("split chunk num", chunkCnt)) + log.Info("split range by random", + zap.Int64("row count", cnt), + zap.Int("split chunk num", chunkCnt)) bucketChunkCnt = chunkCnt } - chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields, table.Range, table.Collation) - if err != nil { - return nil, errors.Trace(err) + rctx, cancel := context.WithCancel(ctx) + eg, egctx := errgroup.WithContext(rctx) + + iter := &RandomIterator{ + table: table, + chunkSize: chunkSize, + dbConn: dbConn, + egCtx: egctx, + cancel: cancel, + eg: eg, + progressID: progressID, + splitColumns: fields, + splitRange: chunkRange, + totalChunkCnt: bucketChunkCnt, + beginIndex: beginIndex, + chunksCh: make(chan *chunk.Range, DefaultChannelBuffer), } - chunk.InitChunks(chunks, chunk.Random, 0, 0, beginIndex, table.Collation, table.Range, bucketChunkCnt) - failpoint.Inject("ignore-last-n-chunk-in-bucket", func(v failpoint.Value) { - log.Info("failpoint ignore-last-n-chunk-in-bucket injected (random splitter)", zap.Int("n", v.(int))) - if len(chunks) <= 1+v.(int) { - failpoint.Return(nil, nil) + // First split the chunkRange to coarse chunks. + if chunkCnt <= randomSplitCoarseChunks { + iter.coarseChunks = []*chunk.Range{chunkRange} + iter.coarseSplitCounts = []int{max(chunkCnt, 1)} + } else { + coarseChunks, err := splitRangeByRandom( + rctx, dbConn, chunkRange, randomSplitCoarseChunks, + table.Schema, table.Table, fields, table.Range, table.Collation) + if err != nil { + return nil, errors.Trace(err) } - chunks = chunks[:(len(chunks) - v.(int))] + iter.coarseChunks = coarseChunks + iter.coarseSplitCounts = mathutil.Divide2Batches(chunkCnt, len(coarseChunks)) + } + + // Then start to produce fine chunks from coarse chunks in background. + iter.eg.Go(func() error { + return iter.produceChunks() }) - progress.StartTable(progressID, len(chunks), true) - return &RandomIterator{ - table: table, - chunkSize: chunkSize, - chunks: chunks, - nextChunk: 0, - dbConn: dbConn, - }, nil + progress.StartTable(progressID, 0, false) + return iter, nil } func (s *RandomIterator) Next() (*chunk.Range, error) { - if uint(len(s.chunks)) <= s.nextChunk { - return nil, nil - } - c := s.chunks[s.nextChunk] - s.nextChunk = s.nextChunk + 1 - failpoint.Inject("print-chunk-info", func() { - lowerBounds := make([]string, len(c.Bounds)) - upperBounds := make([]string, len(c.Bounds)) - for i, bound := range c.Bounds { - lowerBounds[i] = bound.Lower - upperBounds[i] = bound.Upper + select { + case <-s.egCtx.Done(): + return nil, s.firstErr.Get() + case c, ok := <-s.chunksCh: + err := s.firstErr.Get() + if !ok || err != nil { + return nil, err } - log.Info("failpoint print-chunk-info injected (random splitter)", zap.Strings("lowerBounds", lowerBounds), zap.Strings("upperBounds", upperBounds), zap.String("indexCode", c.Index.ToString())) - }) - return c, nil + + failpoint.Inject("print-chunk-info", func() { + lowerBounds := make([]string, len(c.Bounds)) + upperBounds := make([]string, len(c.Bounds)) + for i, bound := range c.Bounds { + lowerBounds[i] = bound.Lower + upperBounds[i] = bound.Upper + } + log.Info("failpoint print-chunk-info injected (random splitter)", + zap.Strings("lowerBounds", lowerBounds), + zap.Strings("upperBounds", upperBounds), + zap.String("indexCode", c.Index.ToString())) + }) + return c, nil + } } func (s *RandomIterator) Close() { - + if s.cancel != nil { + s.cancel() + _ = s.eg.Wait() + } } // GetSplitFields returns fields to split chunks, order by pk, uk, index, columns. @@ -262,7 +304,7 @@ func splitRangeByRandom(ctx context.Context, db *sql.DB, chunk *chunk.Range, cou return nil, errors.Trace(err) } log.Debug("get split values by random", zap.Stringer("chunk", chunk), zap.Int("random values num", len(randomValues))) - for i := 0; i <= len(randomValues); i++ { + for i := range len(randomValues) { newChunk := chunk.Copy() for j, column := range columns { @@ -280,6 +322,46 @@ func splitRangeByRandom(ctx context.Context, db *sql.DB, chunk *chunk.Range, cou } chunks = append(chunks, newChunk) } - log.Debug("split range by random", zap.Stringer("origin chunk", chunk), zap.Int("split num", len(chunks))) + log.Debug("split range by random", + zap.Stringer("origin chunk", chunk), + zap.Int("split num", len(chunks))) return chunks, nil } + +func (s *RandomIterator) produceChunks() error { + defer close(s.chunksCh) + for i := range s.coarseChunks { + fineChunks, err := splitRangeByRandom( + s.egCtx, s.dbConn, s.coarseChunks[i], s.coarseSplitCounts[i], + s.table.Schema, s.table.Table, + s.splitColumns, s.table.Range, s.table.Collation) + if err != nil { + s.firstErr.Set(err) + return err + } + + isLastCoarseChunk := i == len(s.coarseChunks)-1 + if isLastCoarseChunk { + failpoint.Inject("ignore-last-n-chunk-in-bucket", func(v failpoint.Value) { + log.Info("failpoint ignore-last-n-chunk-in-bucket injected (random splitter)", zap.Int("n", v.(int))) + if len(fineChunks) <= 1+v.(int) { + fineChunks = nil + return + } + fineChunks = fineChunks[:(len(fineChunks) - v.(int))] + }) + } + + chunk.InitChunks(fineChunks, chunk.Random, 0, 0, s.beginIndex, s.totalChunkCnt, s.table.Collation, s.table.Range) + s.beginIndex += len(fineChunks) + progress.UpdateTotal(s.progressID, len(fineChunks), isLastCoarseChunk) + for _, fineChunk := range fineChunks { + select { + case <-s.egCtx.Done(): + return nil + case s.chunksCh <- fineChunk: + } + } + } + return nil +}