Files
tidb/pkg/ddl/backfilling.go

1276 lines
40 KiB
Go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/dxf/operator"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/exprctx"
"github.com/pingcap/tidb/pkg/expression/exprstatic"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/topsql"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
type backfillerType byte
const (
typeAddIndexWorker backfillerType = iota
typeUpdateColumnWorker
typeCleanUpIndexWorker
typeAddIndexMergeTmpWorker
typeReorgPartitionWorker
typeCount
)
// BackupFillerTypeCount represents the count of ddl jobs that need to do backfill.
func BackupFillerTypeCount() int {
return int(typeCount)
}
func (bT backfillerType) String() string {
switch bT {
case typeAddIndexWorker:
return "add index"
case typeUpdateColumnWorker:
return "update column"
case typeCleanUpIndexWorker:
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
case typeReorgPartitionWorker:
return "reorganize partition"
default:
return "unknown"
}
}
// By now the DDL jobs that need backfilling include:
// 1: add-index
// 2: modify-column-type
// 3: clean-up global index
// 4: reorganize partition
//
// They all have a write reorganization state to back fill data into the rows existed.
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
// workers to do this in the DDL owner node.
//
// DDL owner thread (also see comments before runReorgJob func)
// ^
// | (reorgCtx.doneCh)
// |
// worker master
// ^ (waitTaskResults)
// |
// |
// v (sendRangeTask)
// +--------------------+---------+---------+------------------+--------------+
// | | | | |
// backfillworker1 backfillworker2 backfillworker3 backfillworker4 ...
//
// The worker master is responsible for scaling the backfilling workers according to the
// system variable "tidb_ddl_reorg_worker_cnt". Essentially, reorg job is mainly based
// on the [start, end] range of the table to backfill data. We did not do it all at once,
// there were several ddl rounds.
//
// [start1---end1 start2---end2 start3---end3 start4---end4 ... ... ]
// | | | | | | | |
// +-------+ +-------+ +-------+ +-------+ ... ...
// | | | |
// bfworker1 bfworker2 bfworker3 bfworker4 ... ...
// | | | | | |
// +---------------- (round1)----------------+ +--(round2)--+
//
// The main range [start, end] will be split into small ranges.
// Each small range corresponds to a region and it will be delivered to a backfillworker.
// Each worker can only be assigned with one range at one round, those remaining ranges
// will be cached until all the backfill workers have had their previous range jobs done.
//
// [ region start --------------------- region end ]
// |
// v
// [ batch ] [ batch ] [ batch ] [ batch ] ...
// | | | |
// v v v v
// (a kv txn) -> -> ->
//
// For a single range, backfill worker doesn't backfill all the data in one kv transaction.
// Instead, it is divided into batches, each time a kv transaction completes the backfilling
// of a partial batch.
// backfillTaskContext is the context of the batch adding indices or updating column values.
// After finishing the batch adding indices or updating column values, result in backfillTaskContext will be merged into backfillResult.
type backfillTaskContext struct {
nextKey kv.Key
done bool
addedCount int
scanCount int
warnings map[errors.ErrorID]*terror.Error
warningsCount map[errors.ErrorID]int64
finishTS uint64
}
type backfillCtx struct {
id int
*ddlCtx
warnings contextutil.WarnHandlerExt
loc *time.Location
exprCtx exprctx.BuildContext
tblCtx table.MutateContext
schemaName string
table table.Table
batchCnt int
jobContext *ReorgContext
metricCounter prometheus.Counter
conflictCounter prometheus.Counter
}
func newBackfillCtx(id int, rInfo *reorgInfo, schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isUpdateColumn bool) (*backfillCtx, error) {
warnHandler := contextutil.NewStaticWarnHandler(0)
exprCtx, err := newReorgExprCtxWithReorgMeta(rInfo.ReorgMeta, warnHandler)
if err != nil {
return nil, errors.Trace(err)
}
if isUpdateColumn {
// The below case should be compatible with mysql behavior:
// > create table t (a int);
// > insert into t values (0);
// > alter table t modify column a date;
// The alter DDL should return an error in strict mode and success in non-strict mode.
// See: https://github.com/pingcap/tidb/pull/25728 for more details.
hasStrictMode := rInfo.ReorgMeta.SQLMode.HasStrictMode()
tc := exprCtx.GetStaticEvalCtx().TypeCtx()
evalCtx := exprCtx.GetStaticEvalCtx().Apply(exprstatic.WithTypeFlags(
tc.Flags().WithIgnoreZeroDateErr(!hasStrictMode),
))
exprCtx = exprCtx.Apply(exprstatic.WithEvalCtx(evalCtx))
}
tblCtx := newReorgTableMutateContext(exprCtx)
colOrIdxName := ""
switch rInfo.Job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
args, err := model.GetModifyIndexArgs(rInfo.Job)
if err != nil {
logutil.DDLLogger().Error("Fail to get ModifyIndexArgs", zap.String("label", label), zap.String("schemaName", schemaName), zap.String("tableName", tbl.Meta().Name.String()))
} else {
colOrIdxName = getIdxNamesFromArgs(args)
}
case model.ActionModifyColumn:
oldCol, _ := getOldAndNewColumnsForUpdateColumn(tbl, rInfo.currElement.ID)
if oldCol != nil {
colOrIdxName = oldCol.Name.String()
}
}
batchCnt := rInfo.ReorgMeta.GetBatchSize()
return &backfillCtx{
id: id,
ddlCtx: rInfo.jobCtx.oldDDLCtx,
warnings: warnHandler,
exprCtx: exprCtx,
tblCtx: tblCtx,
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
batchCnt: batchCnt,
jobContext: jobCtx,
metricCounter: metrics.GetBackfillTotalByLabel(
label, schemaName, tbl.Meta().Name.String(), colOrIdxName),
conflictCounter: metrics.GetBackfillTotalByLabel(
fmt.Sprintf("%s-conflict", label), schemaName, tbl.Meta().Name.String(), colOrIdxName),
}, nil
}
func getIdxNamesFromArgs(args *model.ModifyIndexArgs) string {
var sb strings.Builder
for i, idx := range args.IndexArgs {
if i > 0 {
sb.WriteString("+")
}
sb.WriteString(idx.IndexName.O)
}
return sb.String()
}
func updateTxnEntrySizeLimitIfNeeded(txn kv.Transaction) {
if entrySizeLimit := vardef.TxnEntrySizeLimit.Load(); entrySizeLimit > 0 {
txn.SetOption(kv.SizeLimits, kv.TxnSizeLimits{
Entry: entrySizeLimit,
Total: kv.TxnTotalSizeLimit.Load(),
})
}
}
type backfiller interface {
BackfillData(ctx context.Context, handleRange reorgBackfillTask) (taskCtx backfillTaskContext, err error)
AddMetricInfo(float64)
GetCtx() *backfillCtx
String() string
}
type backfillResult struct {
taskID int
addedCount int
scanCount int
totalCount int
nextKey kv.Key
err error
}
type reorgBackfillTask struct {
physicalTable table.PhysicalTable
// TODO: Remove the following fields after remove the function of run.
id int
startKey kv.Key
endKey kv.Key
jobID int64
priority int
}
func (r *reorgBackfillTask) getJobID() int64 {
return r.jobID
}
func (r *reorgBackfillTask) String() string {
pID := r.physicalTable.GetPhysicalID()
start := hex.EncodeToString(r.startKey)
end := hex.EncodeToString(r.endKey)
jobID := r.getJobID()
return fmt.Sprintf("taskID: %d, physicalTableID: %d, range: [%s, %s), jobID: %d", r.id, pID, start, end, jobID)
}
// mergeBackfillCtxToResult merge partial result in taskCtx into result.
func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) {
result.nextKey = taskCtx.nextKey
result.addedCount += taskCtx.addedCount
result.scanCount += taskCtx.scanCount
}
type backfillWorker struct {
backfiller
taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
ctx context.Context
cancel func()
wg *sync.WaitGroup
}
func newBackfillWorker(ctx context.Context, bf backfiller) *backfillWorker {
bfCtx, cancel := context.WithCancel(ctx)
return &backfillWorker{
backfiller: bf,
taskCh: make(chan *reorgBackfillTask, 1),
resultCh: make(chan *backfillResult, 1),
ctx: bfCtx,
cancel: cancel,
}
}
func (w *backfillWorker) String() string {
return fmt.Sprintf("backfill-worker %d, tp %s", w.GetCtx().id, w.backfiller.String())
}
func (w *backfillWorker) Close() {
if w.cancel != nil {
w.cancel()
w.cancel = nil
}
}
func closeBackfillWorkers(workers []*backfillWorker) {
for _, worker := range workers {
worker.Close()
}
}
// ResultCounterForTest is used for test.
var ResultCounterForTest *atomic.Int32
// handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table.
func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult {
handleRange := *task
result := &backfillResult{
taskID: task.id,
err: nil,
addedCount: 0,
nextKey: handleRange.startKey,
}
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
jobID := task.getJobID()
rc := d.getReorgCtx(jobID)
for {
// Give job chance to be canceled or paused, if we not check it here,
// we will never cancel the job once there is panic in bf.BackfillData.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(d.ctx, false)
if err != nil {
result.err = err
return result
}
taskCtx, err := bf.BackfillData(w.ctx, handleRange)
if err != nil {
result.err = err
return result
}
bf.AddMetricInfo(float64(taskCtx.addedCount))
mergeBackfillCtxToResult(&taskCtx, result)
// Although `handleRange` is for data in one region, but back fill worker still split it into many
// small reorg batch size slices and reorg them in many different kv txn.
// If a task failed, it may contained some committed small kv txn which has already finished the
// small range reorganization.
// In the next round of reorganization, the target handle range may overlap with last committed
// small ranges. This will cause the `redo` action in reorganization.
// So for added count and warnings collection, it is recommended to collect the statistics in every
// successfully committed small ranges rather than fetching it in the total result.
rc.increaseRowCount(int64(taskCtx.addedCount))
rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)
if num := result.scanCount - lastLogCount; num >= 90000 {
lastLogCount = result.scanCount
logutil.DDLLogger().Info("backfill worker back fill index", zap.Stringer("worker", w),
zap.Int("addedCount", result.addedCount), zap.Int("scanCount", result.scanCount),
zap.String("next key", hex.EncodeToString(taskCtx.nextKey)),
zap.Float64("speed(rows/s)", float64(num)/time.Since(lastLogTime).Seconds()))
lastLogTime = time.Now()
}
handleRange.startKey = taskCtx.nextKey
if taskCtx.done {
break
}
}
failpoint.InjectCall("afterHandleBackfillTask", task.jobID)
logutil.DDLLogger().Info("backfill worker finish task",
zap.Stringer("worker", w), zap.Stringer("task", task),
zap.Int("added count", result.addedCount),
zap.Int("scan count", result.scanCount),
zap.String("next key", hex.EncodeToString(result.nextKey)),
zap.Stringer("take time", time.Since(startTime)))
if ResultCounterForTest != nil && result.err == nil {
ResultCounterForTest.Add(1)
}
return result
}
func (w *backfillWorker) sendResult(result *backfillResult) {
select {
case <-w.ctx.Done():
case w.resultCh <- result:
}
}
func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
logger := logutil.DDLLogger().With(zap.Stringer("worker", w), zap.Int64("jobID", job.ID))
var (
curTaskID int
task *reorgBackfillTask
ok bool
)
defer w.wg.Done()
defer util.Recover(metrics.LabelDDL, "backfillWorker.run", func() {
w.sendResult(&backfillResult{taskID: curTaskID, err: dbterror.ErrReorgPanic})
}, false)
for {
select {
case <-w.ctx.Done():
logger.Info("backfill worker exit on context done")
return
case task, ok = <-w.taskCh:
}
if !ok {
logger.Info("backfill worker exit")
return
}
curTaskID = task.id
d.setDDLLabelForTopSQL(job.ID, job.Query)
logger.Debug("backfill worker got task", zap.Int("workerID", w.GetCtx().id), zap.Stringer("task", task))
failpoint.Inject("mockBackfillRunErr", func() {
if w.GetCtx().id == 0 {
result := &backfillResult{taskID: task.id, addedCount: 0, nextKey: nil, err: errors.Errorf("mock backfill error")}
w.sendResult(result)
failpoint.Continue()
}
})
failpoint.Inject("mockHighLoadForAddIndex", func() {
sqlPrefixes := []string{"alter"}
topsql.MockHighCPULoad(job.Query, sqlPrefixes, 5)
})
failpoint.Inject("mockBackfillSlow", func() {
time.Sleep(100 * time.Millisecond)
})
// Change the batch size dynamically.
currentBatchCnt := w.GetCtx().batchCnt
targetBatchSize := job.ReorgMeta.GetBatchSize()
if targetBatchSize != currentBatchCnt {
w.GetCtx().batchCnt = targetBatchSize
logger.Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int("current batch size", w.GetCtx().batchCnt))
}
result := w.handleBackfillTask(d, task, bf)
w.sendResult(result)
if result.err != nil {
logger.Info("backfill worker exit on error",
zap.Error(result.err))
return
}
}
}
// loadTableRanges load table key ranges from PD between given start key and end key.
// It returns up to `limit` ranges.
func loadTableRanges(
ctx context.Context,
pid int64,
store kv.Storage,
startKey, endKey kv.Key,
splitKeys []kv.Key,
limit int,
) ([]kv.KeyRange, error) {
if len(startKey) == 0 && len(endKey) == 0 {
logutil.DDLLogger().Info("load empty range",
zap.Int64("physicalTableID", pid))
return []kv.KeyRange{}, nil
}
s, ok := store.(tikv.Storage)
if !ok {
// Only support split ranges in tikv.Storage now.
logutil.DDLLogger().Info("load table ranges failed, unsupported storage",
zap.String("storage", fmt.Sprintf("%T", store)),
zap.Int64("physicalTableID", pid))
return []kv.KeyRange{{StartKey: startKey, EndKey: endKey}}, nil
}
failpoint.Inject("setLimitForLoadTableRanges", func(val failpoint.Value) {
if v, ok := val.(int); ok {
limit = v
}
})
rc := s.GetRegionCache()
maxSleep := 10000 // ms
bo := tikv.NewBackofferWithVars(ctx, maxSleep, nil)
var ranges []kv.KeyRange
maxRetryTimes := util.DefaultMaxRetries
failpoint.Inject("loadTableRangesNoRetry", func() {
maxRetryTimes = 1
})
err := util.RunWithRetry(maxRetryTimes, util.RetryInterval, func() (bool, error) {
logutil.DDLLogger().Info("load table ranges from PD",
zap.Int64("physicalTableID", pid),
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("end key", hex.EncodeToString(endKey)))
rs, err := rc.BatchLoadRegionsWithKeyRange(bo, startKey, endKey, limit)
if err != nil {
return false, errors.Trace(err)
}
var mockErr bool
failpoint.InjectCall("beforeLoadRangeFromPD", &mockErr)
if mockErr {
return false, kv.ErrTxnRetryable
}
ranges = make([]kv.KeyRange, 0, len(rs))
for _, r := range rs {
ranges = append(ranges, kv.KeyRange{StartKey: r.StartKey(), EndKey: r.EndKey()})
}
err = validateAndFillRanges(ranges, startKey, endKey)
if err != nil {
return true, errors.Trace(err)
}
return false, nil
})
if err != nil {
return nil, errors.Trace(err)
}
ranges = splitRangesByKeys(ranges, splitKeys)
logutil.DDLLogger().Info("load table ranges from PD done",
zap.Int64("physicalTableID", pid),
zap.String("range start", hex.EncodeToString(ranges[0].StartKey)),
zap.String("range end", hex.EncodeToString(ranges[len(ranges)-1].EndKey)),
zap.Int("range count", len(ranges)))
failpoint.InjectCall("afterLoadTableRanges", len(ranges))
return ranges, nil
}
// splitRangesByKeys splits the ranges into more ranges by given split keys.
// The split keys should be ordered.
func splitRangesByKeys(ranges []kv.KeyRange, splitKeys []kv.Key) []kv.KeyRange {
if len(splitKeys) == 0 {
return ranges
}
ret := make([]kv.KeyRange, 0, len(ranges)+len(splitKeys))
for _, r := range ranges {
start := r.StartKey
finishOneRange := false
for !finishOneRange {
if len(splitKeys) == 0 {
break
}
split := splitKeys[0]
switch {
case split.Cmp(start) <= 0:
splitKeys = splitKeys[1:]
case split.Cmp(r.EndKey) < 0:
splitKeys = splitKeys[1:]
ret = append(ret, kv.KeyRange{StartKey: start, EndKey: split})
start = split
default:
finishOneRange = true
}
}
ret = append(ret, kv.KeyRange{StartKey: start, EndKey: r.EndKey})
}
return ret
}
func validateAndFillRanges(ranges []kv.KeyRange, startKey, endKey []byte) error {
failpoint.Inject("validateAndFillRangesErr", func() {
failpoint.Return(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs("mock"))
})
if len(ranges) == 0 {
errMsg := fmt.Sprintf("cannot find region in range [%s, %s]",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
return dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg)
}
for i, r := range ranges {
if i == 0 {
s := r.StartKey
if len(s) == 0 || bytes.Compare(s, startKey) < 0 {
ranges[i].StartKey = startKey
} else if bytes.Compare(s, startKey) > 0 {
errMsg := fmt.Sprintf("get empty range at the beginning of ranges, expected %s, but got %s",
hex.EncodeToString(startKey), hex.EncodeToString(s))
return dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg)
}
}
if i == len(ranges)-1 {
e := r.EndKey
if len(e) == 0 || bytes.Compare(e, endKey) > 0 {
ranges[i].EndKey = endKey
}
// We don't need to check the end key because a limit may set before scanning regions.
}
if len(ranges[i].StartKey) == 0 || len(ranges[i].EndKey) == 0 {
return dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs("get empty start/end key in the middle of ranges")
}
if i > 0 && !bytes.Equal(ranges[i-1].EndKey, ranges[i].StartKey) {
errMsg := fmt.Sprintf("ranges are not continuous, last end key %s, next start key %s",
hex.EncodeToString(ranges[i-1].EndKey), hex.EncodeToString(ranges[i].StartKey))
return dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg)
}
}
return nil
}
func getBatchTasks(
t table.Table,
reorgInfo *reorgInfo,
kvRanges []kv.KeyRange,
taskIDAlloc *taskIDAllocator,
bfWorkerTp backfillerType,
) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, len(kvRanges))
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
for _, r := range kvRanges {
taskID := taskIDAlloc.alloc()
startKey := r.StartKey
endKey := r.EndKey
endKey = getActualEndKey(t, reorgInfo, bfWorkerTp, startKey, endKey, taskID)
task := &reorgBackfillTask{
id: taskID,
jobID: reorgInfo.Job.ID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
}
batchTasks = append(batchTasks, task)
}
return batchTasks
}
func getActualEndKey(
t table.Table,
reorgInfo *reorgInfo,
bfTp backfillerType,
rangeStart, rangeEnd kv.Key,
taskID int,
) kv.Key {
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
if bfTp == typeAddIndexMergeTmpWorker {
// Temp Index data does not grow infinitely, we can return the whole range
// and IndexMergeTmpWorker should still be finished in a bounded time.
return rangeEnd
}
if bfTp == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeIngest {
// Ingest worker uses coprocessor to read table data. It is fast enough,
// we don't need to get the actual end key of this range.
return rangeEnd
}
// Otherwise to avoid the future data written to key range of [backfillChunkEndKey, rangeEnd) and
// backfill worker can't catch up, we shrink the end key to the actual written key for now.
jobCtx := reorgInfo.NewJobContext()
actualEndKey, err := GetRangeEndKey(jobCtx, reorgInfo.jobCtx.store, job.Priority, t.RecordPrefix(), rangeStart, rangeEnd)
if err != nil {
logutil.DDLLogger().Info("get backfill range task, get reverse key failed", zap.Error(err))
return rangeEnd
}
logutil.DDLLogger().Info("get backfill range task, change end key",
zap.Int("id", taskID),
zap.Int64("pTbl", phyTbl.GetPhysicalID()),
zap.String("end key", hex.EncodeToString(rangeEnd)),
zap.String("current end key", hex.EncodeToString(actualEndKey)))
return actualEndKey
}
// sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func sendTasks(
exec backfillExecutor,
t table.PhysicalTable,
kvRanges []kv.KeyRange,
reorgInfo *reorgInfo,
taskIDAlloc *taskIDAllocator,
bfWorkerTp backfillerType,
) error {
batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc, bfWorkerTp)
for _, task := range batchTasks {
if err := exec.sendTask(task); err != nil {
return errors.Trace(err)
}
}
return nil
}
func makeupDecodeColMap(dbName ast.CIStr, t table.Table) (map[int64]decoder.Column, error) {
writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols()))
for _, col := range t.WritableCols() {
writableColInfos = append(writableColInfos, col.ColumnInfo)
}
exprCols, _, err := expression.ColumnInfos2ColumnsAndNames(newReorgExprCtx(), dbName, t.Meta().Name, writableColInfos, t.Meta())
if err != nil {
return nil, err
}
mockSchema := expression.NewSchema(exprCols...)
decodeColMap := decoder.BuildFullDecodeColMap(t.WritableCols(), mockSchema)
return decodeColMap, nil
}
const backfillTaskChanSize = 128
func (dc *ddlCtx) addIndexWithLocalIngest(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
reorgInfo *reorgInfo,
) error {
if err := dc.isReorgRunnable(ctx, false); err != nil {
return errors.Trace(err)
}
job := reorgInfo.Job
wctx := NewLocalWorkerCtx(ctx, job.ID)
defer wctx.Cancel()
idxCnt := len(reorgInfo.elements)
indexIDs := make([]int64, 0, idxCnt)
indexInfos := make([]*model.IndexInfo, 0, idxCnt)
var indexNames strings.Builder
uniques := make([]bool, 0, idxCnt)
hasUnique := false
for _, e := range reorgInfo.elements {
indexIDs = append(indexIDs, e.ID)
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, e.ID)
if indexInfo == nil {
logutil.DDLIngestLogger().Warn("index info not found",
zap.Int64("jobID", job.ID),
zap.Int64("tableID", t.Meta().ID),
zap.Int64("indexID", e.ID))
return errors.Errorf("index info not found: %d", e.ID)
}
indexInfos = append(indexInfos, indexInfo)
if indexNames.Len() > 0 {
indexNames.WriteString("+")
}
indexNames.WriteString(indexInfo.Name.O)
uniques = append(uniques, indexInfo.Unique)
hasUnique = hasUnique || indexInfo.Unique
}
var (
cfg *local.BackendConfig
bd *local.Backend
err error
)
if config.GetGlobalConfig().Store == config.StoreTypeTiKV {
cfg, bd, err = ingest.CreateLocalBackend(ctx, dc.store, job, hasUnique, false, 0)
if err != nil {
return errors.Trace(err)
}
defer bd.Close()
}
bcCtx, err := ingest.NewBackendCtxBuilder(ctx, dc.store, job).
WithCheckpointManagerParam(sessPool, reorgInfo.PhysicalTableID).
Build(cfg, bd)
if err != nil {
return errors.Trace(err)
}
defer bcCtx.Close()
reorgCtx := dc.getReorgCtx(job.ID)
rowCntListener := &localRowCntCollector{
prevPhysicalRowCnt: reorgCtx.getRowCount(),
reorgCtx: reorgCtx,
counter: metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, job.SchemaName, job.TableName, indexNames.String()),
}
sctx, err := sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer sessPool.Put(sctx)
avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t)
engines, err := bcCtx.Register(indexIDs, uniques, t)
if err != nil {
logutil.DDLIngestLogger().Error("cannot register new engine",
zap.Int64("jobID", job.ID),
zap.Error(err),
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}
importConc := job.ReorgMeta.GetConcurrency()
pipe, err := NewAddIndexIngestPipeline(
wctx,
dc.store,
sessPool,
bcCtx,
engines,
job.ID,
t,
indexInfos,
reorgInfo.StartKey,
reorgInfo.EndKey,
job.ReorgMeta,
avgRowSize,
importConc,
rowCntListener,
)
if err != nil {
return err
}
err = executeAndClosePipeline(wctx, pipe, reorgInfo, bcCtx, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
logutil.DDLIngestLogger().Error("unregister engine failed",
zap.Int64("jobID", job.ID),
zap.Error(err1),
zap.Int64s("index IDs", indexIDs))
}
return err
}
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}
func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPipeline, bcCtx ingest.BackendCtx, avgRowSize int, reorgInfo *reorgInfo) {
reader, writer := pipe.GetReaderAndWriter()
if reader == nil || writer == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", reorgInfo.ID))
return
}
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
failpoint.InjectCall("onUpdateJobParam")
reorgInfo.UpdateConfigFromSysTbl(ctx)
maxWriteSpeed := reorgInfo.ReorgMeta.GetMaxWriteSpeed()
if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() {
bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed)
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", reorgInfo.ID),
zap.Int("max write speed", bcCtx.GetLocalBackend().GetWriteSpeedLimit()))
}
concurrency := reorgInfo.ReorgMeta.GetConcurrency()
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(concurrency, avgRowSize, reorgInfo.ReorgMeta.UseCloudStorage)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) != currentReaderCnt || int32(targetWriterCnt) != currentWriterCnt {
reader.TuneWorkerPoolSize(int32(targetReaderCnt), false)
writer.TuneWorkerPoolSize(int32(targetWriterCnt), false)
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", reorgInfo.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
failpoint.InjectCall("checkReorgConcurrency", reorgInfo.Job)
}
}
}
func executeAndClosePipeline(ctx *workerpool.Context, pipe *operator.AsyncPipeline, reorgInfo *reorgInfo, bcCtx ingest.BackendCtx, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
}
// Adjust worker pool size and max write speed dynamically.
var wg util.WaitGroupWrapper
adjustCtx, cancel := context.WithCancel(ctx)
if reorgInfo != nil {
wg.RunWithLog(func() {
adjustWorkerCntAndMaxWriteSpeed(adjustCtx, pipe, bcCtx, avgRowSize, reorgInfo)
})
}
err = pipe.Close()
failpoint.InjectCall("afterPipeLineClose", pipe)
cancel()
wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
}
return err
}
type localRowCntCollector struct {
execute.NoopCollector
reorgCtx *reorgCtx
counter prometheus.Counter
// prevPhysicalRowCnt records the row count from previous physical tables (partitions).
prevPhysicalRowCnt int64
// curPhysicalRowCnt records the row count of current physical table.
curPhysicalRowCnt struct {
cnt int64
mu sync.Mutex
}
}
func (s *localRowCntCollector) Processed(_, rowCnt int64) {
s.curPhysicalRowCnt.mu.Lock()
s.curPhysicalRowCnt.cnt += rowCnt
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + s.curPhysicalRowCnt.cnt)
s.curPhysicalRowCnt.mu.Unlock()
s.counter.Add(float64(rowCnt))
}
func (s *localRowCntCollector) SetTotal(total int) {
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total))
}
// UpdateDDLJobReorgCfgInterval is the interval to check and update reorg configuration.
var UpdateDDLJobReorgCfgInterval = 2 * time.Second
// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
// How to "add index" or "update column value" in reorganization state?
// Concurrently process the @@tidb_ddl_reorg_worker_cnt tasks. Each task deals with a handle range of the index/row record.
// The handle range is split from PD regions now. Each worker deal with a region table key range one time.
// Each handle range by estimation, concurrent processing needs to perform after the handle range has been acquired.
// The operation flow is as follows:
// 1. Open numbers of defaultWorkers goroutines.
// 2. Split table key range from PD regions.
// 3. Send tasks to running workers by workers's task channel. Each task deals with a region key ranges.
// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done.
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (dc *ddlCtx) writePhysicalTableRecord(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
bfWorkerType backfillerType,
reorgInfo *reorgInfo,
) (err error) {
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
if err := dc.isReorgRunnable(ctx, false); err != nil {
return errors.Trace(err)
}
defer func() {
if err != nil && ctx.Err() != nil {
err = context.Cause(ctx)
}
}()
failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(reorgInfo.Job.ErrorCount)) + ", mock unknown type: ast.whenClause."))
}
})
if bfWorkerType == typeAddIndexWorker && reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeIngest {
return dc.addIndexWithLocalIngest(ctx, sessPool, t, reorgInfo)
}
jc := reorgInfo.NewJobContext()
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
exec, err := newTxnBackfillExecutor(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
if err != nil {
return errors.Trace(err)
}
defer exec.close(true)
err = exec.setupWorkers()
if err != nil {
return errors.Trace(err)
}
var splitKeys []kv.Key
if reorgInfo.mergingTmpIdx {
splitKeys = getSplitKeysForTempIndexRanges(t.GetPhysicalID(), reorgInfo.elements)
}
// process result goroutine
eg.Go(func() error {
totalAddedCount := reorgInfo.Job.GetRowCount()
keeper := newDoneTaskKeeper(startKey)
cnt := 0
for {
select {
case <-egCtx.Done():
return egCtx.Err()
case result, ok := <-exec.resultChan():
if !ok {
logutil.DDLLogger().Info("backfill workers successfully processed",
zap.Stringer("element", reorgInfo.currElement),
zap.Int64("total added count", totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)))
return nil
}
cnt++
if result.err != nil {
logutil.DDLLogger().Warn("backfill worker failed",
zap.Int64("job ID", reorgInfo.ID),
zap.Int64("total added count", totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("result next key", hex.EncodeToString(result.nextKey)),
zap.Error(result.err))
return result.err
}
if result.totalCount > 0 {
totalAddedCount = int64(result.totalCount)
} else {
totalAddedCount += int64(result.addedCount)
}
keeper.updateNextKey(result.taskID, result.nextKey)
if cnt%(exec.currentWorkerSize()*4) == 0 {
err2 := reorgInfo.UpdateReorgMeta(keeper.nextKey, sessPool)
if err2 != nil {
logutil.DDLLogger().Warn("update reorg meta failed",
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
}
}
})
// generate task goroutine
doneCh := make(chan struct{})
eg.Go(func() error {
// we will modify the startKey in this goroutine, so copy them to avoid race.
start, end := startKey, endKey
taskIDAlloc := newTaskIDAllocator()
for {
kvRanges, err2 := loadTableRanges(egCtx, t.GetPhysicalID(), dc.store, start, end, splitKeys, backfillTaskChanSize)
if err2 != nil {
return errors.Trace(err2)
}
if len(kvRanges) == 0 {
break
}
logutil.DDLLogger().Info("start backfill workers to reorg record",
zap.Stringer("type", bfWorkerType),
zap.Int("workerCnt", exec.currentWorkerSize()),
zap.Int("regionCnt", len(kvRanges)),
zap.String("startKey", hex.EncodeToString(start)),
zap.String("endKey", hex.EncodeToString(end)))
err2 = sendTasks(exec, t, kvRanges, reorgInfo, taskIDAlloc, bfWorkerType)
if err2 != nil {
return errors.Trace(err2)
}
start = kvRanges[len(kvRanges)-1].EndKey
if start.Cmp(end) >= 0 {
break
}
}
exec.close(false)
close(doneCh)
return nil
})
// update the worker cnt goroutine
eg.Go(func() error {
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
outer:
for {
select {
case <-egCtx.Done():
return egCtx.Err()
case <-doneCh:
break outer
case <-ticker.C:
reorgInfo.UpdateConfigFromSysTbl(ctx)
currentWorkerCnt := exec.currentWorkerSize()
targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency()
if currentWorkerCnt != targetWorkerCnt {
err := exec.adjustWorkerSize()
if err != nil {
logutil.DDLLogger().Error("adjust ddl job config failed",
zap.Error(err))
} else {
logutil.DDLLogger().Info("adjust ddl job config success",
zap.Int("current worker count", exec.currentWorkerSize()))
}
}
failpoint.InjectCall("checkReorgWorkerCnt", reorgInfo.Job)
}
}
return nil
})
return eg.Wait()
}
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)
func iterateSnapshotKeys(ctx *ReorgContext, store kv.Storage, priority int, keyPrefix kv.Key, version uint64,
startKey kv.Key, endKey kv.Key, fn recordIterFunc) error {
isRecord := tablecodec.IsRecordKey(keyPrefix.Next())
var firstKey kv.Key
if startKey == nil {
firstKey = keyPrefix
} else {
firstKey = startKey
}
var upperBound kv.Key
if endKey == nil {
upperBound = keyPrefix.PrefixNext()
} else {
upperBound = endKey.PrefixNext()
}
ver := kv.Version{Ver: version}
snap := store.GetSnapshot(ver)
snap.SetOption(kv.Priority, priority)
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
snap.SetOption(kv.ResourceGroupName, ctx.resourceGroupName)
it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
defer it.Close()
for it.Valid() {
if !it.Key().HasPrefix(keyPrefix) {
break
}
var handle kv.Handle
if isRecord {
handle, err = tablecodec.DecodeRowKey(it.Key())
if err != nil {
return errors.Trace(err)
}
}
more, err := fn(handle, it.Key(), it.Value())
if !more || err != nil {
return errors.Trace(err)
}
err = kv.NextUntil(it, util.RowKeyPrefixFilter(it.Key()))
if err != nil {
if kv.ErrNotExist.Equal(err) {
break
}
return errors.Trace(err)
}
}
return nil
}
// GetRangeEndKey gets the actual end key for the range of [startKey, endKey).
func GetRangeEndKey(ctx *ReorgContext, store kv.Storage, priority int, keyPrefix kv.Key, startKey, endKey kv.Key) (kv.Key, error) {
snap := store.GetSnapshot(kv.MaxVersion)
snap.SetOption(kv.Priority, priority)
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
snap.SetOption(kv.ResourceGroupName, ctx.resourceGroupName)
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
it, err := snap.IterReverse(endKey, nil)
if err != nil {
return nil, errors.Trace(err)
}
defer it.Close()
if !it.Valid() || !it.Key().HasPrefix(keyPrefix) {
return startKey.Next(), nil
}
if it.Key().Cmp(startKey) < 0 {
return startKey.Next(), nil
}
return it.Key().Next(), nil
}
func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
}
func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
if threshold == 0 {
threshold = atomic.LoadUint32(&vardef.DDLSlowOprThreshold)
}
if elapsed >= time.Duration(threshold)*time.Millisecond {
logutil.DDLLogger().Info("slow operations",
zap.Duration("takeTimes", elapsed),
zap.String("msg", slowMsg))
}
}
// doneTaskKeeper keeps the done tasks and update the latest next key.
type doneTaskKeeper struct {
doneTaskNextKey map[int]kv.Key
current int
nextKey kv.Key
}
func newDoneTaskKeeper(start kv.Key) *doneTaskKeeper {
return &doneTaskKeeper{
doneTaskNextKey: make(map[int]kv.Key),
current: 0,
nextKey: start,
}
}
func (n *doneTaskKeeper) updateNextKey(doneTaskID int, next kv.Key) {
if doneTaskID == n.current {
n.current++
n.nextKey = next
for {
nKey, ok := n.doneTaskNextKey[n.current]
if !ok {
break
}
delete(n.doneTaskNextKey, n.current)
n.current++
n.nextKey = nKey
}
return
}
n.doneTaskNextKey[doneTaskID] = next
}