Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
e0104ba
Update
joechenrh Nov 29, 2024
364c134
Use custom arrow-go package
joechenrh Nov 29, 2024
f3222f0
Update go deps
joechenrh Nov 29, 2024
0bab0fd
Update reader
joechenrh Dec 4, 2024
3f155f6
Fix
joechenrh Dec 4, 2024
ab23cd2
Update code
joechenrh Dec 9, 2024
36b295c
Refine
joechenrh Dec 10, 2024
14284da
Refine
joechenrh Dec 10, 2024
010baa4
Fix
joechenrh Dec 11, 2024
0e20195
Add todo
joechenrh Dec 13, 2024
8216f06
test
joechenrh Dec 16, 2024
837c7d1
Update parser and memory usage estimation
joechenrh Dec 16, 2024
dff3f67
Add memory limiter
joechenrh Dec 16, 2024
03d68b2
Merge branch 'master' into change-parquet-lib
joechenrh Dec 16, 2024
b393670
Update go mod
joechenrh Dec 16, 2024
25433c1
Fix bazel
joechenrh Dec 16, 2024
7ff086c
Update go.mod
joechenrh Dec 18, 2024
1648888
Merge branch 'master' into change-parquet-lib
joechenrh Dec 18, 2024
bd3b5a8
Add pprof for test
joechenrh Dec 19, 2024
494baad
[Test Only] output heap usage
joechenrh Dec 24, 2024
15f4aa0
Fix read from S3
joechenrh Dec 26, 2024
1dc2e94
Update
joechenrh Dec 27, 2024
8692b54
Update go mod
joechenrh Dec 31, 2024
97d72f1
Fix
joechenrh Dec 31, 2024
0a4090e
Add new configuration
joechenrh Jan 2, 2025
3d00736
Adjust gc percent
joechenrh Jan 2, 2025
a078229
Fix memory controller
joechenrh Jan 3, 2025
0d6d2d3
Fix GC percentage
joechenrh Jan 3, 2025
134faba
Add more log
joechenrh Jan 3, 2025
8caf76c
Fix build
joechenrh Jan 3, 2025
c19cfeb
Fix error
joechenrh Jan 3, 2025
8374728
Add read mode
joechenrh Jan 7, 2025
c386049
Update allocator
joechenrh Jan 10, 2025
b2fbdc3
Merge branch 'master' into change-parquet-lib
joechenrh Jan 10, 2025
e8af41a
Add comment
joechenrh Jan 10, 2025
32cba87
Update code
joechenrh Jan 17, 2025
14c0e53
Fix bazel
joechenrh Jan 17, 2025
0b76e09
Fix bazel
joechenrh Jan 17, 2025
155ed91
Fix
joechenrh Jan 17, 2025
f98e272
Fix allocator
joechenrh Jan 21, 2025
e188b2e
Remove some test codes
joechenrh Feb 5, 2025
00f39eb
Merge branch 'master' into change-parquet-lib2
joechenrh Feb 5, 2025
9206b62
Fix test
joechenrh Feb 5, 2025
bcbbf41
Fix test
joechenrh Feb 5, 2025
3a374ce
Fix test
joechenrh Feb 5, 2025
b006b58
Merge branch 'master' into change-parquet-lib2
joechenrh Feb 5, 2025
7eb8b3e
Update bazel
joechenrh Feb 6, 2025
abe850f
Update memory allocation
joechenrh Feb 8, 2025
15973ef
Update memory control
joechenrh Feb 10, 2025
fe8cd9c
Fix IMPORT INTO parquet
joechenrh Feb 12, 2025
6e42ba4
Update memory control
joechenrh Feb 13, 2025
b62cc26
Update code for IMPORT INTO
joechenrh Feb 14, 2025
f51fac3
[test] Add base64 try fix OOM for global sort
joechenrh Feb 15, 2025
4c49aa5
update bazel
joechenrh Feb 18, 2025
d3285e5
Merge branch 'master' into change-parquet-lib2
joechenrh Feb 18, 2025
97fa5f9
update bazel
joechenrh Feb 18, 2025
291d7a9
update build
joechenrh Feb 18, 2025
650f9b8
update build
joechenrh Feb 18, 2025
b6681a1
update build
joechenrh Feb 18, 2025
016d1e5
fix test
joechenrh Feb 18, 2025
5a02500
fix test
joechenrh Feb 18, 2025
9f74818
update allocator and comments
joechenrh Feb 19, 2025
30e4d32
revert for test build
joechenrh Feb 20, 2025
9b324ee
revert for test build
joechenrh Feb 20, 2025
1cf6466
update allocator and GC
joechenrh Feb 20, 2025
577812c
fix build
joechenrh Feb 20, 2025
1b8bf90
update
joechenrh Feb 20, 2025
1fffb00
remove some unused code
joechenrh Feb 21, 2025
fdf4578
clean up
joechenrh Feb 26, 2025
99d32e7
add prefetch size
joechenrh Mar 2, 2025
1957b72
Merge branch 'master' into change-parquet-lib2
joechenrh Mar 2, 2025
c75afde
fix build
joechenrh Mar 2, 2025
db06ab0
[test] add base64 for csv
joechenrh Feb 26, 2025
ff2b0ea
fix after merge
joechenrh Mar 3, 2025
f186077
fix
joechenrh Mar 3, 2025
296f9d7
fix test
joechenrh Mar 3, 2025
af7007b
Update code
joechenrh Mar 6, 2025
df7ee45
Update code
joechenrh Mar 6, 2025
1e8f837
Fix merge sort
joechenrh Mar 7, 2025
6fb09b8
[test] skip preprocess for test
joechenrh Mar 7, 2025
4be7553
Adjust encode concurrency based on memory usage
joechenrh Mar 7, 2025
af1b47f
add test
joechenrh Mar 13, 2025
0850431
Revert "add test"
joechenrh May 26, 2025
3781ec7
Revert "[test] skip preprocess for test"
joechenrh May 26, 2025
ab49f52
Merge branch 'master' into change-parquet-lib2
joechenrh May 26, 2025
3b5fd47
update DEPS.bzl
joechenrh May 26, 2025
4136e3f
totally remove xitongsys/parquet-go
joechenrh May 27, 2025
95c1ebf
fix CI
joechenrh May 27, 2025
113817d
update DEPS.bzl
joechenrh May 27, 2025
6ec630a
fix test
joechenrh May 27, 2025
d3de7b7
Merge branch 'master' into change-parquet-lib2
joechenrh Jul 29, 2025
e8e313e
fix after merge
joechenrh Jul 29, 2025
f4fde50
Update go mod and remove non-streaming mode
joechenrh Jul 30, 2025
1a22eff
update timestamp
joechenrh Sep 1, 2025
fad328d
minor update
joechenrh Sep 2, 2025
892ff96
update allocator
joechenrh Sep 4, 2025
7ee9076
update go mod
joechenrh Sep 5, 2025
b0b24c1
Test
joechenrh Sep 5, 2025
0a86cea
Merge branch 'master' into change-parquet-lib2
joechenrh Sep 6, 2025
8b3378b
fix after merge
joechenrh Sep 6, 2025
6c447f9
Revert "Test"
joechenrh Sep 6, 2025
c589910
fix after merge
joechenrh Sep 6, 2025
dddd4af
update code
joechenrh Sep 8, 2025
51110d1
Merge branch 'master' into change-parquet-lib2
joechenrh Sep 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,250 changes: 943 additions & 1,307 deletions DEPS.bzl

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,6 @@ type gcsObjectReader struct {

prefetchSize int
// reader context used for implement `io.Seek`
// currently, lightning depends on package `xitongsys/parquet-go` to read parquet file and it needs `io.Seeker`
// See: https://github.com/xitongsys/parquet-go/blob/207a3cee75900b2b95213627409b7bac0f190bb3/source/source.go#L9-L10
ctx context.Context
}

Expand Down
2 changes: 0 additions & 2 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,8 +1018,6 @@ type s3ObjectReader struct {
pos int64
rangeInfo RangeInfo
// reader context used for implement `io.Seek`
// currently, lightning depends on package `xitongsys/parquet-go` to read parquet file and it needs `io.Seeker`
// See: https://github.com/xitongsys/parquet-go/blob/207a3cee75900b2b95213627409b7bac0f190bb3/source/source.go#L9-L10
ctx context.Context
prefetchSize int
}
Expand Down
18 changes: 8 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/Masterminds/semver v1.5.0
github.com/YangKeao/go-mysql-driver v0.0.0-20240627104025-dd5589458cfa
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1581
github.com/apache/arrow-go/v18 v18.0.0
github.com/apache/skywalking-eyes v0.4.0
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
github.com/ashanbrown/forbidigo/v2 v2.1.0
Expand Down Expand Up @@ -118,8 +119,6 @@ require (
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/vbauerster/mpb/v7 v7.5.3
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f
github.com/xitongsys/parquet-go v1.6.3-0.20240520233950-75e935fc3e17
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
github.com/zyedidia/generic v1.2.1
go.etcd.io/etcd/api/v3 v3.5.15
go.etcd.io/etcd/client/pkg/v3 v3.5.15
Expand Down Expand Up @@ -156,24 +155,22 @@ require (
require (
codeberg.org/chavacava/garif v0.2.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/goccy/go-reflect v1.2.0 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/ldez/grignotin v0.9.0 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/qri-io/jsonpointer v0.1.1 // indirect
github.com/segmentio/fasthash v1.0.3 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
Expand All @@ -197,7 +194,7 @@ require (
github.com/Masterminds/sprig/v3 v3.2.2 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v2 v2.0.4 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand All @@ -220,7 +217,7 @@ require (
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/goccy/go-json v0.10.4 // indirect
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2 // indirect
github.com/golang/glog v1.2.4 // indirect
Expand Down Expand Up @@ -330,6 +327,7 @@ require (
)

replace (
github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250905011811-90682f7df921
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
github.com/pingcap/tidb/pkg/parser => ./pkg/parser

Expand Down
217 changes: 26 additions & 191 deletions go.sum

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions lightning/pkg/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,6 @@ go_test(
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//pkg/caller",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_mock//gomock",
Expand Down
2 changes: 1 addition & 1 deletion lightning/pkg/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func openParser(
case mydump.SourceTypeSQL:
parser = mydump.NewChunkParser(ctx, cfg.TiDB.SQLMode, reader, blockBufSize, ioWorkers)
case mydump.SourceTypeParquet:
parser, err = mydump.NewParquetParser(ctx, store, reader, chunk.FileMeta.Path)
parser, err = mydump.NewParquetParser(ctx, store, reader, chunk.FileMeta.Path, chunk.FileMeta.ParquetMeta)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions lightning/pkg/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,10 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context,
case mydump.SourceTypeSQL:
parser = mydump.NewChunkParser(ctx, p.cfg.TiDB.SQLMode, reader, blockBufSize, p.ioWorkers)
case mydump.SourceTypeParquet:
parser, err = mydump.NewParquetParser(ctx, p.srcStorage, reader, dataFileMeta.Path)
parser, err = mydump.NewParquetParser(
ctx, p.srcStorage, reader,
dataFileMeta.Path, mydump.GetDefaultParquetMeta(),
)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -661,7 +664,10 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
case mydump.SourceTypeSQL:
parser = mydump.NewChunkParser(ctx, p.cfg.TiDB.SQLMode, reader, blockBufSize, p.ioWorkers)
case mydump.SourceTypeParquet:
parser, err = mydump.NewParquetParser(ctx, p.srcStorage, reader, sampleFile.Path)
parser, err = mydump.NewParquetParser(
ctx, p.srcStorage, reader,
sampleFile.Path, mydump.GetDefaultParquetMeta(),
)
if err != nil {
return 0.0, false, errors.Trace(err)
}
Expand Down
41 changes: 18 additions & 23 deletions lightning/pkg/importer/get_pre_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"context"
"database/sql"
"fmt"
"slices"
"strings"
"testing"

"github.com/DATA-DOG/go-sqlmock"
mysql_sql_driver "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/lightning/pkg/importer/mock"
ropts "github.com/pingcap/tidb/lightning/pkg/importer/opts"
"github.com/pingcap/tidb/pkg/errno"
Expand All @@ -36,8 +36,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
pqt_buf_src "github.com/xitongsys/parquet-go-source/buffer"
pqtwriter "github.com/xitongsys/parquet-go/writer"
)

type colDef struct {
Expand Down Expand Up @@ -253,26 +251,24 @@ func TestGetPreInfoGetAllTableStructures(t *testing.T) {
}
}

func generateParquetData(t *testing.T) []byte {
type parquetStruct struct {
ID int64 `parquet:"name=id, type=INT64"`
Name string `parquet:"name=name, type=BYTE_ARRAY"`
}
pf, err := pqt_buf_src.NewBufferFile(make([]byte, 0))
func readParquetData(t *testing.T) []byte {
s, err := storage.ParseBackend("./testdata", nil)
require.NoError(t, err)
pw, err := pqtwriter.NewParquetWriter(pf, new(parquetStruct), 4)

store, err := storage.NewWithDefaultOpt(context.Background(), s)
require.NoError(t, err)
for i := range 10 {
require.NoError(t, pw.Write(parquetStruct{
ID: int64(i + 1),
Name: fmt.Sprintf("name_%d", i+1),
}))
}
require.NoError(t, pw.WriteStop())
require.NoError(t, pf.Close())
bf, ok := pf.(pqt_buf_src.BufferFile)
require.True(t, ok)
return slices.Clone(bf.Bytes())
defer store.Close()

reader, err := store.Open(context.Background(), "test.parquet", nil)
require.NoError(t, err)
defer reader.Close()

bs := make([]byte, 1024)
l, err := reader.Read(bs)
bs = bs[:l]
require.NoError(t, err)

return bs
}

func TestGetPreInfoReadFirstRow(t *testing.T) {
Expand All @@ -282,7 +278,6 @@ func TestGetPreInfoReadFirstRow(t *testing.T) {
111,"aaa"
222,"bbb"
`)
pqtData := generateParquetData(t)
const testSQLData01 string = `INSERT INTO db01.tbl01 (ival, sval) VALUES (333, 'ccc');
INSERT INTO db01.tbl01 (ival, sval) VALUES (444, 'ddd');`
testDataInfos := []struct {
Expand Down Expand Up @@ -349,7 +344,7 @@ INSERT INTO db01.tbl01 (ival, sval) VALUES (444, 'ddd');`
},
{
FileName: "/db01/tbl01/data.005.parquet",
Data: pqtData,
Data: readParquetData(t),
FirstN: 3,
ExpectFirstRowDatums: [][]types.Datum{
{
Expand Down
44 changes: 0 additions & 44 deletions lightning/pkg/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/lightning/pkg/web"
"github.com/pingcap/tidb/pkg/errno"
Expand Down Expand Up @@ -788,13 +787,6 @@ ChunkLoop:
break
}

if chunk.FileMeta.Type == mydump.SourceTypeParquet {
// TODO: use the compressed size of the chunk to conduct memory control
if _, err = getChunkCompressedSizeForParquet(ctx, chunk, rc.store); err != nil {
return nil, errors.Trace(err)
}
}

restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)
go func(w *worker.Worker, cr *chunkProcessor) {
Expand Down Expand Up @@ -1201,42 +1193,6 @@ func (tr *TableImporter) postProcess(
return true, nil
}

func getChunkCompressedSizeForParquet(
ctx context.Context,
chunk *checkpoints.ChunkCheckpoint,
store storage.ExternalStorage,
) (int64, error) {
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {
return 0, errors.Trace(err)
}
parser, err := mydump.NewParquetParser(ctx, store, reader, chunk.FileMeta.Path)
if err != nil {
_ = reader.Close()
return 0, errors.Trace(err)
}
//nolint: errcheck
defer parser.Close()
err = parser.Reader.ReadFooter()
if err != nil {
return 0, errors.Trace(err)
}
rowGroups := parser.Reader.Footer.GetRowGroups()
var maxRowGroupSize int64
for _, rowGroup := range rowGroups {
var rowGroupSize int64
columnChunks := rowGroup.GetColumns()
for _, columnChunk := range columnChunks {
columnChunkSize := columnChunk.MetaData.GetTotalCompressedSize()
rowGroupSize += columnChunkSize
}
maxRowGroupSize = max(maxRowGroupSize, rowGroupSize)
}
return maxRowGroupSize, nil
}

func updateStatsMeta(ctx context.Context, db *sql.DB, tableID int64, count int) {
s := common.SQLWithRetry{
DB: db,
Expand Down
36 changes: 0 additions & 36 deletions lightning/pkg/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2398,39 +2398,3 @@ func TestGetDDLStatus(t *testing.T) {
require.Equal(t, model.JobStateRunning, status.state)
require.Equal(t, int64(123)+int64(456), status.rowCount)
}

func TestGetChunkCompressedSizeForParquet(t *testing.T) {
dir := "./testdata/"
fileName := "000000_0.parquet"
store, err := storage.NewLocalStorage(dir)
require.NoError(t, err)

dataFiles := make([]mydump.FileInfo, 0)
dataFiles = append(dataFiles, mydump.FileInfo{
TableName: filter.Table{Schema: "db", Name: "table"},
FileMeta: mydump.SourceFileMeta{
Path: fileName,
Type: mydump.SourceTypeParquet,
Compression: mydump.CompressionNone,
SortKey: "99",
FileSize: 192,
},
})

chunk := checkpoints.ChunkCheckpoint{
Key: checkpoints.ChunkCheckpointKey{Path: dataFiles[0].FileMeta.Path, Offset: 0},
FileMeta: dataFiles[0].FileMeta,
Chunk: mydump.Chunk{
Offset: 0,
EndOffset: 192,
PrevRowIDMax: 0,
RowIDMax: 100,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

compressedSize, err := getChunkCompressedSizeForParquet(ctx, &chunk, store)
require.NoError(t, err)
require.Equal(t, compressedSize, int64(192))
}
Binary file added lightning/pkg/importer/testdata/test.parquet
Binary file not shown.
21 changes: 15 additions & 6 deletions pkg/disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
Expand All @@ -52,6 +53,7 @@ type encodeAndSortOperator struct {
ctx context.Context
cancel context.CancelFunc
collector execute.Collector
pool *mydump.Pool

taskID, subtaskID int64
tableImporter *importer.TableImporter
Expand All @@ -71,12 +73,14 @@ func newEncodeAndSortOperator(
collector execute.Collector,
subtaskID int64,
concurrency int,
memPool *mydump.Pool,
) *encodeAndSortOperator {
subCtx, cancel := context.WithCancel(ctx)
op := &encodeAndSortOperator{
ctx: subCtx,
cancel: cancel,
collector: collector,
pool: memPool,
taskID: executor.taskID,
subtaskID: subtaskID,
tableImporter: executor.tableImporter,
Expand Down Expand Up @@ -197,7 +201,7 @@ func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool.
// we don't use the input send function, it makes workflow more complex
// we send result to errCh and handle it here.
executor := newImportMinimalTaskExecutor(task)
if err := executor.Run(w.ctx, w.dataWriter, w.indexWriter, w.op.collector); err != nil {
if err := executor.Run(w.ctx, w.dataWriter, w.indexWriter, w.op.collector, w.op.pool); err != nil {
w.op.onError(err)
}
}
Expand Down Expand Up @@ -228,12 +232,17 @@ func subtaskPrefix(taskID, subtaskID int64) string {
return path.Join(strconv.Itoa(int(taskID)), strconv.Itoa(int(subtaskID)))
}

func getWriterMemorySizeLimit(resource *proto.StepResource, plan *importer.Plan) (
func getWriterMemorySizeLimit(resource *proto.StepResource, plan *importer.Plan, encodeStep bool) (
dataKVMemSizePerCon, perIndexKVMemSizePerCon uint64) {
indexKVGroupCnt := getNumOfIndexGenKV(plan.DesiredTableInfo)
memPerCon := resource.Mem.Capacity() / int64(plan.ThreadCnt)
// we use half of the total available memory for data writer, and the other half
// for encoding and other stuffs, it's an experience value, might not optimal.

// We use a portion of the total available memory for data writer, which is depended
// on the data format, and the other half for encoding and other stuffs, it's an
// experience value, might not optimal.
memForWriter := mydump.GetMemoryForWriter(
encodeStep, plan.ParquetFileMemoryUsage,
plan.ThreadCnt, int(resource.Mem.Capacity()))

// Then we divide those memory into indexKVGroupCnt + 3 shares, data KV writer
// takes 3 shares, and each index KV writer takes 1 share.
// suppose we have memPerCon = 2G
Expand All @@ -243,7 +252,7 @@ func getWriterMemorySizeLimit(resource *proto.StepResource, plan *importer.Plan)
// | 1 | 768/256 MiB |
// | 5 | 384/128 MiB |
// | 13 | 192/64 MiB |
memPerShare := float64(memPerCon) / 2 / float64(indexKVGroupCnt+3)
memPerShare := float64(memForWriter) / float64(indexKVGroupCnt+3)
return uint64(memPerShare * 3), uint64(memPerShare)
}

Expand Down
Loading
Loading