From be57d2fd8708f5d462dba16aec320cfa6a070c19 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 30 Jan 2024 17:48:23 +0800 Subject: [PATCH] disttask: mock to start multiple scheduler/taskexecutor (#50801) ref pingcap/tidb#49008 --- pkg/ddl/backfilling_dist_scheduler.go | 2 +- .../framework/framework_err_handling_test.go | 16 +- .../framework_pause_and_resume_test.go | 26 +- pkg/disttask/framework/framework_role_test.go | 21 +- .../framework/framework_rollback_test.go | 8 +- pkg/disttask/framework/framework_test.go | 147 ++++------ pkg/disttask/framework/scheduler/balancer.go | 18 +- .../framework/scheduler/balancer_test.go | 6 +- pkg/disttask/framework/scheduler/interface.go | 7 +- pkg/disttask/framework/scheduler/main_test.go | 2 +- pkg/disttask/framework/scheduler/nodes.go | 33 ++- .../framework/scheduler/nodes_test.go | 18 +- pkg/disttask/framework/scheduler/scheduler.go | 34 ++- .../framework/scheduler/scheduler_manager.go | 92 +++--- .../framework/scheduler/slots_test.go | 2 +- .../framework/taskexecutor/BUILD.bazel | 2 +- .../framework/taskexecutor/manager.go | 13 +- .../framework/taskexecutor/task_executor.go | 13 +- pkg/disttask/framework/testutil/BUILD.bazel | 1 + pkg/disttask/framework/testutil/context.go | 263 ++++++++++++++++++ pkg/testkit/testkit.go | 5 +- 21 files changed, 493 insertions(+), 236 deletions(-) diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index fe114b9b48..ab64603382 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -349,7 +349,7 @@ func generateGlobalSortIngestPlan( // Skip global sort for empty table. return nil, nil } - instanceIDs, err := scheduler.GenerateTaskExecutorNodes(ctx) + instanceIDs, err := scheduler.GetLiveExecIDs(ctx) if err != nil { return nil, err } diff --git a/pkg/disttask/framework/framework_err_handling_test.go b/pkg/disttask/framework/framework_err_handling_test.go index fbd51ca1bf..1bb0ff4e88 100644 --- a/pkg/disttask/framework/framework_err_handling_test.go +++ b/pkg/disttask/framework/framework_err_handling_test.go @@ -23,19 +23,15 @@ import ( ) func TestRetryErrOnNextSubtasksBatch(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2) - defer ctrl.Finish() - testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanErrSchedulerExt(ctrl, testContext), testContext, nil) - submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext) - distContext.Close() + c := testutil.NewTestDXFContext(t, 2) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanErrSchedulerExt(c.MockCtrl, c.TestContext), c.TestContext, nil) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext) } func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 2) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanNotRetryableErrSchedulerExt(ctrl), testContext, nil) - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil) + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateFailed, task.State) - distContext.Close() } diff --git a/pkg/disttask/framework/framework_pause_and_resume_test.go b/pkg/disttask/framework/framework_pause_and_resume_test.go index 2c5020014d..636f36c9e3 100644 --- a/pkg/disttask/framework/framework_pause_and_resume_test.go +++ b/pkg/disttask/framework/framework_pause_and_resume_test.go @@ -41,44 +41,42 @@ func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state p } func TestFrameworkPauseAndResume(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 3) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) // 1. schedule and pause one running task. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()")) - task1 := testutil.SubmitAndWaitTask(ctx, t, "key1") + task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStatePaused, task1.State) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask")) // 4 subtask scheduled. - require.NoError(t, handle.ResumeTask(ctx, "key1")) + require.NoError(t, handle.ResumeTask(c.Ctx, "key1")) <-scheduler.TestSyncChan - testutil.WaitTaskDoneOrPaused(ctx, t, task1.Key) - CheckSubtasksState(ctx, t, 1, proto.SubtaskStateSucceed, 4) + testutil.WaitTaskDoneOrPaused(c.Ctx, t, task1.Key) + CheckSubtasksState(c.Ctx, t, 1, proto.SubtaskStateSucceed, 4) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume")) mgr, err := storage.GetTaskManager() require.NoError(t, err) - errs, err := mgr.GetSubtaskErrors(ctx, 1) + errs, err := mgr.GetSubtaskErrors(c.Ctx, 1) require.NoError(t, err) require.Empty(t, errs) // 2. pause pending task. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()")) - task2 := testutil.SubmitAndWaitTask(ctx, t, "key2") + task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2") require.Equal(t, proto.TaskStatePaused, task2.State) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask")) // 4 subtask scheduled. - require.NoError(t, handle.ResumeTask(ctx, "key2")) + require.NoError(t, handle.ResumeTask(c.Ctx, "key2")) <-scheduler.TestSyncChan - testutil.WaitTaskDoneOrPaused(ctx, t, task2.Key) - CheckSubtasksState(ctx, t, 1, proto.SubtaskStateSucceed, 4) + testutil.WaitTaskDoneOrPaused(c.Ctx, t, task2.Key) + CheckSubtasksState(c.Ctx, t, 1, proto.SubtaskStateSucceed, 4) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume")) - errs, err = mgr.GetSubtaskErrors(ctx, 1) + errs, err = mgr.GetSubtaskErrors(c.Ctx, 1) require.NoError(t, err) require.Empty(t, errs) - distContext.Close() } diff --git a/pkg/disttask/framework/framework_role_test.go b/pkg/disttask/framework/framework_role_test.go index 337189c02c..c071c2aff8 100644 --- a/pkg/disttask/framework/framework_role_test.go +++ b/pkg/disttask/framework/framework_role_test.go @@ -39,16 +39,15 @@ func checkSubtaskOnNodes(ctx context.Context, t *testing.T, taskID int64, expect } func TestRoleBasic(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 3) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) - tk := testkit.NewTestKit(t, distContext.Store) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) + tk := testkit.NewTestKit(t, c.Store) // 1. all "" role. - submitTaskAndCheckSuccessForBasic(ctx, t, "😁", testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😁", c.TestContext) - checkSubtaskOnNodes(ctx, t, 1, []string{":4000", ":4001", ":4002"}) + checkSubtaskOnNodes(c.Ctx, t, 1, []string{":4000", ":4001", ":4002"}) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows("")) @@ -60,29 +59,27 @@ func TestRoleBasic(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")) <-scheduler.TestRefreshedChan - submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows("")) - checkSubtaskOnNodes(ctx, t, 2, []string{":4000"}) + checkSubtaskOnNodes(c.Ctx, t, 2, []string{":4000"}) // 3. 2 "background" role. tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"") time.Sleep(5 * time.Second) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")) <-scheduler.TestRefreshedChan - submitTaskAndCheckSuccessForBasic(ctx, t, "😆", testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😆", c.TestContext) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh")) - checkSubtaskOnNodes(ctx, t, 3, []string{":4000", ":4001"}) + checkSubtaskOnNodes(c.Ctx, t, 3, []string{":4000", ":4001"}) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("background")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4002"`).Check(testkit.Rows("")) - - distContext.Close() } func TestSetRole(t *testing.T) { diff --git a/pkg/disttask/framework/framework_rollback_test.go b/pkg/disttask/framework/framework_rollback_test.go index 78e84dc1b2..52fa1bf439 100644 --- a/pkg/disttask/framework/framework_rollback_test.go +++ b/pkg/disttask/framework/framework_rollback_test.go @@ -24,15 +24,13 @@ import ( ) func TestFrameworkRollback(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2) - defer ctrl.Finish() - testutil.RegisterRollbackTaskMeta(t, ctrl, testutil.GetMockRollbackSchedulerExt(ctrl), testContext) + c := testutil.NewTestDXFContext(t, 2) + testutil.RegisterRollbackTaskMeta(t, c.MockCtrl, testutil.GetMockRollbackSchedulerExt(c.MockCtrl), c.TestContext) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask")) }() - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateReverted, task.State) - distContext.Close() } diff --git a/pkg/disttask/framework/framework_test.go b/pkg/disttask/framework/framework_test.go index 9f7e646d58..d4da587624 100644 --- a/pkg/disttask/framework/framework_test.go +++ b/pkg/disttask/framework/framework_test.go @@ -50,82 +50,65 @@ func submitTaskAndCheckSuccess(ctx context.Context, t *testing.T, taskKey string } func TestRandomOwnerChangeWithMultipleTasks(t *testing.T) { - nodeCnt := 5 - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, nodeCnt) - defer ctrl.Finish() - seed := time.Now().UnixNano() - t.Logf("seed: %d", seed) - random := rand.New(rand.NewSource(seed)) + c := testutil.NewTestDXFContext(t, 5) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) var wg util.WaitGroupWrapper for i := 0; i < 10; i++ { taskKey := fmt.Sprintf("key%d", i) wg.Run(func() { - submitTaskAndCheckSuccessForBasic(ctx, t, taskKey, testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, taskKey, c.TestContext) }) } wg.Run(func() { + seed := time.Now().UnixNano() + t.Logf("seed in change owner loop: %d", seed) + random := rand.New(rand.NewSource(seed)) for i := 0; i < 3; i++ { - time.Sleep(time.Duration(random.Intn(2000)) * time.Millisecond) - idx := int(random.Int31n(int32(nodeCnt))) - distContext.SetOwner(idx) - require.Eventually(t, func() bool { - return distContext.GetDomain(idx).DDL().OwnerManager().IsOwner() - }, 10*time.Second, 100*time.Millisecond) + c.ChangeOwner() + time.Sleep(time.Duration(random.Int63n(int64(3 * time.Second)))) } }) wg.Wait() - distContext.Close() } func TestFrameworkScaleInAndOut(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 5) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 5) seed := time.Now().UnixNano() t.Logf("seed: %d", seed) random := rand.New(rand.NewSource(seed)) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) var wg util.WaitGroupWrapper for i := 0; i < 12; i++ { taskKey := fmt.Sprintf("key%d", i) wg.Run(func() { - submitTaskAndCheckSuccessForBasic(ctx, t, taskKey, testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, taskKey, c.TestContext) }) } wg.Run(func() { for i := 0; i < 3; i++ { if random.Intn(2) == 0 { - distContext.AddDomain() + c.ScaleOut(1) } else { - // TODO: it's not real scale-in, delete domain doesn't stop task executor - // closing domain will reset storage.TaskManager which will cause - // test fail. - distContext.DeleteDomain(int(random.Int31n(int32(distContext.GetDomainCnt()-1))) + 1) + c.ScaleIn(1) } - time.Sleep(time.Duration(random.Intn(2000)) * time.Millisecond) - idx := int(random.Int31n(int32(distContext.GetDomainCnt()))) - distContext.SetOwner(idx) - // TODO we don't wait owner ready, it's not stable, will try refactor - // how we start multiple schedulers/task-executors later. + time.Sleep(time.Duration(random.Int63n(int64(3 * time.Second)))) } }) wg.Wait() - distContext.Close() } func TestFrameworkWithQuery(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 2) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) var wg util.WaitGroupWrapper wg.Run(func() { - submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext) }) - tk := testkit.NewTestKit(t, distContext.Store) + tk := testkit.NewTestKit(t, c.Store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -138,63 +121,53 @@ func TestFrameworkWithQuery(t *testing.T) { require.NoError(t, rs.Close()) wg.Wait() - distContext.Close() } func TestFrameworkCancelTask(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 2) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel", "1*return(1)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunCancel")) }() - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateReverted, task.State) - distContext.Close() } func TestFrameworkSubTaskFailed(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 1) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 1) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr", "1*return(true)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockExecutorRunErr")) }() - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateReverted, task.State) - - distContext.Close() } func TestFrameworkSubTaskInitEnvFailed(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 1) - defer ctrl.Finish() - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + c := testutil.NewTestDXFContext(t, 1) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr", "return()")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockExecSubtaskInitEnvErr")) }() - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateReverted, task.State) - distContext.Close() } func TestOwnerChangeWhenSchedule(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3) - defer ctrl.Finish() - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + c := testutil.NewTestDXFContext(t, 3) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) scheduler.MockOwnerChange = func() { - distContext.SetOwner(0) + c.AsyncChangeOwner() } require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange", "1*return(true)")) - submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockOwnerChange")) - distContext.Close() } func TestTaskExecutorDownBasic(t *testing.T) { @@ -235,26 +208,24 @@ func TestTaskExecutorDownManyNodes(t *testing.T) { } func TestGC(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3) - defer ctrl.Finish() - - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval")) }() + c := testutil.NewTestDXFContext(t, 3) - submitTaskAndCheckSuccessForBasic(ctx, t, "😊", testContext) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) + + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "😊", c.TestContext) mgr, err := storage.GetTaskManager() require.NoError(t, err) var historySubTasksCnt int require.Eventually(t, func() bool { - historySubTasksCnt, err = testutil.GetSubtasksFromHistory(ctx, mgr) + historySubTasksCnt, err = testutil.GetSubtasksFromHistory(c.Ctx, mgr) if err != nil { return false } @@ -264,40 +235,34 @@ func TestGC(t *testing.T) { scheduler.WaitTaskFinished <- struct{}{} require.Eventually(t, func() bool { - historySubTasksCnt, err := testutil.GetSubtasksFromHistory(ctx, mgr) + historySubTasksCnt, err := testutil.GetSubtasksFromHistory(c.Ctx, mgr) if err != nil { return false } return historySubTasksCnt == 0 }, 10*time.Second, 500*time.Millisecond) - - distContext.Close() } func TestFrameworkSubtaskFinishedCancel(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 3) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel", "1*return(true)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockSubtaskFinishedCancel")) }() - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateReverted, task.State) - distContext.Close() } func TestFrameworkRunSubtaskCancel(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 3) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel", "1*return(true)")) - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateReverted, task.State) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/MockRunSubtaskCancel")) - distContext.Close() } func TestFrameworkCleanUpRoutine(t *testing.T) { @@ -306,49 +271,45 @@ func TestFrameworkCleanUpRoutine(t *testing.T) { scheduler.DefaultCleanUpInterval = bak }() scheduler.DefaultCleanUpInterval = 500 * time.Millisecond - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 3) - defer ctrl.Finish() - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + c := testutil.NewTestDXFContext(t, 3) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()")) // normal - submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext) <-scheduler.WaitCleanUpFinished mgr, err := storage.GetTaskManager() require.NoError(t, err) - tasks, err := mgr.GetTaskByKeyWithHistory(ctx, "key1") + tasks, err := mgr.GetTaskByKeyWithHistory(c.Ctx, "key1") require.NoError(t, err) require.NotEmpty(t, tasks) - subtasks, err := testutil.GetSubtasksFromHistory(ctx, mgr) + subtasks, err := testutil.GetSubtasksFromHistory(c.Ctx, mgr) require.NoError(t, err) require.NotEmpty(t, subtasks) // transfer err require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()")) - submitTaskAndCheckSuccessForBasic(ctx, t, "key2", testContext) + submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key2", c.TestContext) <-scheduler.WaitCleanUpFinished mgr, err = storage.GetTaskManager() require.NoError(t, err) - tasks, err = mgr.GetTaskByKeyWithHistory(ctx, "key1") + tasks, err = mgr.GetTaskByKeyWithHistory(c.Ctx, "key1") require.NoError(t, err) require.NotEmpty(t, tasks) - subtasks, err = testutil.GetSubtasksFromHistory(ctx, mgr) + subtasks, err = testutil.GetSubtasksFromHistory(c.Ctx, mgr) require.NoError(t, err) require.NotEmpty(t, subtasks) - distContext.Close() require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished")) } func TestTaskCancelledBeforeUpdateTask(t *testing.T) { - ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 1) - defer ctrl.Finish() + c := testutil.NewTestDXFContext(t, 1) - testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockBasicSchedulerExt(ctrl), testContext, nil) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)")) - task := testutil.SubmitAndWaitTask(ctx, t, "key1") + task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1") require.Equal(t, proto.TaskStateReverted, task.State) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask")) - distContext.Close() } diff --git a/pkg/disttask/framework/scheduler/balancer.go b/pkg/disttask/framework/scheduler/balancer.go index fcba934d14..d5309b17d3 100644 --- a/pkg/disttask/framework/scheduler/balancer.go +++ b/pkg/disttask/framework/scheduler/balancer.go @@ -19,9 +19,10 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/log" + llog "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/intest" "go.uber.org/zap" ) @@ -40,15 +41,20 @@ var ( // the task and try next one. type balancer struct { Param - + logger *zap.Logger // a helper temporary map to record the used slots of each node during balance // to avoid passing it around. currUsedSlots map[string]int } func newBalancer(param Param) *balancer { + logger := log.L() + if intest.InTest { + logger = log.L().With(zap.String("server-id", param.serverID)) + } return &balancer{ Param: param, + logger: logger, currUsedSlots: make(map[string]int), } } @@ -77,8 +83,8 @@ func (b *balancer) balance(ctx context.Context, sm *Manager) { schedulers := sm.getSchedulers() for _, sch := range schedulers { if err := b.balanceSubtasks(ctx, sch, managedNodes); err != nil { - logutil.Logger(ctx).Warn("failed to balance subtasks", - zap.Int64("task-id", sch.GetTask().ID), log.ShortError(err)) + b.logger.Warn("failed to balance subtasks", + zap.Int64("task-id", sch.GetTask().ID), llog.ShortError(err)) return } } @@ -200,7 +206,7 @@ func (b *balancer) doBalanceSubtasks(ctx context.Context, taskID int64, eligible if err = b.taskMgr.UpdateSubtasksExecIDs(ctx, subtasksNeedSchedule); err != nil { return err } - logutil.BgLogger().Info("balance subtasks", zap.Stringers("subtasks", subtasksNeedSchedule)) + b.logger.Info("balance subtasks", zap.Stringers("subtasks", subtasksNeedSchedule)) return nil } diff --git a/pkg/disttask/framework/scheduler/balancer_test.go b/pkg/disttask/framework/scheduler/balancer_test.go index 49669ffaba..0a33537683 100644 --- a/pkg/disttask/framework/scheduler/balancer_test.go +++ b/pkg/disttask/framework/scheduler/balancer_test.go @@ -242,7 +242,7 @@ func TestBalanceOneTask(t *testing.T) { slotMgr.updateCapacity(16) b := newBalancer(Param{ taskMgr: mockTaskMgr, - nodeMgr: newNodeManager(), + nodeMgr: newNodeManager(""), slotMgr: slotMgr, }) b.currUsedSlots = c.initUsedSlots @@ -263,7 +263,7 @@ func TestBalanceOneTask(t *testing.T) { slotMgr.updateCapacity(16) b := newBalancer(Param{ taskMgr: mockTaskMgr, - nodeMgr: newNodeManager(), + nodeMgr: newNodeManager(""), slotMgr: slotMgr, }) require.ErrorContains(t, b.balanceSubtasks(ctx, mockScheduler, []string{"tidb1"}), "mock error") @@ -286,7 +286,7 @@ func TestBalanceOneTask(t *testing.T) { slotMgr.updateCapacity(16) b := newBalancer(Param{ taskMgr: mockTaskMgr, - nodeMgr: newNodeManager(), + nodeMgr: newNodeManager(""), slotMgr: slotMgr, }) require.ErrorContains(t, b.balanceSubtasks(ctx, mockScheduler, []string{"tidb1"}), "mock error") diff --git a/pkg/disttask/framework/scheduler/interface.go b/pkg/disttask/framework/scheduler/interface.go index 852b9e88e4..131dc6493b 100644 --- a/pkg/disttask/framework/scheduler/interface.go +++ b/pkg/disttask/framework/scheduler/interface.go @@ -133,9 +133,10 @@ type Extension interface { // Param is used to pass parameters when creating scheduler. type Param struct { - taskMgr TaskManager - nodeMgr *NodeManager - slotMgr *SlotManager + taskMgr TaskManager + nodeMgr *NodeManager + slotMgr *SlotManager + serverID string } // schedulerFactoryFn is used to create a scheduler. diff --git a/pkg/disttask/framework/scheduler/main_test.go b/pkg/disttask/framework/scheduler/main_test.go index b4d841a7b9..6c6caf2bb0 100644 --- a/pkg/disttask/framework/scheduler/main_test.go +++ b/pkg/disttask/framework/scheduler/main_test.go @@ -41,7 +41,7 @@ func (s *BaseScheduler) Switch2NextStep() (err error) { } func NewNodeManager() *NodeManager { - return newNodeManager() + return newNodeManager("") } func TestMain(m *testing.M) { diff --git a/pkg/disttask/framework/scheduler/nodes.go b/pkg/disttask/framework/scheduler/nodes.go index 7f9c2a8eb7..cda0bfd000 100644 --- a/pkg/disttask/framework/scheduler/nodes.go +++ b/pkg/disttask/framework/scheduler/nodes.go @@ -20,9 +20,9 @@ import ( "time" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/br/pkg/lightning/log" - disttaskutil "github.com/pingcap/tidb/pkg/util/disttask" - "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/log" + llog "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/util/intest" "go.uber.org/zap" ) @@ -34,6 +34,7 @@ var ( // NodeManager maintains live TiDB nodes in the cluster, and maintains the nodes // managed by the framework. type NodeManager struct { + logger *zap.Logger // prevLiveNodes is used to record the live nodes in last checking. prevLiveNodes map[string]struct{} // managedNodes is the cached nodes managed by the framework. @@ -41,8 +42,13 @@ type NodeManager struct { managedNodes atomic.Pointer[[]string] } -func newNodeManager() *NodeManager { +func newNodeManager(serverID string) *NodeManager { + logger := log.L() + if intest.InTest { + logger = log.L().With(zap.String("server-id", serverID)) + } nm := &NodeManager{ + logger: logger, prevLiveNodes: make(map[string]struct{}), } managedNodes := make([]string, 0, 10) @@ -67,15 +73,14 @@ func (nm *NodeManager) maintainLiveNodesLoop(ctx context.Context, taskMgr TaskMa // see recoverMetaLoop in task executor for when node is inserted into dist_framework_meta. func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManager) { // Safe to discard errors since this function can be called at regular intervals. - serverInfos, err := GenerateTaskExecutorNodes(ctx) + liveExecIDs, err := GetLiveExecIDs(ctx) if err != nil { - logutil.BgLogger().Warn("generate task executor nodes met error", log.ShortError(err)) + nm.logger.Warn("generate task executor nodes met error", llog.ShortError(err)) return } - nodeChanged := len(serverInfos) != len(nm.prevLiveNodes) - currLiveNodes := make(map[string]struct{}, len(serverInfos)) - for _, info := range serverInfos { - execID := disttaskutil.GenerateExecID(info) + nodeChanged := len(liveExecIDs) != len(nm.prevLiveNodes) + currLiveNodes := make(map[string]struct{}, len(liveExecIDs)) + for _, execID := range liveExecIDs { if _, ok := nm.prevLiveNodes[execID]; !ok { nodeChanged = true } @@ -87,7 +92,7 @@ func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManage oldNodes, err := taskMgr.GetAllNodes(ctx) if err != nil { - logutil.BgLogger().Warn("get all nodes met error", log.ShortError(err)) + nm.logger.Warn("get all nodes met error", llog.ShortError(err)) return } @@ -101,11 +106,11 @@ func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManage nm.prevLiveNodes = currLiveNodes return } - logutil.BgLogger().Info("delete dead nodes from dist_framework_meta", + nm.logger.Info("delete dead nodes from dist_framework_meta", zap.Int("dead-nodes", len(deadNodes))) err = taskMgr.DeleteDeadNodes(ctx, deadNodes) if err != nil { - logutil.BgLogger().Warn("delete dead nodes from dist_framework_meta failed", log.ShortError(err)) + nm.logger.Warn("delete dead nodes from dist_framework_meta failed", llog.ShortError(err)) return } nm.prevLiveNodes = currLiveNodes @@ -131,7 +136,7 @@ var TestRefreshedChan = make(chan struct{}) func (nm *NodeManager) refreshManagedNodes(ctx context.Context, taskMgr TaskManager, slotMgr *SlotManager) { newNodes, err := taskMgr.GetManagedNodes(ctx) if err != nil { - logutil.BgLogger().Warn("get managed nodes met error", log.ShortError(err)) + nm.logger.Warn("get managed nodes met error", llog.ShortError(err)) return } nodeIDs := make([]string, 0, len(newNodes)) diff --git a/pkg/disttask/framework/scheduler/nodes_test.go b/pkg/disttask/framework/scheduler/nodes_test.go index 04f4a52db0..4979d3ffed 100644 --- a/pkg/disttask/framework/scheduler/nodes_test.go +++ b/pkg/disttask/framework/scheduler/nodes_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/mock" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/util/cpu" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -38,11 +37,9 @@ func TestMaintainLiveNodes(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTaskExecutorNodes")) }) - MockServerInfo = []*infosync.ServerInfo{ - {Port: 4000}, - } + MockServerInfo.Store(&[]string{":4000"}) - nodeMgr := newNodeManager() + nodeMgr := newNodeManager("") ctx := context.Background() mockTaskMgr.EXPECT().GetAllNodes(gomock.Any()).Return(nil, errors.New("mock error")) nodeMgr.maintainLiveNodes(ctx, mockTaskMgr) @@ -59,10 +56,7 @@ func TestMaintainLiveNodes(t *testing.T) { require.True(t, ctrl.Satisfied()) // scale out 1 node - MockServerInfo = []*infosync.ServerInfo{ - {Port: 4000}, - {Port: 4001}, - } + MockServerInfo.Store(&[]string{":4000", ":4001"}) // fail on clean mockTaskMgr.EXPECT().GetAllNodes(gomock.Any()).Return([]proto.ManagedNode{{ID: ":4000"}, {ID: ":4001"}, {ID: ":4002"}}, nil) @@ -82,9 +76,7 @@ func TestMaintainLiveNodes(t *testing.T) { require.True(t, ctrl.Satisfied()) // scale in 1 node - MockServerInfo = []*infosync.ServerInfo{ - {Port: 4000}, - } + MockServerInfo.Store(&[]string{":4000"}) mockTaskMgr.EXPECT().GetAllNodes(gomock.Any()).Return([]proto.ManagedNode{{ID: ":4000"}, {ID: ":4001"}, {ID: ":4002"}}, nil) mockTaskMgr.EXPECT().DeleteDeadNodes(gomock.Any(), gomock.Any()).Return(nil) @@ -102,7 +94,7 @@ func TestMaintainManagedNodes(t *testing.T) { defer ctrl.Finish() ctx := context.Background() mockTaskMgr := mock.NewMockTaskManager(ctrl) - nodeMgr := newNodeManager() + nodeMgr := newNodeManager("") slotMgr := newSlotManager() mockTaskMgr.EXPECT().GetManagedNodes(gomock.Any()).Return(nil, errors.New("mock error")) diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 0c80c5fd75..d684508e66 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/backoff" + disttaskutil "github.com/pingcap/tidb/pkg/util/disttask" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -99,12 +100,15 @@ var MockOwnerChange func() // NewBaseScheduler creates a new BaseScheduler. func NewBaseScheduler(ctx context.Context, task *proto.Task, param Param) *BaseScheduler { + logger := log.L().With(zap.Int64("task-id", task.ID), zap.Stringer("task-type", task.Type)) + if intest.InTest { + logger = logger.With(zap.String("server-id", param.serverID)) + } s := &BaseScheduler{ - ctx: ctx, - Param: param, - logger: log.L().With(zap.Int64("task-id", task.ID), - zap.Stringer("task-type", task.Type)), - rand: rand.New(rand.NewSource(time.Now().UnixNano())), + ctx: ctx, + Param: param, + logger: logger, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), } s.task.Store(task) return s @@ -491,13 +495,25 @@ func (s *BaseScheduler) handlePlanErr(err error) error { } // MockServerInfo exported for scheduler_test.go -var MockServerInfo []*infosync.ServerInfo +var MockServerInfo atomic.Pointer[[]string] -// GenerateTaskExecutorNodes generate a eligible TiDB nodes. -func GenerateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) { +// GetLiveExecIDs returns all live executor node IDs. +func GetLiveExecIDs(ctx context.Context) ([]string, error) { failpoint.Inject("mockTaskExecutorNodes", func() { - failpoint.Return(MockServerInfo, nil) + failpoint.Return(*MockServerInfo.Load(), nil) }) + serverInfos, err := generateTaskExecutorNodes(ctx) + if err != nil { + return nil, err + } + execIDs := make([]string, 0, len(serverInfos)) + for _, info := range serverInfos { + execIDs = append(execIDs, disttaskutil.GenerateExecID(info)) + } + return execIDs, nil +} + +func generateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) { var serverInfos map[string]*infosync.ServerInfo _, etcd := ctx.Value("etcd").(bool) if intest.InTest && !etcd { diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index 0d15bc19ec..4584670c87 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -21,11 +21,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/metrics" tidbutil "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/syncutil" "go.uber.org/zap" ) @@ -108,6 +109,7 @@ type Manager struct { initialized bool // serverID, it's value is ip:port now. serverID string + logger *zap.Logger finishCh chan struct{} @@ -121,20 +123,30 @@ type Manager struct { // NewManager creates a scheduler struct. func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Manager { + logger := log.L() + if intest.InTest { + logger = log.L().With(zap.String("server-id", serverID)) + } + subCtx, cancel := context.WithCancel(ctx) + slotMgr := newSlotManager() + nodeMgr := newNodeManager(serverID) schedulerManager := &Manager{ + ctx: subCtx, + cancel: cancel, taskMgr: taskMgr, serverID: serverID, - slotMgr: newSlotManager(), - nodeMgr: newNodeManager(), + slotMgr: slotMgr, + nodeMgr: nodeMgr, + balancer: newBalancer(Param{ + taskMgr: taskMgr, + nodeMgr: nodeMgr, + slotMgr: slotMgr, + serverID: serverID, + }), + logger: logger, + finishCh: make(chan struct{}, proto.MaxConcurrentTask), } - schedulerManager.ctx, schedulerManager.cancel = context.WithCancel(ctx) schedulerManager.mu.schedulerMap = make(map[int64]Scheduler) - schedulerManager.finishCh = make(chan struct{}, proto.MaxConcurrentTask) - schedulerManager.balancer = newBalancer(Param{ - taskMgr: taskMgr, - nodeMgr: schedulerManager.nodeMgr, - slotMgr: schedulerManager.slotMgr, - }) return schedulerManager } @@ -179,13 +191,13 @@ func (sm *Manager) Initialized() bool { // scheduleTaskLoop schedules the tasks. func (sm *Manager) scheduleTaskLoop() { - logutil.BgLogger().Info("schedule task loop start") + sm.logger.Info("schedule task loop start") ticker := time.NewTicker(checkTaskRunningInterval) defer ticker.Stop() for { select { case <-sm.ctx.Done(): - logutil.BgLogger().Info("schedule task loop exits", zap.Error(sm.ctx.Err()), zap.Int64("interval", int64(checkTaskRunningInterval)/1000000)) + sm.logger.Info("schedule task loop exits") return case <-ticker.C: case <-handle.TaskChangedCh: @@ -193,14 +205,14 @@ func (sm *Manager) scheduleTaskLoop() { taskCnt := sm.getSchedulerCount() if taskCnt >= proto.MaxConcurrentTask { - logutil.BgLogger().Info("scheduled tasks reached limit", + sm.logger.Debug("scheduled tasks reached limit", zap.Int("current", taskCnt), zap.Int("max", proto.MaxConcurrentTask)) continue } tasks, err := sm.taskMgr.GetTopUnfinishedTasks(sm.ctx) if err != nil { - logutil.BgLogger().Warn("get unfinished tasks failed", zap.Error(err)) + sm.logger.Warn("get unfinished tasks failed", zap.Error(err)) continue } @@ -214,7 +226,7 @@ func (sm *Manager) scheduleTaskLoop() { // this should not happen normally, unless user modify system table // directly. if getSchedulerFactory(task.Type) == nil { - logutil.BgLogger().Warn("unknown task type", zap.Int64("task-id", task.ID), + sm.logger.Warn("unknown task type", zap.Int64("task-id", task.ID), zap.Stringer("task-type", task.Type)) sm.failTask(task.ID, task.State, errors.New("unknown task type")) continue @@ -226,7 +238,7 @@ func (sm *Manager) scheduleTaskLoop() { } if err = sm.slotMgr.update(sm.ctx, sm.nodeMgr, sm.taskMgr); err != nil { - logutil.BgLogger().Warn("update used slot failed", zap.Error(err)) + sm.logger.Warn("update used slot failed", zap.Error(err)) continue } for _, task := range schedulableTasks { @@ -248,7 +260,7 @@ func (sm *Manager) scheduleTaskLoop() { func (sm *Manager) failTask(id int64, currState proto.TaskState, err error) { if err2 := sm.taskMgr.FailTask(sm.ctx, id, currState, err); err2 != nil { - logutil.BgLogger().Warn("failed to update task state to failed", + sm.logger.Warn("failed to update task state to failed", zap.Int64("task-id", id), zap.Error(err2)) } } @@ -263,20 +275,20 @@ func (sm *Manager) gcSubtaskHistoryTableLoop() { <-WaitTaskFinished }) - logutil.BgLogger().Info("subtask table gc loop start") + sm.logger.Info("subtask table gc loop start") ticker := time.NewTicker(historySubtaskTableGcInterval) defer ticker.Stop() for { select { case <-sm.ctx.Done(): - logutil.BgLogger().Info("subtask history table gc loop exits", zap.Error(sm.ctx.Err())) + sm.logger.Info("subtask history table gc loop exits") return case <-ticker.C: err := sm.taskMgr.GCSubtasks(sm.ctx) if err != nil { - logutil.BgLogger().Warn("subtask history table gc failed", zap.Error(err)) + sm.logger.Warn("subtask history table gc failed", zap.Error(err)) } else { - logutil.BgLogger().Info("subtask history table gc success") + sm.logger.Info("subtask history table gc success") } } } @@ -285,46 +297,47 @@ func (sm *Manager) gcSubtaskHistoryTableLoop() { func (sm *Manager) startScheduler(basicTask *proto.Task, reservedExecID string) { task, err := sm.taskMgr.GetTaskByID(sm.ctx, basicTask.ID) if err != nil { - logutil.BgLogger().Error("get task failed", zap.Int64("task-id", basicTask.ID), zap.Error(err)) + sm.logger.Error("get task failed", zap.Int64("task-id", basicTask.ID), zap.Error(err)) return } schedulerFactory := getSchedulerFactory(task.Type) scheduler := schedulerFactory(sm.ctx, task, Param{ - taskMgr: sm.taskMgr, - nodeMgr: sm.nodeMgr, - slotMgr: sm.slotMgr, + taskMgr: sm.taskMgr, + nodeMgr: sm.nodeMgr, + slotMgr: sm.slotMgr, + serverID: sm.serverID, }) if err = scheduler.Init(); err != nil { - logutil.BgLogger().Error("init scheduler failed", zap.Error(err)) + sm.logger.Error("init scheduler failed", zap.Error(err)) sm.failTask(task.ID, task.State, err) return } sm.addScheduler(task.ID, scheduler) sm.slotMgr.reserve(basicTask, reservedExecID) - // Using the pool with block, so it wouldn't return an error. + sm.logger.Info("task scheduler started", zap.Int64("task-id", task.ID)) sm.schedulerWG.RunWithLog(func() { defer func() { scheduler.Close() sm.delScheduler(task.ID) sm.slotMgr.unReserve(basicTask, reservedExecID) handle.NotifyTaskChange() + sm.logger.Info("task scheduler exist", zap.Int64("task-id", task.ID)) }() metrics.UpdateMetricsForRunTask(task) scheduler.ScheduleTask() - logutil.BgLogger().Info("task finished", zap.Int64("task-id", task.ID)) sm.finishCh <- struct{}{} }) } func (sm *Manager) cleanupTaskLoop() { - logutil.BgLogger().Info("cleanup loop start") + sm.logger.Info("cleanup loop start") ticker := time.NewTicker(DefaultCleanUpInterval) defer ticker.Stop() for { select { case <-sm.ctx.Done(): - logutil.BgLogger().Info("cleanup loop exits", zap.Error(sm.ctx.Err())) + sm.logger.Info("cleanup loop exits") return case <-sm.finishCh: sm.doCleanupTask() @@ -349,29 +362,29 @@ func (sm *Manager) doCleanupTask() { proto.TaskStateSucceed, ) if err != nil { - logutil.BgLogger().Warn("cleanup routine failed", zap.Error(err)) + sm.logger.Warn("get task in states failed", zap.Error(err)) return } if len(tasks) == 0 { return } - logutil.BgLogger().Info("cleanup routine start") + sm.logger.Info("cleanup routine start") err = sm.cleanupFinishedTasks(tasks) if err != nil { - logutil.BgLogger().Warn("cleanup routine failed", zap.Error(err)) + sm.logger.Warn("cleanup routine failed", zap.Error(err)) return } failpoint.Inject("WaitCleanUpFinished", func() { WaitCleanUpFinished <- struct{}{} }) - logutil.BgLogger().Info("cleanup routine success") + sm.logger.Info("cleanup routine success") } func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error { cleanedTasks := make([]*proto.Task, 0) var firstErr error for _, task := range tasks { - logutil.BgLogger().Info("cleanup task", zap.Int64("task-id", task.ID)) + sm.logger.Info("cleanup task", zap.Int64("task-id", task.ID)) cleanupFactory := getSchedulerCleanUpFactory(task.Type) if cleanupFactory != nil { cleanup := cleanupFactory() @@ -387,7 +400,7 @@ func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error { } } if firstErr != nil { - logutil.BgLogger().Warn("cleanup routine failed", zap.Error(errors.Trace(firstErr))) + sm.logger.Warn("cleanup routine failed", zap.Error(errors.Trace(firstErr))) } failpoint.Inject("mockTransferErr", func() { @@ -400,8 +413,9 @@ func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error { // MockScheduler mock one scheduler for one task, only used for tests. func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler { return NewBaseScheduler(sm.ctx, task, Param{ - taskMgr: sm.taskMgr, - nodeMgr: sm.nodeMgr, - slotMgr: sm.slotMgr, + taskMgr: sm.taskMgr, + nodeMgr: sm.nodeMgr, + slotMgr: sm.slotMgr, + serverID: sm.serverID, }) } diff --git a/pkg/disttask/framework/scheduler/slots_test.go b/pkg/disttask/framework/scheduler/slots_test.go index 135a233b5c..816bf4f720 100644 --- a/pkg/disttask/framework/scheduler/slots_test.go +++ b/pkg/disttask/framework/scheduler/slots_test.go @@ -181,7 +181,7 @@ func TestSlotManagerUpdate(t *testing.T) { defer ctrl.Finish() ctx := context.Background() - nodeMgr := newNodeManager() + nodeMgr := newNodeManager("") taskMgr := mock.NewMockTaskManager(ctrl) nodeMgr.managedNodes.Store(&[]string{"tidb-1", "tidb-2", "tidb-3"}) taskMgr.EXPECT().GetUsedSlotsOnNodes(gomock.Any()).Return(map[string]int{ diff --git a/pkg/disttask/framework/taskexecutor/BUILD.bazel b/pkg/disttask/framework/taskexecutor/BUILD.bazel index 4e0c25b32d..518701371f 100644 --- a/pkg/disttask/framework/taskexecutor/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/BUILD.bazel @@ -26,7 +26,7 @@ go_library( "//pkg/util/backoff", "//pkg/util/cpu", "//pkg/util/gctuner", - "//pkg/util/logutil", + "//pkg/util/intest", "//pkg/util/memory", "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 99d2f9d74b..d7110bb196 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -22,13 +22,14 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/metrics" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/cpu" - "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" "go.uber.org/zap" ) @@ -70,6 +71,10 @@ type Manager struct { // NewManager creates a new task executor Manager. func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error) { + logger := log.L() + if intest.InTest { + logger = logger.With(zap.String("server-id", id)) + } totalMem, err := memory.MemTotal() if err != nil { // should not happen normally, as in main function of tidb-server, we assert @@ -80,12 +85,12 @@ func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, if totalCPU <= 0 || totalMem <= 0 { return nil, errors.Errorf("invalid cpu or memory, cpu: %d, memory: %d", totalCPU, totalMem) } - logutil.BgLogger().Info("build manager", zap.Int("total-cpu", totalCPU), + logger.Info("build task executor manager", zap.Int("total-cpu", totalCPU), zap.String("total-mem", units.BytesSize(float64(totalMem)))) m := &Manager{ id: id, taskTable: taskTable, - logger: logutil.BgLogger(), + logger: logger, slotManager: &slotManager{ taskID2Index: make(map[int64]int), executorTasks: make([]*proto.Task, 0), @@ -145,7 +150,7 @@ func (m *Manager) recoverMeta() (err error) { // Start starts the Manager. func (m *Manager) Start() error { - m.logger.Debug("manager start") + m.logger.Info("task executor manager start") m.wg.Run(m.handleTasksLoop) m.wg.Run(m.recoverMetaLoop) return nil diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 9b75577426..9682219d5b 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -35,7 +35,7 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/backoff" "github.com/pingcap/tidb/pkg/util/gctuner" - "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/memory" "go.uber.org/zap" ) @@ -84,14 +84,17 @@ type BaseTaskExecutor struct { // NewBaseTaskExecutor creates a new BaseTaskExecutor. func NewBaseTaskExecutor(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) *BaseTaskExecutor { + logger := log.L().With(zap.Int64("task-id", task.ID), zap.String("task-type", string(task.Type))) + if intest.InTest { + logger = logger.With(zap.String("server-id", id)) + } subCtx, cancelFunc := context.WithCancel(ctx) taskExecutorImpl := &BaseTaskExecutor{ id: id, taskTable: taskTable, ctx: subCtx, cancel: cancelFunc, - logger: log.L().With(zap.Int64("task-id", task.ID), - zap.String("task-type", string(task.Type))), + logger: logger, } taskExecutorImpl.task.Store(task) return taskExecutorImpl @@ -453,11 +456,11 @@ func (e *BaseTaskExecutor) runSubtask(ctx context.Context, stepExecutor execute. if taskID, ok := val.(int); ok { mgr, err := storage.GetTaskManager() if err != nil { - logutil.BgLogger().Error("get task manager failed", zap.Error(err)) + e.logger.Error("get task manager failed", zap.Error(err)) } else { err = mgr.CancelTask(ctx, int64(taskID)) if err != nil { - logutil.BgLogger().Error("cancel task failed", zap.Error(err)) + e.logger.Error("cancel task failed", zap.Error(err)) } } } diff --git a/pkg/disttask/framework/testutil/BUILD.bazel b/pkg/disttask/framework/testutil/BUILD.bazel index f60efa966e..18c6a56d7a 100644 --- a/pkg/disttask/framework/testutil/BUILD.bazel +++ b/pkg/disttask/framework/testutil/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/sessionctx", "//pkg/store/mockstore", "//pkg/testkit", + "//pkg/util", "//pkg/util/logutil", "//pkg/util/sqlexec", "@com_github_ngaut_pools//:pools", diff --git a/pkg/disttask/framework/testutil/context.go b/pkg/disttask/framework/testutil/context.go index d02e5b41ff..22c8aa01eb 100644 --- a/pkg/disttask/framework/testutil/context.go +++ b/pkg/disttask/framework/testutil/context.go @@ -17,17 +17,280 @@ package testutil import ( "context" "fmt" + "math/rand" "sync" + "sync/atomic" "testing" + "time" + "github.com/ngaut/pools" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" + "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/testkit" + tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" "go.uber.org/mock/gomock" ) +type tidbNode struct { + id string + owner bool + exeMgr *taskexecutor.Manager + schMgr *scheduler.Manager +} + +// TestDXFContext is the context for testing DXF. +type TestDXFContext struct { + T *testing.T + Store kv.Storage + Ctx context.Context + TaskMgr *storage.TaskManager + MockCtrl *gomock.Controller + TestContext *TestContext + + idAllocator atomic.Int32 + // in real case, when node scale in/out, the node might use the same IP or host name + // such as using K8S, so we use this to simulate this case. + nodeIDPool chan string + rand *rand.Rand + wg tidbutil.WaitGroupWrapper + mu struct { + sync.RWMutex + // to test network partition, we allow multiple owners + ownerIndices map[string]int + nodeIndices map[string]int + nodes []*tidbNode + } +} + +// NewTestDXFContext creates a new TestDXFContext. +func NewTestDXFContext(t *testing.T, nodeNum int) *TestDXFContext { + // all nodes are isometric with 16 CPUs + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(16)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTaskExecutorNodes", "return()")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockDisableDistTask")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTaskExecutorNodes")) + }) + store := testkit.CreateMockStore(t) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return testkit.NewSession(t, store), nil + }, 10, 10, time.Second) + t.Cleanup(func() { + pool.Close() + }) + taskManager := storage.NewTaskManager(pool) + storage.SetTaskManager(taskManager) + ctx := context.Background() + ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) + + seed := time.Now().UnixNano() + t.Log("dxf context seed:", seed) + ctrl := gomock.NewController(t) + c := &TestDXFContext{ + T: t, + Store: store, + Ctx: ctx, + TaskMgr: taskManager, + MockCtrl: ctrl, + TestContext: &TestContext{ + subtasksHasRun: make(map[string]map[int64]struct{}), + }, + nodeIDPool: make(chan string, 100), + rand: rand.New(rand.NewSource(seed)), + } + c.mu.ownerIndices = make(map[string]int) + c.mu.nodeIndices = make(map[string]int, nodeNum) + c.init(nodeNum) + + t.Cleanup(func() { + ctrl.Finish() + c.close() + }) + + return c +} + +// init initializes the context with nodeNum tidb nodes. +// The last node is the owner. +func (c *TestDXFContext) init(nodeNum int) { + for i := 0; i < nodeNum; i++ { + c.ScaleOutBy(c.getNodeID(), i == nodeNum-1) + } +} + +func (c *TestDXFContext) getNodeID() string { + select { + case id := <-c.nodeIDPool: + return id + default: + return fmt.Sprintf(":%d", 4000-1+c.idAllocator.Add(1)) + } +} + +func (c *TestDXFContext) recycleNodeID(id string) { + select { + case c.nodeIDPool <- id: + default: + } +} + +// ScaleOut scales out a tidb node, and elect owner if required. +func (c *TestDXFContext) ScaleOut(nodeNum int) { + for i := 0; i < nodeNum; i++ { + c.ScaleOutBy(c.getNodeID(), false) + } + c.electIfNeeded() +} + +// ScaleOutBy scales out a tidb node by id, and set it as owner if required. +func (c *TestDXFContext) ScaleOutBy(id string, owner bool) { + c.T.Logf("scale out node of id = %s, owner = %t", id, owner) + c.updateLiveExecIDs(id) + exeMgr, err := taskexecutor.NewManager(c.Ctx, id, c.TaskMgr) + require.NoError(c.T, err) + require.NoError(c.T, exeMgr.InitMeta()) + require.NoError(c.T, exeMgr.Start()) + var schMgr *scheduler.Manager + if owner { + schMgr = scheduler.NewManager(c.Ctx, c.TaskMgr, id) + schMgr.Start() + } + node := &tidbNode{ + id: id, + owner: owner, + exeMgr: exeMgr, + schMgr: schMgr, + } + + c.mu.Lock() + defer c.mu.Unlock() + c.mu.nodes = append(c.mu.nodes, node) + c.mu.nodeIndices[id] = len(c.mu.nodes) - 1 + if owner { + c.mu.ownerIndices[id] = len(c.mu.nodes) - 1 + } +} + +func (c *TestDXFContext) updateLiveExecIDs(newID string) { + c.mu.Lock() + defer c.mu.Unlock() + execIDs := make([]string, 0, len(c.mu.nodes)+1) + for _, n := range c.mu.nodes { + execIDs = append(execIDs, n.id) + } + if len(newID) > 0 { + execIDs = append(execIDs, newID) + } + scheduler.MockServerInfo.Store(&execIDs) +} + +// ScaleIn scales in some last added tidb nodes, elect new owner if required. +func (c *TestDXFContext) ScaleIn(nodeNum int) { + for i := 0; i < nodeNum; i++ { + c.mu.Lock() + if len(c.mu.nodes) == 0 { + c.mu.Unlock() + return + } + node := c.mu.nodes[len(c.mu.nodes)-1] + c.mu.Unlock() + + c.ScaleInBy(node.id) + } +} + +// ScaleInBy scales in a tidb node by id, elect new owner if required. +func (c *TestDXFContext) ScaleInBy(id string) { + c.mu.Lock() + idx, ok := c.mu.nodeIndices[id] + if !ok { + c.mu.Unlock() + return + } + node := c.mu.nodes[idx] + c.mu.nodes = append(c.mu.nodes[:idx], c.mu.nodes[idx+1:]...) + delete(c.mu.nodeIndices, id) + if node.owner { + delete(c.mu.ownerIndices, id) + } + c.recycleNodeID(id) + c.mu.Unlock() + + c.updateLiveExecIDs("") + + c.T.Logf("scale in node of id = %s, owner = %t", node.id, node.owner) + node.exeMgr.Stop() + if node.owner { + node.schMgr.Stop() + } + + c.electIfNeeded() +} + +// AsyncChangeOwner resigns all current owners and changes the owner of the cluster to random node asynchronously. +func (c *TestDXFContext) AsyncChangeOwner() { + c.wg.RunWithLog(c.ChangeOwner) +} + +// ChangeOwner resigns all current owners and changes the owner of the cluster to random node. +func (c *TestDXFContext) ChangeOwner() { + c.mu.Lock() + if len(c.mu.nodes) == 0 { + c.mu.Unlock() + return + } + for _, idx := range c.mu.ownerIndices { + c.mu.nodes[idx].schMgr.Stop() + c.mu.nodes[idx].schMgr = nil + c.mu.nodes[idx].owner = false + } + c.mu.ownerIndices = make(map[string]int) + c.mu.Unlock() + + c.electIfNeeded() +} + +func (c *TestDXFContext) electIfNeeded() { + c.mu.Lock() + if len(c.mu.nodes) == 0 || len(c.mu.ownerIndices) > 0 { + c.mu.Unlock() + return + } + + newOwnerIdx := int(rand.Int31n(int32(len(c.mu.nodes)))) + ownerNode := c.mu.nodes[newOwnerIdx] + c.mu.ownerIndices[ownerNode.id] = newOwnerIdx + ownerNode.schMgr = scheduler.NewManager(c.Ctx, c.TaskMgr, ownerNode.id) + ownerNode.schMgr.Start() + ownerNode.owner = true + c.mu.Unlock() + + c.T.Logf("new owner elected, id = %s, newOwnerIdx = %d", ownerNode.id, newOwnerIdx) +} + +func (c *TestDXFContext) close() { + c.wg.Wait() + c.mu.Lock() + defer c.mu.Unlock() + for _, node := range c.mu.nodes { + node.exeMgr.Stop() + if node.owner { + node.schMgr.Stop() + } + } + c.mu.nodes = nil + c.mu.ownerIndices = nil + c.mu.nodeIndices = nil +} + // TestContext defines shared variables for disttask tests. type TestContext struct { sync.RWMutex diff --git a/pkg/testkit/testkit.go b/pkg/testkit/testkit.go index d05e186cab..b3d2824e86 100644 --- a/pkg/testkit/testkit.go +++ b/pkg/testkit/testkit.go @@ -103,7 +103,7 @@ func NewTestKitWithSession(t testing.TB, store kv.Storage, se sessiontypes.Sessi // RefreshSession set a new session for the testkit func (tk *TestKit) RefreshSession() { - tk.session = newSession(tk.t, tk.store) + tk.session = NewSession(tk.t, tk.store) // enforce sysvar cache loading, ref loadCommonGlobalVariableIfNeeded tk.MustExec("select 3") } @@ -423,7 +423,8 @@ func (tk *TestKit) MustExecToErr(sql string, args ...any) { tk.require.Error(err) } -func newSession(t testing.TB, store kv.Storage) sessiontypes.Session { +// NewSession creates a new session environment for test. +func NewSession(t testing.TB, store kv.Storage) sessiontypes.Session { se, err := session.CreateSession4Test(store) require.NoError(t, err) se.SetConnectionID(testKitIDGenerator.Inc())