ttl: Add CommandClient to trigger TTL job manually (#40346)

close pingcap/tidb#40345
This commit is contained in:
王超
2023-01-10 11:48:22 +08:00
committed by GitHub
parent f5362f9cc4
commit 9826913de0
8 changed files with 827 additions and 38 deletions

View File

@ -2483,7 +2483,7 @@ func (do *Domain) StartTTLJobManager() {
logutil.BgLogger().Info("ttlJobManager exited.")
}()
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient)
do.ttlJobManager = ttlJobManager
ttlJobManager.Start()

26
ttl/client/BUILD.bazel Normal file
View File

@ -0,0 +1,26 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "client",
srcs = ["command.go"],
importpath = "github.com/pingcap/tidb/ttl/client",
visibility = ["//visibility:public"],
deps = [
"//util/logutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_zap//:zap",
],
)
go_test(
name = "client_test",
srcs = ["command_test.go"],
embed = [":client"],
deps = [
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_tests_v3//integration",
],
)

419
ttl/client/command.go Normal file
View File

@ -0,0 +1,419 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"encoding/json"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
ttlCmdKeyLeaseSeconds int64 = 60
ttlCmdKeyRequestPrefix = "/tidb/ttl/cmd/req/"
ttlCmdKeyResponsePrefix = "/tidb/ttl/cmd/resp/"
ttlCmdTypeTriggerTTLJob = "trigger_ttl_job"
)
// CmdRequest is the request for a TTL command
type CmdRequest struct {
RequestID string `json:"request_id"`
CmdType string `json:"cmd_type"`
Data json.RawMessage `json:"data"`
}
// GetTriggerTTLJobRequest returns the `TriggerNewTTLJobRequest` object if command type is 'trigger_ttl_job',
// otherwise, (nil, false) will be returned
func (r *CmdRequest) GetTriggerTTLJobRequest() (*TriggerNewTTLJobRequest, bool) {
if r.CmdType != ttlCmdTypeTriggerTTLJob {
return nil, false
}
var req TriggerNewTTLJobRequest
if err := json.Unmarshal(r.Data, &req); err != nil {
return nil, false
}
return &req, true
}
type cmdResponse struct {
RequestID string `json:"request_id"`
ErrorMessage string `json:"error_message"`
Data json.RawMessage `json:"data"`
}
// TriggerNewTTLJobRequest is the command detail to trigger a TTL job
type TriggerNewTTLJobRequest struct {
DBName string `json:"db_name"`
TableName string `json:"table_name"`
}
// TriggerNewTTLJobTableResult is the table detail of `TriggerNewTTLJobResponse`
type TriggerNewTTLJobTableResult struct {
TableID int64 `json:"table_id"`
DBName string `json:"db_name"`
TableName string `json:"table_name"`
PartitionName string `json:"partition_name,omitempty"`
JobID string `json:"job_id"`
ErrorMessage string `json:"error_message"`
}
// TriggerNewTTLJobResponse is the response detail for trigger_ttl_job command
type TriggerNewTTLJobResponse struct {
TableResult []*TriggerNewTTLJobTableResult `json:"table_result"`
}
// CommandClient is an interface used to send and response command of TTL jobs
type CommandClient interface {
// Command sends a command and waits for response. The first value of the return is the requestID, it always not empty.
Command(ctx context.Context, cmdType string, obj interface{}, response interface{}) (string, error)
// WatchCommand watches the commands that are sent
WatchCommand(ctx context.Context) <-chan *CmdRequest
// TakeCommand takes a command to ensure only one can handle the command.
// If the first return value is true, it means you have taken the command successfully, and you should call `ResponseCommand`
// after processed the command. Otherwise, you should not process this command because it is not belong to you.
TakeCommand(ctx context.Context, reqID string) (bool, error)
// ResponseCommand responses the result of the command. `TakeCommand` must be called first before `ResponseCommand`
// obj is the response object to the sender, if obj is an error, the sender will receive an error too.
ResponseCommand(ctx context.Context, reqID string, obj interface{}) error
}
// TriggerNewTTLJob triggers a new TTL job
func TriggerNewTTLJob(ctx context.Context, cli CommandClient, dbName, tableName string) (*TriggerNewTTLJobResponse, error) {
var resp TriggerNewTTLJobResponse
_, err := cli.Command(ctx, ttlCmdTypeTriggerTTLJob, &TriggerNewTTLJobRequest{
DBName: dbName,
TableName: tableName,
}, &resp)
if err != nil {
return nil, err
}
return &resp, nil
}
type etcdClient struct {
etcdCli *clientv3.Client
}
// NewEtcdCommandClient creates a client with etcd
func NewEtcdCommandClient(etcdCli *clientv3.Client) CommandClient {
return &etcdClient{
etcdCli: etcdCli,
}
}
func (c *etcdClient) sendCmd(ctx context.Context, cmdType string, obj interface{}) (string, error) {
reqID := uuid.New().String()
data, err := json.Marshal(obj)
if err != nil {
return reqID, err
}
requestJSON, err := json.Marshal(&CmdRequest{
RequestID: reqID,
CmdType: cmdType,
Data: data,
})
if err != nil {
return reqID, err
}
lease, err := c.etcdCli.Grant(ctx, ttlCmdKeyLeaseSeconds)
if err != nil {
return reqID, err
}
if _, err = c.etcdCli.Put(ctx, ttlCmdKeyRequestPrefix+reqID, string(requestJSON), clientv3.WithLease(lease.ID)); err != nil {
return reqID, err
}
return reqID, nil
}
func (c *etcdClient) waitCmdResponse(ctx context.Context, reqID string, obj interface{}) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds))
defer cancel()
key := ttlCmdKeyResponsePrefix + reqID
ch := c.etcdCli.Watch(ctx, key)
ticker := time.NewTimer(time.Second)
defer ticker.Stop()
var respData []byte
loop:
for {
select {
case <-ticker.C:
response, err := c.etcdCli.Get(ctx, key)
if err != nil {
return err
}
if len(response.Kvs) > 0 {
respData = response.Kvs[0].Value
break loop
}
case resp := <-ch:
for _, event := range resp.Events {
if event.Type == clientv3.EventTypePut {
respData = event.Kv.Value
break loop
}
}
}
}
var cmdResp cmdResponse
if err := json.Unmarshal(respData, &cmdResp); err != nil {
return err
}
if cmdResp.ErrorMessage != "" {
return errors.New(cmdResp.ErrorMessage)
}
return json.Unmarshal(cmdResp.Data, obj)
}
func (c *etcdClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
requestID, err := c.sendCmd(ctx, cmdType, request)
if err != nil {
return requestID, err
}
return requestID, c.waitCmdResponse(ctx, requestID, &response)
}
func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error) {
resp, err := c.etcdCli.Delete(ctx, ttlCmdKeyRequestPrefix+reqID)
if err != nil {
return false, err
}
return resp.Deleted > 0, nil
}
func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj interface{}) error {
resp := &cmdResponse{
RequestID: reqID,
}
if err, ok := obj.(error); ok {
resp.ErrorMessage = err.Error()
} else {
data, err := json.Marshal(obj)
if err != nil {
return err
}
resp.Data = data
}
respJSON, err := json.Marshal(resp)
if err != nil {
return err
}
lease, err := c.etcdCli.Grant(ctx, ttlCmdKeyLeaseSeconds)
if err != nil {
return err
}
_, err = c.etcdCli.Put(ctx, ttlCmdKeyResponsePrefix+reqID, string(respJSON), clientv3.WithLease(lease.ID))
return err
}
func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
ch := make(chan *CmdRequest)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(ch)
}()
etcdCh := c.etcdCli.Watch(ctx, ttlCmdKeyRequestPrefix, clientv3.WithPrefix())
for resp := range etcdCh {
for _, event := range resp.Events {
if event.Type != clientv3.EventTypePut {
continue
}
var request CmdRequest
if err := json.Unmarshal(event.Kv.Value, &request); err != nil {
logutil.BgLogger().Error(
"failed to parse ttl cmd payload",
zap.Error(err),
zap.ByteString("key", event.Kv.Key),
zap.ByteString("value", event.Kv.Value),
)
}
select {
case ch <- &request:
case <-ctx.Done():
return
}
}
}
}()
return ch
}
type mockClient struct {
sync.Mutex
store map[string]interface{}
watchers []chan *CmdRequest
}
// NewMockCommandClient creates a mock client
func NewMockCommandClient() CommandClient {
return &mockClient{
store: make(map[string]interface{}),
watchers: make([]chan *CmdRequest, 0, 1),
}
}
func (c *mockClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds))
defer cancel()
reqID, err := c.sendCmd(ctx, cmdType, request)
if err != nil {
return reqID, err
}
responseKey := ttlCmdKeyResponsePrefix + reqID
for ctx.Err() == nil {
c.Lock()
val, ok := c.store[responseKey]
c.Unlock()
if !ok {
continue
}
res, ok := val.(*cmdResponse)
if !ok {
return reqID, errors.New("response cannot be casted to *cmdResponse")
}
if res.ErrorMessage != "" {
return reqID, errors.New(res.ErrorMessage)
}
if err = json.Unmarshal(res.Data, response); err != nil {
return reqID, err
}
return reqID, nil
}
return reqID, ctx.Err()
}
func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interface{}) (string, error) {
reqID := uuid.New().String()
data, err := json.Marshal(request)
if err != nil {
return reqID, err
}
req := &CmdRequest{
RequestID: reqID,
CmdType: cmdType,
Data: data,
}
c.Lock()
defer c.Unlock()
key := ttlCmdKeyRequestPrefix + reqID
c.store[key] = req
for _, ch := range c.watchers {
select {
case <-ctx.Done():
return reqID, ctx.Err()
case ch <- req:
default:
return reqID, errors.New("watcher channel is blocked")
}
}
return reqID, nil
}
func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error) {
c.Lock()
defer c.Unlock()
key := ttlCmdKeyRequestPrefix + reqID
if _, ok := c.store[key]; ok {
delete(c.store, key)
return true, nil
}
return false, nil
}
func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interface{}) error {
c.Lock()
defer c.Unlock()
resp := &cmdResponse{
RequestID: reqID,
}
if respErr, ok := obj.(error); ok {
resp.ErrorMessage = respErr.Error()
} else {
jsonData, err := json.Marshal(obj)
if err != nil {
return err
}
resp.Data = jsonData
}
c.store[ttlCmdKeyResponsePrefix+reqID] = resp
return nil
}
func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
c.Lock()
defer c.Unlock()
ch := make(chan *CmdRequest, 16+len(c.store))
c.watchers = append(c.watchers, ch)
for key, val := range c.store {
if strings.HasPrefix(key, ttlCmdKeyRequestPrefix) {
if req, ok := val.(*CmdRequest); ok {
ch <- req
}
}
}
go func() {
<-ctx.Done()
c.Lock()
defer c.Unlock()
for i, chItem := range c.watchers {
if chItem == ch {
c.watchers = append(c.watchers[:i], c.watchers[i+1:]...)
break
}
}
close(ch)
}()
return ch
}

139
ttl/client/command_test.go Normal file
View File

@ -0,0 +1,139 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/tests/v3/integration"
)
type mockCmdRequest struct {
V1 string `json:"v_1"`
V2 int `json:"v_2"`
}
type mockCmdResponse struct {
V3 string `json:"v_3"`
V4 int `json:"v_4"`
}
func TestCommandClient(t *testing.T) {
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
etcd := cluster.RandClient()
etcdCli := NewEtcdCommandClient(etcd)
mockCli := NewMockCommandClient()
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
defer cancel()
resCh := make(chan *mockCmdResponse)
defer close(resCh)
for _, cli := range []CommandClient{etcdCli, mockCli} {
var sendRequestID, recvRequestID string
// send command
go func() {
var err error
var res mockCmdResponse
defer func() {
resCh <- &res
}()
req := &mockCmdRequest{V1: "1", V2: 2}
sendRequestID, err = cli.Command(ctx, "type1", req, &res)
require.NoError(t, err)
require.NotEmpty(t, sendRequestID)
}()
// check the received command and send response
watcher := cli.WatchCommand(ctx)
select {
case cmd, ok := <-watcher:
require.True(t, ok)
require.NotNil(t, cmd)
require.Equal(t, "type1", cmd.CmdType)
recvRequestID = cmd.RequestID
var gotReq mockCmdRequest
require.NoError(t, json.Unmarshal(cmd.Data, &gotReq))
require.Equal(t, "1", gotReq.V1)
require.Equal(t, 2, gotReq.V2)
ok, err := cli.TakeCommand(ctx, recvRequestID)
require.NoError(t, err)
require.True(t, ok)
require.NoError(t, cli.ResponseCommand(ctx, cmd.RequestID, &mockCmdResponse{V3: "3", V4: 4}))
case <-ctx.Done():
require.FailNow(t, ctx.Err().Error())
}
// check received response
select {
case res := <-resCh:
require.NotNil(t, res)
require.Equal(t, recvRequestID, sendRequestID)
require.Equal(t, "3", res.V3)
require.Equal(t, 4, res.V4)
case <-ctx.Done():
require.FailNow(t, ctx.Err().Error())
}
// Take command again should return false, nil
ok, err := cli.TakeCommand(ctx, recvRequestID)
require.NoError(t, err)
require.False(t, ok)
// send command and expect an error
go func() {
var err error
var res mockCmdResponse
defer func() {
resCh <- &res
}()
req := &mockCmdRequest{V1: "1", V2: 2}
sendRequestID, err = cli.Command(ctx, "type1", req, &res)
require.NotEmpty(t, sendRequestID)
require.EqualError(t, err, "mockErr")
}()
// response an error
watcher = cli.WatchCommand(ctx)
select {
case cmd, ok := <-watcher:
require.True(t, ok)
require.NotNil(t, cmd)
_, err = cli.TakeCommand(ctx, cmd.RequestID)
require.NoError(t, err)
require.NoError(t, cli.ResponseCommand(ctx, cmd.RequestID, errors.New("mockErr")))
case <-ctx.Done():
require.FailNow(t, ctx.Err().Error())
}
// wait send goroutine exit
select {
case <-resCh:
case <-ctx.Done():
require.FailNow(t, ctx.Err().Error())
}
}
}

View File

@ -19,6 +19,7 @@ go_library(
"//sessionctx",
"//sessionctx/variable",
"//ttl/cache",
"//ttl/client",
"//ttl/metrics",
"//ttl/session",
"//ttl/sqlbuilder",
@ -32,6 +33,7 @@ go_library(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
@ -64,6 +66,7 @@ go_test(
"//statistics/handle",
"//testkit",
"//ttl/cache",
"//ttl/client",
"//ttl/session",
"//types",
"//util/chunk",

View File

@ -16,16 +16,21 @@ package ttlworker
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/client"
"github.com/pingcap/tidb/ttl/metrics"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/timeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
)
@ -68,7 +73,8 @@ type JobManager struct {
// id is the ddl id of this instance
id string
store kv.Storage
store kv.Storage
cmdCli client.CommandClient
// the workers are shared between the loop goroutine and other sessions (e.g. manually resize workers through
// setting variables)
@ -91,7 +97,7 @@ type JobManager struct {
}
// NewJobManager creates a new ttl job manager
func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *JobManager) {
func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *clientv3.Client) (manager *JobManager) {
manager = &JobManager{}
manager.id = id
manager.store = store
@ -105,6 +111,12 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *
manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval())
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())
if etcdCli != nil {
manager.cmdCli = client.NewEtcdCommandClient(etcdCli)
} else {
manager.cmdCli = client.NewMockCommandClient()
}
return
}
@ -127,6 +139,8 @@ func (m *JobManager) jobLoop() error {
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
cmdWatcher := m.cmdCli.WatchCommand(m.ctx)
m.resizeWorkersWithSysVar()
for {
m.reportMetrics()
now := se.Now()
@ -153,30 +167,147 @@ func (m *JobManager) jobLoop() error {
cancel()
case <-updateScanTaskStateTicker:
if m.updateTaskState() {
m.checkFinishedJob(se, now)
m.rescheduleJobs(se, now)
}
case <-m.notifyStateCh:
if m.updateTaskState() {
m.checkFinishedJob(se, now)
m.rescheduleJobs(se, now)
}
case <-jobCheckTicker:
m.checkFinishedJob(se, now)
m.checkNotOwnJob()
case <-resizeWorkersTicker:
err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load()))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err))
}
err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load()))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err))
}
m.resizeWorkersWithSysVar()
case <-scheduleTicker:
m.rescheduleJobs(se, now)
case cmd, ok := <-cmdWatcher:
if !ok {
if m.ctx.Err() != nil {
return nil
}
logutil.BgLogger().Warn("The TTL cmd watcher is closed unexpectedly, re-watch it again")
cmdWatcher = m.cmdCli.WatchCommand(m.ctx)
continue
}
if triggerJobCmd, ok := cmd.GetTriggerTTLJobRequest(); ok {
m.triggerTTLJob(cmd.RequestID, triggerJobCmd, se)
m.rescheduleJobs(se, now)
}
}
}
}
func (m *JobManager) resizeWorkersWithSysVar() {
err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load()))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err))
}
err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load()))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err))
}
}
func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJobRequest, se session.Session) {
if len(m.runningJobs) > 0 {
// sleep 2 seconds to make sure the TiDB without any job running in it to have a higher priority to take a new job.
time.Sleep(2 * time.Second)
}
ok, err := m.cmdCli.TakeCommand(m.ctx, requestID)
if err != nil {
logutil.BgLogger().Error("failed to take TTL trigger job command",
zap.String("requestID", requestID),
zap.String("database", cmd.DBName),
zap.String("table", cmd.TableName))
return
}
if !ok {
return
}
logutil.BgLogger().Info("Get a command to trigger a new TTL job",
zap.String("requestID", requestID),
zap.String("database", cmd.DBName),
zap.String("table", cmd.TableName))
responseErr := func(err error) {
terror.Log(m.cmdCli.ResponseCommand(m.ctx, requestID, err))
}
if err = m.infoSchemaCache.Update(se); err != nil {
responseErr(err)
return
}
if err = m.tableStatusCache.Update(m.ctx, se); err != nil {
responseErr(err)
return
}
var tables []*cache.PhysicalTable
for _, tbl := range m.infoSchemaCache.Tables {
if tbl.Schema.L == strings.ToLower(cmd.DBName) && tbl.Name.L == strings.ToLower(cmd.TableName) {
tables = append(tables, tbl)
}
}
if len(tables) == 0 {
responseErr(errors.Errorf("table %s.%s not exists", cmd.DBName, cmd.TableName))
return
}
now := time.Now()
tableResults := make([]*client.TriggerNewTTLJobTableResult, 0, len(tables))
allError := true
var firstError error
for _, ttlTbl := range tables {
tblResult := &client.TriggerNewTTLJobTableResult{
TableID: ttlTbl.ID,
DBName: cmd.DBName,
TableName: cmd.TableName,
PartitionName: ttlTbl.Partition.O,
}
job, err := m.lockNewJob(m.ctx, se, ttlTbl, now, true)
if err != nil {
firstError = err
tblResult.ErrorMessage = err.Error()
tableResults = append(tableResults, tblResult)
continue
}
allError = false
if job != nil {
m.appendJob(job)
tblResult.JobID = job.id
tableResults = append(tableResults, tblResult)
}
}
if allError {
responseErr(firstError)
return
}
terror.Log(m.cmdCli.ResponseCommand(m.ctx, requestID, &client.TriggerNewTTLJobResponse{
TableResult: tableResults,
}))
tableResultsJSON, _ := json.Marshal(tableResults)
logutil.BgLogger().Info("Done to trigger a new TTL job",
zap.String("requestID", requestID),
zap.String("database", cmd.DBName),
zap.String("table", cmd.TableName),
zap.ByteString("tableResults", tableResultsJSON),
)
}
func (m *JobManager) reportMetrics() {
var runningJobs, cancellingJobs float64
for _, job := range m.runningJobs {
@ -395,7 +526,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
table := newJobTables[0]
newJobTables = newJobTables[1:]
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
job, err = m.lockNewJob(m.ctx, se, table, now)
job, err = m.lockNewJob(m.ctx, se, table, now, false)
if job != nil {
logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)
@ -487,7 +618,7 @@ tblLoop:
}
status := m.tableStatusCache.Tables[table.ID]
ok := m.couldTrySchedule(status, table, now)
ok := m.couldTrySchedule(status, table, now, false)
if ok {
tables = append(tables, table)
}
@ -497,7 +628,7 @@ tblLoop:
}
// couldTrySchedule returns whether a table should be tried to run TTL
func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time) bool {
func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) bool {
if tableStatus == nil {
// if the table status hasn't been created, return true
return true
@ -518,7 +649,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac
return false
}
if tableStatus.LastJobStartTime.IsZero() {
if ignoreScheduleInterval || tableStatus.LastJobStartTime.IsZero() {
return true
}
@ -530,7 +661,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac
// occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new
// localJob and return it.
// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances.
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) {
func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) {
var expireTime time.Time
err := se.RunInTxn(ctx, func() error {
@ -559,7 +690,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return err
}
if !m.couldTrySchedule(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now) {
if !m.couldTrySchedule(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, ignoreScheduleInterval) {
return errors.New("couldn't schedule ttl job")
}
@ -693,3 +824,8 @@ func (m *JobManager) CancelJob(ctx context.Context, jobID string) error {
return errors.Errorf("cannot find the job with id: %s", jobID)
}
// GetCommandCli returns the command client
func (m *JobManager) GetCommandCli() client.CommandClient {
return m.cmdCli
}

View File

@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/client"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util/logutil"
@ -62,11 +63,11 @@ func TestParallelLockNewJob(t *testing.T) {
testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay), JobInterval: duration.Duration{Hour: 1}}}}
// simply lock a new job
m := ttlworker.NewJobManager("test-id", nil, store)
m := ttlworker.NewJobManager("test-id", nil, store, nil)
m.InfoSchemaCache().Tables[testTable.ID] = testTable
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false)
require.NoError(t, err)
job.Finish(se, time.Now())
@ -85,11 +86,11 @@ func TestParallelLockNewJob(t *testing.T) {
jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j)
wg.Add(1)
go func() {
m := ttlworker.NewJobManager(jobManagerID, nil, store)
m := ttlworker.NewJobManager(jobManagerID, nil, store, nil)
m.InfoSchemaCache().Tables[testTable.ID] = testTable
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, now)
job, err := m.LockNewJob(context.Background(), se, testTable, now, false)
if err == nil {
successCounter.Add(1)
successJob = job
@ -117,10 +118,10 @@ func TestFinishJob(t *testing.T) {
tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)")
// finish with error
m := ttlworker.NewJobManager("test-id", nil, store)
m := ttlworker.NewJobManager("test-id", nil, store, nil)
m.InfoSchemaCache().Tables[testTable.ID] = testTable
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false)
require.NoError(t, err)
job.SetScanErr(errors.New(`"'an error message contains both single and double quote'"`))
job.Finish(se, time.Now())
@ -187,6 +188,72 @@ func TestTTLAutoAnalyze(t *testing.T) {
require.True(t, h.HandleAutoAnalyze(is))
}
func TestTriggerTTLJob(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()
store, do := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, t timestamp) TTL=`t` + INTERVAL 1 DAY")
tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
tblID := tbl.Meta().ID
require.NoError(t, err)
// make sure the table had run a job one time to make the test stable
cli := do.TTLJobManager().GetCommandCli()
_, _ = client.TriggerNewTTLJob(ctx, cli, "test", "t")
r := tk.MustQuery("select last_job_id, current_job_id from mysql.tidb_ttl_table_status where table_id=?", tblID)
require.Equal(t, 1, len(r.Rows()))
waitTTLJobFinished(t, tk, tblID)
now := time.Now()
nowDateStr := now.Format("2006-01-02 15:04:05.999999")
expire := now.Add(-time.Hour * 25)
expreDateStr := expire.Format("2006-01-02 15:04:05.999999")
tk.MustExec("insert into t values(1, ?)", expreDateStr)
tk.MustExec("insert into t values(2, ?)", nowDateStr)
tk.MustExec("insert into t values(3, ?)", expreDateStr)
tk.MustExec("insert into t values(4, ?)", nowDateStr)
res, err := client.TriggerNewTTLJob(ctx, cli, "test", "t")
require.NoError(t, err)
require.Equal(t, 1, len(res.TableResult))
tableResult := res.TableResult[0]
require.Equal(t, tblID, tableResult.TableID)
require.NotEmpty(t, tableResult.JobID)
require.Equal(t, "test", tableResult.DBName)
require.Equal(t, "t", tableResult.TableName)
require.Equal(t, "", tableResult.ErrorMessage)
require.Equal(t, "", tableResult.PartitionName)
waitTTLJobFinished(t, tk, tblID)
tk.MustQuery("select id from t order by id asc").Check(testkit.Rows("2", "4"))
}
func waitTTLJobFinished(t *testing.T, tk *testkit.TestKit, tableID int64) {
start := time.Now()
for time.Since(start) < time.Minute {
time.Sleep(time.Second)
r := tk.MustQuery("select last_job_id, current_job_id from mysql.tidb_ttl_table_status where table_id=?", tableID)
rows := r.Rows()
if len(rows) == 0 {
continue
}
if rows[0][0] == "<nil>" {
continue
}
if rows[0][1] != "<nil>" {
continue
}
return
}
require.FailNow(t, "timeout")
}
func TestTTLJobDisable(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second))
defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval")

View File

@ -145,8 +145,8 @@ func (m *JobManager) SetScanWorkers4Test(workers []worker) {
type TTLJob = ttlJob
// LockNewJob is an exported version of lockNewJob for test
func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*TTLJob, error) {
return m.lockNewJob(ctx, se, table, now)
func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*TTLJob, error) {
return m.lockNewJob(ctx, se, table, now, ignoreScheduleInterval)
}
// RunningJobs returns the running jobs inside ttl job manager
@ -178,7 +178,7 @@ func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob {
func TestReadyForNewJobTables(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
m := NewJobManager("test-id", nil, nil)
m := NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
se := newMockSession(t, tbl)
@ -312,9 +312,8 @@ func TestLockNewTable(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
m := NewJobManager("test-id", newMockSessionPool(t), nil)
m := NewJobManager("test-id", newMockSessionPool(t), nil, nil)
m.infoSchemaCache.Tables[c.table.ID] = c.table
sqlCounter := 0
se := newMockSession(t)
se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) {
@ -329,7 +328,7 @@ func TestLockNewTable(t *testing.T) {
}
se.evalExpire = now
job, err := m.lockNewJob(context.Background(), se, c.table, now)
job, err := m.lockNewJob(context.Background(), se, c.table, now, false)
if c.hasJob {
assert.NotNil(t, job)
} else {
@ -352,7 +351,7 @@ func TestResizeWorkers(t *testing.T) {
scanWorker1.Start()
scanWorker2 := newMockScanWorker(t)
m := NewJobManager("test-id", nil, nil)
m := NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
@ -371,7 +370,7 @@ func TestResizeWorkers(t *testing.T) {
scanWorker2 = newMockScanWorker(t)
scanWorker2.Start()
m = NewJobManager("test-id", nil, nil)
m = NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
@ -387,7 +386,7 @@ func TestResizeWorkers(t *testing.T) {
scanWorker2 = newMockScanWorker(t)
scanWorker2.Start()
m = NewJobManager("test-id", nil, nil)
m = NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
@ -406,7 +405,7 @@ func TestLocalJobs(t *testing.T) {
tbl1.ID = 1
tbl2 := newMockTTLTbl(t, "t2")
tbl2.ID = 2
m := NewJobManager("test-id", nil, nil)
m := NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl1, tbl2)
m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1", ctx: context.Background()}, {tbl: tbl2, id: "2", ctx: context.Background()}}
@ -433,7 +432,7 @@ func TestRescheduleJobs(t *testing.T) {
scanWorker2.Start()
scanWorker2.setOneRowResult(tbl, 2022)
m := NewJobManager("test-id", nil, nil)
m := NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
@ -487,7 +486,7 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) {
scanWorker2.Start()
scanWorker2.setOneRowResult(tbl, 2022)
m := NewJobManager("test-id", nil, nil)
m := NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
@ -533,7 +532,7 @@ func TestCheckFinishedJob(t *testing.T) {
se := newMockSession(t, tbl)
// cancelled job will be regarded as finished
m := NewJobManager("test-id", nil, nil)
m := NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusCancelled)}
m.checkFinishedJob(se, se.Now())
@ -543,7 +542,7 @@ func TestCheckFinishedJob(t *testing.T) {
finishedStatistics := &ttlStatistics{}
finishedStatistics.TotalRows.Store(1)
finishedStatistics.SuccessRows.Store(1)
m = NewJobManager("test-id", nil, nil)
m = NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusRunning)}
m.runningJobs[0].statistics = finishedStatistics
@ -572,7 +571,7 @@ func TestCheckFinishedJob(t *testing.T) {
// check timeout job
now = se.Now()
createTime := now.Add(-20 * time.Hour)
m = NewJobManager("test-id", nil, nil)
m = NewJobManager("test-id", nil, nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{
{