ddl, br: improve compatibility for PiTR and ingest ddl jobs (#38029)
ref pingcap/tidb#38045
This commit is contained in:
@ -45,6 +45,7 @@ import (
|
||||
advancercfg "github.com/pingcap/tidb/br/pkg/streamhelper/config"
|
||||
"github.com/pingcap/tidb/br/pkg/summary"
|
||||
"github.com/pingcap/tidb/br/pkg/utils"
|
||||
"github.com/pingcap/tidb/ddl"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/util/mathutil"
|
||||
@ -496,6 +497,9 @@ func RunStreamStart(
|
||||
return errors.New("Unable to create task about log-backup. " +
|
||||
"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
|
||||
}
|
||||
if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) {
|
||||
return errors.Annotate(berrors.ErrUnknown, "Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.")
|
||||
}
|
||||
|
||||
if err = streamMgr.adjustAndCheckStartTS(ctx); err != nil {
|
||||
return errors.Trace(err)
|
||||
|
||||
@ -112,3 +112,8 @@ func LogBackupTaskCountDec() {
|
||||
func CheckLogBackupTaskExist() bool {
|
||||
return logBackupTaskCount > 0
|
||||
}
|
||||
|
||||
// IsLogBackupInUse checks the log backup task existed.
|
||||
func IsLogBackupInUse(ctx sessionctx.Context) bool {
|
||||
return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist()
|
||||
}
|
||||
|
||||
29
ddl/index.go
29
ddl/index.go
@ -18,6 +18,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -718,7 +719,33 @@ func canUseIngest(w *worker) bool {
|
||||
failpoint.Return(true)
|
||||
})
|
||||
// Ingest way is not compatible with PiTR.
|
||||
return !utils.CheckLogBackupEnabled(ctx)
|
||||
return !utils.IsLogBackupInUse(ctx)
|
||||
}
|
||||
|
||||
// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed.
|
||||
func IngestJobsNotExisted(ctx sessionctx.Context) bool {
|
||||
sess := session{ctx}
|
||||
template := "select job_meta from mysql.tidb_ddl_job where reorg and (type = %d or type = %d) and processing;"
|
||||
sql := fmt.Sprintf(template, model.ActionAddIndex, model.ActionAddPrimaryKey)
|
||||
rows, err := sess.execute(context.Background(), sql, "check-pitr")
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
for _, row := range rows {
|
||||
jobBinary := row.GetBytes(0)
|
||||
runJob := model.Job{}
|
||||
err := runJob.Decode(jobBinary)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
// Check whether this add index job is using lightning to do the backfill work.
|
||||
if runJob.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong.
|
||||
|
||||
@ -367,7 +367,7 @@ func getGlobalKillUsageInfo() bool {
|
||||
}
|
||||
|
||||
func getLogBackupUsageInfo(ctx sessionctx.Context) bool {
|
||||
return utils.CheckLogBackupEnabled(ctx) && utils.CheckLogBackupTaskExist()
|
||||
return utils.IsLogBackupInUse(ctx)
|
||||
}
|
||||
|
||||
func getCostModelVer2UsageInfo(ctx sessionctx.Context) bool {
|
||||
|
||||
Reference in New Issue
Block a user