From 3dd656d7dae2b6f01174a7981f6878338b3104f5 Mon Sep 17 00:00:00 2001 From: Jiaqiang Huang Date: Wed, 15 Oct 2025 18:18:47 +0800 Subject: [PATCH] importer: parallel init data source files (#63621) close pingcap/tidb#62633 --- pkg/executor/importer/import.go | 34 +++++++++++++++++++---------- pkg/lightning/mydump/BUILD.bazel | 1 + pkg/lightning/mydump/loader_test.go | 9 +++++++- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index ac82aa4e1c..8c1bac7400 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -1267,7 +1267,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { }() size, err3 := fileReader.Seek(0, io.SeekEnd) if err3 != nil { - return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err2), "failed to read file size by seek") + return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err3), "failed to read file size by seek") } e.detectAndUpdateFormat(fileNameKey) sourceType = e.getSourceType() @@ -1296,26 +1296,29 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { allFiles := make([]mydump.RawFile, 0, 16) if err := s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix, SkipSubDir: true}, func(remotePath string, size int64) error { - // we have checked in LoadDataExec.Next - //nolint: errcheck - match, _ := filepath.Match(escapedPath, remotePath) - if !match { - return nil - } - // pick arbitrary one file to detect the format. - e.detectAndUpdateFormat(remotePath) - sourceType = e.getSourceType() allFiles = append(allFiles, mydump.RawFile{Path: remotePath, Size: size}) - totalSize += size return nil }); err != nil { return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err), "failed to walk dir") } var err error - if dataFiles, err = mydump.ParallelProcess(ctx, allFiles, e.ThreadCnt*2, + var processedFiles []*mydump.SourceFileMeta + var once sync.Once + if processedFiles, err = mydump.ParallelProcess(ctx, allFiles, e.ThreadCnt*2, func(ctx context.Context, f mydump.RawFile) (*mydump.SourceFileMeta, error) { + // we have checked in LoadDataExec.Next + //nolint: errcheck + match, _ := filepath.Match(escapedPath, f.Path) + if !match { + return nil, nil + } path, size := f.Path, f.Size + // pick arbitrary one file to detect the format. + once.Do(func() { + e.detectAndUpdateFormat(path) + sourceType = e.getSourceType() + }) compressTp := mydump.ParseCompressionOnFileExtension(path) fileMeta := mydump.SourceFileMeta{ Path: path, @@ -1328,6 +1331,13 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { }); err != nil { return err } + // filter unmatch files + for _, f := range processedFiles { + if f != nil { + dataFiles = append(dataFiles, f) + totalSize += f.FileSize + } + } } if e.InImportInto && isAutoDetectingFormat && e.Format != DataFormatCSV { if err2 = e.checkNonCSVFormatOptions(); err2 != nil { diff --git a/pkg/lightning/mydump/BUILD.bazel b/pkg/lightning/mydump/BUILD.bazel index a71422f556..fb2390d875 100644 --- a/pkg/lightning/mydump/BUILD.bazel +++ b/pkg/lightning/mydump/BUILD.bazel @@ -104,6 +104,7 @@ go_test( "@com_github_xitongsys_parquet_go//parquet", "@com_github_xitongsys_parquet_go//writer", "@com_github_xitongsys_parquet_go_source//local", + "@org_uber_go_atomic//:atomic", "@org_uber_go_goleak//:goleak", "@org_uber_go_mock//gomock", "@org_uber_go_zap//:zap", diff --git a/pkg/lightning/mydump/loader_test.go b/pkg/lightning/mydump/loader_test.go index 6f8fec0258..76f64418c5 100644 --- a/pkg/lightning/mydump/loader_test.go +++ b/pkg/lightning/mydump/loader_test.go @@ -39,6 +39,7 @@ import ( "github.com/stretchr/testify/require" "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/writer" + "go.uber.org/atomic" ) type testMydumpLoaderSuite struct { @@ -1201,7 +1202,9 @@ func TestSetupOptions(t *testing.T) { } func TestParallelProcess(t *testing.T) { + var totalSize atomic.Int64 hdl := func(ctx context.Context, f md.RawFile) (string, error) { + totalSize.Add(f.Size) return strings.ToLower(f.Path), nil } @@ -1216,16 +1219,20 @@ func TestParallelProcess(t *testing.T) { oneTest := func(length int, concurrency int) { original := make([]md.RawFile, length) + totalSize = *atomic.NewInt64(0) for i := range length { - original[i] = md.RawFile{Path: randomString()} + original[i] = md.RawFile{Path: randomString(), Size: int64(rand.Intn(1000))} } res, err := md.ParallelProcess(context.Background(), original, concurrency, hdl) require.NoError(t, err) + oneTotalSize := int64(0) for i, s := range original { require.Equal(t, strings.ToLower(s.Path), res[i]) + oneTotalSize += s.Size } + require.Equal(t, oneTotalSize, totalSize.Load()) } oneTest(10, 0)