From 8d1a00e351ad1e255e4e79b02cd09385b66ededf Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 28 Aug 2025 18:38:58 +0800 Subject: [PATCH] importinto: update thread count after initialized data files (#63200) ref pingcap/tidb#61702 --- pkg/ddl/reorg_util.go | 2 +- pkg/disttask/importinto/scheduler.go | 2 +- pkg/executor/importer/import.go | 30 ++++++++---- .../importintotest/import_into_test.go | 48 +++++++++++++++++++ .../realtikvtest/importintotest/util_test.go | 1 + 5 files changed, 71 insertions(+), 12 deletions(-) diff --git a/pkg/ddl/reorg_util.go b/pkg/ddl/reorg_util.go index 51c30666a5..6080020de4 100644 --- a/pkg/ddl/reorg_util.go +++ b/pkg/ddl/reorg_util.go @@ -87,7 +87,7 @@ func initJobReorgMetaFromVariables(ctx context.Context, job *model.Job, tbl tabl } if setReorgParam { - if kerneltype.IsNextGen() { + if kerneltype.IsNextGen() && setDistTaskParam { autoConc := scheduler.CalcConcurrencyByDataSize(tableSizeInBytes, cpuNum) m.SetConcurrency(autoConc) } else { diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index f9fb8b7d29..90729a4a95 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -354,7 +354,7 @@ func (sch *importScheduler) OnNextSubtasksBatch( return nil, err } previousSubtaskMetas[step] = metas - logger.Info("move to post-process step ", zap.Any("result", taskMeta.Summary)) + logger.Info("move to post-process step", zap.Any("result", taskMeta.Summary)) case proto.StepDone: return nil, nil default: diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 68b2dfa6c2..f6b1b10b93 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -28,6 +28,7 @@ import ( "sync" "unicode/utf8" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -614,13 +615,9 @@ func (e *LoadDataController) checkFieldParams() error { func (p *Plan) initDefaultOptions(ctx context.Context, targetNodeCPUCnt int, store tidbkv.Storage) { var threadCnt int - if kerneltype.IsNextGen() { - threadCnt = scheduler.CalcConcurrencyByDataSize(p.TotalFileSize, targetNodeCPUCnt) - } else { - threadCnt = int(math.Max(1, float64(targetNodeCPUCnt)*0.5)) - if p.DataSourceType == DataSourceTypeQuery { - threadCnt = 2 - } + threadCnt = int(math.Max(1, float64(targetNodeCPUCnt)*0.5)) + if p.DataSourceType == DataSourceTypeQuery { + threadCnt = 2 } p.Checksum = config.OpLevelRequired p.ThreadCnt = threadCnt @@ -871,9 +868,7 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option p.ManualRecovery = true } - if kerneltype.IsNextGen() { - p.MaxNodeCnt = scheduler.CalcMaxNodeCountByDataSize(p.TotalFileSize, targetNodeCPUCnt) - } else { + if kerneltype.IsClassic() { if sv, ok := seCtx.GetSessionVars().GetSystemVar(vardef.TiDBMaxDistTaskNodes); ok { p.MaxNodeCnt = variable.TidbOptInt(sv, 0) if p.MaxNodeCnt == -1 { // -1 means calculate automatically @@ -1313,6 +1308,21 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error { e.dataFiles = dataFiles e.TotalFileSize = totalSize + + if kerneltype.IsNextGen() { + targetNodeCPUCnt, err := handle.GetCPUCountOfNode(ctx) + if err != nil { + return err + } + failpoint.InjectCall("mockImportDataSize", &totalSize) + e.ThreadCnt = scheduler.CalcConcurrencyByDataSize(totalSize, targetNodeCPUCnt) + e.MaxNodeCnt = scheduler.CalcMaxNodeCountByDataSize(totalSize, targetNodeCPUCnt) + e.logger.Info("set import thread count for nextgen kernel", + zap.Int("thread count", e.ThreadCnt), + zap.Int("max node count", e.MaxNodeCnt), + zap.Int("target node cpu count", targetNodeCPUCnt), + zap.String("total file size", units.BytesSize(float64(totalSize)))) + } return nil } diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index 43220f248d..f87f11933f 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/docker/go-units" "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/ngaut/pools" "github.com/pingcap/failpoint" @@ -1342,6 +1343,53 @@ func (s *mockGCSSuite) TestImportIntoWithFK() { s.tk.MustQuery("SELECT * FROM import_into.child;").Check(testkit.Rows("1 1", "2 2")) } +func (s *mockGCSSuite) TestImportIntoWithMockDataSize() { + if kerneltype.IsClassic() { + s.T().Skip("max node and thread auto calculation is not supported in classic") + } + content := []byte(`1,1 + 2,2`) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "mock-datasize-test", + Name: "t.csv", + }, + Content: content, + }) + s.prepareAndUseDB("import_into") + s.tk.MustExec("create table t (a int, b int);") + testCases := []struct { + tblName string + size int64 + threadCnt int + maxNodeCnt int + }{ + {tblName: "t1", size: 15 * units.GiB, threadCnt: 1, maxNodeCnt: 1}, + {tblName: "t2", size: 100 * units.GiB, threadCnt: 4, maxNodeCnt: 1}, + {tblName: "t3", size: 150 * units.GiB, threadCnt: 6, maxNodeCnt: 1}, + {tblName: "t4", size: 40 * units.TiB, threadCnt: 16, maxNodeCnt: 16}, + } + for _, tc := range testCases { + s.tk.MustExec("create table import_into." + tc.tblName + " (a int, b int);") + testfailpoint.EnableCall(s.T(), "github.com/pingcap/tidb/pkg/executor/importer/mockImportDataSize", func(totalSize *int64) { + *totalSize = tc.size + }) + sql := fmt.Sprintf(`IMPORT INTO import_into.%s FROM 'gs://mock-datasize-test/t.csv?endpoint=%s'`, tc.tblName, gcsEndpoint) + s.tk.MustQuery(sql) + s.tk.MustQuery("SELECT * FROM import_into." + tc.tblName + ";").Check(testkit.Rows("1 1", "2 2")) + s.tk.MustQuery(` + with + all_subtasks as (table mysql.tidb_background_subtask union table mysql.tidb_background_subtask_history order by end_time desc limit 1) + select concurrency from all_subtasks + `).Check(testkit.Rows(fmt.Sprintf("%d", tc.threadCnt))) + s.tk.MustQuery(` + with + global_tasks as (table mysql.tidb_global_task union table mysql.tidb_global_task_history order by end_time desc limit 1) + select max_node_count from global_tasks + `).Check(testkit.Rows(fmt.Sprintf("%d", tc.maxNodeCnt))) + } +} + func (s *mockGCSSuite) TestTableMode() { if kerneltype.IsNextGen() { s.T().Skip("switching table mode is not supported in nextgen") diff --git a/tests/realtikvtest/importintotest/util_test.go b/tests/realtikvtest/importintotest/util_test.go index 20b0e72f2a..c6c456252c 100644 --- a/tests/realtikvtest/importintotest/util_test.go +++ b/tests/realtikvtest/importintotest/util_test.go @@ -64,6 +64,7 @@ func (s *mockGCSSuite) SetupSuite() { s.server, err = fakestorage.NewServerWithOptions(opt) s.Require().NoError(err) testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/domain/deltaUpdateDuration", `return`) + testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)") s.store = realtikvtest.CreateMockStoreAndSetup(s.T()) s.tk = testkit.NewTestKit(s.T(), s.store) }