importinto: update thread count after initialized data files (#63200)

ref pingcap/tidb#61702
This commit is contained in:
tangenta
2025-08-28 18:38:58 +08:00
committed by GitHub
parent 6a7d97043a
commit 8d1a00e351
5 changed files with 71 additions and 12 deletions

View File

@ -87,7 +87,7 @@ func initJobReorgMetaFromVariables(ctx context.Context, job *model.Job, tbl tabl
}
if setReorgParam {
if kerneltype.IsNextGen() {
if kerneltype.IsNextGen() && setDistTaskParam {
autoConc := scheduler.CalcConcurrencyByDataSize(tableSizeInBytes, cpuNum)
m.SetConcurrency(autoConc)
} else {

View File

@ -354,7 +354,7 @@ func (sch *importScheduler) OnNextSubtasksBatch(
return nil, err
}
previousSubtaskMetas[step] = metas
logger.Info("move to post-process step ", zap.Any("result", taskMeta.Summary))
logger.Info("move to post-process step", zap.Any("result", taskMeta.Summary))
case proto.StepDone:
return nil, nil
default:

View File

@ -28,6 +28,7 @@ import (
"sync"
"unicode/utf8"
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
@ -614,13 +615,9 @@ func (e *LoadDataController) checkFieldParams() error {
func (p *Plan) initDefaultOptions(ctx context.Context, targetNodeCPUCnt int, store tidbkv.Storage) {
var threadCnt int
if kerneltype.IsNextGen() {
threadCnt = scheduler.CalcConcurrencyByDataSize(p.TotalFileSize, targetNodeCPUCnt)
} else {
threadCnt = int(math.Max(1, float64(targetNodeCPUCnt)*0.5))
if p.DataSourceType == DataSourceTypeQuery {
threadCnt = 2
}
threadCnt = int(math.Max(1, float64(targetNodeCPUCnt)*0.5))
if p.DataSourceType == DataSourceTypeQuery {
threadCnt = 2
}
p.Checksum = config.OpLevelRequired
p.ThreadCnt = threadCnt
@ -871,9 +868,7 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
p.ManualRecovery = true
}
if kerneltype.IsNextGen() {
p.MaxNodeCnt = scheduler.CalcMaxNodeCountByDataSize(p.TotalFileSize, targetNodeCPUCnt)
} else {
if kerneltype.IsClassic() {
if sv, ok := seCtx.GetSessionVars().GetSystemVar(vardef.TiDBMaxDistTaskNodes); ok {
p.MaxNodeCnt = variable.TidbOptInt(sv, 0)
if p.MaxNodeCnt == -1 { // -1 means calculate automatically
@ -1313,6 +1308,21 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
e.dataFiles = dataFiles
e.TotalFileSize = totalSize
if kerneltype.IsNextGen() {
targetNodeCPUCnt, err := handle.GetCPUCountOfNode(ctx)
if err != nil {
return err
}
failpoint.InjectCall("mockImportDataSize", &totalSize)
e.ThreadCnt = scheduler.CalcConcurrencyByDataSize(totalSize, targetNodeCPUCnt)
e.MaxNodeCnt = scheduler.CalcMaxNodeCountByDataSize(totalSize, targetNodeCPUCnt)
e.logger.Info("set import thread count for nextgen kernel",
zap.Int("thread count", e.ThreadCnt),
zap.Int("max node count", e.MaxNodeCnt),
zap.Int("target node cpu count", targetNodeCPUCnt),
zap.String("total file size", units.BytesSize(float64(totalSize))))
}
return nil
}

View File

@ -26,6 +26,7 @@ import (
"sync"
"time"
"github.com/docker/go-units"
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/ngaut/pools"
"github.com/pingcap/failpoint"
@ -1342,6 +1343,53 @@ func (s *mockGCSSuite) TestImportIntoWithFK() {
s.tk.MustQuery("SELECT * FROM import_into.child;").Check(testkit.Rows("1 1", "2 2"))
}
func (s *mockGCSSuite) TestImportIntoWithMockDataSize() {
if kerneltype.IsClassic() {
s.T().Skip("max node and thread auto calculation is not supported in classic")
}
content := []byte(`1,1
2,2`)
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "mock-datasize-test",
Name: "t.csv",
},
Content: content,
})
s.prepareAndUseDB("import_into")
s.tk.MustExec("create table t (a int, b int);")
testCases := []struct {
tblName string
size int64
threadCnt int
maxNodeCnt int
}{
{tblName: "t1", size: 15 * units.GiB, threadCnt: 1, maxNodeCnt: 1},
{tblName: "t2", size: 100 * units.GiB, threadCnt: 4, maxNodeCnt: 1},
{tblName: "t3", size: 150 * units.GiB, threadCnt: 6, maxNodeCnt: 1},
{tblName: "t4", size: 40 * units.TiB, threadCnt: 16, maxNodeCnt: 16},
}
for _, tc := range testCases {
s.tk.MustExec("create table import_into." + tc.tblName + " (a int, b int);")
testfailpoint.EnableCall(s.T(), "github.com/pingcap/tidb/pkg/executor/importer/mockImportDataSize", func(totalSize *int64) {
*totalSize = tc.size
})
sql := fmt.Sprintf(`IMPORT INTO import_into.%s FROM 'gs://mock-datasize-test/t.csv?endpoint=%s'`, tc.tblName, gcsEndpoint)
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM import_into." + tc.tblName + ";").Check(testkit.Rows("1 1", "2 2"))
s.tk.MustQuery(`
with
all_subtasks as (table mysql.tidb_background_subtask union table mysql.tidb_background_subtask_history order by end_time desc limit 1)
select concurrency from all_subtasks
`).Check(testkit.Rows(fmt.Sprintf("%d", tc.threadCnt)))
s.tk.MustQuery(`
with
global_tasks as (table mysql.tidb_global_task union table mysql.tidb_global_task_history order by end_time desc limit 1)
select max_node_count from global_tasks
`).Check(testkit.Rows(fmt.Sprintf("%d", tc.maxNodeCnt)))
}
}
func (s *mockGCSSuite) TestTableMode() {
if kerneltype.IsNextGen() {
s.T().Skip("switching table mode is not supported in nextgen")

View File

@ -64,6 +64,7 @@ func (s *mockGCSSuite) SetupSuite() {
s.server, err = fakestorage.NewServerWithOptions(opt)
s.Require().NoError(err)
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/domain/deltaUpdateDuration", `return`)
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)")
s.store = realtikvtest.CreateMockStoreAndSetup(s.T())
s.tk = testkit.NewTestKit(s.T(), s.store)
}