ttl: reduce some warnings logs when locking TTL tasks (#58306)

close pingcap/tidb#58305
This commit is contained in:
王超
2024-12-17 21:21:03 +08:00
committed by GitHub
parent db2776a252
commit d92dce025a
4 changed files with 42 additions and 34 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)
@ -673,10 +674,18 @@ func TestLocalJobs(t *testing.T) {
}
func TestSplitCnt(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()
require.Equal(t, 64, getScanSplitCnt(nil))
require.Equal(t, 64, getScanSplitCnt(&mockKVStore{}))
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)}
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)}
for i := uint64(1); i <= 128; i++ {
s.GetRegionCache().SetRegionCacheStore(i, "", "", tikvrpc.TiKV, 1, nil)
if i <= 64 {

View File

@ -309,6 +309,15 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) {
return
}
if len(tasks) == 0 {
return
}
err = m.infoSchemaCache.Update(se)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update infoSchemaCache", zap.Error(err))
return
}
loop:
for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(

View File

@ -130,8 +130,6 @@ func TestParallelSchedule(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
tk.MustExec(sql)
}
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
scheduleWg := sync.WaitGroup{}
finishTasks := make([]func(), 0, 4)
for i := 0; i < 4; i++ {
@ -143,7 +141,7 @@ func TestParallelSchedule(t *testing.T) {
}
managerID := fmt.Sprintf("task-manager-%d", i)
m := ttlworker.NewTaskManager(context.Background(), nil, isc, managerID, store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), managerID, store)
m.SetScanWorkers4Test(workers)
scheduleWg.Add(1)
go func() {
@ -187,14 +185,10 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1)
tk.MustExec(sql)
// update the infoschema cache
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-1", store)
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
se := sessionFactory()
now := se.Now()
@ -204,7 +198,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
// another task manager should fetch this task after heartbeat expire
scanWorker2 := ttlworker.NewMockScanWorker(t)
scanWorker2.Start()
m2 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-2", store)
m2 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-2", store)
m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2})
m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2"))
@ -215,7 +209,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
m2.CheckFinishedTask(sessionFactory(), now)
scanWorker3 := ttlworker.NewMockScanWorker(t)
scanWorker3.Start()
m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3", store)
m3 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-3", store)
m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3})
m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2"))
@ -235,14 +229,10 @@ func TestTaskMetrics(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1)
tk.MustExec(sql)
// update the infoschema cache
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
se := sessionFactory()
now := se.Now()
@ -268,13 +258,11 @@ func TestRescheduleWithError(t *testing.T) {
se := sessionFactory()
now := se.Now()
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(se))
// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
notify := make(chan struct{})
go func() {
@ -307,8 +295,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
tk.MustExec(sql)
}
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
scheduleWg := sync.WaitGroup{}
for i := 0; i < 16; i++ {
workers := []ttlworker.Worker{}
@ -319,7 +306,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) {
}
ctx := logutil.WithKeyValue(context.Background(), "ttl-worker-test", fmt.Sprintf("task-manager-%d", i))
m := ttlworker.NewTaskManager(ctx, nil, isc, fmt.Sprintf("task-manager-%d", i), store)
m := ttlworker.NewTaskManager(ctx, nil, cache.NewInfoSchemaCache(time.Minute), fmt.Sprintf("task-manager-%d", i), store)
m.SetScanWorkers4Test(workers)
scheduleWg.Add(1)
go func() {
@ -384,9 +371,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
se := sessionFactory()
now := se.Now()
isc := cache.NewInfoSchemaCache(time.Minute)
require.NoError(t, isc.Update(se))
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-1", store)
startBlockNotifyCh := make(chan struct{})
blockCancelCh := make(chan struct{})
@ -522,7 +507,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) {
))
// A resigned task can be obtained by other task managers
m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-2", store)
m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-2", store)
worker2 := ttlworker.NewMockScanWorker(t)
worker2.Start()
defer func() {
@ -562,8 +547,6 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i)
tk.MustExec(sql)
}
isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(se))
workers := []ttlworker.Worker{}
for j := 0; j < 8; j++ {
@ -573,10 +556,10 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
}
now := se.Now()
m1 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
m1 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
m1.SetScanWorkers4Test(workers[0:4])
m1.RescheduleTasks(se, now)
m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-2", store)
m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-2", store)
m2.SetScanWorkers4Test(workers[4:])
// All tasks should be scheduled to m1 and running
@ -665,9 +648,7 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
se := sessionFactory()
now := se.Now()
isc := cache.NewInfoSchemaCache(time.Minute)
require.NoError(t, isc.Update(se))
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store)
workers := []ttlworker.Worker{}
for j := 0; j < 4; j++ {
scanWorker := ttlworker.NewMockScanWorker(t)

View File

@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)
@ -283,6 +284,14 @@ func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache {
}
func TestGetMaxRunningTasksLimit(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()
variable.TTLRunningTasks.Store(1)
require.Equal(t, 1, getMaxRunningTasksLimit(&mockTiKVStore{}))
@ -294,7 +303,7 @@ func TestGetMaxRunningTasksLimit(t *testing.T) {
require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockKVStore{}))
require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockTiKVStore{}))
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)}
s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)}
s.GetRegionCache().SetRegionCacheStore(1, "", "", tikvrpc.TiKV, 1, nil)
s.GetRegionCache().SetRegionCacheStore(2, "", "", tikvrpc.TiKV, 1, nil)
s.GetRegionCache().SetRegionCacheStore(3, "", "", tikvrpc.TiFlash, 1, nil)