diff --git a/pkg/ttl/ttlworker/job_manager_test.go b/pkg/ttl/ttlworker/job_manager_test.go index e7437703dc..c40b83b589 100644 --- a/pkg/ttl/ttlworker/job_manager_test.go +++ b/pkg/ttl/ttlworker/job_manager_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" ) @@ -673,10 +674,18 @@ func TestLocalJobs(t *testing.T) { } func TestSplitCnt(t *testing.T) { + mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + require.Equal(t, 64, getScanSplitCnt(nil)) require.Equal(t, 64, getScanSplitCnt(&mockKVStore{})) - s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)} + s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)} for i := uint64(1); i <= 128; i++ { s.GetRegionCache().SetRegionCacheStore(i, "", "", tikvrpc.TiKV, 1, nil) if i <= 64 { diff --git a/pkg/ttl/ttlworker/task_manager.go b/pkg/ttl/ttlworker/task_manager.go index d491146487..d8cedb6695 100644 --- a/pkg/ttl/ttlworker/task_manager.go +++ b/pkg/ttl/ttlworker/task_manager.go @@ -309,6 +309,15 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) { return } + if len(tasks) == 0 { + return + } + + err = m.infoSchemaCache.Update(se) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update infoSchemaCache", zap.Error(err)) + return + } loop: for _, t := range tasks { logger := logutil.Logger(m.ctx).With( diff --git a/pkg/ttl/ttlworker/task_manager_integration_test.go b/pkg/ttl/ttlworker/task_manager_integration_test.go index bdb5d03819..0e605e10bd 100644 --- a/pkg/ttl/ttlworker/task_manager_integration_test.go +++ b/pkg/ttl/ttlworker/task_manager_integration_test.go @@ -130,8 +130,6 @@ func TestParallelSchedule(t *testing.T) { sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i) tk.MustExec(sql) } - isc := cache.NewInfoSchemaCache(time.Second) - require.NoError(t, isc.Update(sessionFactory())) scheduleWg := sync.WaitGroup{} finishTasks := make([]func(), 0, 4) for i := 0; i < 4; i++ { @@ -143,7 +141,7 @@ func TestParallelSchedule(t *testing.T) { } managerID := fmt.Sprintf("task-manager-%d", i) - m := ttlworker.NewTaskManager(context.Background(), nil, isc, managerID, store) + m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), managerID, store) m.SetScanWorkers4Test(workers) scheduleWg.Add(1) go func() { @@ -187,14 +185,10 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1) tk.MustExec(sql) - // update the infoschema cache - isc := cache.NewInfoSchemaCache(time.Second) - require.NoError(t, isc.Update(sessionFactory())) - // schedule in a task manager scanWorker := ttlworker.NewMockScanWorker(t) scanWorker.Start() - m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store) + m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-1", store) m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) se := sessionFactory() now := se.Now() @@ -204,7 +198,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { // another task manager should fetch this task after heartbeat expire scanWorker2 := ttlworker.NewMockScanWorker(t) scanWorker2.Start() - m2 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-2", store) + m2 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-2", store) m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2}) m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2")) @@ -215,7 +209,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { m2.CheckFinishedTask(sessionFactory(), now) scanWorker3 := ttlworker.NewMockScanWorker(t) scanWorker3.Start() - m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3", store) + m3 := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Second), "task-manager-3", store) m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3}) m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2")) @@ -235,14 +229,10 @@ func TestTaskMetrics(t *testing.T) { sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1) tk.MustExec(sql) - // update the infoschema cache - isc := cache.NewInfoSchemaCache(time.Second) - require.NoError(t, isc.Update(sessionFactory())) - // schedule in a task manager scanWorker := ttlworker.NewMockScanWorker(t) scanWorker.Start() - m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store) + m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store) m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) se := sessionFactory() now := se.Now() @@ -268,13 +258,11 @@ func TestRescheduleWithError(t *testing.T) { se := sessionFactory() now := se.Now() - isc := cache.NewInfoSchemaCache(time.Second) - require.NoError(t, isc.Update(se)) // schedule in a task manager scanWorker := ttlworker.NewMockScanWorker(t) scanWorker.Start() - m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1", store) + m := ttlworker.NewTaskManager(context.Background(), nil, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store) m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) notify := make(chan struct{}) go func() { @@ -307,8 +295,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) { sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i) tk.MustExec(sql) } - isc := cache.NewInfoSchemaCache(time.Second) - require.NoError(t, isc.Update(sessionFactory())) + scheduleWg := sync.WaitGroup{} for i := 0; i < 16; i++ { workers := []ttlworker.Worker{} @@ -319,7 +306,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) { } ctx := logutil.WithKeyValue(context.Background(), "ttl-worker-test", fmt.Sprintf("task-manager-%d", i)) - m := ttlworker.NewTaskManager(ctx, nil, isc, fmt.Sprintf("task-manager-%d", i), store) + m := ttlworker.NewTaskManager(ctx, nil, cache.NewInfoSchemaCache(time.Minute), fmt.Sprintf("task-manager-%d", i), store) m.SetScanWorkers4Test(workers) scheduleWg.Add(1) go func() { @@ -384,9 +371,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) { se := sessionFactory() now := se.Now() - isc := cache.NewInfoSchemaCache(time.Minute) - require.NoError(t, isc.Update(se)) - m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store) + m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-1", store) startBlockNotifyCh := make(chan struct{}) blockCancelCh := make(chan struct{}) @@ -522,7 +507,7 @@ func TestShrinkScanWorkerAndResignOwner(t *testing.T) { )) // A resigned task can be obtained by other task managers - m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-2", store) + m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "scan-manager-2", store) worker2 := ttlworker.NewMockScanWorker(t) worker2.Start() defer func() { @@ -562,8 +547,6 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) { sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i) tk.MustExec(sql) } - isc := cache.NewInfoSchemaCache(time.Second) - require.NoError(t, isc.Update(se)) workers := []ttlworker.Worker{} for j := 0; j < 8; j++ { @@ -573,10 +556,10 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) { } now := se.Now() - m1 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store) + m1 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store) m1.SetScanWorkers4Test(workers[0:4]) m1.RescheduleTasks(se, now) - m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-2", store) + m2 := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-2", store) m2.SetScanWorkers4Test(workers[4:]) // All tasks should be scheduled to m1 and running @@ -665,9 +648,7 @@ func TestHeartBeatErrorNotBlockOthers(t *testing.T) { se := sessionFactory() now := se.Now() - isc := cache.NewInfoSchemaCache(time.Minute) - require.NoError(t, isc.Update(se)) - m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store) + m := ttlworker.NewTaskManager(context.Background(), pool, cache.NewInfoSchemaCache(time.Minute), "task-manager-1", store) workers := []ttlworker.Worker{} for j := 0; j < 4; j++ { scanWorker := ttlworker.NewMockScanWorker(t) diff --git a/pkg/ttl/ttlworker/task_manager_test.go b/pkg/ttl/ttlworker/task_manager_test.go index 5372d8c298..45dad9491c 100644 --- a/pkg/ttl/ttlworker/task_manager_test.go +++ b/pkg/ttl/ttlworker/task_manager_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" ) @@ -283,6 +284,14 @@ func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache { } func TestGetMaxRunningTasksLimit(t *testing.T) { + mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + variable.TTLRunningTasks.Store(1) require.Equal(t, 1, getMaxRunningTasksLimit(&mockTiKVStore{})) @@ -294,7 +303,7 @@ func TestGetMaxRunningTasksLimit(t *testing.T) { require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockKVStore{})) require.Equal(t, variable.MaxConfigurableConcurrency, getMaxRunningTasksLimit(&mockTiKVStore{})) - s := &mockTiKVStore{regionCache: tikv.NewRegionCache(nil)} + s := &mockTiKVStore{regionCache: tikv.NewRegionCache(pdClient)} s.GetRegionCache().SetRegionCacheStore(1, "", "", tikvrpc.TiKV, 1, nil) s.GetRegionCache().SetRegionCacheStore(2, "", "", tikvrpc.TiKV, 1, nil) s.GetRegionCache().SetRegionCacheStore(3, "", "", tikvrpc.TiFlash, 1, nil)