383 lines
12 KiB
Go
383 lines
12 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ddl
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/ddl/copr"
|
|
sess "github.com/pingcap/tidb/pkg/ddl/session"
|
|
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
|
|
"github.com/pingcap/tidb/pkg/errctx"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/resourcegroup"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
contextutil "github.com/pingcap/tidb/pkg/util/context"
|
|
"github.com/pingcap/tidb/pkg/util/execdetails"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/memory"
|
|
"github.com/pingcap/tidb/pkg/util/ppcpuusage"
|
|
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
|
|
"github.com/pingcap/tidb/pkg/util/sqlkiller"
|
|
"github.com/pingcap/tidb/pkg/util/tiflash"
|
|
tikvstore "github.com/tikv/client-go/v2/kv"
|
|
)
|
|
|
|
// backfillExecutor is used to manage the lifetime of backfill workers.
|
|
type backfillExecutor interface {
|
|
setupWorkers() error
|
|
close(force bool)
|
|
|
|
sendTask(*reorgBackfillTask) error
|
|
resultChan() <-chan *backfillResult
|
|
|
|
currentWorkerSize() int
|
|
adjustWorkerSize() error
|
|
}
|
|
|
|
var (
|
|
_ backfillExecutor = &txnBackfillExecutor{}
|
|
)
|
|
|
|
const maxBackfillWorkerSize = 16
|
|
|
|
type txnBackfillExecutor struct {
|
|
ctx context.Context
|
|
reorgInfo *reorgInfo
|
|
sessPool *sess.Pool
|
|
tp backfillerType
|
|
tbl table.PhysicalTable
|
|
decodeColMap map[int64]decoder.Column
|
|
jobCtx *ReorgContext
|
|
|
|
workers []*backfillWorker
|
|
wg sync.WaitGroup
|
|
|
|
taskCh chan *reorgBackfillTask
|
|
resultCh chan *backfillResult
|
|
closed bool
|
|
}
|
|
|
|
func newTxnBackfillExecutor(ctx context.Context, info *reorgInfo, sessPool *sess.Pool,
|
|
tp backfillerType, tbl table.PhysicalTable,
|
|
jobCtx *ReorgContext) (backfillExecutor, error) {
|
|
decColMap, err := makeupDecodeColMap(info.dbInfo.Name, tbl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
workerCnt := info.ReorgMeta.GetConcurrency()
|
|
return &txnBackfillExecutor{
|
|
ctx: ctx,
|
|
reorgInfo: info,
|
|
sessPool: sessPool,
|
|
tp: tp,
|
|
tbl: tbl,
|
|
decodeColMap: decColMap,
|
|
jobCtx: jobCtx,
|
|
workers: make([]*backfillWorker, 0, workerCnt),
|
|
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
|
|
resultCh: make(chan *backfillResult, backfillTaskChanSize),
|
|
}, nil
|
|
}
|
|
|
|
func (b *txnBackfillExecutor) setupWorkers() error {
|
|
return b.adjustWorkerSize()
|
|
}
|
|
|
|
func (b *txnBackfillExecutor) sendTask(task *reorgBackfillTask) error {
|
|
select {
|
|
case <-b.ctx.Done():
|
|
return b.ctx.Err()
|
|
case b.taskCh <- task:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (b *txnBackfillExecutor) resultChan() <-chan *backfillResult {
|
|
return b.resultCh
|
|
}
|
|
|
|
// NewReorgCopContext creates a CopContext for reorg
|
|
func NewReorgCopContext(
|
|
reorgMeta *model.DDLReorgMeta,
|
|
tblInfo *model.TableInfo,
|
|
allIdxInfo []*model.IndexInfo,
|
|
requestSource string,
|
|
) (copr.CopContext, error) {
|
|
warnHandler := contextutil.NewStaticWarnHandler(0)
|
|
|
|
exprCtx, err := newReorgExprCtxWithReorgMeta(reorgMeta, warnHandler)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
evalCtx := exprCtx.GetEvalCtx()
|
|
tc, ec := evalCtx.TypeCtx(), evalCtx.ErrCtx()
|
|
pushDownFlags := stmtctx.PushDownFlagsWithTypeFlagsAndErrLevels(tc.Flags(), ec.LevelMap())
|
|
|
|
return copr.NewCopContext(
|
|
exprCtx,
|
|
pushDownFlags,
|
|
tblInfo,
|
|
allIdxInfo,
|
|
requestSource,
|
|
)
|
|
}
|
|
|
|
func newDefaultReorgDistSQLCtx(kvClient kv.Client, warnHandler contextutil.WarnAppender) *distsqlctx.DistSQLContext {
|
|
intest.AssertNotNil(kvClient)
|
|
intest.AssertNotNil(warnHandler)
|
|
var sqlKiller sqlkiller.SQLKiller
|
|
var execDetails execdetails.SyncExecDetails
|
|
var cpuUsages ppcpuusage.SQLCPUUsages
|
|
return &distsqlctx.DistSQLContext{
|
|
WarnHandler: warnHandler,
|
|
Client: kvClient,
|
|
EnableChunkRPC: true,
|
|
EnabledRateLimitAction: vardef.DefTiDBEnableRateLimitAction,
|
|
KVVars: tikvstore.NewVariables(&sqlKiller.Signal),
|
|
SessionMemTracker: memory.NewTracker(memory.LabelForSession, -1),
|
|
Location: time.UTC,
|
|
SQLKiller: &sqlKiller,
|
|
CPUUsage: &cpuUsages,
|
|
ErrCtx: errctx.NewContextWithLevels(stmtctx.DefaultStmtErrLevels, warnHandler),
|
|
TiFlashReplicaRead: tiflash.GetTiFlashReplicaReadByStr(vardef.DefTiFlashReplicaRead),
|
|
TiFlashMaxThreads: vardef.DefTiFlashMaxThreads,
|
|
TiFlashMaxBytesBeforeExternalJoin: vardef.DefTiFlashMaxBytesBeforeExternalJoin,
|
|
TiFlashMaxBytesBeforeExternalGroupBy: vardef.DefTiFlashMaxBytesBeforeExternalGroupBy,
|
|
TiFlashMaxBytesBeforeExternalSort: vardef.DefTiFlashMaxBytesBeforeExternalSort,
|
|
TiFlashMaxQueryMemoryPerNode: vardef.DefTiFlashMemQuotaQueryPerNode,
|
|
TiFlashQuerySpillRatio: vardef.DefTiFlashQuerySpillRatio,
|
|
TiFlashHashJoinVersion: vardef.DefTiFlashHashJoinVersion,
|
|
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
|
|
ExecDetails: &execDetails,
|
|
RuntimeStatsColl: execdetails.NewRuntimeStatsColl(nil),
|
|
}
|
|
}
|
|
|
|
func newReorgDistSQLCtxWithReorgMeta(kvClient kv.Client, reorgMeta *model.DDLReorgMeta, warnHandler contextutil.WarnAppender) (*distsqlctx.DistSQLContext, error) {
|
|
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
ctx := newDefaultReorgDistSQLCtx(kvClient, warnHandler)
|
|
ctx.Location = loc
|
|
ctx.ErrCtx = errctx.NewContextWithLevels(reorgErrLevelsWithSQLMode(reorgMeta.SQLMode), ctx.WarnHandler)
|
|
ctx.ResourceGroupName = reorgMeta.ResourceGroupName
|
|
return ctx, nil
|
|
}
|
|
|
|
// initSessCtx initializes the session context. Be careful to the timezone.
|
|
func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) error {
|
|
// Correct the initial timezone.
|
|
tz := *time.UTC
|
|
sessCtx.GetSessionVars().TimeZone = &tz
|
|
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(&tz)
|
|
|
|
// Set the row encode format version.
|
|
rowFormat := vardef.GetDDLReorgRowFormat()
|
|
sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != vardef.DefTiDBRowFormatV1
|
|
// Simulate the sql mode environment in the worker sessionCtx.
|
|
sqlMode := reorgMeta.SQLMode
|
|
sessCtx.GetSessionVars().SQLMode = sqlMode
|
|
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
sessCtx.GetSessionVars().TimeZone = loc
|
|
sessCtx.GetSessionVars().StmtCtx.SetTimeZone(loc)
|
|
|
|
errLevels := reorgErrLevelsWithSQLMode(sqlMode)
|
|
sessCtx.GetSessionVars().StmtCtx.SetErrLevels(errLevels)
|
|
|
|
typeFlags := reorgTypeFlagsWithSQLMode(sqlMode)
|
|
sessCtx.GetSessionVars().StmtCtx.SetTypeFlags(typeFlags)
|
|
sessCtx.GetSessionVars().StmtCtx.ResourceGroupName = reorgMeta.ResourceGroupName
|
|
return nil
|
|
}
|
|
|
|
func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) {
|
|
sv := sessCtx.GetSessionVars() //nolint:forbidigo
|
|
rowEncoder := sv.RowEncoder.Enable
|
|
sqlMode := sv.SQLMode
|
|
var timezone *time.Location
|
|
if sv.TimeZone != nil {
|
|
// Copy the content of timezone instead of pointer because it may be changed.
|
|
tz := *sv.TimeZone
|
|
timezone = &tz
|
|
}
|
|
typeFlags := sv.StmtCtx.TypeFlags()
|
|
errLevels := sv.StmtCtx.ErrLevels()
|
|
resGroupName := sv.StmtCtx.ResourceGroupName
|
|
return func(usedSessCtx sessionctx.Context) {
|
|
uv := usedSessCtx.GetSessionVars() //nolint:forbidigo
|
|
uv.RowEncoder.Enable = rowEncoder
|
|
uv.SQLMode = sqlMode
|
|
uv.TimeZone = timezone
|
|
uv.StmtCtx.SetTypeFlags(typeFlags)
|
|
uv.StmtCtx.SetErrLevels(errLevels)
|
|
uv.StmtCtx.ResourceGroupName = resGroupName
|
|
}
|
|
}
|
|
|
|
func (b *txnBackfillExecutor) expectedWorkerSize() (size int) {
|
|
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency()
|
|
return min(workerCnt, maxBackfillWorkerSize)
|
|
}
|
|
|
|
func (b *txnBackfillExecutor) currentWorkerSize() int {
|
|
return len(b.workers)
|
|
}
|
|
|
|
func (b *txnBackfillExecutor) adjustWorkerSize() error {
|
|
reorgInfo := b.reorgInfo
|
|
job := reorgInfo.Job
|
|
jc := b.jobCtx
|
|
workerCnt := b.expectedWorkerSize()
|
|
// Increase the worker.
|
|
for i := len(b.workers); i < workerCnt; i++ {
|
|
var (
|
|
runner *backfillWorker
|
|
worker backfiller
|
|
)
|
|
switch b.tp {
|
|
case typeAddIndexWorker:
|
|
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblAddIdxRate, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx,
|
|
job, reorgInfo.elements, reorgInfo.currElement)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
runner = newBackfillWorker(b.ctx, idxWorker)
|
|
worker = idxWorker
|
|
case typeAddIndexMergeTmpWorker:
|
|
backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblMergeTmpIdxRate, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tmpIdxWorker, err := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.elements)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
runner = newBackfillWorker(b.ctx, tmpIdxWorker)
|
|
worker = tmpIdxWorker
|
|
case typeUpdateColumnWorker:
|
|
updateWorker, err := newUpdateColumnWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
runner = newBackfillWorker(b.ctx, updateWorker)
|
|
worker = updateWorker
|
|
case typeCleanUpIndexWorker:
|
|
idxWorker, err := newCleanUpIndexWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
runner = newBackfillWorker(b.ctx, idxWorker)
|
|
worker = idxWorker
|
|
case typeReorgPartitionWorker:
|
|
partWorker, err := newReorgPartitionWorker(i, b.tbl, b.decodeColMap, reorgInfo, jc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
runner = newBackfillWorker(b.ctx, partWorker)
|
|
worker = partWorker
|
|
default:
|
|
return errors.New("unknown backfill type")
|
|
}
|
|
runner.taskCh = b.taskCh
|
|
runner.resultCh = b.resultCh
|
|
runner.wg = &b.wg
|
|
b.workers = append(b.workers, runner)
|
|
b.wg.Add(1)
|
|
go runner.run(reorgInfo.jobCtx.oldDDLCtx, worker, job)
|
|
}
|
|
// Decrease the worker.
|
|
if len(b.workers) > workerCnt {
|
|
workers := b.workers[workerCnt:]
|
|
b.workers = b.workers[:workerCnt]
|
|
closeBackfillWorkers(workers)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *txnBackfillExecutor) close(force bool) {
|
|
if b.closed {
|
|
return
|
|
}
|
|
b.closed = true
|
|
close(b.taskCh)
|
|
if force {
|
|
closeBackfillWorkers(b.workers)
|
|
}
|
|
b.wg.Wait()
|
|
close(b.resultCh)
|
|
}
|
|
|
|
func expectedIngestWorkerCnt(concurrency, avgRowSize int, useGlobalSort bool) (readerCnt, writerCnt int) {
|
|
workerCnt := concurrency
|
|
// Testing showed that in a global sort environment,
|
|
// reader ration does not significantly impact the performance of add indexes.
|
|
// We disable this ration in global sort for better memory management.
|
|
if useGlobalSort {
|
|
return workerCnt, workerCnt
|
|
}
|
|
if avgRowSize == 0 {
|
|
// Statistic data not exist, use default concurrency.
|
|
readerCnt = min(workerCnt/2, maxBackfillWorkerSize)
|
|
readerCnt = max(readerCnt, 1)
|
|
writerCnt = min(workerCnt/2+2, maxBackfillWorkerSize)
|
|
return readerCnt, writerCnt
|
|
}
|
|
|
|
readerRatio := []float64{0.5, 1, 2, 4, 8}
|
|
rowSize := []uint64{200, 500, 1000, 3000, math.MaxUint64}
|
|
for i, s := range rowSize {
|
|
if uint64(avgRowSize) <= s {
|
|
readerCnt = max(int(float64(workerCnt)*readerRatio[i]), 1)
|
|
writerCnt = max(workerCnt, 1)
|
|
break
|
|
}
|
|
}
|
|
return readerCnt, writerCnt
|
|
}
|
|
|
|
type taskIDAllocator struct {
|
|
id int
|
|
}
|
|
|
|
func newTaskIDAllocator() *taskIDAllocator {
|
|
return &taskIDAllocator{}
|
|
}
|
|
|
|
func (a *taskIDAllocator) alloc() int {
|
|
ret := a.id
|
|
a.id++
|
|
return ret
|
|
}
|