diff --git a/domain/domain.go b/domain/domain.go index 06fd9ff4a6..5f6b0ce3a0 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -2483,7 +2483,7 @@ func (do *Domain) StartTTLJobManager() { logutil.BgLogger().Info("ttlJobManager exited.") }() - ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store) + ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient) do.ttlJobManager = ttlJobManager ttlJobManager.Start() diff --git a/ttl/client/BUILD.bazel b/ttl/client/BUILD.bazel new file mode 100644 index 0000000000..6f2c7acaae --- /dev/null +++ b/ttl/client/BUILD.bazel @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "client", + srcs = ["command.go"], + importpath = "github.com/pingcap/tidb/ttl/client", + visibility = ["//visibility:public"], + deps = [ + "//util/logutil", + "@com_github_google_uuid//:uuid", + "@com_github_pingcap_errors//:errors", + "@io_etcd_go_etcd_client_v3//:client", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "client_test", + srcs = ["command_test.go"], + embed = [":client"], + deps = [ + "@com_github_pingcap_errors//:errors", + "@com_github_stretchr_testify//require", + "@io_etcd_go_etcd_tests_v3//integration", + ], +) diff --git a/ttl/client/command.go b/ttl/client/command.go new file mode 100644 index 0000000000..bad2d75635 --- /dev/null +++ b/ttl/client/command.go @@ -0,0 +1,419 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "encoding/json" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + ttlCmdKeyLeaseSeconds int64 = 60 + ttlCmdKeyRequestPrefix = "/tidb/ttl/cmd/req/" + ttlCmdKeyResponsePrefix = "/tidb/ttl/cmd/resp/" + ttlCmdTypeTriggerTTLJob = "trigger_ttl_job" +) + +// CmdRequest is the request for a TTL command +type CmdRequest struct { + RequestID string `json:"request_id"` + CmdType string `json:"cmd_type"` + Data json.RawMessage `json:"data"` +} + +// GetTriggerTTLJobRequest returns the `TriggerNewTTLJobRequest` object if command type is 'trigger_ttl_job', +// otherwise, (nil, false) will be returned +func (r *CmdRequest) GetTriggerTTLJobRequest() (*TriggerNewTTLJobRequest, bool) { + if r.CmdType != ttlCmdTypeTriggerTTLJob { + return nil, false + } + + var req TriggerNewTTLJobRequest + if err := json.Unmarshal(r.Data, &req); err != nil { + return nil, false + } + return &req, true +} + +type cmdResponse struct { + RequestID string `json:"request_id"` + ErrorMessage string `json:"error_message"` + Data json.RawMessage `json:"data"` +} + +// TriggerNewTTLJobRequest is the command detail to trigger a TTL job +type TriggerNewTTLJobRequest struct { + DBName string `json:"db_name"` + TableName string `json:"table_name"` +} + +// TriggerNewTTLJobTableResult is the table detail of `TriggerNewTTLJobResponse` +type TriggerNewTTLJobTableResult struct { + TableID int64 `json:"table_id"` + DBName string `json:"db_name"` + TableName string `json:"table_name"` + PartitionName string `json:"partition_name,omitempty"` + JobID string `json:"job_id"` + ErrorMessage string `json:"error_message"` +} + +// TriggerNewTTLJobResponse is the response detail for trigger_ttl_job command +type TriggerNewTTLJobResponse struct { + TableResult []*TriggerNewTTLJobTableResult `json:"table_result"` +} + +// CommandClient is an interface used to send and response command of TTL jobs +type CommandClient interface { + // Command sends a command and waits for response. The first value of the return is the requestID, it always not empty. + Command(ctx context.Context, cmdType string, obj interface{}, response interface{}) (string, error) + // WatchCommand watches the commands that are sent + WatchCommand(ctx context.Context) <-chan *CmdRequest + // TakeCommand takes a command to ensure only one can handle the command. + // If the first return value is true, it means you have taken the command successfully, and you should call `ResponseCommand` + // after processed the command. Otherwise, you should not process this command because it is not belong to you. + TakeCommand(ctx context.Context, reqID string) (bool, error) + // ResponseCommand responses the result of the command. `TakeCommand` must be called first before `ResponseCommand` + // obj is the response object to the sender, if obj is an error, the sender will receive an error too. + ResponseCommand(ctx context.Context, reqID string, obj interface{}) error +} + +// TriggerNewTTLJob triggers a new TTL job +func TriggerNewTTLJob(ctx context.Context, cli CommandClient, dbName, tableName string) (*TriggerNewTTLJobResponse, error) { + var resp TriggerNewTTLJobResponse + _, err := cli.Command(ctx, ttlCmdTypeTriggerTTLJob, &TriggerNewTTLJobRequest{ + DBName: dbName, + TableName: tableName, + }, &resp) + + if err != nil { + return nil, err + } + return &resp, nil +} + +type etcdClient struct { + etcdCli *clientv3.Client +} + +// NewEtcdCommandClient creates a client with etcd +func NewEtcdCommandClient(etcdCli *clientv3.Client) CommandClient { + return &etcdClient{ + etcdCli: etcdCli, + } +} + +func (c *etcdClient) sendCmd(ctx context.Context, cmdType string, obj interface{}) (string, error) { + reqID := uuid.New().String() + data, err := json.Marshal(obj) + if err != nil { + return reqID, err + } + + requestJSON, err := json.Marshal(&CmdRequest{ + RequestID: reqID, + CmdType: cmdType, + Data: data, + }) + if err != nil { + return reqID, err + } + + lease, err := c.etcdCli.Grant(ctx, ttlCmdKeyLeaseSeconds) + if err != nil { + return reqID, err + } + + if _, err = c.etcdCli.Put(ctx, ttlCmdKeyRequestPrefix+reqID, string(requestJSON), clientv3.WithLease(lease.ID)); err != nil { + return reqID, err + } + + return reqID, nil +} + +func (c *etcdClient) waitCmdResponse(ctx context.Context, reqID string, obj interface{}) error { + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds)) + defer cancel() + + key := ttlCmdKeyResponsePrefix + reqID + ch := c.etcdCli.Watch(ctx, key) + ticker := time.NewTimer(time.Second) + defer ticker.Stop() + + var respData []byte +loop: + for { + select { + case <-ticker.C: + response, err := c.etcdCli.Get(ctx, key) + if err != nil { + return err + } + + if len(response.Kvs) > 0 { + respData = response.Kvs[0].Value + break loop + } + case resp := <-ch: + for _, event := range resp.Events { + if event.Type == clientv3.EventTypePut { + respData = event.Kv.Value + break loop + } + } + } + } + + var cmdResp cmdResponse + if err := json.Unmarshal(respData, &cmdResp); err != nil { + return err + } + + if cmdResp.ErrorMessage != "" { + return errors.New(cmdResp.ErrorMessage) + } + + return json.Unmarshal(cmdResp.Data, obj) +} + +func (c *etcdClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) { + requestID, err := c.sendCmd(ctx, cmdType, request) + if err != nil { + return requestID, err + } + return requestID, c.waitCmdResponse(ctx, requestID, &response) +} + +func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error) { + resp, err := c.etcdCli.Delete(ctx, ttlCmdKeyRequestPrefix+reqID) + if err != nil { + return false, err + } + return resp.Deleted > 0, nil +} + +func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj interface{}) error { + resp := &cmdResponse{ + RequestID: reqID, + } + + if err, ok := obj.(error); ok { + resp.ErrorMessage = err.Error() + } else { + data, err := json.Marshal(obj) + if err != nil { + return err + } + resp.Data = data + } + + respJSON, err := json.Marshal(resp) + if err != nil { + return err + } + + lease, err := c.etcdCli.Grant(ctx, ttlCmdKeyLeaseSeconds) + if err != nil { + return err + } + + _, err = c.etcdCli.Put(ctx, ttlCmdKeyResponsePrefix+reqID, string(respJSON), clientv3.WithLease(lease.ID)) + return err +} + +func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest { + ch := make(chan *CmdRequest) + go func() { + ctx, cancel := context.WithCancel(ctx) + defer func() { + cancel() + close(ch) + }() + + etcdCh := c.etcdCli.Watch(ctx, ttlCmdKeyRequestPrefix, clientv3.WithPrefix()) + for resp := range etcdCh { + for _, event := range resp.Events { + if event.Type != clientv3.EventTypePut { + continue + } + + var request CmdRequest + if err := json.Unmarshal(event.Kv.Value, &request); err != nil { + logutil.BgLogger().Error( + "failed to parse ttl cmd payload", + zap.Error(err), + zap.ByteString("key", event.Kv.Key), + zap.ByteString("value", event.Kv.Value), + ) + } + + select { + case ch <- &request: + case <-ctx.Done(): + return + } + } + } + }() + + return ch +} + +type mockClient struct { + sync.Mutex + store map[string]interface{} + watchers []chan *CmdRequest +} + +// NewMockCommandClient creates a mock client +func NewMockCommandClient() CommandClient { + return &mockClient{ + store: make(map[string]interface{}), + watchers: make([]chan *CmdRequest, 0, 1), + } +} + +func (c *mockClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds)) + defer cancel() + + reqID, err := c.sendCmd(ctx, cmdType, request) + if err != nil { + return reqID, err + } + + responseKey := ttlCmdKeyResponsePrefix + reqID + for ctx.Err() == nil { + c.Lock() + val, ok := c.store[responseKey] + c.Unlock() + + if !ok { + continue + } + + res, ok := val.(*cmdResponse) + if !ok { + return reqID, errors.New("response cannot be casted to *cmdResponse") + } + + if res.ErrorMessage != "" { + return reqID, errors.New(res.ErrorMessage) + } + + if err = json.Unmarshal(res.Data, response); err != nil { + return reqID, err + } + return reqID, nil + } + return reqID, ctx.Err() +} + +func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interface{}) (string, error) { + reqID := uuid.New().String() + data, err := json.Marshal(request) + if err != nil { + return reqID, err + } + + req := &CmdRequest{ + RequestID: reqID, + CmdType: cmdType, + Data: data, + } + + c.Lock() + defer c.Unlock() + key := ttlCmdKeyRequestPrefix + reqID + c.store[key] = req + for _, ch := range c.watchers { + select { + case <-ctx.Done(): + return reqID, ctx.Err() + case ch <- req: + default: + return reqID, errors.New("watcher channel is blocked") + } + } + return reqID, nil +} + +func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error) { + c.Lock() + defer c.Unlock() + key := ttlCmdKeyRequestPrefix + reqID + if _, ok := c.store[key]; ok { + delete(c.store, key) + return true, nil + } + return false, nil +} + +func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interface{}) error { + c.Lock() + defer c.Unlock() + + resp := &cmdResponse{ + RequestID: reqID, + } + + if respErr, ok := obj.(error); ok { + resp.ErrorMessage = respErr.Error() + } else { + jsonData, err := json.Marshal(obj) + if err != nil { + return err + } + resp.Data = jsonData + } + + c.store[ttlCmdKeyResponsePrefix+reqID] = resp + return nil +} + +func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest { + c.Lock() + defer c.Unlock() + ch := make(chan *CmdRequest, 16+len(c.store)) + c.watchers = append(c.watchers, ch) + for key, val := range c.store { + if strings.HasPrefix(key, ttlCmdKeyRequestPrefix) { + if req, ok := val.(*CmdRequest); ok { + ch <- req + } + } + } + go func() { + <-ctx.Done() + c.Lock() + defer c.Unlock() + for i, chItem := range c.watchers { + if chItem == ch { + c.watchers = append(c.watchers[:i], c.watchers[i+1:]...) + break + } + } + close(ch) + }() + return ch +} diff --git a/ttl/client/command_test.go b/ttl/client/command_test.go new file mode 100644 index 0000000000..830137f329 --- /dev/null +++ b/ttl/client/command_test.go @@ -0,0 +1,139 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/tests/v3/integration" +) + +type mockCmdRequest struct { + V1 string `json:"v_1"` + V2 int `json:"v_2"` +} + +type mockCmdResponse struct { + V3 string `json:"v_3"` + V4 int `json:"v_4"` +} + +func TestCommandClient(t *testing.T) { + integration.BeforeTestExternal(t) + + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + etcd := cluster.RandClient() + + etcdCli := NewEtcdCommandClient(etcd) + mockCli := NewMockCommandClient() + + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) + defer cancel() + + resCh := make(chan *mockCmdResponse) + defer close(resCh) + + for _, cli := range []CommandClient{etcdCli, mockCli} { + var sendRequestID, recvRequestID string + + // send command + go func() { + var err error + var res mockCmdResponse + defer func() { + resCh <- &res + }() + req := &mockCmdRequest{V1: "1", V2: 2} + sendRequestID, err = cli.Command(ctx, "type1", req, &res) + require.NoError(t, err) + require.NotEmpty(t, sendRequestID) + }() + + // check the received command and send response + watcher := cli.WatchCommand(ctx) + select { + case cmd, ok := <-watcher: + require.True(t, ok) + require.NotNil(t, cmd) + require.Equal(t, "type1", cmd.CmdType) + recvRequestID = cmd.RequestID + var gotReq mockCmdRequest + require.NoError(t, json.Unmarshal(cmd.Data, &gotReq)) + require.Equal(t, "1", gotReq.V1) + require.Equal(t, 2, gotReq.V2) + ok, err := cli.TakeCommand(ctx, recvRequestID) + require.NoError(t, err) + require.True(t, ok) + require.NoError(t, cli.ResponseCommand(ctx, cmd.RequestID, &mockCmdResponse{V3: "3", V4: 4})) + case <-ctx.Done(): + require.FailNow(t, ctx.Err().Error()) + } + + // check received response + select { + case res := <-resCh: + require.NotNil(t, res) + require.Equal(t, recvRequestID, sendRequestID) + require.Equal(t, "3", res.V3) + require.Equal(t, 4, res.V4) + case <-ctx.Done(): + require.FailNow(t, ctx.Err().Error()) + } + + // Take command again should return false, nil + ok, err := cli.TakeCommand(ctx, recvRequestID) + require.NoError(t, err) + require.False(t, ok) + + // send command and expect an error + go func() { + var err error + var res mockCmdResponse + defer func() { + resCh <- &res + }() + req := &mockCmdRequest{V1: "1", V2: 2} + sendRequestID, err = cli.Command(ctx, "type1", req, &res) + require.NotEmpty(t, sendRequestID) + require.EqualError(t, err, "mockErr") + }() + + // response an error + watcher = cli.WatchCommand(ctx) + select { + case cmd, ok := <-watcher: + require.True(t, ok) + require.NotNil(t, cmd) + _, err = cli.TakeCommand(ctx, cmd.RequestID) + require.NoError(t, err) + require.NoError(t, cli.ResponseCommand(ctx, cmd.RequestID, errors.New("mockErr"))) + case <-ctx.Done(): + require.FailNow(t, ctx.Err().Error()) + } + + // wait send goroutine exit + select { + case <-resCh: + case <-ctx.Done(): + require.FailNow(t, ctx.Err().Error()) + } + } +} diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index be7e20d5a8..61feb7c82b 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//sessionctx", "//sessionctx/variable", "//ttl/cache", + "//ttl/client", "//ttl/metrics", "//ttl/session", "//ttl/sqlbuilder", @@ -32,6 +33,7 @@ go_library( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@io_etcd_go_etcd_client_v3//:client", "@org_golang_x_time//rate", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", @@ -64,6 +66,7 @@ go_test( "//statistics/handle", "//testkit", "//ttl/cache", + "//ttl/client", "//ttl/session", "//types", "//util/chunk", diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index f0d88a6e56..f0ee23af19 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -16,16 +16,21 @@ package ttlworker import ( "context" + "encoding/json" + "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/client" "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/timeutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -68,7 +73,8 @@ type JobManager struct { // id is the ddl id of this instance id string - store kv.Storage + store kv.Storage + cmdCli client.CommandClient // the workers are shared between the loop goroutine and other sessions (e.g. manually resize workers through // setting variables) @@ -91,7 +97,7 @@ type JobManager struct { } // NewJobManager creates a new ttl job manager -func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *JobManager) { +func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *clientv3.Client) (manager *JobManager) { manager = &JobManager{} manager.id = id manager.store = store @@ -105,6 +111,12 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager * manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval()) manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval()) + if etcdCli != nil { + manager.cmdCli = client.NewEtcdCommandClient(etcdCli) + } else { + manager.cmdCli = client.NewMockCommandClient() + } + return } @@ -127,6 +139,8 @@ func (m *JobManager) jobLoop() error { infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval()) tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval()) resizeWorkersTicker := time.Tick(getResizeWorkersInterval()) + cmdWatcher := m.cmdCli.WatchCommand(m.ctx) + m.resizeWorkersWithSysVar() for { m.reportMetrics() now := se.Now() @@ -153,30 +167,147 @@ func (m *JobManager) jobLoop() error { cancel() case <-updateScanTaskStateTicker: if m.updateTaskState() { + m.checkFinishedJob(se, now) m.rescheduleJobs(se, now) } case <-m.notifyStateCh: if m.updateTaskState() { + m.checkFinishedJob(se, now) m.rescheduleJobs(se, now) } case <-jobCheckTicker: m.checkFinishedJob(se, now) m.checkNotOwnJob() case <-resizeWorkersTicker: - err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load())) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err)) - } - err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load())) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err)) - } + m.resizeWorkersWithSysVar() case <-scheduleTicker: m.rescheduleJobs(se, now) + case cmd, ok := <-cmdWatcher: + if !ok { + if m.ctx.Err() != nil { + return nil + } + + logutil.BgLogger().Warn("The TTL cmd watcher is closed unexpectedly, re-watch it again") + cmdWatcher = m.cmdCli.WatchCommand(m.ctx) + continue + } + + if triggerJobCmd, ok := cmd.GetTriggerTTLJobRequest(); ok { + m.triggerTTLJob(cmd.RequestID, triggerJobCmd, se) + m.rescheduleJobs(se, now) + } } } } +func (m *JobManager) resizeWorkersWithSysVar() { + err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load())) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err)) + } + err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load())) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err)) + } +} + +func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJobRequest, se session.Session) { + if len(m.runningJobs) > 0 { + // sleep 2 seconds to make sure the TiDB without any job running in it to have a higher priority to take a new job. + time.Sleep(2 * time.Second) + } + + ok, err := m.cmdCli.TakeCommand(m.ctx, requestID) + if err != nil { + logutil.BgLogger().Error("failed to take TTL trigger job command", + zap.String("requestID", requestID), + zap.String("database", cmd.DBName), + zap.String("table", cmd.TableName)) + return + } + + if !ok { + return + } + + logutil.BgLogger().Info("Get a command to trigger a new TTL job", + zap.String("requestID", requestID), + zap.String("database", cmd.DBName), + zap.String("table", cmd.TableName)) + + responseErr := func(err error) { + terror.Log(m.cmdCli.ResponseCommand(m.ctx, requestID, err)) + } + + if err = m.infoSchemaCache.Update(se); err != nil { + responseErr(err) + return + } + + if err = m.tableStatusCache.Update(m.ctx, se); err != nil { + responseErr(err) + return + } + + var tables []*cache.PhysicalTable + for _, tbl := range m.infoSchemaCache.Tables { + if tbl.Schema.L == strings.ToLower(cmd.DBName) && tbl.Name.L == strings.ToLower(cmd.TableName) { + tables = append(tables, tbl) + } + } + + if len(tables) == 0 { + responseErr(errors.Errorf("table %s.%s not exists", cmd.DBName, cmd.TableName)) + return + } + + now := time.Now() + tableResults := make([]*client.TriggerNewTTLJobTableResult, 0, len(tables)) + allError := true + var firstError error + for _, ttlTbl := range tables { + tblResult := &client.TriggerNewTTLJobTableResult{ + TableID: ttlTbl.ID, + DBName: cmd.DBName, + TableName: cmd.TableName, + PartitionName: ttlTbl.Partition.O, + } + + job, err := m.lockNewJob(m.ctx, se, ttlTbl, now, true) + if err != nil { + firstError = err + tblResult.ErrorMessage = err.Error() + tableResults = append(tableResults, tblResult) + continue + } + + allError = false + if job != nil { + m.appendJob(job) + tblResult.JobID = job.id + tableResults = append(tableResults, tblResult) + } + } + + if allError { + responseErr(firstError) + return + } + + terror.Log(m.cmdCli.ResponseCommand(m.ctx, requestID, &client.TriggerNewTTLJobResponse{ + TableResult: tableResults, + })) + + tableResultsJSON, _ := json.Marshal(tableResults) + logutil.BgLogger().Info("Done to trigger a new TTL job", + zap.String("requestID", requestID), + zap.String("database", cmd.DBName), + zap.String("table", cmd.TableName), + zap.ByteString("tableResults", tableResultsJSON), + ) +} + func (m *JobManager) reportMetrics() { var runningJobs, cancellingJobs float64 for _, job := range m.runningJobs { @@ -395,7 +526,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { table := newJobTables[0] newJobTables = newJobTables[1:] logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID)) - job, err = m.lockNewJob(m.ctx, se, table, now) + job, err = m.lockNewJob(m.ctx, se, table, now, false) if job != nil { logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID)) m.appendJob(job) @@ -487,7 +618,7 @@ tblLoop: } status := m.tableStatusCache.Tables[table.ID] - ok := m.couldTrySchedule(status, table, now) + ok := m.couldTrySchedule(status, table, now, false) if ok { tables = append(tables, table) } @@ -497,7 +628,7 @@ tblLoop: } // couldTrySchedule returns whether a table should be tried to run TTL -func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time) bool { +func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) bool { if tableStatus == nil { // if the table status hasn't been created, return true return true @@ -518,7 +649,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac return false } - if tableStatus.LastJobStartTime.IsZero() { + if ignoreScheduleInterval || tableStatus.LastJobStartTime.IsZero() { return true } @@ -530,7 +661,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac // occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new // localJob and return it. // It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. -func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) { +func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) { var expireTime time.Time err := se.RunInTxn(ctx, func() error { @@ -559,7 +690,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * if err != nil { return err } - if !m.couldTrySchedule(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now) { + if !m.couldTrySchedule(tableStatus, m.infoSchemaCache.Tables[tableStatus.TableID], now, ignoreScheduleInterval) { return errors.New("couldn't schedule ttl job") } @@ -693,3 +824,8 @@ func (m *JobManager) CancelJob(ctx context.Context, jobID string) error { return errors.Errorf("cannot find the job with id: %s", jobID) } + +// GetCommandCli returns the command client +func (m *JobManager) GetCommandCli() client.CommandClient { + return m.cmdCli +} diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 6d8aab6806..0bb2c6bc9a 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/client" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/util/logutil" @@ -62,11 +63,11 @@ func TestParallelLockNewJob(t *testing.T) { testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay), JobInterval: duration.Duration{Hour: 1}}}} // simply lock a new job - m := ttlworker.NewJobManager("test-id", nil, store) + m := ttlworker.NewJobManager("test-id", nil, store, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() - job, err := m.LockNewJob(context.Background(), se, testTable, time.Now()) + job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false) require.NoError(t, err) job.Finish(se, time.Now()) @@ -85,11 +86,11 @@ func TestParallelLockNewJob(t *testing.T) { jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j) wg.Add(1) go func() { - m := ttlworker.NewJobManager(jobManagerID, nil, store) + m := ttlworker.NewJobManager(jobManagerID, nil, store, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() - job, err := m.LockNewJob(context.Background(), se, testTable, now) + job, err := m.LockNewJob(context.Background(), se, testTable, now, false) if err == nil { successCounter.Add(1) successJob = job @@ -117,10 +118,10 @@ func TestFinishJob(t *testing.T) { tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)") // finish with error - m := ttlworker.NewJobManager("test-id", nil, store) + m := ttlworker.NewJobManager("test-id", nil, store, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() - job, err := m.LockNewJob(context.Background(), se, testTable, time.Now()) + job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false) require.NoError(t, err) job.SetScanErr(errors.New(`"'an error message contains both single and double quote'"`)) job.Finish(se, time.Now()) @@ -187,6 +188,72 @@ func TestTTLAutoAnalyze(t *testing.T) { require.True(t, h.HandleAutoAnalyze(is)) } +func TestTriggerTTLJob(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute) + defer cancel() + + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, t timestamp) TTL=`t` + INTERVAL 1 DAY") + tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + tblID := tbl.Meta().ID + require.NoError(t, err) + + // make sure the table had run a job one time to make the test stable + cli := do.TTLJobManager().GetCommandCli() + _, _ = client.TriggerNewTTLJob(ctx, cli, "test", "t") + r := tk.MustQuery("select last_job_id, current_job_id from mysql.tidb_ttl_table_status where table_id=?", tblID) + require.Equal(t, 1, len(r.Rows())) + waitTTLJobFinished(t, tk, tblID) + + now := time.Now() + nowDateStr := now.Format("2006-01-02 15:04:05.999999") + expire := now.Add(-time.Hour * 25) + expreDateStr := expire.Format("2006-01-02 15:04:05.999999") + tk.MustExec("insert into t values(1, ?)", expreDateStr) + tk.MustExec("insert into t values(2, ?)", nowDateStr) + tk.MustExec("insert into t values(3, ?)", expreDateStr) + tk.MustExec("insert into t values(4, ?)", nowDateStr) + + res, err := client.TriggerNewTTLJob(ctx, cli, "test", "t") + require.NoError(t, err) + require.Equal(t, 1, len(res.TableResult)) + tableResult := res.TableResult[0] + require.Equal(t, tblID, tableResult.TableID) + require.NotEmpty(t, tableResult.JobID) + require.Equal(t, "test", tableResult.DBName) + require.Equal(t, "t", tableResult.TableName) + require.Equal(t, "", tableResult.ErrorMessage) + require.Equal(t, "", tableResult.PartitionName) + + waitTTLJobFinished(t, tk, tblID) + tk.MustQuery("select id from t order by id asc").Check(testkit.Rows("2", "4")) +} + +func waitTTLJobFinished(t *testing.T, tk *testkit.TestKit, tableID int64) { + start := time.Now() + for time.Since(start) < time.Minute { + time.Sleep(time.Second) + r := tk.MustQuery("select last_job_id, current_job_id from mysql.tidb_ttl_table_status where table_id=?", tableID) + rows := r.Rows() + if len(rows) == 0 { + continue + } + + if rows[0][0] == "" { + continue + } + + if rows[0][1] != "" { + continue + } + + return + } + require.FailNow(t, "timeout") +} + func TestTTLJobDisable(t *testing.T) { failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second)) defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval") diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 437aec11cf..d8b7420e76 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -145,8 +145,8 @@ func (m *JobManager) SetScanWorkers4Test(workers []worker) { type TTLJob = ttlJob // LockNewJob is an exported version of lockNewJob for test -func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*TTLJob, error) { - return m.lockNewJob(ctx, se, table, now) +func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*TTLJob, error) { + return m.lockNewJob(ctx, se, table, now, ignoreScheduleInterval) } // RunningJobs returns the running jobs inside ttl job manager @@ -178,7 +178,7 @@ func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { func TestReadyForNewJobTables(t *testing.T) { tbl := newMockTTLTbl(t, "t1") - m := NewJobManager("test-id", nil, nil) + m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) se := newMockSession(t, tbl) @@ -312,9 +312,8 @@ func TestLockNewTable(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - m := NewJobManager("test-id", newMockSessionPool(t), nil) + m := NewJobManager("test-id", newMockSessionPool(t), nil, nil) m.infoSchemaCache.Tables[c.table.ID] = c.table - sqlCounter := 0 se := newMockSession(t) se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) { @@ -329,7 +328,7 @@ func TestLockNewTable(t *testing.T) { } se.evalExpire = now - job, err := m.lockNewJob(context.Background(), se, c.table, now) + job, err := m.lockNewJob(context.Background(), se, c.table, now, false) if c.hasJob { assert.NotNil(t, job) } else { @@ -352,7 +351,7 @@ func TestResizeWorkers(t *testing.T) { scanWorker1.Start() scanWorker2 := newMockScanWorker(t) - m := NewJobManager("test-id", nil, nil) + m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, @@ -371,7 +370,7 @@ func TestResizeWorkers(t *testing.T) { scanWorker2 = newMockScanWorker(t) scanWorker2.Start() - m = NewJobManager("test-id", nil, nil) + m = NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, @@ -387,7 +386,7 @@ func TestResizeWorkers(t *testing.T) { scanWorker2 = newMockScanWorker(t) scanWorker2.Start() - m = NewJobManager("test-id", nil, nil) + m = NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, @@ -406,7 +405,7 @@ func TestLocalJobs(t *testing.T) { tbl1.ID = 1 tbl2 := newMockTTLTbl(t, "t2") tbl2.ID = 2 - m := NewJobManager("test-id", nil, nil) + m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl1, tbl2) m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1", ctx: context.Background()}, {tbl: tbl2, id: "2", ctx: context.Background()}} @@ -433,7 +432,7 @@ func TestRescheduleJobs(t *testing.T) { scanWorker2.Start() scanWorker2.setOneRowResult(tbl, 2022) - m := NewJobManager("test-id", nil, nil) + m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, @@ -487,7 +486,7 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) { scanWorker2.Start() scanWorker2.setOneRowResult(tbl, 2022) - m := NewJobManager("test-id", nil, nil) + m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.SetScanWorkers4Test([]worker{ scanWorker1, @@ -533,7 +532,7 @@ func TestCheckFinishedJob(t *testing.T) { se := newMockSession(t, tbl) // cancelled job will be regarded as finished - m := NewJobManager("test-id", nil, nil) + m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusCancelled)} m.checkFinishedJob(se, se.Now()) @@ -543,7 +542,7 @@ func TestCheckFinishedJob(t *testing.T) { finishedStatistics := &ttlStatistics{} finishedStatistics.TotalRows.Store(1) finishedStatistics.SuccessRows.Store(1) - m = NewJobManager("test-id", nil, nil) + m = NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusRunning)} m.runningJobs[0].statistics = finishedStatistics @@ -572,7 +571,7 @@ func TestCheckFinishedJob(t *testing.T) { // check timeout job now = se.Now() createTime := now.Add(-20 * time.Hour) - m = NewJobManager("test-id", nil, nil) + m = NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) m.runningJobs = []*ttlJob{ {