Files
tidb/pkg/ddl/backfilling_txn_executor.go

383 lines
12 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"context"
"math"
"sync"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/copr"
sess "github.com/pingcap/tidb/pkg/ddl/session"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcegroup"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/table"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ppcpuusage"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/tiflash"
tikvstore "github.com/tikv/client-go/v2/kv"
)
// backfillExecutor is used to manage the lifetime of backfill workers.
type backfillExecutor interface {
setupWorkers() error
close(force bool)
sendTask(*reorgBackfillTask) error
resultChan() <-chan *backfillResult
currentWorkerSize() int
adjustWorkerSize() error
}
var (
_ backfillExecutor = &txnBackfillExecutor{}
)
const maxBackfillWorkerSize = 16
type txnBackfillExecutor struct {
ctx context.Context
reorgInfo *reorgInfo
sessPool *sess.Pool
tp backfillerType
tbl table.PhysicalTable
decodeColMap map[int64]decoder.Column
jobCtx *ReorgContext
workers []*backfillWorker
wg sync.WaitGroup
taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
closed bool
}
func newTxnBackfillExecutor(ctx context.Context, info *reorgInfo, sessPool *sess.Pool,
tp backfillerType, tbl table.PhysicalTable,
jobCtx *ReorgContext) (backfillExecutor, error) {
decColMap, err := makeupDecodeColMap(info.dbInfo.Name, tbl)
if err != nil {
return nil, err
}
workerCnt := info.ReorgMeta.GetConcurrency()
return &txnBackfillExecutor{
ctx: ctx,
reorgInfo: info,
sessPool: sessPool,
tp: tp,
tbl: tbl,
decodeColMap: decColMap,
jobCtx: jobCtx,
workers: make([]*backfillWorker, 0, workerCnt),
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
}, nil
}
func (b *txnBackfillExecutor) setupWorkers() error {
return b.adjustWorkerSize()
}
func (b *txnBackfillExecutor) sendTask(task *reorgBackfillTask) error {
select {
case <-b.ctx.Done():
return b.ctx.Err()
case b.taskCh <- task:
return nil
}
}
func (b *txnBackfillExecutor) resultChan() <-chan *backfillResult {
return b.resultCh
}
// NewReorgCopContext creates a CopContext for reorg
func NewReorgCopContext(
reorgMeta *model.DDLReorgMeta,
tblInfo *model.TableInfo,
allIdxInfo []*model.IndexInfo,
requestSource string,
) (copr.CopContext, error) {
warnHandler := contextutil.NewStaticWarnHandler(0)
exprCtx, err := newReorgExprCtxWithReorgMeta(reorgMeta, warnHandler)
if err != nil {
return nil, err
}
evalCtx := exprCtx.GetEvalCtx()
tc, ec := evalCtx.TypeCtx(), evalCtx.ErrCtx()
pushDownFlags := stmtctx.PushDownFlagsWithTypeFlagsAndErrLevels(tc.Flags(), ec.LevelMap())
return copr.NewCopContext(
exprCtx,
pushDownFlags,
tblInfo,
allIdxInfo,
requestSource,
)
}
func newDefaultReorgDistSQLCtx(kvClient kv.Client, warnHandler contextutil.WarnAppender) *distsqlctx.DistSQLContext {
intest.AssertNotNil(kvClient)
intest.AssertNotNil(warnHandler)
var sqlKiller sqlkiller.SQLKiller
var execDetails execdetails.SyncExecDetails
var cpuUsages ppcpuusage.SQLCPUUsages
return &distsqlctx.DistSQLContext{
WarnHandler: warnHandler,
Client: kvClient,
EnableChunkRPC: true,
EnabledRateLimitAction: vardef.DefTiDBEnableRateLimitAction,
KVVars: tikvstore.NewVariables(&sqlKiller.Signal),
SessionMemTracker: memory.NewTracker(memory.LabelForSession, -1),
Location: time.UTC,
SQLKiller: &sqlKiller,
CPUUsage: &cpuUsages,
ErrCtx: errctx.NewContextWithLevels(stmtctx.DefaultStmtErrLevels, warnHandler),
TiFlashReplicaRead: tiflash.GetTiFlashReplicaReadByStr(vardef.DefTiFlashReplicaRead),
TiFlashMaxThreads: vardef.DefTiFlashMaxThreads,
TiFlashMaxBytesBeforeExternalJoin: vardef.DefTiFlashMaxBytesBeforeExternalJoin,
TiFlashMaxBytesBeforeExternalGroupBy: vardef.DefTiFlashMaxBytesBeforeExternalGroupBy,
TiFlashMaxBytesBeforeExternalSort: vardef.DefTiFlashMaxBytesBeforeExternalSort,
TiFlashMaxQueryMemoryPerNode: vardef.DefTiFlashMemQuotaQueryPerNode,
TiFlashQuerySpillRatio: vardef.DefTiFlashQuerySpillRatio,
TiFlashHashJoinVersion: vardef.DefTiFlashHashJoinVersion,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
ExecDetails: &execDetails,
RuntimeStatsColl: execdetails.NewRuntimeStatsColl(nil),
}
}
func newReorgDistSQLCtxWithReorgMeta(kvClient kv.Client, reorgMeta *model.DDLReorgMeta, warnHandler contextutil.WarnAppender) (*distsqlctx.DistSQLContext, error) {
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
if err != nil {
return nil, errors.Trace(err)
}
ctx := newDefaultReorgDistSQLCtx(kvClient, warnHandler)
ctx.Location = loc
ctx.ErrCtx = errctx.NewContextWithLevels(reorgErrLevelsWithSQLMode(reorgMeta.SQLMode), ctx.WarnHandler)
ctx.ResourceGroupName = reorgMeta.ResourceGroupName
return ctx, nil
}
// initSessCtx initializes the session context. Be careful to the timezone.
func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) error {
// Correct the initial timezone.
tz := *time.UTC
sessCtx.GetSessionVars().TimeZone = &tz
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(&tz)
// Set the row encode format version.
rowFormat := vardef.GetDDLReorgRowFormat()
sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != vardef.DefTiDBRowFormatV1
// Simulate the sql mode environment in the worker sessionCtx.
sqlMode := reorgMeta.SQLMode
sessCtx.GetSessionVars().SQLMode = sqlMode
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
if err != nil {
return errors.Trace(err)
}
sessCtx.GetSessionVars().TimeZone = loc
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(loc)
errLevels := reorgErrLevelsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetErrLevels(errLevels)
typeFlags := reorgTypeFlagsWithSQLMode(sqlMode)
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags)
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName
return nil
}
func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) {
sv := sessCtx.GetSessionVars() //nolint:forbidigo
rowEncoder := sv.RowEncoder.Enable
sqlMode := sv.SQLMode
var timezone *time.Location
if sv.TimeZone != nil {
// Copy the content of timezone instead of pointer because it may be changed.
tz := *sv.TimeZone
timezone = &tz
}
typeFlags := sv.StmtCtx.TypeFlags()
errLevels := sv.StmtCtx.ErrLevels()
resGroupName := sv.StmtCtx.ResourceGroupName
return func(usedSessCtx sessionctx.Context) {
uv := usedSessCtx.GetSessionVars() //nolint:forbidigo
uv.RowEncoder.Enable = rowEncoder
uv.SQLMode = sqlMode
uv.TimeZone = timezone
uv.StmtCtx.SetTypeFlags(typeFlags)
uv.StmtCtx.SetErrLevels(errLevels)
uv.StmtCtx.ResourceGroupName = resGroupName
}
}
func (b *txnBackfillExecutor) expectedWorkerSize() (size int) {
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency()
return min(workerCnt, maxBackfillWorkerSize)
}
func (b *txnBackfillExecutor) currentWorkerSize() int {
return len(b.workers)
}
func (b *txnBackfillExecutor) adjustWorkerSize() error {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
jc := b.jobCtx
workerCnt := b.expectedWorkerSize()
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
var (
runner *backfillWorker
worker backfiller
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblAddIdxRate, false)
if err != nil {
return err
}
idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx,
job, reorgInfo.elements, reorgInfo.currElement)
if err != nil {
return err
}
runner = newBackfillWorker(b.ctx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblMergeTmpIdxRate, false)
if err != nil {
return err
}
tmpIdxWorker, err := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.elements)
if err != nil {
return err
}
runner = newBackfillWorker(b.ctx, tmpIdxWorker)
worker = tmpIdxWorker
case typeUpdateColumnWorker:
updateWorker, err := newUpdateColumnWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(b.ctx, updateWorker)
worker = updateWorker
case typeCleanUpIndexWorker:
idxWorker, err := newCleanUpIndexWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(b.ctx, idxWorker)
worker = idxWorker
case typeReorgPartitionWorker:
partWorker, err := newReorgPartitionWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
runner = newBackfillWorker(b.ctx, partWorker)
worker = partWorker
default:
return errors.New("unknown backfill type")
}
runner.taskCh = b.taskCh
runner.resultCh = b.resultCh
runner.wg = &b.wg
b.workers = append(b.workers, runner)
b.wg.Add(1)
go runner.run(reorgInfo.jobCtx.oldDDLCtx, worker, job)
}
// Decrease the worker.
if len(b.workers) > workerCnt {
workers := b.workers[workerCnt:]
b.workers = b.workers[:workerCnt]
closeBackfillWorkers(workers)
}
return nil
}
func (b *txnBackfillExecutor) close(force bool) {
if b.closed {
return
}
b.closed = true
close(b.taskCh)
if force {
closeBackfillWorkers(b.workers)
}
b.wg.Wait()
close(b.resultCh)
}
func expectedIngestWorkerCnt(concurrency, avgRowSize int, useGlobalSort bool) (readerCnt, writerCnt int) {
workerCnt := concurrency
// Testing showed that in a global sort environment,
// reader ration does not significantly impact the performance of add indexes.
// We disable this ration in global sort for better memory management.
if useGlobalSort {
return workerCnt, workerCnt
}
if avgRowSize == 0 {
// Statistic data not exist, use default concurrency.
readerCnt = min(workerCnt/2, maxBackfillWorkerSize)
readerCnt = max(readerCnt, 1)
writerCnt = min(workerCnt/2+2, maxBackfillWorkerSize)
return readerCnt, writerCnt
}
readerRatio := []float64{0.5, 1, 2, 4, 8}
rowSize := []uint64{200, 500, 1000, 3000, math.MaxUint64}
for i, s := range rowSize {
if uint64(avgRowSize) <= s {
readerCnt = max(int(float64(workerCnt)*readerRatio[i]), 1)
writerCnt = max(workerCnt, 1)
break
}
}
return readerCnt, writerCnt
}
type taskIDAllocator struct {
id int
}
func newTaskIDAllocator() *taskIDAllocator {
return &taskIDAllocator{}
}
func (a *taskIDAllocator) alloc() int {
ret := a.id
a.id++
return ret
}