importer: parallel init data source files (#63621)

close pingcap/tidb#62633
This commit is contained in:
Jiaqiang Huang
2025-10-15 18:18:47 +08:00
committed by GitHub
parent aa22bdccd3
commit 3dd656d7da
3 changed files with 31 additions and 13 deletions

View File

@ -1267,7 +1267,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
}()
size, err3 := fileReader.Seek(0, io.SeekEnd)
if err3 != nil {
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err2), "failed to read file size by seek")
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err3), "failed to read file size by seek")
}
e.detectAndUpdateFormat(fileNameKey)
sourceType = e.getSourceType()
@ -1296,26 +1296,29 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
allFiles := make([]mydump.RawFile, 0, 16)
if err := s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix, SkipSubDir: true},
func(remotePath string, size int64) error {
// we have checked in LoadDataExec.Next
//nolint: errcheck
match, _ := filepath.Match(escapedPath, remotePath)
if !match {
return nil
}
// pick arbitrary one file to detect the format.
e.detectAndUpdateFormat(remotePath)
sourceType = e.getSourceType()
allFiles = append(allFiles, mydump.RawFile{Path: remotePath, Size: size})
totalSize += size
return nil
}); err != nil {
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err), "failed to walk dir")
}
var err error
if dataFiles, err = mydump.ParallelProcess(ctx, allFiles, e.ThreadCnt*2,
var processedFiles []*mydump.SourceFileMeta
var once sync.Once
if processedFiles, err = mydump.ParallelProcess(ctx, allFiles, e.ThreadCnt*2,
func(ctx context.Context, f mydump.RawFile) (*mydump.SourceFileMeta, error) {
// we have checked in LoadDataExec.Next
//nolint: errcheck
match, _ := filepath.Match(escapedPath, f.Path)
if !match {
return nil, nil
}
path, size := f.Path, f.Size
// pick arbitrary one file to detect the format.
once.Do(func() {
e.detectAndUpdateFormat(path)
sourceType = e.getSourceType()
})
compressTp := mydump.ParseCompressionOnFileExtension(path)
fileMeta := mydump.SourceFileMeta{
Path: path,
@ -1328,6 +1331,13 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
}); err != nil {
return err
}
// filter unmatch files
for _, f := range processedFiles {
if f != nil {
dataFiles = append(dataFiles, f)
totalSize += f.FileSize
}
}
}
if e.InImportInto && isAutoDetectingFormat && e.Format != DataFormatCSV {
if err2 = e.checkNonCSVFormatOptions(); err2 != nil {

View File

@ -104,6 +104,7 @@ go_test(
"@com_github_xitongsys_parquet_go//parquet",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//local",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",

View File

@ -39,6 +39,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
"go.uber.org/atomic"
)
type testMydumpLoaderSuite struct {
@ -1201,7 +1202,9 @@ func TestSetupOptions(t *testing.T) {
}
func TestParallelProcess(t *testing.T) {
var totalSize atomic.Int64
hdl := func(ctx context.Context, f md.RawFile) (string, error) {
totalSize.Add(f.Size)
return strings.ToLower(f.Path), nil
}
@ -1216,16 +1219,20 @@ func TestParallelProcess(t *testing.T) {
oneTest := func(length int, concurrency int) {
original := make([]md.RawFile, length)
totalSize = *atomic.NewInt64(0)
for i := range length {
original[i] = md.RawFile{Path: randomString()}
original[i] = md.RawFile{Path: randomString(), Size: int64(rand.Intn(1000))}
}
res, err := md.ParallelProcess(context.Background(), original, concurrency, hdl)
require.NoError(t, err)
oneTotalSize := int64(0)
for i, s := range original {
require.Equal(t, strings.ToLower(s.Path), res[i])
oneTotalSize += s.Size
}
require.Equal(t, oneTotalSize, totalSize.Load())
}
oneTest(10, 0)