dxf: refactor failpoints in scheduler (#57566)

ref pingcap/tidb#57497
This commit is contained in:
D3Hunter
2024-11-21 11:46:45 +08:00
committed by GitHub
parent 91c14a45bb
commit e4e707d4a7
17 changed files with 156 additions and 145 deletions

View File

@ -16,6 +16,7 @@ package integrationtests
import (
"context"
"sync/atomic"
"testing"
"github.com/pingcap/failpoint"
@ -50,10 +51,18 @@ func TestFrameworkPauseAndResume(t *testing.T) {
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
// 1. schedule and pause one running task.
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)")
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask", func(task *proto.Task) {
if counter.Add(1) <= 2 {
if task.State == proto.TaskStateRunning {
_, err := c.TaskMgr.PauseTask(c.Ctx, task.Key)
require.NoError(t, err)
}
}
})
task1 := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1)
require.Equal(t, proto.TaskStatePaused, task1.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(c.Ctx, "key1"))
task1Base := testutil.WaitTaskDone(c.Ctx, t, task1.Key)
@ -67,10 +76,18 @@ func TestFrameworkPauseAndResume(t *testing.T) {
require.Empty(t, errs)
// 2. pause pending task.
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)")
counter.Store(0)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask", func(task *proto.Task) {
if counter.Add(1) <= 2 {
if task.State == proto.TaskStatePending {
_, err := mgr.PauseTask(c.Ctx, task.Key)
require.NoError(t, err)
}
}
})
task2 := testutil.SubmitAndWaitTask(c.Ctx, t, "key2", "", 1)
require.Equal(t, proto.TaskStatePaused, task2.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/beforeRefreshTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(c.Ctx, "key2"))
task2Base := testutil.WaitTaskDone(c.Ctx, t, task2.Key)

View File

@ -15,6 +15,7 @@
package integrationtests
import (
"sync/atomic"
"testing"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
@ -26,7 +27,16 @@ import (
func TestFrameworkRollback(t *testing.T) {
c := testutil.NewTestDXFContext(t, 2, 16, true)
testutil.RegisterRollbackTaskMeta(t, c.MockCtrl, testutil.GetMockRollbackSchedulerExt(c.MockCtrl), c.TestContext)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask", "2*return(true)")
var counter atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/afterRefreshTask",
func(task *proto.Task) {
if counter.Add(1) <= 2 {
if task.State == proto.TaskStateRunning {
require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, task.ID))
}
}
},
)
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1)
require.Equal(t, proto.TaskStateReverted, task.State)

View File

@ -19,10 +19,10 @@ import (
"fmt"
"slices"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
@ -81,10 +81,17 @@ func TestScopeBasic(t *testing.T) {
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")
<-scheduler.TestRefreshedChan
var fpEnabled atomic.Bool
ch := make(chan struct{})
fpEnabled.Store(true)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", func() {
if fpEnabled.Load() {
ch <- struct{}{}
}
})
<-ch
taskID = submitTaskAndCheckSuccessForScope(c.Ctx, t, "😊", nodeCnt, "background", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
fpEnabled.Store(false)
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows(""))
@ -94,10 +101,10 @@ func TestScopeBasic(t *testing.T) {
// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
time.Sleep(5 * time.Second)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "1*return()")
<-scheduler.TestRefreshedChan
fpEnabled.Store(true)
<-ch
taskID = submitTaskAndCheckSuccessForScope(c.Ctx, t, "😆", nodeCnt, "background", c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
fpEnabled.Store(false)
checkSubtaskOnNodes(c.Ctx, t, taskID, []string{":4000", ":4001"})
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4000"`).Check(testkit.Rows("background"))
tk.MustQuery(`select role from mysql.dist_framework_meta where host=":4001"`).Check(testkit.Rows("background"))
@ -146,12 +153,19 @@ func runTargetScopeCase(t *testing.T, c *testutil.TestDXFContext, tk *testkit.Te
for i := 0; i < len(testCase.nodeScopes); i++ {
tk.MustExec(fmt.Sprintf("update mysql.dist_framework_meta set role = \"%s\" where host = \"%s\"", testCase.nodeScopes[i], c.GetNodeIDByIdx(i)))
}
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", "3*return()")
<-scheduler.TestRefreshedChan
<-scheduler.TestRefreshedChan
<-scheduler.TestRefreshedChan
var fpEnabled atomic.Bool
ch := make(chan struct{})
fpEnabled.Store(true)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh", func() {
if fpEnabled.Load() {
ch <- struct{}{}
}
})
<-ch
<-ch
<-ch
taskID := submitTaskAndCheckSuccessForScope(c.Ctx, t, "task"+strconv.Itoa(idx), nodeCnt, testCase.scope, c.TestContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncRefresh"))
fpEnabled.Store(false)
expected := make([]string, 0)
for i, scope := range testCase.nodeScopes {
if scope == testCase.scope {

View File

@ -168,8 +168,14 @@ func TestOwnerChangeWhenSchedule(t *testing.T) {
}
func TestGC(t *testing.T) {
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)")
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", "return(1)")
ch := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", func(interval *int) {
*interval = 1
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/historySubtaskTableGcInterval", func(interval *time.Duration) {
*interval = 1 * time.Second
<-ch
})
c := testutil.NewTestDXFContext(t, 3, 16, true)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
@ -188,7 +194,7 @@ func TestGC(t *testing.T) {
return historySubTasksCnt == 4
}, 10*time.Second, 500*time.Millisecond)
scheduler.WaitTaskFinished <- struct{}{}
ch <- struct{}{}
require.Eventually(t, func() bool {
historySubTasksCnt, err := testutil.GetSubtasksFromHistory(c.Ctx, mgr)
@ -225,11 +231,14 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
scheduler.DefaultCleanUpInterval = 500 * time.Millisecond
c := testutil.NewTestDXFContext(t, 3, 16, true)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()")
ch := make(chan struct{}, 1)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() {
ch <- struct{}{}
})
// normal
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key1", c.TestContext)
<-scheduler.WaitCleanUpFinished
<-ch
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
tasks, err := mgr.GetTaskByKeyWithHistory(c.Ctx, "key1")
@ -242,7 +251,7 @@ func TestFrameworkCleanUpRoutine(t *testing.T) {
// transfer err
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mockTransferErr", "1*return()")
submitTaskAndCheckSuccessForBasic(c.Ctx, t, "key2", c.TestContext)
<-scheduler.WaitCleanUpFinished
<-ch
mgr, err = storage.GetTaskManager()
require.NoError(t, err)
tasks, err = mgr.GetTaskByKeyWithHistory(c.Ctx, "key1")
@ -257,7 +266,12 @@ func TestTaskCancelledBeforeUpdateTask(t *testing.T) {
c := testutil.NewTestDXFContext(t, 1, 16, true)
testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetMockBasicSchedulerExt(c.MockCtrl), c.TestContext, nil)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", "1*return(true)")
var once sync.Once
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelBeforeUpdateTask", func(taskID int64) {
once.Do(func() {
require.NoError(t, c.TaskMgr.CancelTask(c.Ctx, taskID))
})
})
task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", "", 1)
require.Equal(t, proto.TaskStateReverted, task.State)
}

View File

@ -130,9 +130,6 @@ func (nm *NodeManager) refreshNodesLoop(ctx context.Context, taskMgr TaskManager
}
}
// TestRefreshedChan is used to sync the test.
var TestRefreshedChan = make(chan struct{})
// refreshNodes maintains the nodes managed by the framework.
func (nm *NodeManager) refreshNodes(ctx context.Context, taskMgr TaskManager, slotMgr *SlotManager) {
newNodes, err := taskMgr.GetAllNodes(ctx)
@ -150,9 +147,7 @@ func (nm *NodeManager) refreshNodes(ctx context.Context, taskMgr TaskManager, sl
slotMgr.updateCapacity(cpuCount)
nm.nodes.Store(&newNodes)
failpoint.Inject("syncRefresh", func() {
TestRefreshedChan <- struct{}{}
})
failpoint.InjectCall("syncRefresh")
}
// GetNodes returns the nodes managed by the framework.

View File

@ -68,6 +68,7 @@ type Scheduler interface {
// Close closes the scheduler, should be called if Init returns nil.
Close()
// GetTask returns the task that the scheduler is managing.
// the task is for read only, it might be accessed by multiple goroutines
GetTask() *proto.Task
Extension
}
@ -129,6 +130,12 @@ func (s *BaseScheduler) GetTask() *proto.Task {
return s.task.Load()
}
// getTaskClone returns a clone of the task.
func (s *BaseScheduler) getTaskClone() *proto.Task {
clone := *s.GetTask()
return &clone
}
// refreshTaskIfNeeded fetch task state from tidb_global_task table.
func (s *BaseScheduler) refreshTaskIfNeeded() error {
task := s.GetTask()
@ -168,6 +175,7 @@ func (s *BaseScheduler) scheduleTask() {
case <-ticker.C:
}
failpoint.InjectCall("beforeRefreshTask", s.GetTask())
err := s.refreshTaskIfNeeded()
if err != nil {
if errors.Cause(err) == storage.ErrTaskNotFound {
@ -179,41 +187,12 @@ func (s *BaseScheduler) scheduleTask() {
s.logger.Error("refresh task failed", zap.Error(err))
continue
}
task := *s.GetTask()
failpoint.InjectCall("afterRefreshTask", s.GetTask())
task := s.getTaskClone()
// TODO: refine failpoints below.
failpoint.Inject("exitScheduler", func() {
failpoint.Return()
})
failpoint.Inject("cancelTaskAfterRefreshTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStateRunning {
err := s.taskMgr.CancelTask(s.ctx, task.ID)
if err != nil {
s.logger.Error("cancel task failed", zap.Error(err))
}
}
})
failpoint.Inject("pausePendingTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStatePending {
_, err := s.taskMgr.PauseTask(s.ctx, task.Key)
if err != nil {
s.logger.Error("pause task failed", zap.Error(err))
}
task.State = proto.TaskStatePausing
s.task.Store(&task)
}
})
failpoint.Inject("pauseTaskAfterRefreshTask", func(val failpoint.Value) {
if val.(bool) && task.State == proto.TaskStateRunning {
_, err := s.taskMgr.PauseTask(s.ctx, task.Key)
if err != nil {
s.logger.Error("pause task failed", zap.Error(err))
}
task.State = proto.TaskStatePausing
s.task.Store(&task)
}
})
switch task.State {
case proto.TaskStateCancelling:
@ -280,7 +259,7 @@ func (s *BaseScheduler) onCancelling() error {
// handle task in pausing state, cancel all running subtasks.
func (s *BaseScheduler) onPausing() error {
task := *s.GetTask()
task := s.getTaskClone()
s.logger.Info("on pausing state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step)
@ -299,7 +278,7 @@ func (s *BaseScheduler) onPausing() error {
return err
}
task.State = proto.TaskStatePaused
s.task.Store(&task)
s.task.Store(task)
return nil
}
@ -314,7 +293,7 @@ func (s *BaseScheduler) onPaused() error {
// handle task in resuming state.
func (s *BaseScheduler) onResuming() error {
task := *s.GetTask()
task := s.getTaskClone()
s.logger.Info("on resuming state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step)
@ -329,7 +308,7 @@ func (s *BaseScheduler) onResuming() error {
return err
}
task.State = proto.TaskStateRunning
s.task.Store(&task)
s.task.Store(task)
return nil
}
@ -338,7 +317,7 @@ func (s *BaseScheduler) onResuming() error {
// handle task in reverting state, check all revert subtasks finishes.
func (s *BaseScheduler) onReverting() error {
task := *s.GetTask()
task := s.getTaskClone()
s.logger.Debug("on reverting state", zap.Stringer("state", task.State),
zap.String("step", proto.Step2Str(task.Type, task.Step)))
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, task.ID, task.Step)
@ -348,18 +327,18 @@ func (s *BaseScheduler) onReverting() error {
}
runnableSubtaskCnt := cntByStates[proto.SubtaskStatePending] + cntByStates[proto.SubtaskStateRunning]
if runnableSubtaskCnt == 0 {
if err = s.OnDone(s.ctx, s, &task); err != nil {
if err = s.OnDone(s.ctx, s, task); err != nil {
return errors.Trace(err)
}
if err = s.taskMgr.RevertedTask(s.ctx, task.ID); err != nil {
return errors.Trace(err)
}
task.State = proto.TaskStateReverted
s.task.Store(&task)
s.task.Store(task)
return nil
}
// Wait all subtasks in this step finishes.
s.OnTick(s.ctx, &task)
s.OnTick(s.ctx, task)
s.logger.Debug("on reverting state, this task keeps current state", zap.Stringer("state", task.State))
return nil
}
@ -412,14 +391,14 @@ func (s *BaseScheduler) onFinished() {
}
func (s *BaseScheduler) switch2NextStep() error {
task := *s.GetTask()
task := s.getTaskClone()
nextStep := s.GetNextStep(&task.TaskBase)
s.logger.Info("switch to next step",
zap.String("current-step", proto.Step2Str(task.Type, task.Step)),
zap.String("next-step", proto.Step2Str(task.Type, nextStep)))
if nextStep == proto.StepDone {
if err := s.OnDone(s.ctx, s, &task); err != nil {
if err := s.OnDone(s.ctx, s, task); err != nil {
return errors.Trace(err)
}
if err := s.taskMgr.SucceedTask(s.ctx, task.ID); err != nil {
@ -427,7 +406,7 @@ func (s *BaseScheduler) switch2NextStep() error {
}
task.Step = nextStep
task.State = proto.TaskStateSucceed
s.task.Store(&task)
s.task.Store(task)
return nil
}
@ -443,19 +422,19 @@ func (s *BaseScheduler) switch2NextStep() error {
return errors.New("no available TiDB node to dispatch subtasks")
}
metas, err := s.OnNextSubtasksBatch(s.ctx, s, &task, eligibleNodes, nextStep)
metas, err := s.OnNextSubtasksBatch(s.ctx, s, task, eligibleNodes, nextStep)
if err != nil {
s.logger.Warn("generate part of subtasks failed", zap.Error(err))
return s.handlePlanErr(err)
}
if err = s.scheduleSubTask(&task, nextStep, metas, eligibleNodes); err != nil {
if err = s.scheduleSubTask(task, nextStep, metas, eligibleNodes); err != nil {
return err
}
task.Step = nextStep
task.State = proto.TaskStateRunning
// and OnNextSubtasksBatch might change meta of task.
s.task.Store(&task)
s.task.Store(task)
return nil
}
@ -491,9 +470,7 @@ func (s *BaseScheduler) scheduleSubTask(
size += uint64(len(meta))
}
failpoint.Inject("cancelBeforeUpdateTask", func() {
_ = s.taskMgr.CancelTask(s.ctx, task.ID)
})
failpoint.InjectCall("cancelBeforeUpdateTask", task.ID)
// as other fields and generated key and index KV takes space too, we limit
// the size of subtasks to 80% of the transaction limit.
@ -521,7 +498,7 @@ func (s *BaseScheduler) scheduleSubTask(
}
func (s *BaseScheduler) handlePlanErr(err error) error {
task := *s.GetTask()
task := s.getTaskClone()
s.logger.Warn("generate plan failed", zap.Error(err), zap.Stringer("state", task.State))
if s.IsRetryableErr(err) {
return err
@ -530,13 +507,13 @@ func (s *BaseScheduler) handlePlanErr(err error) error {
}
func (s *BaseScheduler) revertTask(taskErr error) error {
task := *s.GetTask()
task := s.getTaskClone()
if err := s.taskMgr.RevertTask(s.ctx, task.ID, task.State, taskErr); err != nil {
return err
}
task.State = proto.TaskStateReverting
task.Error = taskErr
s.task.Store(&task)
s.task.Store(task)
return nil
}

View File

@ -42,9 +42,6 @@ var (
defaultCollectMetricsInterval = 5 * time.Second
)
// WaitTaskFinished is used to sync the test.
var WaitTaskFinished = make(chan struct{})
func (sm *Manager) getSchedulerCount() int {
sm.mu.RLock()
defer sm.mu.RUnlock()
@ -300,13 +297,7 @@ func (sm *Manager) failTask(id int64, currState proto.TaskState, err error) {
func (sm *Manager) gcSubtaskHistoryTableLoop() {
historySubtaskTableGcInterval := defaultHistorySubtaskTableGcInterval
failpoint.Inject("historySubtaskTableGcInterval", func(val failpoint.Value) {
if seconds, ok := val.(int); ok {
historySubtaskTableGcInterval = time.Second * time.Duration(seconds)
}
<-WaitTaskFinished
})
failpoint.InjectCall("historySubtaskTableGcInterval", &historySubtaskTableGcInterval)
sm.logger.Info("subtask table gc loop start")
ticker := time.NewTicker(historySubtaskTableGcInterval)
@ -385,9 +376,6 @@ func (sm *Manager) cleanupTaskLoop() {
}
}
// WaitCleanUpFinished is used to sync the test.
var WaitCleanUpFinished = make(chan struct{}, 1)
// doCleanupTask processes clean up routine defined by each type of tasks and cleanupMeta.
// For example:
//
@ -412,9 +400,7 @@ func (sm *Manager) doCleanupTask() {
sm.logger.Warn("cleanup routine failed", zap.Error(err))
return
}
failpoint.Inject("WaitCleanUpFinished", func() {
WaitCleanUpFinished <- struct{}{}
})
failpoint.InjectCall("WaitCleanUpFinished")
sm.logger.Info("cleanup routine success")
}

View File

@ -89,7 +89,6 @@ func TestSchedulerCleanupTask(t *testing.T) {
require.True(t, ctrl.Satisfied())
// fail in transfer
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "1*return()"))
mockErr := errors.New("transfer err")
taskMgr.EXPECT().GetTasksInStates(
mgr.ctx,
@ -108,8 +107,6 @@ func TestSchedulerCleanupTask(t *testing.T) {
taskMgr.EXPECT().TransferTasks2History(mgr.ctx, tasks).Return(nil)
mgr.doCleanupTask()
require.True(t, ctrl.Satisfied())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished"))
}
func TestManagerSchedulerNotAllocateSlots(t *testing.T) {

View File

@ -287,7 +287,7 @@ func TestSchedulerRefreshTask(t *testing.T) {
// state/step not changed, no need to refresh
taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&task.TaskBase, nil)
require.NoError(t, scheduler.refreshTaskIfNeeded())
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// get task by id failed
tmpTask := task
@ -295,7 +295,7 @@ func TestSchedulerRefreshTask(t *testing.T) {
taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&tmpTask.TaskBase, nil)
taskMgr.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(nil, errors.New("get task by id err"))
require.ErrorContains(t, scheduler.refreshTaskIfNeeded(), "get task by id err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// state changed
tmpTask = task
@ -303,7 +303,7 @@ func TestSchedulerRefreshTask(t *testing.T) {
taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&tmpTask.TaskBase, nil)
taskMgr.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(&tmpTask, nil)
require.NoError(t, scheduler.refreshTaskIfNeeded())
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
// step changed
scheduler.task.Store(&schTask) // revert
@ -312,7 +312,7 @@ func TestSchedulerRefreshTask(t *testing.T) {
taskMgr.EXPECT().GetTaskBaseByID(gomock.Any(), task.ID).Return(&tmpTask.TaskBase, nil)
taskMgr.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(&tmpTask, nil)
require.NoError(t, scheduler.refreshTaskIfNeeded())
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
}
@ -354,7 +354,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
taskMgr.EXPECT().GetSubtaskCntGroupByStates(gomock.Any(), task.ID, gomock.Any()).Return(nil, nil)
taskMgr.EXPECT().PausedTask(gomock.Any(), task.ID).Return(fmt.Errorf("pause err"))
require.ErrorContains(t, scheduler.onPausing(), "pause err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// pause task successfully
@ -363,7 +363,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
require.NoError(t, scheduler.onPausing())
tmpTask := task
tmpTask.State = proto.TaskStatePaused
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
})
@ -373,7 +373,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
taskMgr.EXPECT().GetSubtaskCntGroupByStates(gomock.Any(), task.ID, gomock.Any()).Return(nil, nil)
taskMgr.EXPECT().ResumedTask(gomock.Any(), task.ID).Return(fmt.Errorf("resume err"))
require.ErrorContains(t, scheduler.onResuming(), "resume err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// resume task successfully
@ -382,7 +382,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
require.NoError(t, scheduler.onResuming())
tmpTask := task
tmpTask.State = proto.TaskStateRunning
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
})
@ -393,7 +393,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
schExt.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
taskMgr.EXPECT().RevertedTask(gomock.Any(), task.ID).Return(fmt.Errorf("reverted err"))
require.ErrorContains(t, scheduler.onReverting(), "reverted err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// revert task successfully
@ -403,7 +403,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
require.NoError(t, scheduler.onReverting())
tmpTask := task
tmpTask.State = proto.TaskStateReverted
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
})
@ -415,7 +415,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
Return(nil, errors.New("plan err"))
schExt.EXPECT().IsRetryableErr(gomock.Any()).Return(true)
require.ErrorContains(t, scheduler.switch2NextStep(), "plan err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// non-retryable plan error, but failed to revert, task state unchanged
schExt.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
@ -423,7 +423,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
schExt.EXPECT().IsRetryableErr(gomock.Any()).Return(false)
taskMgr.EXPECT().RevertTask(gomock.Any(), task.ID, gomock.Any(), gomock.Any()).Return(fmt.Errorf("revert err"))
require.ErrorContains(t, scheduler.switch2NextStep(), "revert err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// non-retryable plan error, task state changed to reverting
schExt.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
@ -434,7 +434,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
tmpTask := task
tmpTask.State = proto.TaskStateReverting
tmpTask.Error = fmt.Errorf("revert err")
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
// revert task back
@ -445,7 +445,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
Return(nil, nil)
taskMgr.EXPECT().GetUsedSlotsOnNodes(gomock.Any()).Return(nil, fmt.Errorf("update err"))
require.ErrorContains(t, scheduler.switch2NextStep(), "update err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
// switch to next step successfully
schExt.EXPECT().OnNextSubtasksBatch(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
@ -456,21 +456,21 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
tmpTask = task
tmpTask.State = proto.TaskStateRunning
tmpTask.Step = proto.StepOne
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
// task done, but update failed, task state unchanged
schExt.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
taskMgr.EXPECT().SucceedTask(gomock.Any(), task.ID).Return(fmt.Errorf("update err"))
require.ErrorContains(t, scheduler.switch2NextStep(), "update err")
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
// task done successfully, task state changed
schExt.EXPECT().OnDone(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
taskMgr.EXPECT().SucceedTask(gomock.Any(), task.ID).Return(nil)
require.NoError(t, scheduler.switch2NextStep())
tmpTask.State = proto.TaskStateSucceed
tmpTask.Step = proto.StepDone
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
})
t.Run("test revertTask", func(t *testing.T) {
@ -478,7 +478,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
taskMgr.EXPECT().RevertTask(gomock.Any(), task.ID, gomock.Any(), gomock.Any()).Return(fmt.Errorf("revert err"))
require.ErrorContains(t, scheduler.revertTask(fmt.Errorf("task err")), "revert err")
require.Equal(t, *scheduler.GetTask(), task)
require.Equal(t, *scheduler.getTaskClone(), task)
require.True(t, ctrl.Satisfied())
taskMgr.EXPECT().RevertTask(gomock.Any(), task.ID, gomock.Any(), gomock.Any()).Return(nil)
@ -486,7 +486,7 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
tmpTask := task
tmpTask.State = proto.TaskStateReverting
tmpTask.Error = fmt.Errorf("task err")
require.Equal(t, *scheduler.GetTask(), tmpTask)
require.Equal(t, *scheduler.getTaskClone(), tmpTask)
require.True(t, ctrl.Satisfied())
})
}

View File

@ -83,11 +83,7 @@ func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*prot
// GCSubtasks deletes the history subtask which is older than the given days.
func (mgr *TaskManager) GCSubtasks(ctx context.Context) error {
subtaskHistoryKeepSeconds := defaultSubtaskKeepDays * 24 * 60 * 60
failpoint.Inject("subtaskHistoryKeepSeconds", func(val failpoint.Value) {
if val, ok := val.(int); ok {
subtaskHistoryKeepSeconds = val
}
})
failpoint.InjectCall("subtaskHistoryKeepSeconds", &subtaskHistoryKeepSeconds)
_, err := mgr.ExecuteSQLWithNewSession(
ctx,
fmt.Sprintf("DELETE FROM mysql.tidb_background_subtask_history WHERE state_update_time < UNIX_TIMESTAMP() - %d ;", subtaskHistoryKeepSeconds),

View File

@ -824,7 +824,9 @@ func TestSubtaskHistoryTable(t *testing.T) {
require.Len(t, subTasks, 3)
// test GC history table.
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)")
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/storage/subtaskHistoryKeepSeconds", func(interval *int) {
*interval = 1
})
time.Sleep(2 * time.Second)
subTask4 := testutil.CreateSubTask(t, sm, taskID2, proto.StepInit, tidb1, []byte(meta), proto.TaskTypeExample, 11)

View File

@ -11,7 +11,6 @@ go_test(
deps = [
"//br/pkg/storage",
"//pkg/config",
"//pkg/disttask/framework/scheduler",
"//pkg/kv",
"//pkg/lightning/backend/external",
"//pkg/meta/model",

View File

@ -26,7 +26,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/meta/model"
@ -85,7 +84,10 @@ func TestGlobalSortBasic(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()")
ch := make(chan struct{}, 1)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() {
ch <- struct{}{}
})
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
@ -117,18 +119,18 @@ func TestGlobalSortBasic(t *testing.T) {
tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()")
tk.MustExec("alter table t add index idx1(a);")
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)
tk.MustExec("alter table t add unique index idx2(a);")
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)
}

View File

@ -22,7 +22,6 @@ go_test(
"//br/pkg/utils",
"//pkg/config",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/disttask/importinto",

View File

@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/br/pkg/mock/mocklocal"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/importinto"
"github.com/pingcap/tidb/pkg/domain/infosync"
@ -864,7 +863,10 @@ func (s *mockGCSSuite) TestImportMode() {
// NOTE: this case only runs when current instance is TiDB owner, if you run it locally,
// better start a cluster without TiDB instance.
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/parser/ast/forceRedactURL", "return(true)")
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()")
ch := make(chan struct{}, 1)
testfailpoint.EnableCall(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() {
ch <- struct{}{}
})
sql := fmt.Sprintf(`IMPORT INTO load_data.import_mode FROM 'gs://test-load/import_mode-*.tsv?access-key=aaaaaa&secret-access-key=bbbbbb&endpoint=%s'`, gcsEndpoint)
rows := s.tk.MustQuery(sql).Rows()
s.Len(rows, 1)
@ -872,7 +874,7 @@ func (s *mockGCSSuite) TestImportMode() {
s.NoError(err)
s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111"))
s.Greater(intoNormalTime, intoImportTime)
<-scheduler.WaitCleanUpFinished
<-ch
s.checkTaskMetaRedacted(int64(jobID))
// after import step, we should enter normal mode, i.e. we only call ToImportMode once
intoNormalTime, intoImportTime = time.Time{}, time.Time{}
@ -888,7 +890,7 @@ func (s *mockGCSSuite) TestImportMode() {
s.Greater(intoNormalTime, intoImportTime)
s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/importinto/clearLastSwitchTime"))
s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/importinto/waitBeforePostProcess"))
<-scheduler.WaitCleanUpFinished
<-ch
// test disable_tikv_import_mode, should not call ToImportMode and ToNormalMode
s.tk.MustExec("truncate table load_data.import_mode;")
@ -896,7 +898,7 @@ func (s *mockGCSSuite) TestImportMode() {
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111"))
s.tk.MustExec("truncate table load_data.import_mode;")
<-scheduler.WaitCleanUpFinished
<-ch
// test with multirocksdb
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/ddl/util/IsRaftKv2", "return(true)")
@ -905,7 +907,7 @@ func (s *mockGCSSuite) TestImportMode() {
s.tk.MustQuery(sql)
s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111"))
s.tk.MustExec("truncate table load_data.import_mode;")
<-scheduler.WaitCleanUpFinished
<-ch
s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/util/IsRaftKv2"))
@ -921,7 +923,7 @@ func (s *mockGCSSuite) TestImportMode() {
err = s.tk.QueryToErr(sql)
s.Error(err)
s.Greater(intoNormalTime, intoImportTime)
<-scheduler.WaitCleanUpFinished
<-ch
s.checkTaskMetaRedacted(importer.TestLastImportJobID.Load())
s.NoError(failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished"))
}

View File

@ -13,7 +13,6 @@ go_test(
deps = [
"//pkg/config",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/disttask/importinto",

View File

@ -26,7 +26,6 @@ import (
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/importinto"
"github.com/pingcap/tidb/pkg/executor/importer"
@ -63,7 +62,10 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
s.tk.MustExec(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int,
key(a), key(c,d), key(d));`)
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/parser/ast/forceRedactURL", "return(true)")
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", "return()")
ch := make(chan struct{}, 1)
testfailpoint.EnableCall(s.T(), "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished", func() {
ch <- struct{}{}
})
sortStorageURI := fmt.Sprintf("gs://sorted/import?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint)
importSQL := fmt.Sprintf(`import into t FROM 'gs://gs-basic/t.*.csv?endpoint=%s'
@ -78,7 +80,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
))
// check all sorted data cleaned up
<-scheduler.WaitCleanUpFinished
<-ch
_, files, err := s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"})
s.NoError(err)
@ -106,7 +108,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
"1 foo1 bar1 123", "2 foo2 bar2 456", "3 foo3 bar3 789",
"4 foo4 bar4 123", "5 foo5 bar5 223", "6 foo6 bar6 323",
))
<-scheduler.WaitCleanUpFinished
<-ch
// failed task, should clean up all sorted data too.
testfailpoint.Enable(s.T(), "github.com/pingcap/tidb/pkg/disttask/importinto/failWhenDispatchWriteIngestSubtask", "return(true)")
@ -121,7 +123,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
return task.State == proto.TaskStateReverted
}, 30*time.Second, 300*time.Millisecond)
// check all sorted data cleaned up
<-scheduler.WaitCleanUpFinished
<-ch
_, files, err = s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"})
s.NoError(err)