ddl: reimplement the backfill scheduler (#42668)

This commit is contained in:
tangenta
2023-03-29 22:30:59 +08:00
committed by GitHub
parent 366075fa4e
commit ed596f9ced
14 changed files with 460 additions and 339 deletions

View File

@ -86,6 +86,7 @@ go_library(
"//parser/terror",
"//parser/types",
"//privilege",
"//resourcemanager/pool/workerpool",
"//resourcemanager/pooltask",
"//resourcemanager/util",
"//sessionctx",

View File

@ -25,7 +25,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
@ -386,11 +385,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) {
if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok {
switch w := w.backfiller.(type) {
case *addIndexTxnWorker:
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
case *addIndexIngestWorker:
if w, ok := w.backfiller.(*addIndexTxnWorker); ok {
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
@ -523,24 +518,28 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
}
type resultConsumer struct {
dc *ddlCtx
wg *sync.WaitGroup
err error
hasError *atomic.Bool
dc *ddlCtx
wg *sync.WaitGroup
err error
hasError *atomic.Bool
reorgInfo *reorgInfo // reorgInfo is used to update the reorg handle.
sessPool *sessionPool // sessPool is used to get the session to update the reorg handle.
}
func newResultConsumer(dc *ddlCtx) *resultConsumer {
func newResultConsumer(dc *ddlCtx, reorgInfo *reorgInfo, sessPool *sessionPool) *resultConsumer {
return &resultConsumer{
dc: dc,
wg: &sync.WaitGroup{},
hasError: &atomic.Bool{},
dc: dc,
wg: &sync.WaitGroup{},
hasError: &atomic.Bool{},
reorgInfo: reorgInfo,
sessPool: sessPool,
}
}
func (s *resultConsumer) run(scheduler *backfillScheduler, start kv.Key, totalAddedCount *int64) {
func (s *resultConsumer) run(scheduler backfillScheduler, start kv.Key, totalAddedCount *int64) {
s.wg.Add(1)
go func() {
reorgInfo := scheduler.reorgInfo
reorgInfo := s.reorgInfo
err := consumeResults(scheduler, s, start, totalAddedCount)
if err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker handle tasks failed",
@ -567,11 +566,15 @@ func (s *resultConsumer) shouldAbort() bool {
return s.hasError.Load()
}
func consumeResults(scheduler *backfillScheduler, consumer *resultConsumer, start kv.Key, totalAddedCount *int64) error {
func consumeResults(scheduler backfillScheduler, consumer *resultConsumer, start kv.Key, totalAddedCount *int64) error {
keeper := newDoneTaskKeeper(start)
handledTaskCnt := 0
var firstErr error
for result := range scheduler.resultCh {
for {
result, ok := scheduler.receiveResult()
if !ok {
return firstErr
}
err := handleOneResult(result, scheduler, consumer, keeper, totalAddedCount, handledTaskCnt)
handledTaskCnt++
if err != nil && firstErr == nil {
@ -579,33 +582,28 @@ func consumeResults(scheduler *backfillScheduler, consumer *resultConsumer, star
firstErr = err
}
}
return firstErr
}
func handleOneResult(result *backfillResult, scheduler *backfillScheduler, consumer *resultConsumer,
func handleOneResult(result *backfillResult, scheduler backfillScheduler, consumer *resultConsumer,
keeper *doneTaskKeeper, totalAddedCount *int64, taskSeq int) error {
reorgInfo := scheduler.reorgInfo
reorgInfo := consumer.reorgInfo
if result.err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker failed",
zap.Int64("job ID", reorgInfo.ID),
zap.String("result next key", hex.EncodeToString(result.nextKey)),
zap.Error(result.err))
// Drain tasks to make it quit early.
for len(scheduler.taskCh) > 0 {
<-scheduler.taskCh
}
scheduler.drainTasks() // Make it quit early.
return result.err
}
*totalAddedCount += int64(result.addedCount)
reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID)
reorgCtx.setRowCount(*totalAddedCount)
keeper.updateNextKey(result.taskID, result.nextKey)
if taskSeq%(scheduler.workerSize()*4) == 0 {
if taskSeq%(scheduler.currentWorkerSize()*4) == 0 {
err := consumer.dc.isReorgRunnable(reorgInfo.ID, false)
if err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker is not runnable", zap.Error(err))
// Drain tasks to make it quit early.
for len(scheduler.taskCh) > 0 {
<-scheduler.taskCh
}
scheduler.drainTasks() // Make it quit early.
return err
}
failpoint.Inject("MockGetIndexRecordErr", func() {
@ -614,7 +612,7 @@ func handleOneResult(result *backfillResult, scheduler *backfillScheduler, consu
time.Sleep(50 * time.Millisecond)
}
})
err = reorgInfo.UpdateReorgMeta(keeper.nextKey, scheduler.sessPool)
err = reorgInfo.UpdateReorgMeta(keeper.nextKey, consumer.sessPool)
if err != nil {
logutil.BgLogger().Warn("[ddl] update reorg meta failed",
zap.Int64("job ID", reorgInfo.ID), zap.Error(err))
@ -680,9 +678,9 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
}
// sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func sendTasks(scheduler *backfillScheduler, consumer *resultConsumer, t table.PhysicalTable,
kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize)
func sendTasks(scheduler backfillScheduler, consumer *resultConsumer, t table.PhysicalTable,
kvRanges []kv.KeyRange, reorgInfo *reorgInfo) ([]kv.KeyRange, error) {
batchTasks := getBatchTasks(t, reorgInfo, kvRanges, backfillTaskChanSize)
if len(batchTasks) == 0 {
return nil, nil
}
@ -691,10 +689,7 @@ func sendTasks(scheduler *backfillScheduler, consumer *resultConsumer, t table.P
if consumer.shouldAbort() {
return nil, nil
}
if scheduler.copReqSenderPool != nil {
scheduler.copReqSenderPool.sendTask(task)
}
scheduler.taskCh <- task
scheduler.sendTask(task)
}
if len(batchTasks) < len(kvRanges) {
@ -710,7 +705,7 @@ var (
// TestCheckWorkerNumCh use for test adjust backfill worker.
TestCheckWorkerNumCh = make(chan *sync.WaitGroup)
// TestCheckWorkerNumber use for test adjust backfill worker.
TestCheckWorkerNumber = int32(1)
TestCheckWorkerNumber = int32(variable.DefTiDBDDLReorgWorkerCount)
// TestCheckReorgTimeout is used to mock timeout when reorg data.
TestCheckReorgTimeout = int32(0)
)
@ -759,7 +754,7 @@ func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocat
return nil
}
var backfillTaskChanSize = 1024
var backfillTaskChanSize = 128
// SetBackfillTaskChanSizeForTest is only used for test.
func SetBackfillTaskChanSizeForTest(n int) {
@ -786,11 +781,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
totalAddedCount := job.GetRowCount()
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t)
if err != nil {
return errors.Trace(err)
}
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
return errors.Trace(err)
@ -807,21 +797,21 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
})
jc := dc.jobContext(job.ID)
scheduler := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, decodeColMap, jc)
sessCtx := newContext(reorgInfo.d.store)
scheduler, err := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc)
if err != nil {
return errors.Trace(err)
}
defer scheduler.close(true)
var ingestBeCtx *ingest.BackendContext
if bfWorkerType == typeAddIndexWorker && reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
ingestBeCtx = bc
defer bc.EngMgr.ResetWorkers(bc, job.ID, reorgInfo.currElement.ID)
} else {
return errors.New(ingest.LitErrGetBackendFail)
}
consumer := newResultConsumer(dc, reorgInfo, sessPool)
consumer.run(scheduler, startKey, &totalAddedCount)
err = scheduler.setupWorkers()
if err != nil {
return errors.Trace(err)
}
consumer := newResultConsumer(dc)
consumer.run(scheduler, startKey, &totalAddedCount)
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize)
if err != nil {
@ -831,26 +821,14 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
break
}
scheduler.setMaxWorkerSize(len(kvRanges))
err = scheduler.adjustWorkerSize()
if err != nil {
return errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] start backfill workers to reorg record",
zap.Stringer("type", bfWorkerType),
zap.Int("workerCnt", scheduler.workerSize()),
zap.Int("workerCnt", scheduler.currentWorkerSize()),
zap.Int("regionCnt", len(kvRanges)),
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)))
if ingestBeCtx != nil {
err := ingestBeCtx.Flush(reorgInfo.currElement.ID)
if err != nil {
return errors.Trace(err)
}
}
remains, err := sendTasks(scheduler, consumer, t, kvRanges)
remains, err := sendTasks(scheduler, consumer, t, kvRanges, reorgInfo)
if err != nil {
return errors.Trace(err)
}

View File

@ -20,11 +20,17 @@ import (
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/resourcemanager/pool/workerpool"
poolutil "github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
@ -32,7 +38,26 @@ import (
)
// backfillScheduler is used to manage the lifetime of backfill workers.
type backfillScheduler struct {
type backfillScheduler interface {
setupWorkers() error
close(force bool)
sendTask(task *reorgBackfillTask)
drainTasks()
receiveResult() (*backfillResult, bool)
currentWorkerSize() int
adjustWorkerSize() error
}
var (
_ backfillScheduler = &txnBackfillScheduler{}
_ backfillScheduler = &ingestBackfillScheduler{}
)
const maxBackfillWorkerSize = 16
type txnBackfillScheduler struct {
ctx context.Context
reorgInfo *reorgInfo
sessPool *sessionPool
@ -43,19 +68,29 @@ type backfillScheduler struct {
workers []*backfillWorker
wg sync.WaitGroup
maxSize int
taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
closed bool
copReqSenderPool *copReqSenderPool // for add index in ingest way.
}
func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool,
tp backfillerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column,
jobCtx *JobContext) *backfillScheduler {
return &backfillScheduler{
tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context,
jobCtx *JobContext) (backfillScheduler, error) {
if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return newIngestBackfillScheduler(ctx, info, tbl), nil
}
return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx)
}
func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool,
tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context,
jobCtx *JobContext) (backfillScheduler, error) {
decColMap, err := makeupDecodeColMap(sessCtx, info.dbInfo.Name, tbl)
if err != nil {
return nil, err
}
return &txnBackfillScheduler{
ctx: ctx,
reorgInfo: info,
sessPool: sessPool,
@ -66,11 +101,29 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessio
workers: make([]*backfillWorker, 0, variable.GetDDLReorgWorkerCounter()),
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
}, nil
}
func (b *txnBackfillScheduler) setupWorkers() error {
return b.adjustWorkerSize()
}
func (b *txnBackfillScheduler) sendTask(task *reorgBackfillTask) {
b.taskCh <- task
}
func (b *txnBackfillScheduler) drainTasks() {
for len(b.taskCh) > 0 {
<-b.taskCh
}
}
func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) {
reorgInfo := b.reorgInfo
func (b *txnBackfillScheduler) receiveResult() (*backfillResult, bool) {
ret, ok := <-b.resultCh
return ret, ok
}
func newSessCtx(reorgInfo *reorgInfo) (sessionctx.Context, error) {
sessCtx := newContext(reorgInfo.d.store)
if err := initSessCtx(sessCtx, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location); err != nil {
return nil, errors.Trace(err)
@ -106,38 +159,26 @@ func initSessCtx(sessCtx sessionctx.Context, sqlMode mysql.SQLMode, tzLocation *
return nil
}
func (b *backfillScheduler) setMaxWorkerSize(maxSize int) {
b.maxSize = maxSize
}
func (b *backfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) {
func (b *txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := int(variable.GetDDLReorgWorkerCounter())
if b.tp == typeAddIndexWorker && b.reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
readerSize = mathutil.Min(workerCnt/2, b.maxSize)
readerSize = mathutil.Max(readerSize, 1)
writerSize = mathutil.Min(workerCnt/2+2, b.maxSize)
return readerSize, writerSize
}
workerCnt = mathutil.Min(workerCnt, b.maxSize)
return workerCnt, workerCnt
return mathutil.Min(workerCnt, maxBackfillWorkerSize)
}
func (b *backfillScheduler) workerSize() int {
func (b *txnBackfillScheduler) currentWorkerSize() int {
return len(b.workers)
}
func (b *backfillScheduler) adjustWorkerSize() error {
b.initCopReqSenderPool()
func (b *txnBackfillScheduler) adjustWorkerSize() error {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
jc := b.jobCtx
if err := loadDDLReorgVars(b.ctx, b.sessPool); err != nil {
logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err))
}
readerCnt, writerCnt := b.expectedWorkerSize()
workerCnt := b.expectedWorkerSize()
// Increase the worker.
for i := len(b.workers); i < writerCnt; i++ {
sessCtx, err := b.newSessCtx()
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := newSessCtx(b.reorgInfo)
if err != nil {
return err
}
@ -148,27 +189,13 @@ func (b *backfillScheduler) adjustWorkerSize() error {
switch b.tp {
case typeAddIndexWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "add_idx_rate", false)
if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
idxWorker, err := newAddIndexIngestWorker(b.tbl, backfillCtx,
job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
if canSkipError(b.reorgInfo.ID, len(b.workers), err) {
continue
}
return err
}
idxWorker.copReqSenderPool = b.copReqSenderPool
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
} else {
idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx,
job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
return err
}
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx,
job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
return err
}
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false)
tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.currElement.ID)
@ -202,59 +229,18 @@ func (b *backfillScheduler) adjustWorkerSize() error {
go runner.run(reorgInfo.d, worker, job)
}
// Decrease the worker.
if len(b.workers) > writerCnt {
workers := b.workers[writerCnt:]
b.workers = b.workers[:writerCnt]
if len(b.workers) > workerCnt {
workers := b.workers[workerCnt:]
b.workers = b.workers[:workerCnt]
closeBackfillWorkers(workers)
}
if b.copReqSenderPool != nil {
b.copReqSenderPool.adjustSize(readerCnt)
}
return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker)
}
func (b *backfillScheduler) initCopReqSenderPool() {
if b.tp != typeAddIndexWorker || b.reorgInfo.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge ||
b.copReqSenderPool != nil || len(b.workers) > 0 {
return
}
indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID)
if indexInfo == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender",
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return
}
sessCtx, err := b.newSessCtx()
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore())
}
func canSkipError(jobID int64, workerCnt int, err error) bool {
if workerCnt > 0 {
// The error can be skipped because the rest workers can handle the tasks.
return true
}
logutil.BgLogger().Warn("[ddl] create add index backfill worker failed",
zap.Int("current worker count", workerCnt),
zap.Int64("job ID", jobID), zap.Error(err))
return false
}
func (b *backfillScheduler) close(force bool) {
func (b *txnBackfillScheduler) close(force bool) {
if b.closed {
return
}
if b.copReqSenderPool != nil {
b.copReqSenderPool.close(force)
}
close(b.taskCh)
if force {
closeBackfillWorkers(b.workers)
@ -263,3 +249,220 @@ func (b *backfillScheduler) close(force bool) {
close(b.resultCh)
b.closed = true
}
type ingestBackfillScheduler struct {
ctx context.Context
reorgInfo *reorgInfo
tbl table.PhysicalTable
closed bool
taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
copReqSenderPool *copReqSenderPool
writerPool *workerpool.WorkerPool[idxRecResult]
writerMaxID int
poolErr chan error
backendCtx *ingest.BackendContext
}
func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable) *ingestBackfillScheduler {
return &ingestBackfillScheduler{
ctx: ctx,
reorgInfo: info,
tbl: tbl,
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
poolErr: make(chan error),
}
}
func (b *ingestBackfillScheduler) setupWorkers() error {
job := b.reorgInfo.Job
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
logutil.BgLogger().Error(ingest.LitErrGetBackendFail, zap.Int64("job ID", job.ID))
return errors.Trace(errors.New("cannot get lightning backend"))
}
b.backendCtx = bc
copReqSenderPool, err := b.createCopReqSenderPool()
if err != nil {
return errors.Trace(err)
}
b.copReqSenderPool = copReqSenderPool
readerCnt, writerCnt := b.expectedWorkerSize()
skipReg := workerpool.OptionSkipRegister[idxRecResult]{}
writerPool, err := workerpool.NewWorkerPool[idxRecResult]("ingest_writer",
poolutil.DDL, writerCnt, b.createWorker, skipReg)
if err != nil {
return errors.Trace(err)
}
b.writerPool = writerPool
b.copReqSenderPool.chunkSender = writerPool
b.copReqSenderPool.adjustSize(readerCnt)
return nil
}
func (b *ingestBackfillScheduler) close(force bool) {
if b.closed {
return
}
close(b.taskCh)
b.copReqSenderPool.close(force)
b.writerPool.ReleaseAndWait()
close(b.resultCh)
if !force {
jobID := b.reorgInfo.ID
indexID := b.reorgInfo.currElement.ID
if bc, ok := ingest.LitBackCtxMgr.Load(jobID); ok {
bc.EngMgr.ResetWorkers(bc, jobID, indexID)
}
}
b.closed = true
}
func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) {
b.taskCh <- task
}
func (b *ingestBackfillScheduler) drainTasks() {
for len(b.taskCh) > 0 {
<-b.taskCh
}
}
func (b *ingestBackfillScheduler) receiveResult() (*backfillResult, bool) {
select {
case err := <-b.poolErr:
return &backfillResult{err: err}, true
case rs, ok := <-b.resultCh:
return rs, ok
}
}
func (b *ingestBackfillScheduler) currentWorkerSize() int {
return int(b.writerPool.Cap())
}
func (b *ingestBackfillScheduler) adjustWorkerSize() error {
readerCnt, writer := b.expectedWorkerSize()
b.writerPool.Tune(int32(writer))
b.copReqSenderPool.adjustSize(readerCnt)
return nil
}
func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo)
if err != nil {
b.poolErr <- err
return nil
}
bcCtx := b.backendCtx
ei, err := bcCtx.EngMgr.Register(bcCtx, job.ID, b.reorgInfo.currElement.ID, job.SchemaName, job.TableName)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
b.poolErr <- err
return nil
}
logutil.BgLogger().Warn("[ddl-ingest] cannot create new writer", zap.Error(err),
zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return nil
}
worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID,
reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
b.poolErr <- err
return nil
}
logutil.BgLogger().Warn("[ddl-ingest] cannot create new writer", zap.Error(err),
zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return nil
}
b.writerMaxID++
return worker
}
func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, error) {
indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID)
if indexInfo == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender",
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return nil, errors.New("cannot find index info")
}
sessCtx, err := newSessCtx(b.reorgInfo)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return nil, err
}
copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return nil, err
}
return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh), nil
}
func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) {
workerCnt := int(variable.GetDDLReorgWorkerCounter())
readerSize = mathutil.Min(workerCnt/2, maxBackfillWorkerSize)
readerSize = mathutil.Max(readerSize, 1)
writerSize = mathutil.Min(workerCnt/2+2, maxBackfillWorkerSize)
return readerSize, writerSize
}
func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
defer util.Recover(metrics.LabelDDL, "ingestWorker.HandleTask", func() {
w.resultCh <- &backfillResult{taskID: rs.id, err: dbterror.ErrReorgPanic}
}, false)
result := &backfillResult{
taskID: rs.id,
err: rs.err,
}
if result.err != nil {
logutil.BgLogger().Error("[ddl-ingest] finish a cop-request task with error",
zap.Int("id", rs.id), zap.Error(rs.err))
w.resultCh <- result
return
}
err := w.d.isReorgRunnable(w.jobID, false)
if err != nil {
result.err = err
w.resultCh <- result
return
}
count, nextKey, err := w.WriteLocal(&rs)
if err != nil {
result.err = err
w.resultCh <- result
return
}
if count == 0 {
logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", zap.Int("id", rs.id))
if bc, ok := ingest.LitBackCtxMgr.Load(w.jobID); ok {
err := bc.Flush(w.index.Meta().ID)
if err != nil {
result.err = err
w.resultCh <- result
}
}
return
}
result.scanCount = count
result.addedCount = count
result.nextKey = nextKey
w.metricCounter.Add(float64(count))
if ResultCounterForTest != nil && result.err == nil {
ResultCounterForTest.Add(1)
}
w.resultCh <- result
}
func (w *addIndexIngestWorker) Close() {}

View File

@ -157,10 +157,6 @@ func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, worker
var bf backfiller
bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, schemaName, tbl, d.jobContext(jobID), "add_idx_rate", true))
if err != nil {
if canSkipError(jobID, len(bwCtx.backfillWorkers), err) {
err = nil
continue
}
logutil.BgLogger().Error("[ddl] new backfill worker context, do bfFunc failed", zap.Int64("jobID", jobID), zap.Error(err))
return nil, errors.Trace(err)
}

View File

@ -32,8 +32,16 @@ func SetBatchInsertDeleteRangeSize(i int) {
var NewCopContext4Test = newCopContext
type resultChanForTest struct {
ch chan idxRecResult
}
func (r *resultChanForTest) AddTask(rs idxRecResult) {
r.ch <- rs
}
func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endKey kv.Key, store kv.Storage,
batchSize int) (*chunk.Chunk, error) {
batchSize int) *chunk.Chunk {
variable.SetDDLReorgBatchSize(int32(batchSize))
task := &reorgBackfillTask{
id: 1,
@ -41,12 +49,16 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK
endKey: endKey,
physicalTable: tbl,
}
pool := newCopReqSenderPool(context.Background(), copCtx, store)
taskCh := make(chan *reorgBackfillTask, 5)
resultCh := make(chan idxRecResult, 5)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh)
pool.chunkSender = &resultChanForTest{ch: resultCh}
pool.adjustSize(1)
pool.tasksCh <- task
copChunk, err := pool.fetchChunk()
rs := <-resultCh
close(taskCh)
pool.close(false)
return copChunk, err
return rs.chunk
}
func ConvertRowToHandleAndIndexDatum(row chunk.Row, copCtx *copContext) (kv.Handle, []types.Datum, error) {

View File

@ -51,6 +51,7 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
@ -1665,158 +1666,86 @@ func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords [
}
type addIndexIngestWorker struct {
*backfillCtx
d *ddlCtx
metricCounter prometheus.Counter
sessCtx sessionctx.Context
tbl table.PhysicalTable
index table.Index
writerCtx *ingest.WriterContext
copReqSenderPool *copReqSenderPool
done bool
resultCh chan *backfillResult
jobID int64
}
func newAddIndexIngestWorker(t table.PhysicalTable, bfCtx *backfillCtx, jobID, eleID int64, eleTypeKey []byte) (*addIndexIngestWorker, error) {
if !bytes.Equal(eleTypeKey, meta.IndexElementKey) {
logutil.BgLogger().Error("Element type for addIndexIngestWorker incorrect",
zap.Int64("job ID", jobID), zap.ByteString("element type", eleTypeKey), zap.Int64("element ID", eleID))
return nil, errors.Errorf("element type is not index, typeKey: %v", eleTypeKey)
}
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, eleID)
func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo,
resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int,
copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context) (*addIndexIngestWorker, error) {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, indexID)
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
var lwCtx *ingest.WriterContext
bc, ok := ingest.LitBackCtxMgr.Load(jobID)
if !ok {
return nil, errors.Trace(errors.New(ingest.LitErrGetBackendFail))
}
ei, err := bc.EngMgr.Register(bc, jobID, eleID, bfCtx.schemaName, t.Meta().Name.O)
if err != nil {
return nil, errors.Trace(err)
}
lwCtx, err = ei.NewWriterCtx(bfCtx.id, indexInfo.Unique)
lwCtx, err := ei.NewWriterCtx(writerID, indexInfo.Unique)
if err != nil {
return nil, err
}
return &addIndexIngestWorker{
backfillCtx: bfCtx,
index: index,
writerCtx: lwCtx,
d: d,
sessCtx: sessCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)),
tbl: t,
index: index,
writerCtx: lwCtx,
copReqSenderPool: copReqSenderPool,
resultCh: resultCh,
jobID: jobID,
}, nil
}
func (w *addIndexIngestWorker) AddMetricInfo(count float64) {
if w.done {
return
// WriteLocal will write index records to lightning engine.
func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey kv.Key, err error) {
oprStartTime := time.Now()
copCtx := w.copReqSenderPool.copCtx
vars := w.sessCtx.GetSessionVars()
cnt, lastHandle, err := writeChunkToLocal(w.writerCtx, w.index, copCtx, vars, rs.chunk)
if err != nil || cnt == 0 {
w.copReqSenderPool.recycleChunk(rs.chunk)
return 0, nil, err
}
w.metricCounter.Add(count)
}
func (*addIndexIngestWorker) GetTasks() ([]*BackfillJob, error) {
return nil, nil
}
func (w *addIndexIngestWorker) UpdateTask(bfJob *BackfillJob) error {
s := newSession(w.backfillCtx.sessCtx)
return s.runInTxn(func(se *session) error {
jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%s'", bfJob.keyString()), "update_backfill_task")
if err != nil {
return err
}
if len(jobs) == 0 {
return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job")
}
if jobs[0].InstanceID != bfJob.InstanceID {
return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill job %v, want instance ID %s", jobs[0], bfJob.InstanceID))
}
currTime, err := GetOracleTimeWithStartTS(se)
if err != nil {
return err
}
bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease)
return updateBackfillJob(se, BackgroundSubtaskTable, bfJob, "update_backfill_task")
})
}
func (w *addIndexIngestWorker) FinishTask(bfJob *BackfillJob) error {
s := newSession(w.backfillCtx.sessCtx)
return s.runInTxn(func(se *session) error {
txn, err := se.txn()
if err != nil {
return errors.Trace(err)
}
bfJob.StateUpdateTS = txn.StartTS()
err = RemoveBackfillJob(se, false, bfJob)
if err != nil {
return err
}
return AddBackfillHistoryJob(se, []*BackfillJob{bfJob})
})
}
func (w *addIndexIngestWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}
func (*addIndexIngestWorker) String() string {
return "ingest index"
}
// BackfillData will ingest index records through lightning engine.
func (w *addIndexIngestWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, err error) {
taskCtx.nextKey = handleRange.startKey
var total int
for {
oprStartTime := time.Now()
copChunk, err := w.copReqSenderPool.fetchChunk()
if err != nil {
return taskCtx, err
}
if copChunk == nil {
// No more chunks.
break
}
copCtx := w.copReqSenderPool.copCtx
vars := w.sessCtx.GetSessionVars()
cnt, err := writeChunkToLocal(w.writerCtx, w.index, copCtx, vars, copChunk)
if err != nil {
w.copReqSenderPool.recycleChunk(copChunk)
return taskCtx, err
}
total += cnt
w.copReqSenderPool.recycleChunk(copChunk)
w.AddMetricInfo(float64(cnt))
logSlowOperations(time.Since(oprStartTime), "writeChunkToLocal", 3000)
}
taskCtx.scanCount = total
taskCtx.addedCount = total
taskCtx.nextKey = handleRange.endKey
taskCtx.done = true
w.done = true
return taskCtx, nil
w.copReqSenderPool.recycleChunk(rs.chunk)
w.metricCounter.Add(float64(cnt))
logSlowOperations(time.Since(oprStartTime), "writeChunkToLocal", 3000)
nextKey = tablecodec.EncodeRecordKey(w.tbl.RecordPrefix(), lastHandle)
return cnt, nextKey, nil
}
func writeChunkToLocal(writerCtx *ingest.WriterContext,
index table.Index, copCtx *copContext, vars *variable.SessionVars,
copChunk *chunk.Chunk) (int, error) {
copChunk *chunk.Chunk) (int, kv.Handle, error) {
sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs()
iter := chunk.NewIterator4Chunk(copChunk)
idxDataBuf := make([]types.Datum, len(copCtx.idxColOutputOffsets))
handleDataBuf := make([]types.Datum, len(copCtx.handleOutputOffsets))
count := 0
var lastHandle kv.Handle
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
idxDataBuf, handleDataBuf = idxDataBuf[:0], handleDataBuf[:0]
idxDataBuf = extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, idxDataBuf)
handleDataBuf := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, handleDataBuf)
handle, err := buildHandle(handleDataBuf, copCtx.tblInfo, copCtx.pkInfo, sCtx)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}
rsData := getRestoreData(copCtx.tblInfo, copCtx.idxInfo, copCtx.pkInfo, handleDataBuf)
err = writeOneKVToLocal(writerCtx, index, sCtx, writeBufs, idxDataBuf, rsData, handle)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}
count++
lastHandle = handle
}
return count, nil
return count, lastHandle, nil
}
func writeOneKVToLocal(writerCtx *ingest.WriterContext,

View File

@ -59,23 +59,14 @@ func copReadChunkPoolSize() int {
return 10 * int(variable.GetDDLReorgWorkerCounter())
}
func (c *copReqSenderPool) fetchChunk() (*chunk.Chunk, error) {
rs, ok := <-c.resultsCh
if !ok {
return nil, nil
}
if rs.err != nil {
logutil.BgLogger().Error("[ddl-ingest] finish a cop-request task with error",
zap.Int("id", rs.id), zap.Error(rs.err))
} else if rs.done {
logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", zap.Int("id", rs.id))
}
return rs.chunk, rs.err
// chunkSender is used to receive the result of coprocessor request.
type chunkSender interface {
AddTask(idxRecResult)
}
type copReqSenderPool struct {
tasksCh chan *reorgBackfillTask
resultsCh chan idxRecResult
tasksCh chan *reorgBackfillTask
chunkSender chunkSender
ctx context.Context
copCtx *copContext
@ -100,7 +91,7 @@ func (c *copReqSender) run() {
defer p.wg.Done()
var curTaskID int
defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() {
p.resultsCh <- idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic}
p.chunkSender.AddTask(idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic})
}, false)
for {
if util.HasCancelled(c.ctx) {
@ -115,12 +106,12 @@ func (c *copReqSender) run() {
zap.Int("id", task.id), zap.String("task", task.String()))
ver, err := p.store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.chunkSender.AddTask(idxRecResult{id: task.id, err: err})
return
}
rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.chunkSender.AddTask(idxRecResult{id: task.id, err: err})
return
}
failpoint.Inject("MockCopSenderPanic", func(val failpoint.Value) {
@ -133,26 +124,26 @@ func (c *copReqSender) run() {
srcChk := p.getChunk()
done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.chunkSender.AddTask(idxRecResult{id: task.id, err: err})
p.recycleChunk(srcChk)
terror.Call(rs.Close)
return
}
p.resultsCh <- idxRecResult{id: task.id, chunk: srcChk, done: done}
p.chunkSender.AddTask(idxRecResult{id: task.id, chunk: srcChk, done: done})
}
terror.Call(rs.Close)
}
}
func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage) *copReqSenderPool {
func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage,
taskCh chan *reorgBackfillTask) *copReqSenderPool {
poolSize := copReadChunkPoolSize()
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, copReadBatchSize())
}
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultsCh: make(chan idxRecResult, backfillTaskChanSize),
tasksCh: taskCh,
ctx: ctx,
copCtx: copCtx,
store: store,
@ -162,10 +153,6 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Stora
}
}
func (c *copReqSenderPool) sendTask(task *reorgBackfillTask) {
c.tasksCh <- task
}
func (c *copReqSenderPool) adjustSize(n int) {
// Add some senders.
for i := len(c.senders); i < n; i++ {
@ -192,7 +179,6 @@ func (c *copReqSenderPool) close(force bool) {
return
}
logutil.BgLogger().Info("[ddl-ingest] close cop-request sender pool")
close(c.tasksCh)
if force {
for _, w := range c.senders {
w.cancel()
@ -200,7 +186,6 @@ func (c *copReqSenderPool) close(force bool) {
}
// Wait for all cop-req senders to exit.
c.wg.Wait()
close(c.resultsCh)
c.closed = true
}

View File

@ -45,7 +45,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(t, err)
copChunk, err := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10)
copChunk := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10)
require.NoError(t, err)
require.NoError(t, txn.Rollback())

View File

@ -139,7 +139,7 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, eleID in
}
}
// BackfillDataInTxn merge temp index data in txn.
// BackfillData merge temp index data in txn.
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())

View File

@ -89,7 +89,9 @@ func (bc *BackendContext) Flush(indexID int64) error {
return err
}
if bc.diskRoot.CurrentUsage() >= uint64(importThreshold*float64(bc.diskRoot.MaxQuota())) {
release := ei.AcquireFlushLock()
if release != nil && bc.diskRoot.CurrentUsage() >= uint64(importThreshold*float64(bc.diskRoot.MaxQuota())) {
defer release()
// TODO: it should be changed according checkpoint solution.
// Flush writer cached data into local disk for engine first.
err := ei.Flush()

View File

@ -30,9 +30,9 @@ import (
"go.uber.org/zap"
)
// One engine for one index reorg task, each task will create several new writers under the
// Opened Engine. Note engineInfo is not thread safe.
type engineInfo struct {
// EngineInfo is the engine for one index reorg task, each task will create several new writers under the
// Opened Engine. Note EngineInfo is not thread safe.
type EngineInfo struct {
ctx context.Context
jobID int64
indexID int64
@ -44,12 +44,13 @@ type engineInfo struct {
memRoot MemRoot
diskRoot DiskRoot
rowSeq atomic.Int64
flushing atomic.Bool
}
// NewEngineInfo create a new EngineInfo struct.
func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig,
en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *engineInfo {
return &engineInfo{
en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *EngineInfo {
return &EngineInfo{
ctx: ctx,
jobID: jobID,
indexID: indexID,
@ -64,7 +65,7 @@ func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin
}
// Flush imports all the key-values in engine to the storage.
func (ei *engineInfo) Flush() error {
func (ei *EngineInfo) Flush() error {
err := ei.openedEngine.Flush(ei.ctx)
if err != nil {
logutil.BgLogger().Error(LitErrFlushEngineErr, zap.Error(err),
@ -74,7 +75,19 @@ func (ei *engineInfo) Flush() error {
return nil
}
func (ei *engineInfo) Clean() {
// AcquireFlushLock acquires the flush lock of the engine.
func (ei *EngineInfo) AcquireFlushLock() (release func()) {
ok := ei.flushing.CompareAndSwap(false, true)
if !ok {
return nil
}
return func() {
ei.flushing.Store(false)
}
}
// Clean closes the engine and removes the local intermediate files.
func (ei *EngineInfo) Clean() {
if ei.openedEngine == nil {
return
}
@ -98,7 +111,8 @@ func (ei *engineInfo) Clean() {
}
}
func (ei *engineInfo) ImportAndClean() error {
// ImportAndClean imports the engine data to TiKV and cleans up the local intermediate files.
func (ei *EngineInfo) ImportAndClean() error {
// Close engine and finish local tasks of lightning.
logutil.BgLogger().Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
indexEngine := ei.openedEngine
@ -151,7 +165,8 @@ type WriterContext struct {
lWrite *backend.LocalEngineWriter
}
func (ei *engineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error) {
// NewWriterCtx creates a new WriterContext.
func (ei *EngineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error) {
ei.memRoot.RefreshConsumption()
ok := ei.memRoot.CheckConsume(StructSizeWriterCtx)
if !ok {
@ -179,7 +194,7 @@ func (ei *engineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error)
// If local writer not exist, then create new one and store it into engine info writer cache.
// note: operate ei.writeCache map is not thread safe please make sure there is sync mechanism to
// make sure the safe.
func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContext, error) {
func (ei *EngineInfo) newWriterContext(workerID int, unique bool) (*WriterContext, error) {
lWrite, exist := ei.writerCache.Load(workerID)
if !exist {
var err error
@ -198,7 +213,7 @@ func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContex
return wc, nil
}
func (ei *engineInfo) closeWriters() error {
func (ei *EngineInfo) closeWriters() error {
var firstErr error
for _, wid := range ei.writerCache.Keys() {
if w, ok := ei.writerCache.Load(wid); ok {

View File

@ -25,19 +25,19 @@ import (
)
type engineManager struct {
generic.SyncMap[int64, *engineInfo]
generic.SyncMap[int64, *EngineInfo]
MemRoot MemRoot
DiskRoot DiskRoot
}
func (m *engineManager) init(memRoot MemRoot, diskRoot DiskRoot) {
m.SyncMap = generic.NewSyncMap[int64, *engineInfo](10)
m.SyncMap = generic.NewSyncMap[int64, *EngineInfo](10)
m.MemRoot = memRoot
m.DiskRoot = diskRoot
}
// Register create a new engineInfo and register it to the engineManager.
func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schemaName, tableName string) (*engineInfo, error) {
// Register create a new EngineInfo and register it to the engineManager.
func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schemaName, tableName string) (*EngineInfo, error) {
// Calculate lightning concurrency degree and set memory usage
// and pre-allocate memory usage for worker.
m.MemRoot.RefreshConsumption()
@ -87,7 +87,7 @@ func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schem
return en, nil
}
// Unregister delete the engineInfo from the engineManager.
// Unregister delete the EngineInfo from the engineManager.
func (m *engineManager) Unregister(jobID, indexID int64) {
ei, exist := m.Load(indexID)
if !exist {
@ -101,7 +101,7 @@ func (m *engineManager) Unregister(jobID, indexID int64) {
m.MemRoot.Release(StructSizeEngineInfo)
}
// ResetWorkers reset the writer count of the engineInfo because
// ResetWorkers reset the writer count of the EngineInfo because
// the goroutines of backfill workers have been terminated.
func (m *engineManager) ResetWorkers(bc *BackendContext, jobID, indexID int64) {
ei, exist := m.Load(indexID)
@ -115,7 +115,7 @@ func (m *engineManager) ResetWorkers(bc *BackendContext, jobID, indexID int64) {
ei.writerCount = 0
}
// UnregisterAll delete all engineInfo from the engineManager.
// UnregisterAll delete all EngineInfo from the engineManager.
func (m *engineManager) UnregisterAll(jobID int64) {
for _, idxID := range m.Keys() {
m.Unregister(jobID, idxID)

View File

@ -45,7 +45,7 @@ var (
func init() {
StructSizeBackendCtx = int64(unsafe.Sizeof(BackendContext{}))
StructSizeEngineInfo = int64(unsafe.Sizeof(engineInfo{}))
StructSizeEngineInfo = int64(unsafe.Sizeof(EngineInfo{}))
StructSizeWriterCtx = int64(unsafe.Sizeof(WriterContext{}))
}

View File

@ -102,11 +102,11 @@ func (p *WorkerPool[T]) handleTaskWithRecover(w Worker[T], task T) {
}
func (p *WorkerPool[T]) runAWorker() {
w := p.createWorker()
if w == nil {
return // Fail to create worker, quit.
}
p.wg.Run(func() {
w := p.createWorker()
if w == nil {
return // Fail to create worker, quit.
}
for {
select {
case task := <-p.taskChan: