ttl: don't fetch ttl scan task after it finished (#40919)
close pingcap/tidb#40918
This commit is contained in:
2
ttl/cache/task.go
vendored
2
ttl/cache/task.go
vendored
@ -59,7 +59,7 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{})
|
||||
|
||||
// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
|
||||
func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) {
|
||||
return selectFromTTLTask + " WHERE status = 'waiting' OR owner_hb_time < %? ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
|
||||
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
|
||||
}
|
||||
|
||||
// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
|
||||
|
||||
@ -186,6 +186,17 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
|
||||
m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2})
|
||||
m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
|
||||
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2"))
|
||||
|
||||
// another task manager shouldn't fetch this task if it has finished
|
||||
task := m2.GetRunningTasks()[0]
|
||||
task.SetResult(nil)
|
||||
m2.CheckFinishedTask(sessionFactory(), now)
|
||||
scanWorker3 := ttlworker.NewMockScanWorker(t)
|
||||
scanWorker3.Start()
|
||||
m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3")
|
||||
m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3})
|
||||
m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
|
||||
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2"))
|
||||
}
|
||||
|
||||
func TestTaskMetrics(t *testing.T) {
|
||||
|
||||
@ -54,6 +54,24 @@ func (m *taskManager) ReportMetrics() {
|
||||
m.reportMetrics()
|
||||
}
|
||||
|
||||
// CheckFinishedTask is an exported version of checkFinishedTask
|
||||
func (m *taskManager) CheckFinishedTask(se session.Session, now time.Time) {
|
||||
m.checkFinishedTask(se, now)
|
||||
}
|
||||
|
||||
// ReportTaskFinished is an exported version of reportTaskFinished
|
||||
func (m *taskManager) GetRunningTasks() []*runningScanTask {
|
||||
return m.runningTasks
|
||||
}
|
||||
|
||||
// ReportTaskFinished is an exported version of reportTaskFinished
|
||||
func (t *runningScanTask) SetResult(err error) {
|
||||
t.result = &ttlScanTaskExecResult{
|
||||
task: t.ttlScanTask,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func TestResizeWorkers(t *testing.T) {
|
||||
tbl := newMockTTLTbl(t, "t1")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user