Files
tidb/pkg/ttl/ttlworker/task_manager_test.go
2025-10-08 14:11:03 +00:00

312 lines
9.5 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ttlworker
import (
"context"
"testing"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)
// NewTaskManager is an exported version of newTaskManager for test
var NewTaskManager = newTaskManager
// Worker is an exported version of worker
type Worker = worker
func (m *taskManager) SetScanWorkers4Test(workers []worker) {
m.scanWorkers = workers
}
// LockScanTask is an exported version of lockScanTask
func (m *taskManager) LockScanTask(se session.Session, task *cache.TTLTask, now time.Time) (*runningScanTask, error) {
return m.lockScanTask(se, task, now)
}
// ResizeWorkersWithSysVar is an exported version of resizeWorkersWithSysVar
func (m *taskManager) ResizeWorkersWithSysVar() {
m.resizeWorkersWithSysVar()
}
// ResizeWorkersToZero resize workers to zero
func (m *taskManager) ResizeWorkersToZero(t *testing.T) {
require.NoError(t, m.resizeScanWorkers(0))
require.NoError(t, m.resizeDelWorkers(0))
}
// RescheduleTasks is an exported version of rescheduleTasks
func (m *taskManager) RescheduleTasks(se session.Session, now time.Time) {
m.rescheduleTasks(se, now)
}
// ResizeScanWorkers is an exported version of resizeScanWorkers
func (m *taskManager) ResizeScanWorkers(count int) error {
return m.resizeScanWorkers(count)
}
// ResizeDelWorkers is an exported version of resizeDeleteWorkers
func (m *taskManager) ResizeDelWorkers(count int) error {
return m.resizeDelWorkers(count)
}
// ReportMetrics is an exported version of reportMetrics
func (m *taskManager) ReportMetrics() {
m.reportMetrics()
}
// CheckFinishedTask is an exported version of checkFinishedTask
func (m *taskManager) CheckFinishedTask(se session.Session, now time.Time) {
m.checkFinishedTask(se, now)
}
// ReportTaskFinished is an exported version of reportTaskFinished
func (m *taskManager) GetRunningTasks() []*runningScanTask {
return m.runningTasks
}
// MeetTTLRunningTasks is an exported version of meetTTLRunningTask
func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus) bool {
return m.meetTTLRunningTask(count, taskStatus)
}
// ReportTaskFinished is an exported version of reportTaskFinished
func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error {
return m.reportTaskFinished(se, now, task)
}
// GetScanWorkers returns the scan workers of the task manager.
func (m *taskManager) GetScanWorkers() []worker {
return m.scanWorkers
}
// SetResult sets the result of the task
func (t *runningScanTask) SetResult(err error) {
t.result = t.ttlScanTask.result(err)
}
// SetCancel sets the cancel function of the task
func (t *runningScanTask) SetCancel(cancel func()) {
t.cancel = cancel
}
// CheckInvalidTask is an exported version of checkInvalidTask
func (m *taskManager) CheckInvalidTask(se session.Session) {
m.checkInvalidTask(se)
}
// UpdateHeartBeat is an exported version of updateHeartBeat
func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
m.updateHeartBeat(ctx, se, now)
}
// UpdateHeartBeatForTask is an exported version of updateHeartBeatForTask
func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
return m.taskHeartbeatOrResignOwner(ctx, se, now, task, false)
}
// SetWaitWorkerStopTimeoutForTest sets the waitWorkerStopTimeout for testing
func SetWaitWorkerStopTimeoutForTest(timeout time.Duration) func() {
original := waitWorkerStopTimeout
waitWorkerStopTimeout = timeout
return func() {
waitWorkerStopTimeout = original
}
}
// GetTerminateInfo returns the task terminates info
func (t *runningScanTask) GetTerminateInfo() (bool, TaskTerminateReason, time.Time) {
if t.result == nil {
return false, "", time.Time{}
}
return true, t.result.reason, t.result.time
}
// GetStatistics returns the ttlStatistics
func (t *runningScanTask) GetStatistics() *ttlStatistics {
return t.statistics
}
// ResetEndTimeForTest resets the end time
func (t *runningScanTask) ResetEndTimeForTest(tb *testing.T, tm time.Time) {
require.NotNil(tb, t.result)
t.result.time = tm
}
func TestResizeWorkers(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
// scale workers
scanWorker1 := NewMockScanWorker(t)
scanWorker1.Start()
scanWorker2 := NewMockScanWorker(t)
m := newTaskManager(context.Background(), nil, nil, "test-id", nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
})
newWorkers, _, err := m.resizeWorkers(m.scanWorkers, 2, func() worker {
return scanWorker2
})
assert.NoError(t, err)
assert.Len(t, newWorkers, 2)
scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil)
scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil)
// shrink scan workers
scanWorker1 = NewMockScanWorker(t)
scanWorker1.Start()
scanWorker2 = NewMockScanWorker(t)
scanWorker2.Start()
m = newTaskManager(context.Background(), nil, nil, "test-id", nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
})
assert.NoError(t, m.resizeScanWorkers(1))
scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil)
// shrink scan workers after job is run
scanWorker1 = NewMockScanWorker(t)
scanWorker1.Start()
scanWorker2 = NewMockScanWorker(t)
scanWorker2.Start()
m = newTaskManager(context.Background(), nil, nil, "test-id", nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
})
m.runningTasks = append(m.runningTasks, &runningScanTask{
ttlScanTask: &ttlScanTask{
tbl: tbl,
TTLTask: &cache.TTLTask{
JobID: "test-job-id",
ScanID: 1,
},
},
})
task := &ttlScanTask{tbl: tbl, TTLTask: &cache.TTLTask{
JobID: "test-job-id",
ScanID: 1,
}}
scanWorker2.curTaskResult = task.result(nil)
assert.NoError(t, m.resizeScanWorkers(1))
scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil)
assert.NotNil(t, m.runningTasks[0].result)
}
func TestTaskFinishedCondition(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
task := runningScanTask{
ttlScanTask: &ttlScanTask{
tbl: tbl,
TTLTask: &cache.TTLTask{
JobID: "test-job-id",
ScanID: 1,
},
statistics: &ttlStatistics{},
},
}
logger := logutil.BgLogger()
// result == nil means it is not finished, even if all rows processed
require.Nil(t, task.result)
require.False(t, task.finished(logger))
task.statistics.TotalRows.Store(10)
task.statistics.SuccessRows.Store(10)
require.False(t, task.finished(logger))
for _, resultErr := range []error{nil, errors.New("mockErr")} {
// result != nil but not all rows processed means it is not finished
task.statistics.SuccessRows.Store(0)
task.statistics.ErrorRows.Store(0)
task.result = task.ttlScanTask.result(resultErr)
require.InDelta(t, task.result.time.Unix(), time.Now().Unix(), 5)
require.False(t, task.finished(logger))
task.statistics.SuccessRows.Store(8)
task.statistics.ErrorRows.Store(1)
require.False(t, task.finished(logger))
// result != nil but time out means it is finished
task.result = task.ttlScanTask.result(resultErr)
task.result.time = time.Now().Add(-waitTaskProcessRowsTimeout - time.Second)
require.True(t, task.finished(logger))
// result != nil and processed rows are more that total rows means it is finished
task.statistics.SuccessRows.Store(8)
task.statistics.ErrorRows.Store(3)
require.True(t, task.finished(logger))
}
}
type mockKVStore struct {
kv.Storage
}
type mockTiKVStore struct {
mockKVStore
tikv.Storage
regionCache *tikv.RegionCache
}
func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache {
return s.regionCache
}
func TestGetMaxRunningTasksLimit(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()
vardef.TTLRunningTasks.Store(1)
require.Equal(t, 1, getMaxRunningTasksLimit(&mockTiKVStore{}))
vardef.TTLRunningTasks.Store(2)
require.Equal(t, 2, getMaxRunningTasksLimit(&mockTiKVStore{}))
vardef.TTLRunningTasks.Store(-1)
require.Equal(t, vardef.MaxConfigurableConcurrency, getMaxRunningTasksLimit(nil))
require.Equal(t, vardef.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockKVStore{}))
require.Equal(t, vardef.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockTiKVStore{}))
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)}
s.GetRegionCache().SetRegionCacheStore(1, "", "", tikvrpc.TiKV, 1, nil)
s.GetRegionCache().SetRegionCacheStore(2, "", "", tikvrpc.TiKV, 1, nil)
s.GetRegionCache().SetRegionCacheStore(3, "", "", tikvrpc.TiFlash, 1, nil)
require.Equal(t, 2, getMaxRunningTasksLimit(s))
}