disttask: mock to start multiple scheduler/taskexecutor (#50801)

ref pingcap/tidb#49008
This commit is contained in:
D3Hunter
2024-01-30 17:48:23 +08:00
committed by GitHub
parent 4e416993cc
commit be57d2fd87
21 changed files with 493 additions and 236 deletions

View File

@ -349,7 +349,7 @@ func generateGlobalSortIngestPlan(
// Skip global sort for empty table.
return nil, nil
}
instanceIDs, err := scheduler.GenerateTaskExecutorNodes(ctx)
instanceIDs, err := scheduler.GetLiveExecIDs(ctx)
if err != nil {
return nil, err
}

View File

@ -23,19 +23,15 @@ import (
)
func TestRetryErrOnNextSubtasksBatch(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanErrSchedulerExt(ctrl, testContext), testContext, nil)
submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext)
distContext.Close()
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanErrSchedulerExt(c.MockCtrl, c.TestContext), c.TestContext, nil)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
}
func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanNotRetryableErrSchedulerExt(ctrl), testContext, nil)
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil)
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateFailed, task.State)
distContext.Close()
}

View File

@ -41,44 +41,42 @@ func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state p
}
func TestFrameworkPauseAndResume(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
// 1. schedule and pause one running task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()"))
task1 := testutil.SubmitAndWaitTask(ctx, t, "key1")
task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStatePaused, task1.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(ctx, "key1"))
require.NoError(t, handle.ResumeTask(c.Ctx, "key1"))
<-scheduler.TestSyncChan
testutil.WaitTaskDoneOrPaused(ctx, t, task1.Key)
CheckSubtasksState(ctx, t, 1, proto.SubtaskStateSucceed, 4)
testutil.WaitTaskDoneOrPaused(c.Ctx, t, task1.Key)
CheckSubtasksState(c.Ctx, t, 1, proto.SubtaskStateSucceed, 4)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume"))
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
errs, err := mgr.GetSubtaskErrors(ctx, 1)
errs, err := mgr.GetSubtaskErrors(c.Ctx, 1)
require.NoError(t, err)
require.Empty(t, errs)
// 2. pause pending task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()"))
task2 := testutil.SubmitAndWaitTask(ctx, t, "key2")
task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2")
require.Equal(t, proto.TaskStatePaused, task2.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(ctx, "key2"))
require.NoError(t, handle.ResumeTask(c.Ctx, "key2"))
<-scheduler.TestSyncChan
testutil.WaitTaskDoneOrPaused(ctx, t, task2.Key)
CheckSubtasksState(ctx, t, 1, proto.SubtaskStateSucceed, 4)
testutil.WaitTaskDoneOrPaused(c.Ctx, t, task2.Key)
CheckSubtasksState(c.Ctx, t, 1, proto.SubtaskStateSucceed, 4)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume"))
errs, err = mgr.GetSubtaskErrors(ctx, 1)
errs, err = mgr.GetSubtaskErrors(c.Ctx, 1)
require.NoError(t, err)
require.Empty(t, errs)
distContext.Close()
}

View File

@ -39,16 +39,15 @@ func checkSubtaskOnNodes(ctx context.Context, t *testing.T, taskID int64, expect
}
func TestRoleBasic(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
tk := testkit.NewTestKit(t, distContext.Store)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
tk := testkit.NewTestKit(t, c.Store)
// 1. all "" role.
submitTaskAndCheckSuccessForBasic(ctx, t, "😁", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😁", c.TestContext)
checkSubtaskOnNodes(ctx, t, 1, []string{":4000", ":4001", ":4002"})
checkSubtaskOnNodes(c.Ctx, t, 1, []string{":4000", ":4001", ":4002"})
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))
@ -60,29 +59,27 @@ func TestRoleBasic(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows(""))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))
checkSubtaskOnNodes(ctx, t, 2, []string{":4000"})
checkSubtaskOnNodes(c.Ctx, t, 2, []string{":4000"})
// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
time.Sleep(5 * time.Second)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()"))
<-scheduler.TestRefreshedChan
submitTaskAndCheckSuccessForBasic(ctx, t, "😆", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😆", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
checkSubtaskOnNodes(ctx, t, 3, []string{":4000", ":4001"})
checkSubtaskOnNodes(c.Ctx, t, 3, []string{":4000", ":4001"})
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows(""))
distContext.Close()
}
func TestSetRole(t *testing.T) {

View File

@ -24,15 +24,13 @@ import (
)
func TestFrameworkRollback(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
testutil.RegisterRollbackTaskMeta(t, ctrl, testutil.GetMockRollbackSchedulerExt(ctrl), testContext)
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterRollbackTaskMeta(t, c.MockCtrl, testutil.GetMockRollbackSchedulerExt(c.MockCtrl), c.TestContext)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask"))
}()
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
distContext.Close()
}

View File

@ -50,82 +50,65 @@ func submitTaskAndCheckSuccess(ctx context.Context, t *testing.T, taskKey string
}
func TestRandomOwnerChangeWithMultipleTasks(t *testing.T) {
nodeCnt := 5
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, nodeCnt)
defer ctrl.Finish()
seed := time.Now().UnixNano()
t.Logf("seed: %d", seed)
random := rand.New(rand.NewSource(seed))
c := testutil.NewTestDXFContext(t, 5)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
var wg util.WaitGroupWrapper
for i := 0; i < 10; i++ {
taskKey := fmt.Sprintf("key%d", i)
wg.Run(func() {
submitTaskAndCheckSuccessForBasic(ctx, t, taskKey, testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, taskKey, c.TestContext)
})
}
wg.Run(func() {
seed := time.Now().UnixNano()
t.Logf("seed in change owner loop: %d", seed)
random := rand.New(rand.NewSource(seed))
for i := 0; i < 3; i++ {
time.Sleep(time.Duration(random.Intn(2000)) * time.Millisecond)
idx := int(random.Int31n(int32(nodeCnt)))
distContext.SetOwner(idx)
require.Eventually(t, func() bool {
return distContext.GetDomain(idx).DDL().OwnerManager().IsOwner()
}, 10*time.Second, 100*time.Millisecond)
c.ChangeOwner()
time.Sleep(time.Duration(random.Int63n(int64(3 * time.Second))))
}
})
wg.Wait()
distContext.Close()
}
func TestFrameworkScaleInAndOut(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 5)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 5)
seed := time.Now().UnixNano()
t.Logf("seed: %d", seed)
random := rand.New(rand.NewSource(seed))
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
var wg util.WaitGroupWrapper
for i := 0; i < 12; i++ {
taskKey := fmt.Sprintf("key%d", i)
wg.Run(func() {
submitTaskAndCheckSuccessForBasic(ctx, t, taskKey, testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, taskKey, c.TestContext)
})
}
wg.Run(func() {
for i := 0; i < 3; i++ {
if random.Intn(2) == 0 {
distContext.AddDomain()
c.ScaleOut(1)
} else {
// TODO: it's not real scale-in, delete domain doesn't stop task executor
// closing domain will reset storage.TaskManager which will cause
// test fail.
distContext.DeleteDomain(int(random.Int31n(int32(distContext.GetDomainCnt()-1))) + 1)
c.ScaleIn(1)
}
time.Sleep(time.Duration(random.Intn(2000)) * time.Millisecond)
idx := int(random.Int31n(int32(distContext.GetDomainCnt())))
distContext.SetOwner(idx)
// TODO we don't wait owner ready, it's not stable, will try refactor
// how we start multiple schedulers/task-executors later.
time.Sleep(time.Duration(random.Int63n(int64(3 * time.Second))))
}
})
wg.Wait()
distContext.Close()
}
func TestFrameworkWithQuery(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
var wg util.WaitGroupWrapper
wg.Run(func() {
submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
})
tk := testkit.NewTestKit(t, distContext.Store)
tk := testkit.NewTestKit(t, c.Store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
@ -138,63 +121,53 @@ func TestFrameworkWithQuery(t *testing.T) {
require.NoError(t, rs.Close())
wg.Wait()
distContext.Close()
}
func TestFrameworkCancelTask(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 2)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel", "1*return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel"))
}()
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
distContext.Close()
}
func TestFrameworkSubTaskFailed(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 1)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 1)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr"))
}()
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
distContext.Close()
}
func TestFrameworkSubTaskInitEnvFailed(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 1)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
c := testutil.NewTestDXFContext(t, 1)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr", "return()"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr"))
}()
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
distContext.Close()
}
func TestOwnerChangeWhenSchedule(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
scheduler.MockOwnerChange = func() {
distContext.SetOwner(0)
c.AsyncChangeOwner()
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange", "1*return(true)"))
submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange"))
distContext.Close()
}
func TestTaskExecutorDownBasic(t *testing.T) {
@ -235,26 +208,24 @@ func TestTaskExecutorDownManyNodes(t *testing.T) {
}
func TestGC(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval"))
}()
c := testutil.NewTestDXFContext(t, 3)
submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext)
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
var historySubTasksCnt int
require.Eventually(t, func() bool {
historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, mgr)
historySubTasksCnt, err = testutil.GetSubtasksFromHistory(c.Ctx, mgr)
if err != nil {
return false
}
@ -264,40 +235,34 @@ func TestGC(t *testing.T) {
scheduler.WaitTaskFinished <- struct{}{}
require.Eventually(t, func() bool {
historySubTasksCnt, err := testutil.GetSubtasksFromHistory(ctx, mgr)
historySubTasksCnt, err := testutil.GetSubtasksFromHistory(c.Ctx, mgr)
if err != nil {
return false
}
return historySubTasksCnt == 0
}, 10*time.Second, 500*time.Millisecond)
distContext.Close()
}
func TestFrameworkSubtaskFinishedCancel(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel"))
}()
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
distContext.Close()
}
func TestFrameworkRunSubtaskCancel(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel", "1*return(true)"))
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel"))
distContext.Close()
}
func TestFrameworkCleanUpRoutine(t *testing.T) {
@ -306,49 +271,45 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
scheduler.DefaultCleanUpInterval = bak
}()
scheduler.DefaultCleanUpInterval = 500 * time.Millisecond
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
c := testutil.NewTestDXFContext(t, 3)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()"))
// normal
submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
<-scheduler.WaitCleanUpFinished
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
tasks, err := mgr.GetTaskByKeyWithHistory(ctx, "key1")
tasks, err := mgr.GetTaskByKeyWithHistory(c.Ctx, "key1")
require.NoError(t, err)
require.NotEmpty(t, tasks)
subtasks, err := testutil.GetSubtasksFromHistory(ctx, mgr)
subtasks, err := testutil.GetSubtasksFromHistory(c.Ctx, mgr)
require.NoError(t, err)
require.NotEmpty(t, subtasks)
// transfer err
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()"))
submitTaskAndCheckSuccessForBasic(ctx, t, "key2", testContext)
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key2", c.TestContext)
<-scheduler.WaitCleanUpFinished
mgr, err = storage.GetTaskManager()
require.NoError(t, err)
tasks, err = mgr.GetTaskByKeyWithHistory(ctx, "key1")
tasks, err = mgr.GetTaskByKeyWithHistory(c.Ctx, "key1")
require.NoError(t, err)
require.NotEmpty(t, tasks)
subtasks, err = testutil.GetSubtasksFromHistory(ctx, mgr)
subtasks, err = testutil.GetSubtasksFromHistory(c.Ctx, mgr)
require.NoError(t, err)
require.NotEmpty(t, subtasks)
distContext.Close()
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished"))
}
func TestTaskCancelledBeforeUpdateTask(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 1)
defer ctrl.Finish()
c := testutil.NewTestDXFContext(t, 1)
testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)"))
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask"))
distContext.Close()
}

View File

@ -19,9 +19,10 @@ import (
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/log"
llog "github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)
@ -40,15 +41,20 @@ var (
// the task and try next one.
type balancer struct {
Param
logger *zap.Logger
// a helper temporary map to record the used slots of each node during balance
// to avoid passing it around.
currUsedSlots map[string]int
}
func newBalancer(param Param) *balancer {
logger := log.L()
if intest.InTest {
logger = log.L().With(zap.String("server-id", param.serverID))
}
return &balancer{
Param: param,
logger: logger,
currUsedSlots: make(map[string]int),
}
}
@ -77,8 +83,8 @@ func (b *balancer) balance(ctx context.Context, sm *Manager) {
schedulers := sm.getSchedulers()
for _, sch := range schedulers {
if err := b.balanceSubtasks(ctx, sch, managedNodes); err != nil {
logutil.Logger(ctx).Warn("failed to balance subtasks",
zap.Int64("task-id", sch.GetTask().ID), log.ShortError(err))
b.logger.Warn("failed to balance subtasks",
zap.Int64("task-id", sch.GetTask().ID), llog.ShortError(err))
return
}
}
@ -200,7 +206,7 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible
if err = b.taskMgr.UpdateSubtasksExecIDs(ctx, subtasksNeedSchedule); err != nil {
return err
}
logutil.BgLogger().Info("balance subtasks", zap.Stringers("subtasks", subtasksNeedSchedule))
b.logger.Info("balance subtasks", zap.Stringers("subtasks", subtasksNeedSchedule))
return nil
}

View File

@ -242,7 +242,7 @@ func TestBalanceOneTask(t *testing.T) {
slotMgr.updateCapacity(16)
b := newBalancer(Param{
taskMgr: mockTaskMgr,
nodeMgr: newNodeManager(),
nodeMgr: newNodeManager(""),
slotMgr: slotMgr,
})
b.currUsedSlots = c.initUsedSlots
@ -263,7 +263,7 @@ func TestBalanceOneTask(t *testing.T) {
slotMgr.updateCapacity(16)
b := newBalancer(Param{
taskMgr: mockTaskMgr,
nodeMgr: newNodeManager(),
nodeMgr: newNodeManager(""),
slotMgr: slotMgr,
})
require.ErrorContains(t, b.balanceSubtasks(ctx, mockScheduler, []string{"tidb1"}), "mock error")
@ -286,7 +286,7 @@ func TestBalanceOneTask(t *testing.T) {
slotMgr.updateCapacity(16)
b := newBalancer(Param{
taskMgr: mockTaskMgr,
nodeMgr: newNodeManager(),
nodeMgr: newNodeManager(""),
slotMgr: slotMgr,
})
require.ErrorContains(t, b.balanceSubtasks(ctx, mockScheduler, []string{"tidb1"}), "mock error")

View File

@ -133,9 +133,10 @@ type Extension interface {
// Param is used to pass parameters when creating scheduler.
type Param struct {
taskMgr TaskManager
nodeMgr *NodeManager
slotMgr *SlotManager
taskMgr TaskManager
nodeMgr *NodeManager
slotMgr *SlotManager
serverID string
}
// schedulerFactoryFn is used to create a scheduler.

View File

@ -41,7 +41,7 @@ func (s *BaseScheduler) Switch2NextStep() (err error) {
}
func NewNodeManager() *NodeManager {
return newNodeManager()
return newNodeManager("")
}
func TestMain(m *testing.M) {

View File

@ -20,9 +20,9 @@ import (
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/log"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/log"
llog "github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)
@ -34,6 +34,7 @@ var (
// NodeManager maintains live TiDB nodes in the cluster, and maintains the nodes
// managed by the framework.
type NodeManager struct {
logger *zap.Logger
// prevLiveNodes is used to record the live nodes in last checking.
prevLiveNodes map[string]struct{}
// managedNodes is the cached nodes managed by the framework.
@ -41,8 +42,13 @@ type NodeManager struct {
managedNodes atomic.Pointer[[]string]
}
func newNodeManager() *NodeManager {
func newNodeManager(serverID string) *NodeManager {
logger := log.L()
if intest.InTest {
logger = log.L().With(zap.String("server-id", serverID))
}
nm := &NodeManager{
logger: logger,
prevLiveNodes: make(map[string]struct{}),
}
managedNodes := make([]string, 0, 10)
@ -67,15 +73,14 @@ func (nm *NodeManager) maintainLiveNodesLoop(ctx context.Context, taskMgr TaskMa
// see recoverMetaLoop in task executor for when node is inserted into dist_framework_meta.
func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManager) {
// Safe to discard errors since this function can be called at regular intervals.
serverInfos, err := GenerateTaskExecutorNodes(ctx)
liveExecIDs, err := GetLiveExecIDs(ctx)
if err != nil {
logutil.BgLogger().Warn("generate task executor nodes met error", log.ShortError(err))
nm.logger.Warn("generate task executor nodes met error", llog.ShortError(err))
return
}
nodeChanged := len(serverInfos) != len(nm.prevLiveNodes)
currLiveNodes := make(map[string]struct{}, len(serverInfos))
for _, info := range serverInfos {
execID := disttaskutil.GenerateExecID(info)
nodeChanged := len(liveExecIDs) != len(nm.prevLiveNodes)
currLiveNodes := make(map[string]struct{}, len(liveExecIDs))
for _, execID := range liveExecIDs {
if _, ok := nm.prevLiveNodes[execID]; !ok {
nodeChanged = true
}
@ -87,7 +92,7 @@ func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManage
oldNodes, err := taskMgr.GetAllNodes(ctx)
if err != nil {
logutil.BgLogger().Warn("get all nodes met error", log.ShortError(err))
nm.logger.Warn("get all nodes met error", llog.ShortError(err))
return
}
@ -101,11 +106,11 @@ func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManage
nm.prevLiveNodes = currLiveNodes
return
}
logutil.BgLogger().Info("delete dead nodes from dist_framework_meta",
nm.logger.Info("delete dead nodes from dist_framework_meta",
zap.Int("dead-nodes", len(deadNodes)))
err = taskMgr.DeleteDeadNodes(ctx, deadNodes)
if err != nil {
logutil.BgLogger().Warn("delete dead nodes from dist_framework_meta failed", log.ShortError(err))
nm.logger.Warn("delete dead nodes from dist_framework_meta failed", llog.ShortError(err))
return
}
nm.prevLiveNodes = currLiveNodes
@ -131,7 +136,7 @@ var TestRefreshedChan = make(chan struct{})
func (nm *NodeManager) refreshManagedNodes(ctx context.Context, taskMgr TaskManager, slotMgr *SlotManager) {
newNodes, err := taskMgr.GetManagedNodes(ctx)
if err != nil {
logutil.BgLogger().Warn("get managed nodes met error", log.ShortError(err))
nm.logger.Warn("get managed nodes met error", llog.ShortError(err))
return
}
nodeIDs := make([]string, 0, len(newNodes))

View File

@ -22,7 +22,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
@ -38,11 +37,9 @@ func TestMaintainLiveNodes(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTaskExecutorNodes"))
})
MockServerInfo = []*infosync.ServerInfo{
{Port: 4000},
}
MockServerInfo.Store(&[]string{":4000"})
nodeMgr := newNodeManager()
nodeMgr := newNodeManager("")
ctx := context.Background()
mockTaskMgr.EXPECT().GetAllNodes(gomock.Any()).Return(nil, errors.New("mock error"))
nodeMgr.maintainLiveNodes(ctx, mockTaskMgr)
@ -59,10 +56,7 @@ func TestMaintainLiveNodes(t *testing.T) {
require.True(t, ctrl.Satisfied())
// scale out 1 node
MockServerInfo = []*infosync.ServerInfo{
{Port: 4000},
{Port: 4001},
}
MockServerInfo.Store(&[]string{":4000", ":4001"})
// fail on clean
mockTaskMgr.EXPECT().GetAllNodes(gomock.Any()).Return([]proto.ManagedNode{{ID: ":4000"}, {ID: ":4001"}, {ID: ":4002"}}, nil)
@ -82,9 +76,7 @@ func TestMaintainLiveNodes(t *testing.T) {
require.True(t, ctrl.Satisfied())
// scale in 1 node
MockServerInfo = []*infosync.ServerInfo{
{Port: 4000},
}
MockServerInfo.Store(&[]string{":4000"})
mockTaskMgr.EXPECT().GetAllNodes(gomock.Any()).Return([]proto.ManagedNode{{ID: ":4000"}, {ID: ":4001"}, {ID: ":4002"}}, nil)
mockTaskMgr.EXPECT().DeleteDeadNodes(gomock.Any(), gomock.Any()).Return(nil)
@ -102,7 +94,7 @@ func TestMaintainManagedNodes(t *testing.T) {
defer ctrl.Finish()
ctx := context.Background()
mockTaskMgr := mock.NewMockTaskManager(ctrl)
nodeMgr := newNodeManager()
nodeMgr := newNodeManager("")
slotMgr := newSlotManager()
mockTaskMgr.EXPECT().GetManagedNodes(gomock.Any()).Return(nil, errors.New("mock error"))

View File

@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/backoff"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
@ -99,12 +100,15 @@ var MockOwnerChange func()
// NewBaseScheduler creates a new BaseScheduler.
func NewBaseScheduler(ctx context.Context, task *proto.Task, param Param) *BaseScheduler {
logger := log.L().With(zap.Int64("task-id", task.ID), zap.Stringer("task-type", task.Type))
if intest.InTest {
logger = logger.With(zap.String("server-id", param.serverID))
}
s := &BaseScheduler{
ctx: ctx,
Param: param,
logger: log.L().With(zap.Int64("task-id", task.ID),
zap.Stringer("task-type", task.Type)),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
ctx: ctx,
Param: param,
logger: logger,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
s.task.Store(task)
return s
@ -491,13 +495,25 @@ func (s *BaseScheduler) handlePlanErr(err error) error {
}
// MockServerInfo exported for scheduler_test.go
var MockServerInfo []*infosync.ServerInfo
var MockServerInfo atomic.Pointer[[]string]
// GenerateTaskExecutorNodes generate a eligible TiDB nodes.
func GenerateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) {
// GetLiveExecIDs returns all live executor node IDs.
func GetLiveExecIDs(ctx context.Context) ([]string, error) {
failpoint.Inject("mockTaskExecutorNodes", func() {
failpoint.Return(MockServerInfo, nil)
failpoint.Return(*MockServerInfo.Load(), nil)
})
serverInfos, err := generateTaskExecutorNodes(ctx)
if err != nil {
return nil, err
}
execIDs := make([]string, 0, len(serverInfos))
for _, info := range serverInfos {
execIDs = append(execIDs, disttaskutil.GenerateExecID(info))
}
return execIDs, nil
}
func generateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) {
var serverInfos map[string]*infosync.ServerInfo
_, etcd := ctx.Value("etcd").(bool)
if intest.InTest && !etcd {

View File

@ -21,11 +21,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/metrics"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/syncutil"
"go.uber.org/zap"
)
@ -108,6 +109,7 @@ type Manager struct {
initialized bool
// serverID, it's value is ip:port now.
serverID string
logger *zap.Logger
finishCh chan struct{}
@ -121,20 +123,30 @@ type Manager struct {
// NewManager creates a scheduler struct.
func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Manager {
logger := log.L()
if intest.InTest {
logger = log.L().With(zap.String("server-id", serverID))
}
subCtx, cancel := context.WithCancel(ctx)
slotMgr := newSlotManager()
nodeMgr := newNodeManager(serverID)
schedulerManager := &Manager{
ctx: subCtx,
cancel: cancel,
taskMgr: taskMgr,
serverID: serverID,
slotMgr: newSlotManager(),
nodeMgr: newNodeManager(),
slotMgr: slotMgr,
nodeMgr: nodeMgr,
balancer: newBalancer(Param{
taskMgr: taskMgr,
nodeMgr: nodeMgr,
slotMgr: slotMgr,
serverID: serverID,
}),
logger: logger,
finishCh: make(chan struct{}, proto.MaxConcurrentTask),
}
schedulerManager.ctx, schedulerManager.cancel = context.WithCancel(ctx)
schedulerManager.mu.schedulerMap = make(map[int64]Scheduler)
schedulerManager.finishCh = make(chan struct{}, proto.MaxConcurrentTask)
schedulerManager.balancer = newBalancer(Param{
taskMgr: taskMgr,
nodeMgr: schedulerManager.nodeMgr,
slotMgr: schedulerManager.slotMgr,
})
return schedulerManager
}
@ -179,13 +191,13 @@ func (sm *Manager) Initialized() bool {
// scheduleTaskLoop schedules the tasks.
func (sm *Manager) scheduleTaskLoop() {
logutil.BgLogger().Info("schedule task loop start")
sm.logger.Info("schedule task loop start")
ticker := time.NewTicker(checkTaskRunningInterval)
defer ticker.Stop()
for {
select {
case <-sm.ctx.Done():
logutil.BgLogger().Info("schedule task loop exits", zap.Error(sm.ctx.Err()), zap.Int64("interval", int64(checkTaskRunningInterval)/1000000))
sm.logger.Info("schedule task loop exits")
return
case <-ticker.C:
case <-handle.TaskChangedCh:
@ -193,14 +205,14 @@ func (sm *Manager) scheduleTaskLoop() {
taskCnt := sm.getSchedulerCount()
if taskCnt >= proto.MaxConcurrentTask {
logutil.BgLogger().Info("scheduled tasks reached limit",
sm.logger.Debug("scheduled tasks reached limit",
zap.Int("current", taskCnt), zap.Int("max", proto.MaxConcurrentTask))
continue
}
tasks, err := sm.taskMgr.GetTopUnfinishedTasks(sm.ctx)
if err != nil {
logutil.BgLogger().Warn("get unfinished tasks failed", zap.Error(err))
sm.logger.Warn("get unfinished tasks failed", zap.Error(err))
continue
}
@ -214,7 +226,7 @@ func (sm *Manager) scheduleTaskLoop() {
// this should not happen normally, unless user modify system table
// directly.
if getSchedulerFactory(task.Type) == nil {
logutil.BgLogger().Warn("unknown task type", zap.Int64("task-id", task.ID),
sm.logger.Warn("unknown task type", zap.Int64("task-id", task.ID),
zap.Stringer("task-type", task.Type))
sm.failTask(task.ID, task.State, errors.New("unknown task type"))
continue
@ -226,7 +238,7 @@ func (sm *Manager) scheduleTaskLoop() {
}
if err = sm.slotMgr.update(sm.ctx, sm.nodeMgr, sm.taskMgr); err != nil {
logutil.BgLogger().Warn("update used slot failed", zap.Error(err))
sm.logger.Warn("update used slot failed", zap.Error(err))
continue
}
for _, task := range schedulableTasks {
@ -248,7 +260,7 @@ func (sm *Manager) scheduleTaskLoop() {
func (sm *Manager) failTask(id int64, currState proto.TaskState, err error) {
if err2 := sm.taskMgr.FailTask(sm.ctx, id, currState, err); err2 != nil {
logutil.BgLogger().Warn("failed to update task state to failed",
sm.logger.Warn("failed to update task state to failed",
zap.Int64("task-id", id), zap.Error(err2))
}
}
@ -263,20 +275,20 @@ func (sm *Manager) gcSubtaskHistoryTableLoop() {
<-WaitTaskFinished
})
logutil.BgLogger().Info("subtask table gc loop start")
sm.logger.Info("subtask table gc loop start")
ticker := time.NewTicker(historySubtaskTableGcInterval)
defer ticker.Stop()
for {
select {
case <-sm.ctx.Done():
logutil.BgLogger().Info("subtask history table gc loop exits", zap.Error(sm.ctx.Err()))
sm.logger.Info("subtask history table gc loop exits")
return
case <-ticker.C:
err := sm.taskMgr.GCSubtasks(sm.ctx)
if err != nil {
logutil.BgLogger().Warn("subtask history table gc failed", zap.Error(err))
sm.logger.Warn("subtask history table gc failed", zap.Error(err))
} else {
logutil.BgLogger().Info("subtask history table gc success")
sm.logger.Info("subtask history table gc success")
}
}
}
@ -285,46 +297,47 @@ func (sm *Manager) gcSubtaskHistoryTableLoop() {
func (sm *Manager) startScheduler(basicTask *proto.Task, reservedExecID string) {
task, err := sm.taskMgr.GetTaskByID(sm.ctx, basicTask.ID)
if err != nil {
logutil.BgLogger().Error("get task failed", zap.Int64("task-id", basicTask.ID), zap.Error(err))
sm.logger.Error("get task failed", zap.Int64("task-id", basicTask.ID), zap.Error(err))
return
}
schedulerFactory := getSchedulerFactory(task.Type)
scheduler := schedulerFactory(sm.ctx, task, Param{
taskMgr: sm.taskMgr,
nodeMgr: sm.nodeMgr,
slotMgr: sm.slotMgr,
taskMgr: sm.taskMgr,
nodeMgr: sm.nodeMgr,
slotMgr: sm.slotMgr,
serverID: sm.serverID,
})
if err = scheduler.Init(); err != nil {
logutil.BgLogger().Error("init scheduler failed", zap.Error(err))
sm.logger.Error("init scheduler failed", zap.Error(err))
sm.failTask(task.ID, task.State, err)
return
}
sm.addScheduler(task.ID, scheduler)
sm.slotMgr.reserve(basicTask, reservedExecID)
// Using the pool with block, so it wouldn't return an error.
sm.logger.Info("task scheduler started", zap.Int64("task-id", task.ID))
sm.schedulerWG.RunWithLog(func() {
defer func() {
scheduler.Close()
sm.delScheduler(task.ID)
sm.slotMgr.unReserve(basicTask, reservedExecID)
handle.NotifyTaskChange()
sm.logger.Info("task scheduler exist", zap.Int64("task-id", task.ID))
}()
metrics.UpdateMetricsForRunTask(task)
scheduler.ScheduleTask()
logutil.BgLogger().Info("task finished", zap.Int64("task-id", task.ID))
sm.finishCh <- struct{}{}
})
}
func (sm *Manager) cleanupTaskLoop() {
logutil.BgLogger().Info("cleanup loop start")
sm.logger.Info("cleanup loop start")
ticker := time.NewTicker(DefaultCleanUpInterval)
defer ticker.Stop()
for {
select {
case <-sm.ctx.Done():
logutil.BgLogger().Info("cleanup loop exits", zap.Error(sm.ctx.Err()))
sm.logger.Info("cleanup loop exits")
return
case <-sm.finishCh:
sm.doCleanupTask()
@ -349,29 +362,29 @@ func (sm *Manager) doCleanupTask() {
proto.TaskStateSucceed,
)
if err != nil {
logutil.BgLogger().Warn("cleanup routine failed", zap.Error(err))
sm.logger.Warn("get task in states failed", zap.Error(err))
return
}
if len(tasks) == 0 {
return
}
logutil.BgLogger().Info("cleanup routine start")
sm.logger.Info("cleanup routine start")
err = sm.cleanupFinishedTasks(tasks)
if err != nil {
logutil.BgLogger().Warn("cleanup routine failed", zap.Error(err))
sm.logger.Warn("cleanup routine failed", zap.Error(err))
return
}
failpoint.Inject("WaitCleanUpFinished", func() {
WaitCleanUpFinished <- struct{}{}
})
logutil.BgLogger().Info("cleanup routine success")
sm.logger.Info("cleanup routine success")
}
func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error {
cleanedTasks := make([]*proto.Task, 0)
var firstErr error
for _, task := range tasks {
logutil.BgLogger().Info("cleanup task", zap.Int64("task-id", task.ID))
sm.logger.Info("cleanup task", zap.Int64("task-id", task.ID))
cleanupFactory := getSchedulerCleanUpFactory(task.Type)
if cleanupFactory != nil {
cleanup := cleanupFactory()
@ -387,7 +400,7 @@ func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error {
}
}
if firstErr != nil {
logutil.BgLogger().Warn("cleanup routine failed", zap.Error(errors.Trace(firstErr)))
sm.logger.Warn("cleanup routine failed", zap.Error(errors.Trace(firstErr)))
}
failpoint.Inject("mockTransferErr", func() {
@ -400,8 +413,9 @@ func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error {
// MockScheduler mock one scheduler for one task, only used for tests.
func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler {
return NewBaseScheduler(sm.ctx, task, Param{
taskMgr: sm.taskMgr,
nodeMgr: sm.nodeMgr,
slotMgr: sm.slotMgr,
taskMgr: sm.taskMgr,
nodeMgr: sm.nodeMgr,
slotMgr: sm.slotMgr,
serverID: sm.serverID,
})
}

View File

@ -181,7 +181,7 @@ func TestSlotManagerUpdate(t *testing.T) {
defer ctrl.Finish()
ctx := context.Background()
nodeMgr := newNodeManager()
nodeMgr := newNodeManager("")
taskMgr := mock.NewMockTaskManager(ctrl)
nodeMgr.managedNodes.Store(&[]string{"tidb-1", "tidb-2", "tidb-3"})
taskMgr.EXPECT().GetUsedSlotsOnNodes(gomock.Any()).Return(map[string]int{

View File

@ -26,7 +26,7 @@ go_library(
"//pkg/util/backoff",
"//pkg/util/cpu",
"//pkg/util/gctuner",
"//pkg/util/logutil",
"//pkg/util/intest",
"//pkg/util/memory",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",

View File

@ -22,13 +22,14 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/metrics"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
)
@ -70,6 +71,10 @@ type Manager struct {
// NewManager creates a new task executor Manager.
func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error) {
logger := log.L()
if intest.InTest {
logger = logger.With(zap.String("server-id", id))
}
totalMem, err := memory.MemTotal()
if err != nil {
// should not happen normally, as in main function of tidb-server, we assert
@ -80,12 +85,12 @@ func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager,
if totalCPU <= 0 || totalMem <= 0 {
return nil, errors.Errorf("invalid cpu or memory, cpu: %d, memory: %d", totalCPU, totalMem)
}
logutil.BgLogger().Info("build manager", zap.Int("total-cpu", totalCPU),
logger.Info("build task executor manager", zap.Int("total-cpu", totalCPU),
zap.String("total-mem", units.BytesSize(float64(totalMem))))
m := &Manager{
id: id,
taskTable: taskTable,
logger: logutil.BgLogger(),
logger: logger,
slotManager: &slotManager{
taskID2Index: make(map[int64]int),
executorTasks: make([]*proto.Task, 0),
@ -145,7 +150,7 @@ func (m *Manager) recoverMeta() (err error) {
// Start starts the Manager.
func (m *Manager) Start() error {
m.logger.Debug("manager start")
m.logger.Info("task executor manager start")
m.wg.Run(m.handleTasksLoop)
m.wg.Run(m.recoverMetaLoop)
return nil

View File

@ -35,7 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/gctuner"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
)
@ -84,14 +84,17 @@ type BaseTaskExecutor struct {
// NewBaseTaskExecutor creates a new BaseTaskExecutor.
func NewBaseTaskExecutor(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) *BaseTaskExecutor {
logger := log.L().With(zap.Int64("task-id", task.ID), zap.String("task-type", string(task.Type)))
if intest.InTest {
logger = logger.With(zap.String("server-id", id))
}
subCtx, cancelFunc := context.WithCancel(ctx)
taskExecutorImpl := &BaseTaskExecutor{
id: id,
taskTable: taskTable,
ctx: subCtx,
cancel: cancelFunc,
logger: log.L().With(zap.Int64("task-id", task.ID),
zap.String("task-type", string(task.Type))),
logger: logger,
}
taskExecutorImpl.task.Store(task)
return taskExecutorImpl
@ -453,11 +456,11 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute.
if taskID, ok := val.(int); ok {
mgr, err := storage.GetTaskManager()
if err != nil {
logutil.BgLogger().Error("get task manager failed", zap.Error(err))
e.logger.Error("get task manager failed", zap.Error(err))
} else {
err = mgr.CancelTask(ctx, int64(taskID))
if err != nil {
logutil.BgLogger().Error("cancel task failed", zap.Error(err))
e.logger.Error("cancel task failed", zap.Error(err))
}
}
}

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/sessionctx",
"//pkg/store/mockstore",
"//pkg/testkit",
"//pkg/util",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@com_github_ngaut_pools//:pools",

View File

@ -17,17 +17,280 @@ package testutil
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ngaut/pools"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/testkit"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
"go.uber.org/mock/gomock"
)
type tidbNode struct {
id string
owner bool
exeMgr *taskexecutor.Manager
schMgr *scheduler.Manager
}
// TestDXFContext is the context for testing DXF.
type TestDXFContext struct {
T *testing.T
Store kv.Storage
Ctx context.Context
TaskMgr *storage.TaskManager
MockCtrl *gomock.Controller
TestContext *TestContext
idAllocator atomic.Int32
// in real case, when node scale in/out, the node might use the same IP or host name
// such as using K8S, so we use this to simulate this case.
nodeIDPool chan string
rand *rand.Rand
wg tidbutil.WaitGroupWrapper
mu struct {
sync.RWMutex
// to test network partition, we allow multiple owners
ownerIndices map[string]int
nodeIndices map[string]int
nodes []*tidbNode
}
}
// NewTestDXFContext creates a new TestDXFContext.
func NewTestDXFContext(t *testing.T, nodeNum int) *TestDXFContext {
// all nodes are isometric with 16 CPUs
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTaskExecutorNodes", "return()"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTaskExecutorNodes"))
})
store := testkit.CreateMockStore(t)
pool := pools.NewResourcePool(func() (pools.Resource, error) {
return testkit.NewSession(t, store), nil
}, 10, 10, time.Second)
t.Cleanup(func() {
pool.Close()
})
taskManager := storage.NewTaskManager(pool)
storage.SetTaskManager(taskManager)
ctx := context.Background()
ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
seed := time.Now().UnixNano()
t.Log("dxf context seed:", seed)
ctrl := gomock.NewController(t)
c := &TestDXFContext{
T: t,
Store: store,
Ctx: ctx,
TaskMgr: taskManager,
MockCtrl: ctrl,
TestContext: &TestContext{
subtasksHasRun: make(map[string]map[int64]struct{}),
},
nodeIDPool: make(chan string, 100),
rand: rand.New(rand.NewSource(seed)),
}
c.mu.ownerIndices = make(map[string]int)
c.mu.nodeIndices = make(map[string]int, nodeNum)
c.init(nodeNum)
t.Cleanup(func() {
ctrl.Finish()
c.close()
})
return c
}
// init initializes the context with nodeNum tidb nodes.
// The last node is the owner.
func (c *TestDXFContext) init(nodeNum int) {
for i := 0; i < nodeNum; i++ {
c.ScaleOutBy(c.getNodeID(), i == nodeNum-1)
}
}
func (c *TestDXFContext) getNodeID() string {
select {
case id := <-c.nodeIDPool:
return id
default:
return fmt.Sprintf(":%d", 4000-1+c.idAllocator.Add(1))
}
}
func (c *TestDXFContext) recycleNodeID(id string) {
select {
case c.nodeIDPool <- id:
default:
}
}
// ScaleOut scales out a tidb node, and elect owner if required.
func (c *TestDXFContext) ScaleOut(nodeNum int) {
for i := 0; i < nodeNum; i++ {
c.ScaleOutBy(c.getNodeID(), false)
}
c.electIfNeeded()
}
// ScaleOutBy scales out a tidb node by id, and set it as owner if required.
func (c *TestDXFContext) ScaleOutBy(id string, owner bool) {
c.T.Logf("scale out node of id = %s, owner = %t", id, owner)
c.updateLiveExecIDs(id)
exeMgr, err := taskexecutor.NewManager(c.Ctx, id, c.TaskMgr)
require.NoError(c.T, err)
require.NoError(c.T, exeMgr.InitMeta())
require.NoError(c.T, exeMgr.Start())
var schMgr *scheduler.Manager
if owner {
schMgr = scheduler.NewManager(c.Ctx, c.TaskMgr, id)
schMgr.Start()
}
node := &tidbNode{
id: id,
owner: owner,
exeMgr: exeMgr,
schMgr: schMgr,
}
c.mu.Lock()
defer c.mu.Unlock()
c.mu.nodes = append(c.mu.nodes, node)
c.mu.nodeIndices[id] = len(c.mu.nodes) - 1
if owner {
c.mu.ownerIndices[id] = len(c.mu.nodes) - 1
}
}
func (c *TestDXFContext) updateLiveExecIDs(newID string) {
c.mu.Lock()
defer c.mu.Unlock()
execIDs := make([]string, 0, len(c.mu.nodes)+1)
for _, n := range c.mu.nodes {
execIDs = append(execIDs, n.id)
}
if len(newID) > 0 {
execIDs = append(execIDs, newID)
}
scheduler.MockServerInfo.Store(&execIDs)
}
// ScaleIn scales in some last added tidb nodes, elect new owner if required.
func (c *TestDXFContext) ScaleIn(nodeNum int) {
for i := 0; i < nodeNum; i++ {
c.mu.Lock()
if len(c.mu.nodes) == 0 {
c.mu.Unlock()
return
}
node := c.mu.nodes[len(c.mu.nodes)-1]
c.mu.Unlock()
c.ScaleInBy(node.id)
}
}
// ScaleInBy scales in a tidb node by id, elect new owner if required.
func (c *TestDXFContext) ScaleInBy(id string) {
c.mu.Lock()
idx, ok := c.mu.nodeIndices[id]
if !ok {
c.mu.Unlock()
return
}
node := c.mu.nodes[idx]
c.mu.nodes = append(c.mu.nodes[:idx], c.mu.nodes[idx+1:]...)
delete(c.mu.nodeIndices, id)
if node.owner {
delete(c.mu.ownerIndices, id)
}
c.recycleNodeID(id)
c.mu.Unlock()
c.updateLiveExecIDs("")
c.T.Logf("scale in node of id = %s, owner = %t", node.id, node.owner)
node.exeMgr.Stop()
if node.owner {
node.schMgr.Stop()
}
c.electIfNeeded()
}
// AsyncChangeOwner resigns all current owners and changes the owner of the cluster to random node asynchronously.
func (c *TestDXFContext) AsyncChangeOwner() {
c.wg.RunWithLog(c.ChangeOwner)
}
// ChangeOwner resigns all current owners and changes the owner of the cluster to random node.
func (c *TestDXFContext) ChangeOwner() {
c.mu.Lock()
if len(c.mu.nodes) == 0 {
c.mu.Unlock()
return
}
for _, idx := range c.mu.ownerIndices {
c.mu.nodes[idx].schMgr.Stop()
c.mu.nodes[idx].schMgr = nil
c.mu.nodes[idx].owner = false
}
c.mu.ownerIndices = make(map[string]int)
c.mu.Unlock()
c.electIfNeeded()
}
func (c *TestDXFContext) electIfNeeded() {
c.mu.Lock()
if len(c.mu.nodes) == 0 || len(c.mu.ownerIndices) > 0 {
c.mu.Unlock()
return
}
newOwnerIdx := int(rand.Int31n(int32(len(c.mu.nodes))))
ownerNode := c.mu.nodes[newOwnerIdx]
c.mu.ownerIndices[ownerNode.id] = newOwnerIdx
ownerNode.schMgr = scheduler.NewManager(c.Ctx, c.TaskMgr, ownerNode.id)
ownerNode.schMgr.Start()
ownerNode.owner = true
c.mu.Unlock()
c.T.Logf("new owner elected, id = %s, newOwnerIdx = %d", ownerNode.id, newOwnerIdx)
}
func (c *TestDXFContext) close() {
c.wg.Wait()
c.mu.Lock()
defer c.mu.Unlock()
for _, node := range c.mu.nodes {
node.exeMgr.Stop()
if node.owner {
node.schMgr.Stop()
}
}
c.mu.nodes = nil
c.mu.ownerIndices = nil
c.mu.nodeIndices = nil
}
// TestContext defines shared variables for disttask tests.
type TestContext struct {
sync.RWMutex

View File

@ -103,7 +103,7 @@ func NewTestKitWithSession(t testing.TB, store kv.Storage, se sessiontypes.Sessi
// RefreshSession set a new session for the testkit
func (tk *TestKit) RefreshSession() {
tk.session = newSession(tk.t, tk.store)
tk.session = NewSession(tk.t, tk.store)
// enforce sysvar cache loading, ref loadCommonGlobalVariableIfNeeded
tk.MustExec("select 3")
}
@ -423,7 +423,8 @@ func (tk *TestKit) MustExecToErr(sql string, args ...any) {
tk.require.Error(err)
}
func newSession(t testing.TB, store kv.Storage) sessiontypes.Session {
// NewSession creates a new session environment for test.
func NewSession(t testing.TB, store kv.Storage) sessiontypes.Session {
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
se.SetConnectionID(testKitIDGenerator.Inc())