506 lines
20 KiB
Go
506 lines
20 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ttlworker_test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/meta/metadef"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
timerapi "github.com/pingcap/tidb/pkg/timer/api"
|
|
"github.com/pingcap/tidb/pkg/timer/tablestore"
|
|
"github.com/pingcap/tidb/pkg/ttl/cache"
|
|
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
|
|
mockutil "github.com/pingcap/tidb/pkg/util/mock"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// createTimerTableSQL returns a SQL to create timer table
|
|
func createTimerTableSQL(dbName, tableName string) string {
|
|
return strings.Replace(metadef.CreateTiDBTimersTable, "mysql.tidb_timers", fmt.Sprintf("`%s`.`%s`", dbName, tableName), 1)
|
|
}
|
|
|
|
func TestTTLManualTriggerOneTimer(t *testing.T) {
|
|
store, do := testkit.CreateMockStoreAndDomain(t)
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec(createTimerTableSQL("test", "test_timers"))
|
|
timerStore := tablestore.NewTableTimerStore(1, do.AdvancedSysSessionPool(), "test", "test_timers", nil)
|
|
defer timerStore.Close()
|
|
var zeroWatermark time.Time
|
|
cli := timerapi.NewDefaultTimerClient(timerStore)
|
|
pool := wrapPoolForTest(do.AdvancedSysSessionPool())
|
|
defer pool.AssertNoSessionInUse(t)
|
|
sync := ttlworker.NewTTLTimerSyncer(pool, cli)
|
|
|
|
tk.MustExec("set @@global.tidb_ttl_job_enable=0")
|
|
tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" +
|
|
"partition p0 values less than (10)," +
|
|
"partition p1 values less than (100)," +
|
|
"partition p2 values less than (1000)" +
|
|
")")
|
|
|
|
key, physical := getPhysicalTableInfo(t, do, "test", "tp1", "p0")
|
|
_, err := cli.GetTimerByKey(context.TODO(), key)
|
|
require.True(t, errors.ErrorEqual(err, timerapi.ErrTimerNotExist))
|
|
|
|
startTrigger := func(ctx context.Context, expectErr string) (func() (string, bool, error), timerapi.ManualRequest) {
|
|
timer, err := cli.GetTimerByKey(context.TODO(), key)
|
|
if !errors.ErrorEqual(timerapi.ErrTimerNotExist, err) {
|
|
require.NoError(t, err)
|
|
require.False(t, timer.IsManualRequesting())
|
|
}
|
|
|
|
_, physical = getPhysicalTableInfo(t, do, "test", "tp1", "p0")
|
|
check, err := sync.ManualTriggerTTLTimer(ctx, physical)
|
|
timer = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p0", zeroWatermark)
|
|
if expectErr != "" {
|
|
require.EqualError(t, err, expectErr)
|
|
require.Nil(t, check)
|
|
require.False(t, timer.IsManualRequesting())
|
|
return nil, timer.ManualRequest
|
|
}
|
|
|
|
require.NoError(t, err)
|
|
require.True(t, timer.IsManualRequesting())
|
|
return check, timer.ManualRequest
|
|
}
|
|
|
|
testCheckFunc := func(check func() (string, bool, error), expectJobID string, expectErr string) {
|
|
jobID, ok, err := check()
|
|
if expectErr != "" {
|
|
require.Empty(t, expectJobID)
|
|
require.Empty(t, jobID)
|
|
require.False(t, ok)
|
|
require.EqualError(t, err, expectErr)
|
|
} else {
|
|
require.NoError(t, err)
|
|
require.Equal(t, ok, jobID != "")
|
|
require.Equal(t, expectJobID, jobID)
|
|
}
|
|
}
|
|
|
|
createJobHistory := func(jobID string) {
|
|
tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb_ttl_job_history (
|
|
job_id,
|
|
table_id,
|
|
parent_table_id,
|
|
table_schema,
|
|
table_name,
|
|
partition_name,
|
|
create_time,
|
|
finish_time,
|
|
ttl_expire,
|
|
expired_rows,
|
|
deleted_rows,
|
|
error_delete_rows,
|
|
status
|
|
)
|
|
VALUES
|
|
(
|
|
'%s', %d, %d, 'test', '%s', '',
|
|
from_unixtime(%d),
|
|
from_unixtime(%d),
|
|
from_unixtime(%d),
|
|
100, 100, 0, 'running'
|
|
)`,
|
|
jobID, physical.TableInfo.ID, physical.ID, physical.TableInfo.Name.O,
|
|
time.Now().Unix(),
|
|
time.Now().Unix()+int64(time.Minute.Seconds()),
|
|
time.Now().Unix()-int64(time.Hour.Seconds()),
|
|
))
|
|
}
|
|
|
|
// start trigger -> not finished -> finished
|
|
check, manual := startTrigger(context.TODO(), "")
|
|
testCheckFunc(check, "", "")
|
|
timer, err := cli.GetTimerByKey(context.TODO(), key)
|
|
require.NoError(t, err)
|
|
manual.ManualEventID = "event123"
|
|
manual.ManualProcessed = true
|
|
require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{
|
|
ManualRequest: timerapi.NewOptionalVal(manual),
|
|
}))
|
|
testCheckFunc(check, "", "")
|
|
createJobHistory("event123")
|
|
testCheckFunc(check, "event123", "")
|
|
|
|
// start trigger -> trigger done but no event id
|
|
check, manual = startTrigger(context.TODO(), "")
|
|
manual.ManualProcessed = true
|
|
require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{
|
|
ManualRequest: timerapi.NewOptionalVal(manual),
|
|
}))
|
|
testCheckFunc(check, "", "manual request failed to trigger, request cancelled")
|
|
|
|
// start trigger -> manual requestID not match
|
|
check, manual = startTrigger(context.TODO(), "")
|
|
manual.ManualRequestID = "anotherreqid"
|
|
require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{
|
|
ManualRequest: timerapi.NewOptionalVal(manual),
|
|
}))
|
|
testCheckFunc(check, "", "manual request failed to trigger, request not found")
|
|
manual.ManualRequestID = "anotherreqid"
|
|
manual.ManualProcessed = true
|
|
require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{
|
|
ManualRequest: timerapi.NewOptionalVal(manual),
|
|
}))
|
|
testCheckFunc(check, "", "manual request failed to trigger, request not found")
|
|
require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{
|
|
ManualRequest: timerapi.NewOptionalVal(timerapi.ManualRequest{}),
|
|
}))
|
|
testCheckFunc(check, "", "manual request failed to trigger, request not found")
|
|
|
|
// start trigger -> trigger not done but timeout
|
|
check, manual = startTrigger(context.TODO(), "")
|
|
manual.ManualRequestTime = time.Now().Add(-time.Minute)
|
|
manual.ManualTimeout = 50 * time.Second
|
|
require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{
|
|
ManualRequest: timerapi.NewOptionalVal(manual),
|
|
}))
|
|
testCheckFunc(check, "", "manual request timeout")
|
|
|
|
// disable ttl
|
|
require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{
|
|
ManualRequest: timerapi.NewOptionalVal(timerapi.ManualRequest{}),
|
|
}))
|
|
tk.MustExec("alter table tp1 ttl_enable='OFF'")
|
|
_, physical = getPhysicalTableInfo(t, do, "test", "tp1", "p0")
|
|
startTrigger(context.TODO(), "manual trigger is not allowed when timer is disabled")
|
|
tk.MustExec("alter table tp1 ttl_enable='ON'")
|
|
|
|
// start trigger -> timer deleted
|
|
check, _ = startTrigger(context.TODO(), "")
|
|
_, err = cli.DeleteTimer(context.TODO(), timer.ID)
|
|
require.NoError(t, err)
|
|
testCheckFunc(check, "", "timer not exist")
|
|
|
|
// ctx timeout
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
check, _ = startTrigger(ctx, "")
|
|
cancel()
|
|
testCheckFunc(check, "", ctx.Err().Error())
|
|
}
|
|
|
|
func TestTTLTimerSync(t *testing.T) {
|
|
origFullRefreshTimerCounter := metrics.TTLFullRefreshTimersCounter
|
|
origSyncTimerCounter := metrics.TTLSyncTimerCounter
|
|
defer func() {
|
|
metrics.TTLFullRefreshTimersCounter = origFullRefreshTimerCounter
|
|
metrics.TTLSyncTimerCounter = origSyncTimerCounter
|
|
}()
|
|
|
|
store, do := testkit.CreateMockStoreAndDomain(t)
|
|
waitAndStopTTLManager(t, do)
|
|
|
|
fullRefreshTimersCounter := &mockutil.MetricsCounter{}
|
|
metrics.TTLFullRefreshTimersCounter = fullRefreshTimersCounter
|
|
syncTimerCounter := &mockutil.MetricsCounter{}
|
|
metrics.TTLSyncTimerCounter = syncTimerCounter
|
|
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec(createTimerTableSQL("test", "test_timers"))
|
|
timerStore := tablestore.NewTableTimerStore(1, do.AdvancedSysSessionPool(), "test", "test_timers", nil)
|
|
defer timerStore.Close()
|
|
|
|
tk.MustExec("set @@global.tidb_ttl_job_enable=0")
|
|
tk.MustExec("create table t0(t timestamp)")
|
|
tk.MustExec("create table t1(t timestamp) TTL=`t`+interval 1 HOUR")
|
|
tk.MustExec("create table t2(t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='1m' ttl_enable='OFF'")
|
|
tk.MustExec("create table t3(t timestamp, t2 timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='1h'")
|
|
tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" +
|
|
"partition p0 values less than (10)," +
|
|
"partition p1 values less than (100)," +
|
|
"partition p2 values less than (1000)" +
|
|
")")
|
|
var zeroTime time.Time
|
|
wm1 := time.Unix(3600*24*12, 0)
|
|
wm2 := time.Unix(3600*24*24, 0)
|
|
insertTTLTableStatusWatermark(t, do, tk, "test", "t1", "", zeroTime, false)
|
|
insertTTLTableStatusWatermark(t, do, tk, "test", "t2", "", wm1, false)
|
|
insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p0", zeroTime, false)
|
|
insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2, true)
|
|
|
|
cli := timerapi.NewDefaultTimerClient(timerStore)
|
|
pool := wrapPoolForTest(do.AdvancedSysSessionPool())
|
|
defer pool.AssertNoSessionInUse(t)
|
|
sync := ttlworker.NewTTLTimerSyncer(pool, cli)
|
|
|
|
lastSyncTime, lastSyncVer := sync.GetLastSyncInfo()
|
|
require.True(t, lastSyncTime.IsZero())
|
|
require.Zero(t, lastSyncVer)
|
|
|
|
// first sync
|
|
now := time.Now()
|
|
require.Equal(t, float64(0), fullRefreshTimersCounter.Val())
|
|
require.Equal(t, float64(0), syncTimerCounter.Val())
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, float64(1), fullRefreshTimersCounter.Val())
|
|
require.Equal(t, float64(6), syncTimerCounter.Val())
|
|
syncCnt := syncTimerCounter.Val()
|
|
lastSyncTime, lastSyncVer = sync.GetLastSyncInfo()
|
|
require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer)
|
|
require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix())
|
|
checkTimerCnt(t, cli, 6)
|
|
timer1 := checkTimerWithTableMeta(t, do, cli, "test", "t1", "", zeroTime)
|
|
timer2 := checkTimerWithTableMeta(t, do, cli, "test", "t2", "", wm1)
|
|
timer3 := checkTimerWithTableMeta(t, do, cli, "test", "t3", "", zeroTime)
|
|
timerP10 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p0", zeroTime)
|
|
timerP11 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p1", wm2)
|
|
timerP12 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p2", zeroTime)
|
|
|
|
// create table/partition
|
|
tk.MustExec("create table t4(t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='2h'")
|
|
tk.MustExec("create table t5(t timestamp)")
|
|
tk.MustExec("alter table tp1 add partition (partition p3 values less than(10000))")
|
|
tk.MustExec("create table tp2(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='6h' partition by range(a) (" +
|
|
"partition p0 values less than (10)," +
|
|
"partition p1 values less than (100)" +
|
|
")")
|
|
now = time.Now()
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, syncCnt+4, syncTimerCounter.Val())
|
|
syncCnt = syncTimerCounter.Val()
|
|
lastSyncTime, lastSyncVer = sync.GetLastSyncInfo()
|
|
require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer)
|
|
require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix())
|
|
checkTimerCnt(t, cli, 10)
|
|
timer4 := checkTimerWithTableMeta(t, do, cli, "test", "t4", "", zeroTime)
|
|
checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p3", zeroTime)
|
|
timerP20 := checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p0", zeroTime)
|
|
timerP21 := checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p1", zeroTime)
|
|
checkTimersNotChange(t, cli, timer1, timer2, timer3, timerP10, timerP11, timerP12)
|
|
|
|
// update table
|
|
tk.MustExec("alter table t1 ttl_enable='OFF'")
|
|
tk.MustExec("alter table t2 ttl_job_interval='6m'")
|
|
tk.MustExec("alter table t3 TTL=`t2`+interval 2 HOUR")
|
|
tk.MustExec("alter table t5 TTL=`t`+interval 10 HOUR ttl_enable='OFF'")
|
|
tk.MustExec("alter table tp1 ttl_job_interval='3m'")
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, syncCnt+7, syncTimerCounter.Val())
|
|
syncCnt = syncTimerCounter.Val()
|
|
checkTimerCnt(t, cli, 11)
|
|
checkTimerWithTableMeta(t, do, cli, "test", "t1", "", zeroTime)
|
|
timer2 = checkTimerWithTableMeta(t, do, cli, "test", "t2", "", wm1)
|
|
timer5 := checkTimerWithTableMeta(t, do, cli, "test", "t5", "", zeroTime)
|
|
timerP10 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p0", zeroTime)
|
|
timerP11 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p1", wm2)
|
|
timerP12 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p2", zeroTime)
|
|
timerP13 := checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p3", zeroTime)
|
|
checkTimersNotChange(t, cli, timer3, timer4, timerP20, timerP21)
|
|
|
|
// rename table
|
|
tk.MustExec("rename table t1 to t1a")
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, syncCnt+1, syncTimerCounter.Val())
|
|
syncCnt = syncTimerCounter.Val()
|
|
checkTimerCnt(t, cli, 11)
|
|
timer1 = checkTimerWithTableMeta(t, do, cli, "test", "t1a", "", zeroTime)
|
|
checkTimersNotChange(t, cli, timer1, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12, timerP13, timerP20, timerP21)
|
|
|
|
// truncate table/partition
|
|
oldTimer2 := timer2
|
|
oldTimerP11 := timerP11
|
|
oldTimerP20 := timerP20
|
|
oldTimerP21 := timerP21
|
|
tk.MustExec("truncate table t2")
|
|
tk.MustExec("alter table tp1 truncate partition p1")
|
|
tk.MustExec("truncate table tp2")
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, syncCnt+7, syncTimerCounter.Val())
|
|
syncCnt = syncTimerCounter.Val()
|
|
checkTimerCnt(t, cli, 15)
|
|
timer2 = checkTimerWithTableMeta(t, do, cli, "test", "t2", "", zeroTime)
|
|
require.NotEqual(t, oldTimer2.ID, timer2.ID)
|
|
timerP11 = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p1", zeroTime)
|
|
require.NotEqual(t, oldTimerP11.ID, timerP11.ID)
|
|
timerP20 = checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p0", zeroTime)
|
|
require.NotEqual(t, oldTimerP20.ID, timerP20.ID)
|
|
timerP21 = checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p1", zeroTime)
|
|
require.NotEqual(t, oldTimerP21.ID, timerP21.ID)
|
|
oldTimer2 = checkTimerOnlyDisabled(t, cli, oldTimer2)
|
|
oldTimerP11 = checkTimerOnlyDisabled(t, cli, oldTimerP11)
|
|
oldTimerP20 = checkTimerOnlyDisabled(t, cli, oldTimerP20)
|
|
oldTimerP21 = checkTimerOnlyDisabled(t, cli, oldTimerP21)
|
|
checkTimersNotChange(t, cli, timer1, timer3, timer4, timer5, timerP10, timerP12, timerP13)
|
|
|
|
// drop table/partition
|
|
tk.MustExec("drop table t1a")
|
|
tk.MustExec("alter table tp1 drop partition p3")
|
|
tk.MustExec("drop table tp2")
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, syncCnt+3, syncTimerCounter.Val())
|
|
syncCnt = syncTimerCounter.Val()
|
|
checkTimerCnt(t, cli, 15)
|
|
checkTimerOnlyDisabled(t, cli, timer1)
|
|
checkTimerOnlyDisabled(t, cli, timerP13)
|
|
checkTimerOnlyDisabled(t, cli, timerP20)
|
|
checkTimerOnlyDisabled(t, cli, timerP21)
|
|
checkTimersNotChange(t, cli, oldTimer2, oldTimerP11, oldTimerP20, oldTimerP21)
|
|
checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12)
|
|
|
|
// clear deleted tables
|
|
sync.SetDelayDeleteInterval(time.Millisecond)
|
|
time.Sleep(time.Second)
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, syncCnt+8, syncTimerCounter.Val())
|
|
checkTimerCnt(t, cli, 7)
|
|
checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12)
|
|
|
|
// https://github.com/pingcap/tidb/issues/57112
|
|
// table created and deleted but still in delay interval
|
|
timer3Key := timer3.Key
|
|
_, ok := sync.GetCachedTimerRecord(timer3Key)
|
|
require.True(t, ok)
|
|
sync.SetDelayDeleteInterval(time.Hour)
|
|
tk.MustExec("drop table t3")
|
|
tk.MustExec("delete from test.test_timers where ID = " + timer3.ID)
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, syncCnt+9, syncTimerCounter.Val())
|
|
syncCnt = syncTimerCounter.Val()
|
|
// timer3 should be deleted from cache because the timer is deleted from storage.
|
|
_, ok = sync.GetCachedTimerRecord(timer3Key)
|
|
require.False(t, ok)
|
|
checkTimerCnt(t, cli, 6)
|
|
|
|
// reset timers
|
|
sync.Reset()
|
|
lastSyncTime, lastSyncVer = sync.GetLastSyncInfo()
|
|
require.True(t, lastSyncTime.IsZero())
|
|
require.Zero(t, lastSyncVer)
|
|
|
|
// sync after reset
|
|
now = time.Now()
|
|
require.Equal(t, float64(1), fullRefreshTimersCounter.Val())
|
|
sync.SyncTimers(context.TODO(), do.InfoSchema())
|
|
require.Equal(t, float64(2), fullRefreshTimersCounter.Val())
|
|
require.Equal(t, syncCnt, syncTimerCounter.Val())
|
|
lastSyncTime, lastSyncVer = sync.GetLastSyncInfo()
|
|
require.Equal(t, do.InfoSchema().SchemaMetaVersion(), lastSyncVer)
|
|
require.GreaterOrEqual(t, lastSyncTime.Unix(), now.Unix())
|
|
checkTimerCnt(t, cli, 6)
|
|
checkTimersNotChange(t, cli, timer2, timer4, timer5, timerP10, timerP11, timerP12)
|
|
}
|
|
|
|
func insertTTLTableStatusWatermark(t *testing.T, do *domain.Domain, tk *testkit.TestKit, db, table, partition string, watermark time.Time, jobRunning bool) {
|
|
_, physical := getPhysicalTableInfo(t, do, db, table, partition)
|
|
if watermark.IsZero() {
|
|
tk.MustExec("insert into mysql.tidb_ttl_table_status (table_id, parent_table_id) values (?, ?)", physical.ID, physical.TableInfo.ID)
|
|
return
|
|
}
|
|
|
|
if jobRunning {
|
|
tk.MustExec(
|
|
"insert into mysql.tidb_ttl_table_status (table_id, parent_table_id, last_job_id, last_job_start_time, last_job_finish_time, last_job_ttl_expire, current_job_id, current_job_start_time) values(?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), FROM_UNIXTIME(?), ?, FROM_UNIXTIME(?))",
|
|
physical.ID, physical.TableInfo.ID, uuid.NewString(), watermark.Add(-10*time.Minute).Unix(), watermark.Add(-time.Minute).Unix(), watermark.Add(-20*time.Minute).Unix(),
|
|
uuid.NewString(),
|
|
watermark.Unix(),
|
|
)
|
|
} else {
|
|
tk.MustExec(
|
|
"insert into mysql.tidb_ttl_table_status (table_id, parent_table_id, last_job_id, last_job_start_time, last_job_finish_time, last_job_ttl_expire) values(?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), FROM_UNIXTIME(?))",
|
|
physical.ID, physical.TableInfo.ID, uuid.NewString(), watermark.Unix(), watermark.Add(time.Minute).Unix(), watermark.Add(-time.Minute).Unix(),
|
|
)
|
|
}
|
|
}
|
|
|
|
func checkTimerCnt(t *testing.T, cli timerapi.TimerClient, cnt int) {
|
|
timers, err := cli.GetTimers(context.TODO())
|
|
require.NoError(t, err)
|
|
require.Equal(t, cnt, len(timers))
|
|
}
|
|
|
|
func checkTimerOnlyDisabled(t *testing.T, cli timerapi.TimerClient, timer *timerapi.TimerRecord) *timerapi.TimerRecord {
|
|
tm, err := cli.GetTimerByID(context.TODO(), timer.ID)
|
|
require.NoError(t, err)
|
|
if !timer.Enable {
|
|
require.Equal(t, *timer, *tm)
|
|
return timer
|
|
}
|
|
|
|
require.False(t, tm.Enable)
|
|
require.Greater(t, tm.Version, timer.Version)
|
|
tm2 := timer.Clone()
|
|
tm2.Enable = tm.Enable
|
|
tm2.Version = tm.Version
|
|
require.Equal(t, *tm, *tm2)
|
|
return tm
|
|
}
|
|
|
|
func checkTimersNotChange(t *testing.T, cli timerapi.TimerClient, timers ...*timerapi.TimerRecord) {
|
|
for i, timer := range timers {
|
|
tm, err := cli.GetTimerByID(context.TODO(), timer.ID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, *timer, *tm, fmt.Sprintf("index: %d", i))
|
|
}
|
|
}
|
|
|
|
func getPhysicalTableInfo(t *testing.T, do *domain.Domain, db, table, partition string) (string, *cache.PhysicalTable) {
|
|
is := do.InfoSchema()
|
|
tbl, err := is.TableByName(context.Background(), ast.NewCIStr(db), ast.NewCIStr(table))
|
|
require.NoError(t, err)
|
|
tblInfo := tbl.Meta()
|
|
physical, err := cache.NewPhysicalTable(ast.NewCIStr(db), tblInfo, ast.NewCIStr(partition))
|
|
require.NoError(t, err)
|
|
return fmt.Sprintf("/tidb/ttl/physical_table/%d/%d", tblInfo.ID, physical.ID), physical
|
|
}
|
|
|
|
func checkTimerWithTableMeta(t *testing.T, do *domain.Domain, cli timerapi.TimerClient, db, table, partition string, watermark time.Time) *timerapi.TimerRecord {
|
|
is := do.InfoSchema()
|
|
dbInfo, ok := is.SchemaByName(ast.NewCIStr(db))
|
|
require.True(t, ok)
|
|
|
|
key, physical := getPhysicalTableInfo(t, do, db, table, partition)
|
|
timer, err := cli.GetTimerByKey(context.TODO(), key)
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, physical.TTLInfo.Enable, timer.Enable)
|
|
require.Equal(t, timerapi.SchedEventInterval, timer.SchedPolicyType)
|
|
require.Equal(t, physical.TTLInfo.JobInterval, timer.SchedPolicyExpr)
|
|
if partition == "" {
|
|
require.Equal(t, []string{
|
|
fmt.Sprintf("db=%s", dbInfo.Name.O),
|
|
fmt.Sprintf("table=%s", physical.Name.O),
|
|
}, timer.Tags)
|
|
} else {
|
|
require.Equal(t, []string{
|
|
fmt.Sprintf("db=%s", dbInfo.Name.O),
|
|
fmt.Sprintf("table=%s", physical.Name.O),
|
|
fmt.Sprintf("partition=%s", physical.Partition.O),
|
|
}, timer.Tags)
|
|
}
|
|
|
|
require.NotNil(t, timer.Data)
|
|
var timerData ttlworker.TTLTimerData
|
|
require.NoError(t, json.Unmarshal(timer.Data, &timerData))
|
|
require.Equal(t, physical.TableInfo.ID, timerData.TableID)
|
|
require.Equal(t, physical.ID, timerData.PhysicalID)
|
|
require.Equal(t, watermark.Unix(), timer.Watermark.Unix())
|
|
return timer
|
|
}
|