From 85b8104e21fcdb3d4eef519cd91d304d054aa90f Mon Sep 17 00:00:00 2001 From: Zak Zhao <57036248+joccau@users.noreply.github.com> Date: Thu, 22 Sep 2022 11:15:02 +0800 Subject: [PATCH] ddl, br: improve compatibility for PiTR and ingest ddl jobs (#38029) ref pingcap/tidb#38045 --- br/pkg/task/stream.go | 4 ++++ br/pkg/utils/db.go | 5 +++++ ddl/index.go | 29 ++++++++++++++++++++++++++++- telemetry/data_feature_usage.go | 2 +- 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 88a9b0ab49..727cd8143e 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -45,6 +45,7 @@ import ( advancercfg "github.com/pingcap/tidb/br/pkg/streamhelper/config" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/mathutil" @@ -496,6 +497,9 @@ func RunStreamStart( return errors.New("Unable to create task about log-backup. " + "please set TiKV config `log-backup.enable` to true and restart TiKVs.") } + if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) { + return errors.Annotate(berrors.ErrUnknown, "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.") + } if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil { return errors.Trace(err) diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index b464ff748d..be2bd87a6c 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -112,3 +112,8 @@ func LogBackupTaskCountDec() { func CheckLogBackupTaskExist() bool { return logBackupTaskCount > 0 } + +// IsLogBackupInUse checks the log backup task existed. +func IsLogBackupInUse(ctx sessionctx.Context) bool { + return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist() +} diff --git a/ddl/index.go b/ddl/index.go index 33eb19d4d0..914e5e8ff4 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "strings" "sync/atomic" "time" @@ -718,7 +719,33 @@ func canUseIngest(w *worker) bool { failpoint.Return(true) }) // Ingest way is not compatible with PiTR. - return !utils.CheckLogBackupEnabled(ctx) + return !utils.IsLogBackupInUse(ctx) +} + +// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed. +func IngestJobsNotExisted(ctx sessionctx.Context) bool { + sess := session{ctx} + template := "select job_meta from mysql.tidb_ddl_job where reorg and (type = %d or type = %d) and processing;" + sql := fmt.Sprintf(template, model.ActionAddIndex, model.ActionAddPrimaryKey) + rows, err := sess.execute(context.Background(), sql, "check-pitr") + if err != nil { + logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err)) + return false + } + for _, row := range rows { + jobBinary := row.GetBytes(0) + runJob := model.Job{} + err := runJob.Decode(jobBinary) + if err != nil { + logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err)) + return false + } + // Check whether this add index job is using lightning to do the backfill work. + if runJob.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { + return false + } + } + return true } // tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong. diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 60b9d23f21..f50b5ab02c 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -367,7 +367,7 @@ func getGlobalKillUsageInfo() bool { } func getLogBackupUsageInfo(ctx sessionctx.Context) bool { - return utils.CheckLogBackupEnabled(ctx) && utils.CheckLogBackupTaskExist() + return utils.IsLogBackupInUse(ctx) } func getCostModelVer2UsageInfo(ctx sessionctx.Context) bool {