diff --git a/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go b/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go index c07a731332..c5bcc40e23 100644 --- a/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go +++ b/pkg/disttask/framework/integrationtests/framework_pause_and_resume_test.go @@ -16,6 +16,7 @@ package integrationtests import ( "context" + "sync/atomic" "testing" "github.com/pingcap/failpoint" @@ -50,10 +51,18 @@ func TestFrameworkPauseAndResume(t *testing.T) { testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) // 1. schedule and pause one running task. - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)") + var counter atomic.Int32 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask", func(task *proto.Task) { + if counter.Add(1) <= 2 { + if task.State == proto.TaskStateRunning { + _, err := c.TaskMgr.PauseTask(c.Ctx, task.Key) + require.NoError(t, err) + } + } + }) task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1) require.Equal(t, proto.TaskStatePaused, task1.State) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask")) // 4 subtask scheduled. require.NoError(t, handle.ResumeTask(c.Ctx, "key1")) task1Base := testutil.WaitTaskDone(c.Ctx, t, task1.Key) @@ -67,10 +76,18 @@ func TestFrameworkPauseAndResume(t *testing.T) { require.Empty(t, errs) // 2. pause pending task. - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)") + counter.Store(0) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask", func(task *proto.Task) { + if counter.Add(1) <= 2 { + if task.State == proto.TaskStatePending { + _, err := mgr.PauseTask(c.Ctx, task.Key) + require.NoError(t, err) + } + } + }) task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2", "", 1) require.Equal(t, proto.TaskStatePaused, task2.State) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask")) // 4 subtask scheduled. require.NoError(t, handle.ResumeTask(c.Ctx, "key2")) task2Base := testutil.WaitTaskDone(c.Ctx, t, task2.Key) diff --git a/pkg/disttask/framework/integrationtests/framework_rollback_test.go b/pkg/disttask/framework/integrationtests/framework_rollback_test.go index 56baaaf353..90d517ad10 100644 --- a/pkg/disttask/framework/integrationtests/framework_rollback_test.go +++ b/pkg/disttask/framework/integrationtests/framework_rollback_test.go @@ -15,6 +15,7 @@ package integrationtests import ( + "sync/atomic" "testing" "github.com/pingcap/tidb/pkg/disttask/framework/proto" @@ -26,7 +27,16 @@ import ( func TestFrameworkRollback(t *testing.T) { c := testutil.NewTestDXFContext(t, 2, 16, true) testutil.RegisterRollbackTaskMeta(t, c.MockCtrl, testutil.GetMockRollbackSchedulerExt(c.MockCtrl), c.TestContext) - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)") + var counter atomic.Int32 + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/afterRefreshTask", + func(task *proto.Task) { + if counter.Add(1) <= 2 { + if task.State == proto.TaskStateRunning { + require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, task.ID)) + } + } + }, + ) task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1) require.Equal(t, proto.TaskStateReverted, task.State) diff --git a/pkg/disttask/framework/integrationtests/framework_scope_test.go b/pkg/disttask/framework/integrationtests/framework_scope_test.go index 30f40c31cb..59aaa25764 100644 --- a/pkg/disttask/framework/integrationtests/framework_scope_test.go +++ b/pkg/disttask/framework/integrationtests/framework_scope_test.go @@ -19,10 +19,10 @@ import ( "fmt" "slices" "strconv" + "sync/atomic" "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" @@ -81,10 +81,17 @@ func TestScopeBasic(t *testing.T) { tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background")) tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background")) - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()") - <-scheduler.TestRefreshedChan + var fpEnabled atomic.Bool + ch := make(chan struct{}) + fpEnabled.Store(true) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", func() { + if fpEnabled.Load() { + ch <- struct{}{} + } + }) + <-ch taskID = submitTaskAndCheckSuccessForScope(c.Ctx, t, "😊", nodeCnt, "background", c.TestContext) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh")) + fpEnabled.Store(false) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("")) @@ -94,10 +101,10 @@ func TestScopeBasic(t *testing.T) { // 3. 2 "background" role. tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"") time.Sleep(5 * time.Second) - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()") - <-scheduler.TestRefreshedChan + fpEnabled.Store(true) + <-ch taskID = submitTaskAndCheckSuccessForScope(c.Ctx, t, "😆", nodeCnt, "background", c.TestContext) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh")) + fpEnabled.Store(false) checkSubtaskOnNodes(c.Ctx, t, taskID, []string{":4000", ":4001"}) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background")) tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("background")) @@ -146,12 +153,19 @@ func runTargetScopeCase(t *testing.T, c *testutil.TestDXFContext, tk *testkit.Te for i := 0; i < len(testCase.nodeScopes); i++ { tk.MustExec(fmt.Sprintf("update mysql.dist_framework_meta set role = \"%s\" where host = \"%s\"", testCase.nodeScopes[i], c.GetNodeIDByIdx(i))) } - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "3*return()") - <-scheduler.TestRefreshedChan - <-scheduler.TestRefreshedChan - <-scheduler.TestRefreshedChan + var fpEnabled atomic.Bool + ch := make(chan struct{}) + fpEnabled.Store(true) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", func() { + if fpEnabled.Load() { + ch <- struct{}{} + } + }) + <-ch + <-ch + <-ch taskID := submitTaskAndCheckSuccessForScope(c.Ctx, t, "task"+strconv.Itoa(idx), nodeCnt, testCase.scope, c.TestContext) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh")) + fpEnabled.Store(false) expected := make([]string, 0) for i, scope := range testCase.nodeScopes { if scope == testCase.scope { diff --git a/pkg/disttask/framework/integrationtests/framework_test.go b/pkg/disttask/framework/integrationtests/framework_test.go index ef0b97463d..4b9efa1079 100644 --- a/pkg/disttask/framework/integrationtests/framework_test.go +++ b/pkg/disttask/framework/integrationtests/framework_test.go @@ -168,8 +168,14 @@ func TestOwnerChangeWhenSchedule(t *testing.T) { } func TestGC(t *testing.T) { - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)") - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)") + ch := make(chan struct{}) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", func(interval *int) { + *interval = 1 + }) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", func(interval *time.Duration) { + *interval = 1 * time.Second + <-ch + }) c := testutil.NewTestDXFContext(t, 3, 16, true) testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) @@ -188,7 +194,7 @@ func TestGC(t *testing.T) { return historySubTasksCnt == 4 }, 10*time.Second, 500*time.Millisecond) - scheduler.WaitTaskFinished <- struct{}{} + ch <- struct{}{} require.Eventually(t, func() bool { historySubTasksCnt, err := testutil.GetSubtasksFromHistory(c.Ctx, mgr) @@ -225,11 +231,14 @@ func TestFrameworkCleanUpRoutine(t *testing.T) { scheduler.DefaultCleanUpInterval = 500 * time.Millisecond c := testutil.NewTestDXFContext(t, 3, 16, true) testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()") + ch := make(chan struct{}, 1) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() { + ch <- struct{}{} + }) // normal submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext) - <-scheduler.WaitCleanUpFinished + <-ch mgr, err := storage.GetTaskManager() require.NoError(t, err) tasks, err := mgr.GetTaskByKeyWithHistory(c.Ctx, "key1") @@ -242,7 +251,7 @@ func TestFrameworkCleanUpRoutine(t *testing.T) { // transfer err testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()") submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key2", c.TestContext) - <-scheduler.WaitCleanUpFinished + <-ch mgr, err = storage.GetTaskManager() require.NoError(t, err) tasks, err = mgr.GetTaskByKeyWithHistory(c.Ctx, "key1") @@ -257,7 +266,12 @@ func TestTaskCancelledBeforeUpdateTask(t *testing.T) { c := testutil.NewTestDXFContext(t, 1, 16, true) testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil) - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)") + var once sync.Once + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", func(taskID int64) { + once.Do(func() { + require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, taskID)) + }) + }) task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1) require.Equal(t, proto.TaskStateReverted, task.State) } diff --git a/pkg/disttask/framework/scheduler/nodes.go b/pkg/disttask/framework/scheduler/nodes.go index 7a8dc8cb40..286f4f5f24 100644 --- a/pkg/disttask/framework/scheduler/nodes.go +++ b/pkg/disttask/framework/scheduler/nodes.go @@ -130,9 +130,6 @@ func (nm *NodeManager) refreshNodesLoop(ctx context.Context, taskMgr TaskManager } } -// TestRefreshedChan is used to sync the test. -var TestRefreshedChan = make(chan struct{}) - // refreshNodes maintains the nodes managed by the framework. func (nm *NodeManager) refreshNodes(ctx context.Context, taskMgr TaskManager, slotMgr *SlotManager) { newNodes, err := taskMgr.GetAllNodes(ctx) @@ -150,9 +147,7 @@ func (nm *NodeManager) refreshNodes(ctx context.Context, taskMgr TaskManager, sl slotMgr.updateCapacity(cpuCount) nm.nodes.Store(&newNodes) - failpoint.Inject("syncRefresh", func() { - TestRefreshedChan <- struct{}{} - }) + failpoint.InjectCall("syncRefresh") } // GetNodes returns the nodes managed by the framework. diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 9d2228b962..634d5568f0 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -68,6 +68,7 @@ type Scheduler interface { // Close closes the scheduler, should be called if Init returns nil. Close() // GetTask returns the task that the scheduler is managing. + // the task is for read only, it might be accessed by multiple goroutines GetTask() *proto.Task Extension } @@ -129,6 +130,12 @@ func (s *BaseScheduler) GetTask() *proto.Task { return s.task.Load() } +// getTaskClone returns a clone of the task. +func (s *BaseScheduler) getTaskClone() *proto.Task { + clone := *s.GetTask() + return &clone +} + // refreshTaskIfNeeded fetch task state from tidb_global_task table. func (s *BaseScheduler) refreshTaskIfNeeded() error { task := s.GetTask() @@ -168,6 +175,7 @@ func (s *BaseScheduler) scheduleTask() { case <-ticker.C: } + failpoint.InjectCall("beforeRefreshTask", s.GetTask()) err := s.refreshTaskIfNeeded() if err != nil { if errors.Cause(err) == storage.ErrTaskNotFound { @@ -179,41 +187,12 @@ func (s *BaseScheduler) scheduleTask() { s.logger.Error("refresh task failed", zap.Error(err)) continue } - task := *s.GetTask() + failpoint.InjectCall("afterRefreshTask", s.GetTask()) + task := s.getTaskClone() // TODO: refine failpoints below. failpoint.Inject("exitScheduler", func() { failpoint.Return() }) - failpoint.Inject("cancelTaskAfterRefreshTask", func(val failpoint.Value) { - if val.(bool) && task.State == proto.TaskStateRunning { - err := s.taskMgr.CancelTask(s.ctx, task.ID) - if err != nil { - s.logger.Error("cancel task failed", zap.Error(err)) - } - } - }) - - failpoint.Inject("pausePendingTask", func(val failpoint.Value) { - if val.(bool) && task.State == proto.TaskStatePending { - _, err := s.taskMgr.PauseTask(s.ctx, task.Key) - if err != nil { - s.logger.Error("pause task failed", zap.Error(err)) - } - task.State = proto.TaskStatePausing - s.task.Store(&task) - } - }) - - failpoint.Inject("pauseTaskAfterRefreshTask", func(val failpoint.Value) { - if val.(bool) && task.State == proto.TaskStateRunning { - _, err := s.taskMgr.PauseTask(s.ctx, task.Key) - if err != nil { - s.logger.Error("pause task failed", zap.Error(err)) - } - task.State = proto.TaskStatePausing - s.task.Store(&task) - } - }) switch task.State { case proto.TaskStateCancelling: @@ -280,7 +259,7 @@ func (s *BaseScheduler) onCancelling() error { // handle task in pausing state, cancel all running subtasks. func (s *BaseScheduler) onPausing() error { - task := *s.GetTask() + task := s.getTaskClone() s.logger.Info("on pausing state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step) @@ -299,7 +278,7 @@ func (s *BaseScheduler) onPausing() error { return err } task.State = proto.TaskStatePaused - s.task.Store(&task) + s.task.Store(task) return nil } @@ -314,7 +293,7 @@ func (s *BaseScheduler) onPaused() error { // handle task in resuming state. func (s *BaseScheduler) onResuming() error { - task := *s.GetTask() + task := s.getTaskClone() s.logger.Info("on resuming state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step) @@ -329,7 +308,7 @@ func (s *BaseScheduler) onResuming() error { return err } task.State = proto.TaskStateRunning - s.task.Store(&task) + s.task.Store(task) return nil } @@ -338,7 +317,7 @@ func (s *BaseScheduler) onResuming() error { // handle task in reverting state, check all revert subtasks finishes. func (s *BaseScheduler) onReverting() error { - task := *s.GetTask() + task := s.getTaskClone() s.logger.Debug("on reverting state", zap.Stringer("state", task.State), zap.String("step", proto.Step2Str(task.Type, task.Step))) cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step) @@ -348,18 +327,18 @@ func (s *BaseScheduler) onReverting() error { } runnableSubtaskCnt := cntByStates[proto.SubtaskStatePending] + cntByStates[proto.SubtaskStateRunning] if runnableSubtaskCnt == 0 { - if err = s.OnDone(s.ctx, s, &task); err != nil { + if err = s.OnDone(s.ctx, s, task); err != nil { return errors.Trace(err) } if err = s.taskMgr.RevertedTask(s.ctx, task.ID); err != nil { return errors.Trace(err) } task.State = proto.TaskStateReverted - s.task.Store(&task) + s.task.Store(task) return nil } // Wait all subtasks in this step finishes. - s.OnTick(s.ctx, &task) + s.OnTick(s.ctx, task) s.logger.Debug("on reverting state, this task keeps current state", zap.Stringer("state", task.State)) return nil } @@ -412,14 +391,14 @@ func (s *BaseScheduler) onFinished() { } func (s *BaseScheduler) switch2NextStep() error { - task := *s.GetTask() + task := s.getTaskClone() nextStep := s.GetNextStep(&task.TaskBase) s.logger.Info("switch to next step", zap.String("current-step", proto.Step2Str(task.Type, task.Step)), zap.String("next-step", proto.Step2Str(task.Type, nextStep))) if nextStep == proto.StepDone { - if err := s.OnDone(s.ctx, s, &task); err != nil { + if err := s.OnDone(s.ctx, s, task); err != nil { return errors.Trace(err) } if err := s.taskMgr.SucceedTask(s.ctx, task.ID); err != nil { @@ -427,7 +406,7 @@ func (s *BaseScheduler) switch2NextStep() error { } task.Step = nextStep task.State = proto.TaskStateSucceed - s.task.Store(&task) + s.task.Store(task) return nil } @@ -443,19 +422,19 @@ func (s *BaseScheduler) switch2NextStep() error { return errors.New("no available TiDB node to dispatch subtasks") } - metas, err := s.OnNextSubtasksBatch(s.ctx, s, &task, eligibleNodes, nextStep) + metas, err := s.OnNextSubtasksBatch(s.ctx, s, task, eligibleNodes, nextStep) if err != nil { s.logger.Warn("generate part of subtasks failed", zap.Error(err)) return s.handlePlanErr(err) } - if err = s.scheduleSubTask(&task, nextStep, metas, eligibleNodes); err != nil { + if err = s.scheduleSubTask(task, nextStep, metas, eligibleNodes); err != nil { return err } task.Step = nextStep task.State = proto.TaskStateRunning // and OnNextSubtasksBatch might change meta of task. - s.task.Store(&task) + s.task.Store(task) return nil } @@ -491,9 +470,7 @@ func (s *BaseScheduler) scheduleSubTask( size += uint64(len(meta)) } - failpoint.Inject("cancelBeforeUpdateTask", func() { - _ = s.taskMgr.CancelTask(s.ctx, task.ID) - }) + failpoint.InjectCall("cancelBeforeUpdateTask", task.ID) // as other fields and generated key and index KV takes space too, we limit // the size of subtasks to 80% of the transaction limit. @@ -521,7 +498,7 @@ func (s *BaseScheduler) scheduleSubTask( } func (s *BaseScheduler) handlePlanErr(err error) error { - task := *s.GetTask() + task := s.getTaskClone() s.logger.Warn("generate plan failed", zap.Error(err), zap.Stringer("state", task.State)) if s.IsRetryableErr(err) { return err @@ -530,13 +507,13 @@ func (s *BaseScheduler) handlePlanErr(err error) error { } func (s *BaseScheduler) revertTask(taskErr error) error { - task := *s.GetTask() + task := s.getTaskClone() if err := s.taskMgr.RevertTask(s.ctx, task.ID, task.State, taskErr); err != nil { return err } task.State = proto.TaskStateReverting task.Error = taskErr - s.task.Store(&task) + s.task.Store(task) return nil } diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index 0b946111a8..6dce67e09f 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -42,9 +42,6 @@ var ( defaultCollectMetricsInterval = 5 * time.Second ) -// WaitTaskFinished is used to sync the test. -var WaitTaskFinished = make(chan struct{}) - func (sm *Manager) getSchedulerCount() int { sm.mu.RLock() defer sm.mu.RUnlock() @@ -300,13 +297,7 @@ func (sm *Manager) failTask(id int64, currState proto.TaskState, err error) { func (sm *Manager) gcSubtaskHistoryTableLoop() { historySubtaskTableGcInterval := defaultHistorySubtaskTableGcInterval - failpoint.Inject("historySubtaskTableGcInterval", func(val failpoint.Value) { - if seconds, ok := val.(int); ok { - historySubtaskTableGcInterval = time.Second * time.Duration(seconds) - } - - <-WaitTaskFinished - }) + failpoint.InjectCall("historySubtaskTableGcInterval", &historySubtaskTableGcInterval) sm.logger.Info("subtask table gc loop start") ticker := time.NewTicker(historySubtaskTableGcInterval) @@ -385,9 +376,6 @@ func (sm *Manager) cleanupTaskLoop() { } } -// WaitCleanUpFinished is used to sync the test. -var WaitCleanUpFinished = make(chan struct{}, 1) - // doCleanupTask processes clean up routine defined by each type of tasks and cleanupMeta. // For example: // @@ -412,9 +400,7 @@ func (sm *Manager) doCleanupTask() { sm.logger.Warn("cleanup routine failed", zap.Error(err)) return } - failpoint.Inject("WaitCleanUpFinished", func() { - WaitCleanUpFinished <- struct{}{} - }) + failpoint.InjectCall("WaitCleanUpFinished") sm.logger.Info("cleanup routine success") } diff --git a/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go b/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go index 9380d0bd6e..68305ff508 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go @@ -89,7 +89,6 @@ func TestSchedulerCleanupTask(t *testing.T) { require.True(t, ctrl.Satisfied()) // fail in transfer - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "1*return()")) mockErr := errors.New("transfer err") taskMgr.EXPECT().GetTasksInStates( mgr.ctx, @@ -108,8 +107,6 @@ func TestSchedulerCleanupTask(t *testing.T) { taskMgr.EXPECT().TransferTasks2History(mgr.ctx, tasks).Return(nil) mgr.doCleanupTask() require.True(t, ctrl.Satisfied()) - - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished")) } func TestManagerSchedulerNotAllocateSlots(t *testing.T) { diff --git a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go index aa0a24d082..141331a1b5 100644 --- a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go @@ -287,7 +287,7 @@ func TestSchedulerRefreshTask(t *testing.T) { // state/step not changed, no need to refresh taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&task.TaskBase, nil) require.NoError(t, scheduler.refreshTaskIfNeeded()) - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // get task by id failed tmpTask := task @@ -295,7 +295,7 @@ func TestSchedulerRefreshTask(t *testing.T) { taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&tmpTask.TaskBase, nil) taskMgr.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(nil, errors.New("get task by id err")) require.ErrorContains(t, scheduler.refreshTaskIfNeeded(), "get task by id err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // state changed tmpTask = task @@ -303,7 +303,7 @@ func TestSchedulerRefreshTask(t *testing.T) { taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&tmpTask.TaskBase, nil) taskMgr.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(&tmpTask, nil) require.NoError(t, scheduler.refreshTaskIfNeeded()) - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) // step changed scheduler.task.Store(&schTask) // revert @@ -312,7 +312,7 @@ func TestSchedulerRefreshTask(t *testing.T) { taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&tmpTask.TaskBase, nil) taskMgr.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(&tmpTask, nil) require.NoError(t, scheduler.refreshTaskIfNeeded()) - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) } @@ -354,7 +354,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { taskMgr.EXPECT().GetSubtaskCntGroupByStates(gomock.Any(), task.ID, gomock.Any()).Return(nil, nil) taskMgr.EXPECT().PausedTask(gomock.Any(), task.ID).Return(fmt.Errorf("pause err")) require.ErrorContains(t, scheduler.onPausing(), "pause err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // pause task successfully @@ -363,7 +363,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { require.NoError(t, scheduler.onPausing()) tmpTask := task tmpTask.State = proto.TaskStatePaused - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) }) @@ -373,7 +373,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { taskMgr.EXPECT().GetSubtaskCntGroupByStates(gomock.Any(), task.ID, gomock.Any()).Return(nil, nil) taskMgr.EXPECT().ResumedTask(gomock.Any(), task.ID).Return(fmt.Errorf("resume err")) require.ErrorContains(t, scheduler.onResuming(), "resume err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // resume task successfully @@ -382,7 +382,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { require.NoError(t, scheduler.onResuming()) tmpTask := task tmpTask.State = proto.TaskStateRunning - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) }) @@ -393,7 +393,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { schExt.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) taskMgr.EXPECT().RevertedTask(gomock.Any(), task.ID).Return(fmt.Errorf("reverted err")) require.ErrorContains(t, scheduler.onReverting(), "reverted err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // revert task successfully @@ -403,7 +403,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { require.NoError(t, scheduler.onReverting()) tmpTask := task tmpTask.State = proto.TaskStateReverted - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) }) @@ -415,7 +415,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { Return(nil, errors.New("plan err")) schExt.EXPECT().IsRetryableErr(gomock.Any()).Return(true) require.ErrorContains(t, scheduler.switch2NextStep(), "plan err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // non-retryable plan error, but failed to revert, task state unchanged schExt.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -423,7 +423,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { schExt.EXPECT().IsRetryableErr(gomock.Any()).Return(false) taskMgr.EXPECT().RevertTask(gomock.Any(), task.ID, gomock.Any(), gomock.Any()).Return(fmt.Errorf("revert err")) require.ErrorContains(t, scheduler.switch2NextStep(), "revert err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // non-retryable plan error, task state changed to reverting schExt.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -434,7 +434,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { tmpTask := task tmpTask.State = proto.TaskStateReverting tmpTask.Error = fmt.Errorf("revert err") - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) // revert task back @@ -445,7 +445,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { Return(nil, nil) taskMgr.EXPECT().GetUsedSlotsOnNodes(gomock.Any()).Return(nil, fmt.Errorf("update err")) require.ErrorContains(t, scheduler.switch2NextStep(), "update err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) // switch to next step successfully schExt.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). @@ -456,21 +456,21 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { tmpTask = task tmpTask.State = proto.TaskStateRunning tmpTask.Step = proto.StepOne - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) // task done, but update failed, task state unchanged schExt.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) taskMgr.EXPECT().SucceedTask(gomock.Any(), task.ID).Return(fmt.Errorf("update err")) require.ErrorContains(t, scheduler.switch2NextStep(), "update err") - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) // task done successfully, task state changed schExt.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) taskMgr.EXPECT().SucceedTask(gomock.Any(), task.ID).Return(nil) require.NoError(t, scheduler.switch2NextStep()) tmpTask.State = proto.TaskStateSucceed tmpTask.Step = proto.StepDone - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) }) t.Run("test revertTask", func(t *testing.T) { @@ -478,7 +478,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { taskMgr.EXPECT().RevertTask(gomock.Any(), task.ID, gomock.Any(), gomock.Any()).Return(fmt.Errorf("revert err")) require.ErrorContains(t, scheduler.revertTask(fmt.Errorf("task err")), "revert err") - require.Equal(t, *scheduler.GetTask(), task) + require.Equal(t, *scheduler.getTaskClone(), task) require.True(t, ctrl.Satisfied()) taskMgr.EXPECT().RevertTask(gomock.Any(), task.ID, gomock.Any(), gomock.Any()).Return(nil) @@ -486,7 +486,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { tmpTask := task tmpTask.State = proto.TaskStateReverting tmpTask.Error = fmt.Errorf("task err") - require.Equal(t, *scheduler.GetTask(), tmpTask) + require.Equal(t, *scheduler.getTaskClone(), tmpTask) require.True(t, ctrl.Satisfied()) }) } diff --git a/pkg/disttask/framework/storage/history.go b/pkg/disttask/framework/storage/history.go index 9129a002da..47655d05ff 100644 --- a/pkg/disttask/framework/storage/history.go +++ b/pkg/disttask/framework/storage/history.go @@ -83,11 +83,7 @@ func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*prot // GCSubtasks deletes the history subtask which is older than the given days. func (mgr *TaskManager) GCSubtasks(ctx context.Context) error { subtaskHistoryKeepSeconds := defaultSubtaskKeepDays * 24 * 60 * 60 - failpoint.Inject("subtaskHistoryKeepSeconds", func(val failpoint.Value) { - if val, ok := val.(int); ok { - subtaskHistoryKeepSeconds = val - } - }) + failpoint.InjectCall("subtaskHistoryKeepSeconds", &subtaskHistoryKeepSeconds) _, err := mgr.ExecuteSQLWithNewSession( ctx, fmt.Sprintf("DELETE FROM mysql.tidb_background_subtask_history WHERE state_update_time < UNIX_TIMESTAMP() - %d ;", subtaskHistoryKeepSeconds), diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index e047d53a30..1df5619e1c 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -824,7 +824,9 @@ func TestSubtaskHistoryTable(t *testing.T) { require.Len(t, subTasks, 3) // test GC history table. - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)") + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", func(interval *int) { + *interval = 1 + }) time.Sleep(2 * time.Second) subTask4 := testutil.CreateSubTask(t, sm, taskID2, proto.StepInit, tidb1, []byte(meta), proto.TaskTypeExample, 11) diff --git a/tests/realtikvtest/addindextest2/BUILD.bazel b/tests/realtikvtest/addindextest2/BUILD.bazel index 3590e9da6f..fe7e8b99d4 100644 --- a/tests/realtikvtest/addindextest2/BUILD.bazel +++ b/tests/realtikvtest/addindextest2/BUILD.bazel @@ -11,7 +11,6 @@ go_test( deps = [ "//br/pkg/storage", "//pkg/config", - "//pkg/disttask/framework/scheduler", "//pkg/kv", "//pkg/lightning/backend/external", "//pkg/meta/model", diff --git a/tests/realtikvtest/addindextest2/global_sort_test.go b/tests/realtikvtest/addindextest2/global_sort_test.go index c8ba7fa79d..0347079ff1 100644 --- a/tests/realtikvtest/addindextest2/global_sort_test.go +++ b/tests/realtikvtest/addindextest2/global_sort_test.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/meta/model" @@ -85,7 +84,10 @@ func TestGlobalSortBasic(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) - testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()") + ch := make(chan struct{}, 1) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() { + ch <- struct{}{} + }) tk.MustExec("drop database if exists addindexlit;") tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") @@ -117,18 +119,18 @@ func TestGlobalSortBasic(t *testing.T) { tk.MustExec("alter table t add index idx(a);") tk.MustExec("admin check table t;") - <-scheduler.WaitCleanUpFinished + <-ch checkFileCleaned(t, jobID, cloudStorageURI) testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()") tk.MustExec("alter table t add index idx1(a);") tk.MustExec("admin check table t;") - <-scheduler.WaitCleanUpFinished + <-ch checkFileCleaned(t, jobID, cloudStorageURI) tk.MustExec("alter table t add unique index idx2(a);") tk.MustExec("admin check table t;") - <-scheduler.WaitCleanUpFinished + <-ch checkFileCleaned(t, jobID, cloudStorageURI) } diff --git a/tests/realtikvtest/importintotest/BUILD.bazel b/tests/realtikvtest/importintotest/BUILD.bazel index 8f059b7eeb..f3fdf08016 100644 --- a/tests/realtikvtest/importintotest/BUILD.bazel +++ b/tests/realtikvtest/importintotest/BUILD.bazel @@ -22,7 +22,6 @@ go_test( "//br/pkg/utils", "//pkg/config", "//pkg/disttask/framework/proto", - "//pkg/disttask/framework/scheduler", "//pkg/disttask/framework/storage", "//pkg/disttask/framework/testutil", "//pkg/disttask/importinto", diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index 0b45fd819f..32e3920c4f 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/tidb/br/pkg/mock/mocklocal" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/importinto" "github.com/pingcap/tidb/pkg/domain/infosync" @@ -864,7 +863,10 @@ func (s *mockGCSSuite) TestImportMode() { // NOTE: this case only runs when current instance is TiDB owner, if you run it locally, // better start a cluster without TiDB instance. testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/parser/ast/forceRedactURL", "return(true)") - testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()") + ch := make(chan struct{}, 1) + testfailpoint.EnableCall(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() { + ch <- struct{}{} + }) sql := fmt.Sprintf(`IMPORT INTO load_data.import_mode FROM 'gs://test-load/import_mode-*.tsv?access-key=aaaaaa&secret-access-key=bbbbbb&endpoint=%s'`, gcsEndpoint) rows := s.tk.MustQuery(sql).Rows() s.Len(rows, 1) @@ -872,7 +874,7 @@ func (s *mockGCSSuite) TestImportMode() { s.NoError(err) s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111")) s.Greater(intoNormalTime, intoImportTime) - <-scheduler.WaitCleanUpFinished + <-ch s.checkTaskMetaRedacted(int64(jobID)) // after import step, we should enter normal mode, i.e. we only call ToImportMode once intoNormalTime, intoImportTime = time.Time{}, time.Time{} @@ -888,7 +890,7 @@ func (s *mockGCSSuite) TestImportMode() { s.Greater(intoNormalTime, intoImportTime) s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/importinto/clearLastSwitchTime")) s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/importinto/waitBeforePostProcess")) - <-scheduler.WaitCleanUpFinished + <-ch // test disable_tikv_import_mode, should not call ToImportMode and ToNormalMode s.tk.MustExec("truncate table load_data.import_mode;") @@ -896,7 +898,7 @@ func (s *mockGCSSuite) TestImportMode() { s.tk.MustQuery(sql) s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111")) s.tk.MustExec("truncate table load_data.import_mode;") - <-scheduler.WaitCleanUpFinished + <-ch // test with multirocksdb testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/ddl/util/IsRaftKv2", "return(true)") @@ -905,7 +907,7 @@ func (s *mockGCSSuite) TestImportMode() { s.tk.MustQuery(sql) s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111")) s.tk.MustExec("truncate table load_data.import_mode;") - <-scheduler.WaitCleanUpFinished + <-ch s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/util/IsRaftKv2")) @@ -921,7 +923,7 @@ func (s *mockGCSSuite) TestImportMode() { err = s.tk.QueryToErr(sql) s.Error(err) s.Greater(intoNormalTime, intoImportTime) - <-scheduler.WaitCleanUpFinished + <-ch s.checkTaskMetaRedacted(importer.TestLastImportJobID.Load()) s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished")) } diff --git a/tests/realtikvtest/importintotest4/BUILD.bazel b/tests/realtikvtest/importintotest4/BUILD.bazel index fefb8666c8..6abaad334c 100644 --- a/tests/realtikvtest/importintotest4/BUILD.bazel +++ b/tests/realtikvtest/importintotest4/BUILD.bazel @@ -13,7 +13,6 @@ go_test( deps = [ "//pkg/config", "//pkg/disttask/framework/proto", - "//pkg/disttask/framework/scheduler", "//pkg/disttask/framework/storage", "//pkg/disttask/framework/testutil", "//pkg/disttask/importinto", diff --git a/tests/realtikvtest/importintotest4/global_sort_test.go b/tests/realtikvtest/importintotest4/global_sort_test.go index acc0bec8ab..6734ff65ee 100644 --- a/tests/realtikvtest/importintotest4/global_sort_test.go +++ b/tests/realtikvtest/importintotest4/global_sort_test.go @@ -26,7 +26,6 @@ import ( "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/pingcap/tidb/pkg/disttask/framework/proto" - "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/importinto" "github.com/pingcap/tidb/pkg/executor/importer" @@ -63,7 +62,10 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { s.tk.MustExec(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int, key(a), key(c,d), key(d));`) testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/parser/ast/forceRedactURL", "return(true)") - testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()") + ch := make(chan struct{}, 1) + testfailpoint.EnableCall(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() { + ch <- struct{}{} + }) sortStorageURI := fmt.Sprintf("gs://sorted/import?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint) importSQL := fmt.Sprintf(`import into t FROM 'gs://gs-basic/t.*.csv?endpoint=%s' @@ -78,7 +80,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { )) // check all sorted data cleaned up - <-scheduler.WaitCleanUpFinished + <-ch _, files, err := s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"}) s.NoError(err) @@ -106,7 +108,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { "1 foo1 bar1 123", "2 foo2 bar2 456", "3 foo3 bar3 789", "4 foo4 bar4 123", "5 foo5 bar5 223", "6 foo6 bar6 323", )) - <-scheduler.WaitCleanUpFinished + <-ch // failed task, should clean up all sorted data too. testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/failWhenDispatchWriteIngestSubtask", "return(true)") @@ -121,7 +123,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { return task.State == proto.TaskStateReverted }, 30*time.Second, 300*time.Millisecond) // check all sorted data cleaned up - <-scheduler.WaitCleanUpFinished + <-ch _, files, err = s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"}) s.NoError(err)