From 71fab020d02741b40eee32d0fdaee63ddc211b7d Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 20 Feb 2023 19:47:06 +0800 Subject: [PATCH] ddl: move some dist-reorg code to dist_* files and add the generate prefix key function (#41577) close pingcap/tidb#41576 --- ddl/backfilling.go | 160 ---------------------------------------- ddl/ddl_test.go | 5 ++ ddl/dist_backfilling.go | 82 +++++++++++++++++++- ddl/dist_owner.go | 120 ++++++++++++++++++++++++++---- ddl/index.go | 3 +- ddl/job_table.go | 21 +++--- ddl/job_table_test.go | 10 +-- 7 files changed, 207 insertions(+), 194 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 25881476c7..68d4ac8a90 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -40,7 +40,6 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" @@ -48,7 +47,6 @@ import ( decoder "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tidb/util/topsql" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -61,21 +59,8 @@ const ( typeCleanUpIndexWorker backfillerType = 2 typeAddIndexMergeTmpWorker backfillerType = 3 typeReorgPartitionWorker backfillerType = 4 - - // InstanceLease is the instance lease. - InstanceLease = 1 * time.Minute - updateInstanceLease = 25 * time.Second - genTaskBatch = 4096 - genPhysicalTableTaskBatch = 256 - minGenTaskBatch = 1024 - minGenPhysicalTableTaskBatch = 64 - minDistTaskCnt = 64 - retrySQLTimes = 10 ) -// RetrySQLInterval is export for test. -var RetrySQLInterval = 300 * time.Millisecond - func (bT backfillerType) String() string { switch bT { case typeAddIndexWorker: @@ -93,57 +78,6 @@ func (bT backfillerType) String() string { } } -// BackfillJob is for a tidb_background_subtask table's record. -type BackfillJob struct { - ID int64 - JobID int64 - EleID int64 - EleKey []byte - PhysicalTableID int64 - Tp backfillerType - State model.JobState - InstanceID string - InstanceLease types.Time - StartTS uint64 - StateUpdateTS uint64 - Meta *model.BackfillMeta -} - -// PrefixKeyString returns the BackfillJob's prefix key. -func (bj *BackfillJob) PrefixKeyString() string { - return fmt.Sprintf("%d_%s_%d_%%", bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID) -} - -// AbbrStr returns the BackfillJob's info without the Meta info. -func (bj *BackfillJob) AbbrStr() string { - return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s", - bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease) -} - -// GetOracleTimeWithStartTS returns the current time with txn's startTS. -func GetOracleTimeWithStartTS(se *session) (time.Time, error) { - txn, err := se.Txn(true) - if err != nil { - return time.Time{}, err - } - return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil -} - -// GetOracleTime returns the current time from TS without txn. -func GetOracleTime(store kv.Storage) (time.Time, error) { - currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) - if err != nil { - return time.Time{}, errors.Trace(err) - } - return oracle.GetTimeFromTS(currentVer.Ver).UTC(), nil -} - -// GetLeaseGoTime returns a types.Time by adding a lease. -func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { - leaseTime := currTime.Add(lease) - return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp) -} - // By now the DDL jobs that need backfilling include: // 1: add-index // 2: modify-column-type @@ -1141,100 +1075,6 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { return nil } -func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool, - batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error { - bJobs = bJobs[:0] - instanceID := "" - if notDistTask { - instanceID = reorgInfo.d.uuid - } - - // TODO: Adjust the number of ranges(region) for each task. - for _, task := range batchTasks { - bm := &model.BackfillMeta{ - IsUnique: sJobCtx.isUnique, - EndInclude: task.endInclude, - ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, - SQLMode: reorgInfo.ReorgMeta.SQLMode, - Location: reorgInfo.ReorgMeta.Location, - JobMeta: &model.JobMeta{ - SchemaID: reorgInfo.Job.SchemaID, - TableID: reorgInfo.Job.TableID, - Type: reorgInfo.Job.Type, - Query: reorgInfo.Job.Query, - }, - StartKey: task.startKey, - EndKey: task.endKey, - } - bj := &BackfillJob{ - ID: sJobCtx.currBackfillJobID.Add(1), - JobID: reorgInfo.Job.ID, - EleID: reorgInfo.currElement.ID, - EleKey: reorgInfo.currElement.TypeKey, - PhysicalTableID: phyTblID, - Tp: sJobCtx.bfWorkerType, - State: model.JobStateNone, - InstanceID: instanceID, - Meta: bm, - } - bj.Meta.CurrKey = task.startKey - bJobs = append(bJobs, bj) - } - if err := AddBackfillJobs(sess, bJobs); err != nil { - return errors.Trace(err) - } - return nil -} - -func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error { - isFirstOps := !sJobCtx.isMultiPhyTbl - batchSize := sJobCtx.batchSize - startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey - bJobs := make([]*BackfillJob, 0, batchSize) - for { - kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize) - if err != nil { - return errors.Trace(err) - } - batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize) - if len(batchTasks) == 0 { - break - } - notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) - if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil { - return errors.Trace(err) - } - isFirstOps = false - - remains := kvRanges[len(batchTasks):] - dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job") - logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", - zap.Int64("physicalID", pTblMeta.PhyTblID), - zap.Int("batchTasksCnt", len(batchTasks)), - zap.Int("totalRegionCnt", len(kvRanges)), - zap.Int("remainRegionCnt", len(remains)), - zap.String("startHandle", hex.EncodeToString(startKey)), - zap.String("endHandle", hex.EncodeToString(endKey))) - - if len(remains) == 0 { - break - } - - for { - bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, pTblMeta.PhyTblID) - if err != nil { - return errors.Trace(err) - } - if bJobCnt < sJobCtx.minBatchSize { - break - } - time.Sleep(RetrySQLInterval) - } - startKey = remains[0].StartKey - } - return nil -} - // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 9e360fb364..4fff76d15c 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -85,6 +85,11 @@ var NewSession = newSession // GetJobWithoutPartition is only used for test. const GetJobWithoutPartition = getJobWithoutPartition +// BackfillJobPrefixKeyString is only used for test. +func BackfillJobPrefixKeyString(ddlJobID int64, eleKey kv.Key, eleID int64) string { + return backfillJobPrefixKeyString(ddlJobID, eleKey, eleID) +} + // GetDDLCtx returns ddlCtx for test. func GetDDLCtx(d DDL) *ddlCtx { return d.(*ddl).ddlCtx diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go index b030cb33f0..8762b83093 100644 --- a/ddl/dist_backfilling.go +++ b/ddl/dist_backfilling.go @@ -15,6 +15,8 @@ package ddl import ( + "encoding/hex" + "fmt" "sync" "time" @@ -22,19 +24,97 @@ import ( "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/resourcemanager/pooltask" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/gpool" "github.com/pingcap/tidb/util/gpool/spmc" "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) -const getJobWithoutPartition = -1 +const ( + getJobWithoutPartition = -1 + backfillJobPrefixKey = "%d_%s_%d_%%" + + // InstanceLease is the instance lease. + InstanceLease = 1 * time.Minute + updateInstanceLease = 25 * time.Second + genTaskBatch = 4096 + genPhysicalTableTaskBatch = 256 + minGenTaskBatch = 1024 + minGenPhysicalTableTaskBatch = 64 + minDistTaskCnt = 64 + retrySQLTimes = 10 +) + +// RetrySQLInterval is export for test. +var RetrySQLInterval = 300 * time.Millisecond + +func backfillJobPrefixKeyString(ddlJobID int64, eleKey kv.Key, eleID int64) string { + return fmt.Sprintf(backfillJobPrefixKey, ddlJobID, hex.EncodeToString(eleKey), eleID) +} + +// BackfillJob is for a tidb_background_subtask table's record. +type BackfillJob struct { + ID int64 + JobID int64 + EleID int64 + EleKey []byte + PhysicalTableID int64 + Tp backfillerType + State model.JobState + InstanceID string + InstanceLease types.Time + StartTS uint64 + StateUpdateTS uint64 + Meta *model.BackfillMeta +} + +// PrefixKeyString returns the BackfillJob's prefix key. +func (bj *BackfillJob) PrefixKeyString() string { + return backfillJobPrefixKeyString(bj.JobID, bj.EleKey, bj.EleID) +} + +func (bj *BackfillJob) keyString() string { + return fmt.Sprintf("%d_%s_%d_%d", bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID, bj.ID) +} + +// AbbrStr returns the BackfillJob's info without the Meta info. +func (bj *BackfillJob) AbbrStr() string { + return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s", + bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease) +} + +// GetOracleTimeWithStartTS returns the current time with txn's startTS. +func GetOracleTimeWithStartTS(se *session) (time.Time, error) { + txn, err := se.Txn(true) + if err != nil { + return time.Time{}, err + } + return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil +} + +// GetOracleTime returns the current time from TS without txn. +func GetOracleTime(store kv.Storage) (time.Time, error) { + currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) + if err != nil { + return time.Time{}, errors.Trace(err) + } + return oracle.GetTimeFromTS(currentVer.Ver).UTC(), nil +} + +// GetLeaseGoTime returns a types.Time by adding a lease. +func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time { + leaseTime := currTime.Add(lease) + return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp) +} type backfillWorkerContext struct { currID int diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index 563b7fcd88..b11d6a9c7f 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -278,6 +278,100 @@ func (dc *ddlCtx) controlWriteTableRecord(sessPool *sessionPool, t table.Table, return checkReorgJobFinished(dc.ctx, sess, &dc.reorgCtx, ddlJobID, currEle) } +func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, phyTblID int64, notDistTask bool, + batchTasks []*reorgBackfillTask, bJobs []*BackfillJob) error { + bJobs = bJobs[:0] + instanceID := "" + if notDistTask { + instanceID = reorgInfo.d.uuid + } + + // TODO: Adjust the number of ranges(region) for each task. + for _, task := range batchTasks { + bm := &model.BackfillMeta{ + IsUnique: sJobCtx.isUnique, + EndInclude: task.endInclude, + ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, + SQLMode: reorgInfo.ReorgMeta.SQLMode, + Location: reorgInfo.ReorgMeta.Location, + JobMeta: &model.JobMeta{ + SchemaID: reorgInfo.Job.SchemaID, + TableID: reorgInfo.Job.TableID, + Type: reorgInfo.Job.Type, + Query: reorgInfo.Job.Query, + }, + StartKey: task.startKey, + EndKey: task.endKey, + } + bj := &BackfillJob{ + ID: sJobCtx.currBackfillJobID.Add(1), + JobID: reorgInfo.Job.ID, + EleID: reorgInfo.currElement.ID, + EleKey: reorgInfo.currElement.TypeKey, + PhysicalTableID: phyTblID, + Tp: sJobCtx.bfWorkerType, + State: model.JobStateNone, + InstanceID: instanceID, + Meta: bm, + } + bj.Meta.CurrKey = task.startKey + bJobs = append(bJobs, bj) + } + if err := AddBackfillJobs(sess, bJobs); err != nil { + return errors.Trace(err) + } + return nil +} + +func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext, pTblMeta *BackfillJobRangeMeta) error { + isFirstOps := !sJobCtx.isMultiPhyTbl + batchSize := sJobCtx.batchSize + startKey, endKey := pTblMeta.StartKey, pTblMeta.EndKey + bJobs := make([]*BackfillJob, 0, batchSize) + for { + kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize) + if err != nil { + return errors.Trace(err) + } + batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize) + if len(batchTasks) == 0 { + break + } + notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) + if err = addBatchBackfillJobs(sess, reorgInfo, sJobCtx, pTblMeta.PhyTblID, notNeedDistProcess, batchTasks, bJobs); err != nil { + return errors.Trace(err) + } + isFirstOps = false + + remains := kvRanges[len(batchTasks):] + dc.asyncNotifyWorker(dc.backfillJobCh, addingBackfillJob, reorgInfo.Job.ID, "backfill_job") + logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", + zap.Int64("physicalID", pTblMeta.PhyTblID), + zap.Int("batchTasksCnt", len(batchTasks)), + zap.Int("totalRegionCnt", len(kvRanges)), + zap.Int("remainRegionCnt", len(remains)), + zap.String("startHandle", hex.EncodeToString(startKey)), + zap.String("endHandle", hex.EncodeToString(endKey))) + + if len(remains) == 0 { + break + } + + for { + bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if bJobCnt < sJobCtx.minBatchSize { + break + } + time.Sleep(RetrySQLInterval) + } + startKey = remains[0].StartKey + } + return nil +} + func (dc *ddlCtx) splitPhysicalTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJobContext) { defaultSQLMode := sess.GetSessionVars().SQLMode defer func() { sess.GetSessionVars().SQLMode = defaultSQLMode }() @@ -323,12 +417,13 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC var backfillJobFinished bool ticker := time.NewTicker(CheckBackfillJobFinishInterval) defer ticker.Stop() + bjPrefixKey := backfillJobPrefixKeyString(ddlJobID, currEle.TypeKey, currEle.ID) for { failpoint.Inject("MockCanceledErr", func() { getReorgCtx(reorgCtxs, ddlJobID).notifyReorgCancel() }) if getReorgCtx(reorgCtxs, ddlJobID).isReorgCanceled() { - err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID)) + err := cleanupBackfillJobs(sess, bjPrefixKey) if err != nil { return err } @@ -351,7 +446,7 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC return errors.Trace(err) } - bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, ddlJobID, currEle.ID, currEle.TypeKey) + bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, bjPrefixKey) if err != nil { logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err)) return errors.Trace(err) @@ -370,11 +465,11 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC } if isSynced { logutil.BgLogger().Info("[ddl] finish all backfill jobs and put them to history", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle)) - return GetBackfillErr(sess, ddlJobID, currEle.ID, currEle.TypeKey) + return GetBackfillErr(sess, bjPrefixKey) } } case <-ctx.Done(): - err := cleanupBackfillJobs(sess, fmt.Sprintf("%d_%s_%d_%%", ddlJobID, hex.EncodeToString(currEle.TypeKey), currEle.ID)) + err := cleanupBackfillJobs(sess, bjPrefixKey) if err != nil { return err } @@ -401,12 +496,11 @@ func checkJobIsFinished(sess *session, ddlJobID int64) (bool, error) { } // GetBackfillErr gets the error in backfill job. -func GetBackfillErr(sess *session, ddlJobID, currEleID int64, currEleKey []byte) error { +func GetBackfillErr(sess *session, bjPrefixKey string) error { var err error var metas []*model.BackfillMeta for i := 0; i < retrySQLTimes; i++ { - metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", - ddlJobID, hex.EncodeToString(currEleKey), currEleID), "get_backfill_job_metas") + metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like '%s'", bjPrefixKey), "get_backfill_job_metas") if err == nil { for _, m := range metas { if m.Error != nil { @@ -456,15 +550,14 @@ func cleanupBackfillJobs(sess *session, prefixKey string) error { return err } -func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte, pTblID int64) (backfillJobCnt int, err error) { +func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { err = checkAndHandleInterruptedBackfillJobs(sess, ddlJobID, currEleID, currEleKey) if err != nil { return 0, errors.Trace(err) } backfillJobCnt, err = GetBackfillJobCount(sess, BackgroundSubtaskTable, - fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", - ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_count") + fmt.Sprintf("task_key like '%s'", backfillJobPrefixKeyString(ddlJobID, currEleKey, currEleID)), "check_backfill_job_count") if err != nil { return 0, errors.Trace(err) } @@ -472,12 +565,11 @@ func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey return backfillJobCnt, nil } -func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleID int64, currEleKey []byte) ([]*BackfillJob, error) { +func getBackfillJobWithRetry(sess *session, tableName, bjPrefixKey string) ([]*BackfillJob, error) { var err error var bJobs []*BackfillJob for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" limit 1", - ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_state") + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like '%s' limit 1", bjPrefixKey), "check_backfill_job_state") if err != nil { logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) time.Sleep(RetrySQLInterval) @@ -490,7 +582,7 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI // GetPhysicalTableMetas gets the max backfill metas per physical table in BackgroundSubtaskTable and BackgroundSubtaskHistoryTable. func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (map[int64]*BackfillJobRangeMeta, error) { - condition := fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", ddlJobID, hex.EncodeToString(currEleKey), currEleID) + condition := fmt.Sprintf("task_key like '%s'", backfillJobPrefixKeyString(ddlJobID, currEleKey, currEleID)) pTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskTable, condition, "get_ptbl_metas") if err != nil { return nil, errors.Trace(err) diff --git a/ddl/index.go b/ddl/index.go index 736fce54bc..8770ef9745 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1374,8 +1374,7 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { s := newSession(w.backfillCtx.sessCtx) return s.runInTxn(func(se *session) error { - jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%d_%s_%d_%d'", - bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID, bfJob.ID), "update_backfill_task") + jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%s'", bfJob.keyString()), "update_backfill_task") if err != nil { return err } diff --git a/ddl/job_table.go b/ddl/job_table.go index d1444450d1..6d5adecc09 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -634,8 +634,8 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { } func syncBackfillHistoryJobs(sess *session, uuid string, backfillJob *BackfillJob) error { - sql := fmt.Sprintf("update mysql.%s set state = '%s' where task_key like \"%d_%s_%d_%%\" and exec_id = '%s' limit 1;", - BackgroundSubtaskHistoryTable, model.JobStateSynced.String(), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, uuid) + sql := fmt.Sprintf("update mysql.%s set state = '%s' where task_key like '%s' and exec_id = '%s' limit 1;", + BackgroundSubtaskHistoryTable, model.JobStateSynced.String(), backfillJob.PrefixKeyString(), uuid) _, err := sess.execute(context.Background(), sql, "sync_backfill_history_job") return err } @@ -654,8 +654,8 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) if i != 0 { sqlBuilder.WriteString(", ") } - sqlBuilder.WriteString(fmt.Sprintf("('%d_%s_%d_%d', %d, %d, '%s', '%s', '%s', %s, %d, %d, %s)", - bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID, bj.ID, bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State.String(), wrapKey2String(bj.Meta.CurrKey), + sqlBuilder.WriteString(fmt.Sprintf("('%s', %d, %d, '%s', '%s', '%s', %s, %d, %d, %s)", + bj.keyString(), bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State.String(), wrapKey2String(bj.Meta.CurrKey), bj.StartTS, bj.StateUpdateTS, wrapKey2String(mateByte))) } return sqlBuilder.String(), nil @@ -789,8 +789,8 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st // GetInterruptedBackfillJobForOneEle gets an interrupted backfill job that contains only one element. func GetInterruptedBackfillJobForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) { - bJobs, err := GetBackfillJobs(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" and state = \"%s\" limit 1", - jobID, hex.EncodeToString(eleKey), eleID, model.JobStateCancelled.String()), "get_interrupt_backfill_job") + bJobs, err := GetBackfillJobs(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like '%s' and state = \"%s\" limit 1", + backfillJobPrefixKeyString(jobID, eleKey, eleID), model.JobStateCancelled.String()), "get_interrupt_backfill_job") if err != nil || len(bJobs) == 0 { return nil, err } @@ -940,9 +940,9 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] func RemoveBackfillJob(sess *session, isOneEle bool, backfillJob *BackfillJob) error { sql := "delete from mysql.tidb_background_subtask" if !isOneEle { - sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%d\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID) + sql += fmt.Sprintf(" where task_key like '%s'", backfillJob.keyString()) } else { - sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%%\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID) + sql += fmt.Sprintf(" where task_key like '%s'", backfillJob.PrefixKeyString()) } _, err := sess.execute(context.Background(), sql, "remove_backfill_job") return err @@ -953,9 +953,8 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob if err != nil { return err } - sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_expired = '%s', state = '%s', checkpoint = %s, meta = %s where task_key = '%d_%s_%d_%d'", - tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State.String(), wrapKey2String(backfillJob.Meta.CurrKey), - wrapKey2String(mate), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID) + sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_expired = '%s', state = '%s', checkpoint = %s, meta = %s where task_key = '%s'", + tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State.String(), wrapKey2String(backfillJob.Meta.CurrKey), wrapKey2String(mate), backfillJob.keyString()) _, err = sess.execute(context.Background(), sql, label) return err } diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index ed0c241768..3b670aac8f 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -16,7 +16,6 @@ package ddl_test import ( "context" - "encoding/hex" "fmt" "strconv" "strings" @@ -27,6 +26,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/internal/callback" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -231,8 +231,7 @@ func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time) } func getIdxConditionStr(jobID, eleID int64) string { - return fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", - jobID, hex.EncodeToString(meta.IndexElementKey), eleID) + return fmt.Sprintf("task_key like '%s'", ddl.BackfillJobPrefixKeyString(jobID, kv.Key(meta.IndexElementKey), eleID)) } func readInTxn(se sessionctx.Context, f func(sessionctx.Context)) (err error) { @@ -597,11 +596,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { tk.MustQuery(fmt.Sprintf("select exec_id, exec_expired from mysql.tidb_background_subtask where task_key like \"%%%d\" and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))). Check(testkit.Rows(" 0000-00-00 00:00:00")) // test GetBackfillMetas - bfErr := ddl.GetBackfillErr(se, jobID1, eleID1, eleKey) + bfErr := ddl.GetBackfillErr(se, ddl.BackfillJobPrefixKeyString(jobID1, kv.Key(meta.IndexElementKey), eleID1)) require.Error(t, bfErr, dbterror.ErrCancelledDDLJob) - bfErr = ddl.GetBackfillErr(se, jobID2, eleID2, eleKey) + bfErr = ddl.GetBackfillErr(se, ddl.BackfillJobPrefixKeyString(jobID2, kv.Key(meta.IndexElementKey), eleID2)) require.NoError(t, bfErr) - bJobs1[0].State = model.JobStateNone bJobs1[0].ID = 5 bJobs1[1].State = model.JobStateNone