200 lines
5.5 KiB
Go
200 lines
5.5 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cache
|
|
|
|
import (
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
)
|
|
|
|
const selectFromTTLTask = `SELECT LOW_PRIORITY
|
|
job_id,
|
|
table_id,
|
|
scan_id,
|
|
scan_range_start,
|
|
scan_range_end,
|
|
expire_time,
|
|
owner_id,
|
|
owner_addr,
|
|
owner_hb_time,
|
|
status,
|
|
status_update_time,
|
|
state,
|
|
created_time FROM mysql.tidb_ttl_task`
|
|
const insertIntoTTLTask = `INSERT LOW_PRIORITY INTO mysql.tidb_ttl_task SET
|
|
job_id = %?,
|
|
table_id = %?,
|
|
scan_id = %?,
|
|
scan_range_start = %?,
|
|
scan_range_end = %?,
|
|
expire_time = %?,
|
|
created_time = %?`
|
|
|
|
// SelectFromTTLTaskWithJobID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task
|
|
func SelectFromTTLTaskWithJobID(jobID string) (string, []any) {
|
|
return selectFromTTLTask + " WHERE job_id = %?", []any{jobID}
|
|
}
|
|
|
|
// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job
|
|
// and scanID in mysql.tidb_ttl_task
|
|
func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []any) {
|
|
return selectFromTTLTask + " WHERE job_id = %? AND scan_id = %?", []any{jobID, scanID}
|
|
}
|
|
|
|
// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
|
|
func PeekWaitingTTLTask(hbExpire time.Time) (string, []any) {
|
|
return selectFromTTLTask +
|
|
" WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC",
|
|
[]any{hbExpire.Format(time.DateTime)}
|
|
}
|
|
|
|
// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
|
|
func InsertIntoTTLTask(loc *time.Location, jobID string, tableID int64, scanID int, scanRangeStart []types.Datum,
|
|
scanRangeEnd []types.Datum, expireTime time.Time, createdTime time.Time) (string, []any, error) {
|
|
rangeStart, err := codec.EncodeKey(loc, []byte{}, scanRangeStart...)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
rangeEnd, err := codec.EncodeKey(loc, []byte{}, scanRangeEnd...)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
return insertIntoTTLTask, []any{jobID, tableID, int64(scanID),
|
|
rangeStart, rangeEnd, expireTime, createdTime}, nil
|
|
}
|
|
|
|
// TaskStatus represents the current status of a task
|
|
type TaskStatus string
|
|
|
|
const (
|
|
// TaskStatusWaiting means the task hasn't started
|
|
TaskStatusWaiting TaskStatus = "waiting"
|
|
// TaskStatusRunning means this task is running
|
|
TaskStatusRunning TaskStatus = "running"
|
|
// TaskStatusFinished means this task has finished
|
|
TaskStatusFinished TaskStatus = "finished"
|
|
)
|
|
|
|
// TTLTask is a row recorded in mysql.tidb_ttl_task
|
|
type TTLTask struct {
|
|
JobID string
|
|
TableID int64
|
|
ScanID int64
|
|
ScanRangeStart []types.Datum
|
|
ScanRangeEnd []types.Datum
|
|
ExpireTime time.Time
|
|
OwnerID string
|
|
OwnerAddr string
|
|
OwnerHBTime time.Time
|
|
Status TaskStatus
|
|
StatusUpdateTime time.Time
|
|
State *TTLTaskState
|
|
CreatedTime time.Time
|
|
}
|
|
|
|
// TTLTaskState records the internal states of the ttl task
|
|
type TTLTaskState struct {
|
|
TotalRows uint64 `json:"total_rows"`
|
|
SuccessRows uint64 `json:"success_rows"`
|
|
ErrorRows uint64 `json:"error_rows"`
|
|
|
|
ScanTaskErr string `json:"scan_task_err"`
|
|
|
|
// When PreviousOwner != "", it means this task is resigned from another owner
|
|
PreviousOwner string `json:"prev_owner,omitempty"`
|
|
}
|
|
|
|
// RowToTTLTask converts a row into TTL task
|
|
func RowToTTLTask(timeZone *time.Location, row chunk.Row) (*TTLTask, error) {
|
|
var err error
|
|
|
|
task := &TTLTask{
|
|
JobID: row.GetString(0),
|
|
TableID: row.GetInt64(1),
|
|
ScanID: row.GetInt64(2),
|
|
}
|
|
if !row.IsNull(3) {
|
|
scanRangeStartBuf := row.GetBytes(3)
|
|
// it's still posibble to be empty even this column is not NULL
|
|
if len(scanRangeStartBuf) > 0 {
|
|
task.ScanRangeStart, err = codec.Decode(scanRangeStartBuf, len(scanRangeStartBuf))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
if !row.IsNull(4) {
|
|
scanRangeEndBuf := row.GetBytes(4)
|
|
// it's still posibble to be empty even this column is not NULL
|
|
if len(scanRangeEndBuf) > 0 {
|
|
task.ScanRangeEnd, err = codec.Decode(scanRangeEndBuf, len(scanRangeEndBuf))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
task.ExpireTime, err = row.GetTime(5).GoTime(timeZone)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !row.IsNull(6) {
|
|
task.OwnerID = row.GetString(6)
|
|
}
|
|
if !row.IsNull(7) {
|
|
task.OwnerAddr = row.GetString(7)
|
|
}
|
|
if !row.IsNull(8) {
|
|
task.OwnerHBTime, err = row.GetTime(8).GoTime(timeZone)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if !row.IsNull(9) {
|
|
status := row.GetString(9)
|
|
if len(status) == 0 {
|
|
status = "waiting"
|
|
}
|
|
task.Status = TaskStatus(status)
|
|
}
|
|
if !row.IsNull(10) {
|
|
task.StatusUpdateTime, err = row.GetTime(10).GoTime(timeZone)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if !row.IsNull(11) {
|
|
stateStr := row.GetString(11)
|
|
state := &TTLTaskState{}
|
|
err = json.Unmarshal([]byte(stateStr), state)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
task.State = state
|
|
}
|
|
|
|
task.CreatedTime, err = row.GetTime(12).GoTime(timeZone)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return task, nil
|
|
}
|