ddl: move some dist-reorg code to dist_* files and add the generate prefix key function (#41577)
close pingcap/tidb#41576
This commit is contained in:
@ -40,7 +40,6 @@ import (
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/table/tables"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/dbterror"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
@ -48,7 +47,6 @@ import (
|
||||
decoder "github.com/pingcap/tidb/util/rowDecoder"
|
||||
"github.com/pingcap/tidb/util/timeutil"
|
||||
"github.com/pingcap/tidb/util/topsql"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -61,21 +59,8 @@ const (
|
||||
typeCleanUpIndexWorker backfillerType = 2
|
||||
typeAddIndexMergeTmpWorker backfillerType = 3
|
||||
typeReorgPartitionWorker backfillerType = 4
|
||||
|
||||
// InstanceLease is the instance lease.
|
||||
InstanceLease = 1 * time.Minute
|
||||
updateInstanceLease = 25 * time.Second
|
||||
genTaskBatch = 4096
|
||||
genPhysicalTableTaskBatch = 256
|
||||
minGenTaskBatch = 1024
|
||||
minGenPhysicalTableTaskBatch = 64
|
||||
minDistTaskCnt = 64
|
||||
retrySQLTimes = 10
|
||||
)
|
||||
|
||||
// RetrySQLInterval is export for test.
|
||||
var RetrySQLInterval = 300 * time.Millisecond
|
||||
|
||||
func (bT backfillerType) String() string {
|
||||
switch bT {
|
||||
case typeAddIndexWorker:
|
||||
@ -93,57 +78,6 @@ func (bT backfillerType) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
// BackfillJob is for a tidb_background_subtask table's record.
|
||||
type BackfillJob struct {
|
||||
ID int64
|
||||
JobID int64
|
||||
EleID int64
|
||||
EleKey []byte
|
||||
PhysicalTableID int64
|
||||
Tp backfillerType
|
||||
State model.JobState
|
||||
InstanceID string
|
||||
InstanceLease types.Time
|
||||
StartTS uint64
|
||||
StateUpdateTS uint64
|
||||
Meta *model.BackfillMeta
|
||||
}
|
||||
|
||||
// PrefixKeyString returns the BackfillJob's prefix key.
|
||||
func (bj *BackfillJob) PrefixKeyString() string {
|
||||
return fmt.Sprintf("%d_%s_%d_%%", bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID)
|
||||
}
|
||||
|
||||
// AbbrStr returns the BackfillJob's info without the Meta info.
|
||||
func (bj *BackfillJob) AbbrStr() string {
|
||||
return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s",
|
||||
bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease)
|
||||
}
|
||||
|
||||
// GetOracleTimeWithStartTS returns the current time with txn's startTS.
|
||||
func GetOracleTimeWithStartTS(se *session) (time.Time, error) {
|
||||
txn, err := se.Txn(true)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil
|
||||
}
|
||||
|
||||
// GetOracleTime returns the current time from TS without txn.
|
||||
func GetOracleTime(store kv.Storage) (time.Time, error) {
|
||||
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
|
||||
if err != nil {
|
||||
return time.Time{}, errors.Trace(err)
|
||||
}
|
||||
return oracle.GetTimeFromTS(currentVer.Ver).UTC(), nil
|
||||
}
|
||||
|
||||
// GetLeaseGoTime returns a types.Time by adding a lease.
|
||||
func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
|
||||
leaseTime := currTime.Add(lease)
|
||||
return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp)
|
||||
}
|
||||
|
||||
// By now the DDL jobs that need backfilling include:
|
||||
// 1: add-index
|
||||
// 2: modify-column-type
|
||||
@ -1141,100 +1075,6 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool,
|
||||
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error {
|
||||
bJobs = bJobs[:0]
|
||||
instanceID := ""
|
||||
if notDistTask {
|
||||
instanceID = reorgInfo.d.uuid
|
||||
}
|
||||
|
||||
// TODO: Adjust the number of ranges(region) for each task.
|
||||
for _, task := range batchTasks {
|
||||
bm := &model.BackfillMeta{
|
||||
IsUnique: sJobCtx.isUnique,
|
||||
EndInclude: task.endInclude,
|
||||
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
|
||||
SQLMode: reorgInfo.ReorgMeta.SQLMode,
|
||||
Location: reorgInfo.ReorgMeta.Location,
|
||||
JobMeta: &model.JobMeta{
|
||||
SchemaID: reorgInfo.Job.SchemaID,
|
||||
TableID: reorgInfo.Job.TableID,
|
||||
Type: reorgInfo.Job.Type,
|
||||
Query: reorgInfo.Job.Query,
|
||||
},
|
||||
StartKey: task.startKey,
|
||||
EndKey: task.endKey,
|
||||
}
|
||||
bj := &BackfillJob{
|
||||
ID: sJobCtx.currBackfillJobID.Add(1),
|
||||
JobID: reorgInfo.Job.ID,
|
||||
EleID: reorgInfo.currElement.ID,
|
||||
EleKey: reorgInfo.currElement.TypeKey,
|
||||
PhysicalTableID: phyTblID,
|
||||
Tp: sJobCtx.bfWorkerType,
|
||||
State: model.JobStateNone,
|
||||
InstanceID: instanceID,
|
||||
Meta: bm,
|
||||
}
|
||||
bj.Meta.CurrKey = task.startKey
|
||||
bJobs = append(bJobs, bj)
|
||||
}
|
||||
if err := AddBackfillJobs(sess, bJobs); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error {
|
||||
isFirstOps := !sJobCtx.isMultiPhyTbl
|
||||
batchSize := sJobCtx.batchSize
|
||||
startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey
|
||||
bJobs := make([]*BackfillJob, 0, batchSize)
|
||||
for {
|
||||
kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize)
|
||||
if len(batchTasks) == 0 {
|
||||
break
|
||||
}
|
||||
notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt)
|
||||
if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
isFirstOps = false
|
||||
|
||||
remains := kvRanges[len(batchTasks):]
|
||||
dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job")
|
||||
logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table",
|
||||
zap.Int64("physicalID", pTblMeta.PhyTblID),
|
||||
zap.Int("batchTasksCnt", len(batchTasks)),
|
||||
zap.Int("totalRegionCnt", len(kvRanges)),
|
||||
zap.Int("remainRegionCnt", len(remains)),
|
||||
zap.String("startHandle", hex.EncodeToString(startKey)),
|
||||
zap.String("endHandle", hex.EncodeToString(endKey)))
|
||||
|
||||
if len(remains) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for {
|
||||
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, pTblMeta.PhyTblID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if bJobCnt < sJobCtx.minBatchSize {
|
||||
break
|
||||
}
|
||||
time.Sleep(RetrySQLInterval)
|
||||
}
|
||||
startKey = remains[0].StartKey
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// recordIterFunc is used for low-level record iteration.
|
||||
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)
|
||||
|
||||
|
||||
@ -85,6 +85,11 @@ var NewSession = newSession
|
||||
// GetJobWithoutPartition is only used for test.
|
||||
const GetJobWithoutPartition = getJobWithoutPartition
|
||||
|
||||
// BackfillJobPrefixKeyString is only used for test.
|
||||
func BackfillJobPrefixKeyString(ddlJobID int64, eleKey kv.Key, eleID int64) string {
|
||||
return backfillJobPrefixKeyString(ddlJobID, eleKey, eleID)
|
||||
}
|
||||
|
||||
// GetDDLCtx returns ddlCtx for test.
|
||||
func GetDDLCtx(d DDL) *ddlCtx {
|
||||
return d.(*ddl).ddlCtx
|
||||
|
||||
@ -15,6 +15,8 @@
|
||||
package ddl
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -22,19 +24,97 @@ import (
|
||||
"github.com/pingcap/tidb/ddl/ingest"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/parser/mysql"
|
||||
"github.com/pingcap/tidb/resourcemanager/pooltask"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/dbterror"
|
||||
"github.com/pingcap/tidb/util/gpool"
|
||||
"github.com/pingcap/tidb/util/gpool/spmc"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const getJobWithoutPartition = -1
|
||||
const (
|
||||
getJobWithoutPartition = -1
|
||||
backfillJobPrefixKey = "%d_%s_%d_%%"
|
||||
|
||||
// InstanceLease is the instance lease.
|
||||
InstanceLease = 1 * time.Minute
|
||||
updateInstanceLease = 25 * time.Second
|
||||
genTaskBatch = 4096
|
||||
genPhysicalTableTaskBatch = 256
|
||||
minGenTaskBatch = 1024
|
||||
minGenPhysicalTableTaskBatch = 64
|
||||
minDistTaskCnt = 64
|
||||
retrySQLTimes = 10
|
||||
)
|
||||
|
||||
// RetrySQLInterval is export for test.
|
||||
var RetrySQLInterval = 300 * time.Millisecond
|
||||
|
||||
func backfillJobPrefixKeyString(ddlJobID int64, eleKey kv.Key, eleID int64) string {
|
||||
return fmt.Sprintf(backfillJobPrefixKey, ddlJobID, hex.EncodeToString(eleKey), eleID)
|
||||
}
|
||||
|
||||
// BackfillJob is for a tidb_background_subtask table's record.
|
||||
type BackfillJob struct {
|
||||
ID int64
|
||||
JobID int64
|
||||
EleID int64
|
||||
EleKey []byte
|
||||
PhysicalTableID int64
|
||||
Tp backfillerType
|
||||
State model.JobState
|
||||
InstanceID string
|
||||
InstanceLease types.Time
|
||||
StartTS uint64
|
||||
StateUpdateTS uint64
|
||||
Meta *model.BackfillMeta
|
||||
}
|
||||
|
||||
// PrefixKeyString returns the BackfillJob's prefix key.
|
||||
func (bj *BackfillJob) PrefixKeyString() string {
|
||||
return backfillJobPrefixKeyString(bj.JobID, bj.EleKey, bj.EleID)
|
||||
}
|
||||
|
||||
func (bj *BackfillJob) keyString() string {
|
||||
return fmt.Sprintf("%d_%s_%d_%d", bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID, bj.ID)
|
||||
}
|
||||
|
||||
// AbbrStr returns the BackfillJob's info without the Meta info.
|
||||
func (bj *BackfillJob) AbbrStr() string {
|
||||
return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s",
|
||||
bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease)
|
||||
}
|
||||
|
||||
// GetOracleTimeWithStartTS returns the current time with txn's startTS.
|
||||
func GetOracleTimeWithStartTS(se *session) (time.Time, error) {
|
||||
txn, err := se.Txn(true)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil
|
||||
}
|
||||
|
||||
// GetOracleTime returns the current time from TS without txn.
|
||||
func GetOracleTime(store kv.Storage) (time.Time, error) {
|
||||
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
|
||||
if err != nil {
|
||||
return time.Time{}, errors.Trace(err)
|
||||
}
|
||||
return oracle.GetTimeFromTS(currentVer.Ver).UTC(), nil
|
||||
}
|
||||
|
||||
// GetLeaseGoTime returns a types.Time by adding a lease.
|
||||
func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
|
||||
leaseTime := currTime.Add(lease)
|
||||
return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp)
|
||||
}
|
||||
|
||||
type backfillWorkerContext struct {
|
||||
currID int
|
||||
|
||||
@ -278,6 +278,100 @@ func (dc *ddlCtx) controlWriteTableRecord(sessPool *sessionPool, t table.Table,
|
||||
return checkReorgJobFinished(dc.ctx, sess, &dc.reorgCtx, ddlJobID, currEle)
|
||||
}
|
||||
|
||||
func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool,
|
||||
batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error {
|
||||
bJobs = bJobs[:0]
|
||||
instanceID := ""
|
||||
if notDistTask {
|
||||
instanceID = reorgInfo.d.uuid
|
||||
}
|
||||
|
||||
// TODO: Adjust the number of ranges(region) for each task.
|
||||
for _, task := range batchTasks {
|
||||
bm := &model.BackfillMeta{
|
||||
IsUnique: sJobCtx.isUnique,
|
||||
EndInclude: task.endInclude,
|
||||
ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp,
|
||||
SQLMode: reorgInfo.ReorgMeta.SQLMode,
|
||||
Location: reorgInfo.ReorgMeta.Location,
|
||||
JobMeta: &model.JobMeta{
|
||||
SchemaID: reorgInfo.Job.SchemaID,
|
||||
TableID: reorgInfo.Job.TableID,
|
||||
Type: reorgInfo.Job.Type,
|
||||
Query: reorgInfo.Job.Query,
|
||||
},
|
||||
StartKey: task.startKey,
|
||||
EndKey: task.endKey,
|
||||
}
|
||||
bj := &BackfillJob{
|
||||
ID: sJobCtx.currBackfillJobID.Add(1),
|
||||
JobID: reorgInfo.Job.ID,
|
||||
EleID: reorgInfo.currElement.ID,
|
||||
EleKey: reorgInfo.currElement.TypeKey,
|
||||
PhysicalTableID: phyTblID,
|
||||
Tp: sJobCtx.bfWorkerType,
|
||||
State: model.JobStateNone,
|
||||
InstanceID: instanceID,
|
||||
Meta: bm,
|
||||
}
|
||||
bj.Meta.CurrKey = task.startKey
|
||||
bJobs = append(bJobs, bj)
|
||||
}
|
||||
if err := AddBackfillJobs(sess, bJobs); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error {
|
||||
isFirstOps := !sJobCtx.isMultiPhyTbl
|
||||
batchSize := sJobCtx.batchSize
|
||||
startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey
|
||||
bJobs := make([]*BackfillJob, 0, batchSize)
|
||||
for {
|
||||
kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize)
|
||||
if len(batchTasks) == 0 {
|
||||
break
|
||||
}
|
||||
notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt)
|
||||
if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
isFirstOps = false
|
||||
|
||||
remains := kvRanges[len(batchTasks):]
|
||||
dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job")
|
||||
logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table",
|
||||
zap.Int64("physicalID", pTblMeta.PhyTblID),
|
||||
zap.Int("batchTasksCnt", len(batchTasks)),
|
||||
zap.Int("totalRegionCnt", len(kvRanges)),
|
||||
zap.Int("remainRegionCnt", len(remains)),
|
||||
zap.String("startHandle", hex.EncodeToString(startKey)),
|
||||
zap.String("endHandle", hex.EncodeToString(endKey)))
|
||||
|
||||
if len(remains) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for {
|
||||
bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if bJobCnt < sJobCtx.minBatchSize {
|
||||
break
|
||||
}
|
||||
time.Sleep(RetrySQLInterval)
|
||||
}
|
||||
startKey = remains[0].StartKey
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *ddlCtx) splitPhysicalTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext) {
|
||||
defaultSQLMode := sess.GetSessionVars().SQLMode
|
||||
defer func() { sess.GetSessionVars().SQLMode = defaultSQLMode }()
|
||||
@ -323,12 +417,13 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC
|
||||
var backfillJobFinished bool
|
||||
ticker := time.NewTicker(CheckBackfillJobFinishInterval)
|
||||
defer ticker.Stop()
|
||||
bjPrefixKey := backfillJobPrefixKeyString(ddlJobID, currEle.TypeKey, currEle.ID)
|
||||
for {
|
||||
failpoint.Inject("MockCanceledErr", func() {
|
||||
getReorgCtx(reorgCtxs, ddlJobID).notifyReorgCancel()
|
||||
})
|
||||
if getReorgCtx(reorgCtxs, ddlJobID).isReorgCanceled() {
|
||||
err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID))
|
||||
err := cleanupBackfillJobs(sess, bjPrefixKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -351,7 +446,7 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, ddlJobID, currEle.ID, currEle.TypeKey)
|
||||
bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, bjPrefixKey)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err))
|
||||
return errors.Trace(err)
|
||||
@ -370,11 +465,11 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC
|
||||
}
|
||||
if isSynced {
|
||||
logutil.BgLogger().Info("[ddl] finish all backfill jobs and put them to history", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle))
|
||||
return GetBackfillErr(sess, ddlJobID, currEle.ID, currEle.TypeKey)
|
||||
return GetBackfillErr(sess, bjPrefixKey)
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID))
|
||||
err := cleanupBackfillJobs(sess, bjPrefixKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -401,12 +496,11 @@ func checkJobIsFinished(sess *session, ddlJobID int64) (bool, error) {
|
||||
}
|
||||
|
||||
// GetBackfillErr gets the error in backfill job.
|
||||
func GetBackfillErr(sess *session, ddlJobID, currEleID int64, currEleKey []byte) error {
|
||||
func GetBackfillErr(sess *session, bjPrefixKey string) error {
|
||||
var err error
|
||||
var metas []*model.BackfillMeta
|
||||
for i := 0; i < retrySQLTimes; i++ {
|
||||
metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
|
||||
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "get_backfill_job_metas")
|
||||
metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like '%s'", bjPrefixKey), "get_backfill_job_metas")
|
||||
if err == nil {
|
||||
for _, m := range metas {
|
||||
if m.Error != nil {
|
||||
@ -456,15 +550,14 @@ func cleanupBackfillJobs(sess *session, prefixKey string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte, pTblID int64) (backfillJobCnt int, err error) {
|
||||
func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) {
|
||||
err = checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEleID, currEleKey)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
backfillJobCnt, err = GetBackfillJobCount(sess, BackgroundSubtaskTable,
|
||||
fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
|
||||
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_count")
|
||||
fmt.Sprintf("task_key like '%s'", backfillJobPrefixKeyString(ddlJobID, currEleKey, currEleID)), "check_backfill_job_count")
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
@ -472,12 +565,11 @@ func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey
|
||||
return backfillJobCnt, nil
|
||||
}
|
||||
|
||||
func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleID int64, currEleKey []byte) ([]*BackfillJob, error) {
|
||||
func getBackfillJobWithRetry(sess *session, tableName, bjPrefixKey string) ([]*BackfillJob, error) {
|
||||
var err error
|
||||
var bJobs []*BackfillJob
|
||||
for i := 0; i < retrySQLTimes; i++ {
|
||||
bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" limit 1",
|
||||
ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_state")
|
||||
bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like '%s' limit 1", bjPrefixKey), "check_backfill_job_state")
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err))
|
||||
time.Sleep(RetrySQLInterval)
|
||||
@ -490,7 +582,7 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI
|
||||
|
||||
// GetPhysicalTableMetas gets the max backfill metas per physical table in BackgroundSubtaskTable and BackgroundSubtaskHistoryTable.
|
||||
func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (map[int64]*BackfillJobRangeMeta, error) {
|
||||
condition := fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", ddlJobID, hex.EncodeToString(currEleKey), currEleID)
|
||||
condition := fmt.Sprintf("task_key like '%s'", backfillJobPrefixKeyString(ddlJobID, currEleKey, currEleID))
|
||||
pTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskTable, condition, "get_ptbl_metas")
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
|
||||
@ -1374,8 +1374,7 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error {
|
||||
s := newSession(w.backfillCtx.sessCtx)
|
||||
|
||||
return s.runInTxn(func(se *session) error {
|
||||
jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%d_%s_%d_%d'",
|
||||
bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID, bfJob.ID), "update_backfill_task")
|
||||
jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%s'", bfJob.keyString()), "update_backfill_task")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -634,8 +634,8 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) {
|
||||
}
|
||||
|
||||
func syncBackfillHistoryJobs(sess *session, uuid string, backfillJob *BackfillJob) error {
|
||||
sql := fmt.Sprintf("update mysql.%s set state = '%s' where task_key like \"%d_%s_%d_%%\" and exec_id = '%s' limit 1;",
|
||||
BackgroundSubtaskHistoryTable, model.JobStateSynced.String(), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, uuid)
|
||||
sql := fmt.Sprintf("update mysql.%s set state = '%s' where task_key like '%s' and exec_id = '%s' limit 1;",
|
||||
BackgroundSubtaskHistoryTable, model.JobStateSynced.String(), backfillJob.PrefixKeyString(), uuid)
|
||||
_, err := sess.execute(context.Background(), sql, "sync_backfill_history_job")
|
||||
return err
|
||||
}
|
||||
@ -654,8 +654,8 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob)
|
||||
if i != 0 {
|
||||
sqlBuilder.WriteString(", ")
|
||||
}
|
||||
sqlBuilder.WriteString(fmt.Sprintf("('%d_%s_%d_%d', %d, %d, '%s', '%s', '%s', %s, %d, %d, %s)",
|
||||
bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID, bj.ID, bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State.String(), wrapKey2String(bj.Meta.CurrKey),
|
||||
sqlBuilder.WriteString(fmt.Sprintf("('%s', %d, %d, '%s', '%s', '%s', %s, %d, %d, %s)",
|
||||
bj.keyString(), bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State.String(), wrapKey2String(bj.Meta.CurrKey),
|
||||
bj.StartTS, bj.StateUpdateTS, wrapKey2String(mateByte)))
|
||||
}
|
||||
return sqlBuilder.String(), nil
|
||||
@ -789,8 +789,8 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st
|
||||
|
||||
// GetInterruptedBackfillJobForOneEle gets an interrupted backfill job that contains only one element.
|
||||
func GetInterruptedBackfillJobForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) {
|
||||
bJobs, err := GetBackfillJobs(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" and state = \"%s\" limit 1",
|
||||
jobID, hex.EncodeToString(eleKey), eleID, model.JobStateCancelled.String()), "get_interrupt_backfill_job")
|
||||
bJobs, err := GetBackfillJobs(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like '%s' and state = \"%s\" limit 1",
|
||||
backfillJobPrefixKeyString(jobID, eleKey, eleID), model.JobStateCancelled.String()), "get_interrupt_backfill_job")
|
||||
if err != nil || len(bJobs) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
@ -940,9 +940,9 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([]
|
||||
func RemoveBackfillJob(sess *session, isOneEle bool, backfillJob *BackfillJob) error {
|
||||
sql := "delete from mysql.tidb_background_subtask"
|
||||
if !isOneEle {
|
||||
sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%d\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID)
|
||||
sql += fmt.Sprintf(" where task_key like '%s'", backfillJob.keyString())
|
||||
} else {
|
||||
sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%%\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID)
|
||||
sql += fmt.Sprintf(" where task_key like '%s'", backfillJob.PrefixKeyString())
|
||||
}
|
||||
_, err := sess.execute(context.Background(), sql, "remove_backfill_job")
|
||||
return err
|
||||
@ -953,9 +953,8 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_expired = '%s', state = '%s', checkpoint = %s, meta = %s where task_key = '%d_%s_%d_%d'",
|
||||
tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State.String(), wrapKey2String(backfillJob.Meta.CurrKey),
|
||||
wrapKey2String(mate), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID)
|
||||
sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_expired = '%s', state = '%s', checkpoint = %s, meta = %s where task_key = '%s'",
|
||||
tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State.String(), wrapKey2String(backfillJob.Meta.CurrKey), wrapKey2String(mate), backfillJob.keyString())
|
||||
_, err = sess.execute(context.Background(), sql, label)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -16,7 +16,6 @@ package ddl_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -27,6 +26,7 @@ import (
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/ddl"
|
||||
"github.com/pingcap/tidb/ddl/internal/callback"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/meta"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/parser/mysql"
|
||||
@ -231,8 +231,7 @@ func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time)
|
||||
}
|
||||
|
||||
func getIdxConditionStr(jobID, eleID int64) string {
|
||||
return fmt.Sprintf("task_key like \"%d_%s_%d_%%\"",
|
||||
jobID, hex.EncodeToString(meta.IndexElementKey), eleID)
|
||||
return fmt.Sprintf("task_key like '%s'", ddl.BackfillJobPrefixKeyString(jobID, kv.Key(meta.IndexElementKey), eleID))
|
||||
}
|
||||
|
||||
func readInTxn(se sessionctx.Context, f func(sessionctx.Context)) (err error) {
|
||||
@ -597,11 +596,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) {
|
||||
tk.MustQuery(fmt.Sprintf("select exec_id, exec_expired from mysql.tidb_background_subtask where task_key like \"%%%d\" and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))).
|
||||
Check(testkit.Rows(" 0000-00-00 00:00:00"))
|
||||
// test GetBackfillMetas
|
||||
bfErr := ddl.GetBackfillErr(se, jobID1, eleID1, eleKey)
|
||||
bfErr := ddl.GetBackfillErr(se, ddl.BackfillJobPrefixKeyString(jobID1, kv.Key(meta.IndexElementKey), eleID1))
|
||||
require.Error(t, bfErr, dbterror.ErrCancelledDDLJob)
|
||||
bfErr = ddl.GetBackfillErr(se, jobID2, eleID2, eleKey)
|
||||
bfErr = ddl.GetBackfillErr(se, ddl.BackfillJobPrefixKeyString(jobID2, kv.Key(meta.IndexElementKey), eleID2))
|
||||
require.NoError(t, bfErr)
|
||||
|
||||
bJobs1[0].State = model.JobStateNone
|
||||
bJobs1[0].ID = 5
|
||||
bJobs1[1].State = model.JobStateNone
|
||||
|
||||
Reference in New Issue
Block a user