diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 56c8ba19dc..41e881af19 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -86,6 +86,7 @@ go_library( "//parser/terror", "//parser/types", "//privilege", + "//resourcemanager/pool/workerpool", "//resourcemanager/pooltask", "//resourcemanager/util", "//sessionctx", diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 00763703ca..b20017ca18 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/ddl/ingest" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -386,11 +385,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) { if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok { - switch w := w.backfiller.(type) { - case *addIndexTxnWorker: - indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID) - w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo) - case *addIndexIngestWorker: + if w, ok := w.backfiller.(*addIndexTxnWorker); ok { indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID) w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo) } @@ -523,24 +518,28 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey } type resultConsumer struct { - dc *ddlCtx - wg *sync.WaitGroup - err error - hasError *atomic.Bool + dc *ddlCtx + wg *sync.WaitGroup + err error + hasError *atomic.Bool + reorgInfo *reorgInfo // reorgInfo is used to update the reorg handle. + sessPool *sessionPool // sessPool is used to get the session to update the reorg handle. } -func newResultConsumer(dc *ddlCtx) *resultConsumer { +func newResultConsumer(dc *ddlCtx, reorgInfo *reorgInfo, sessPool *sessionPool) *resultConsumer { return &resultConsumer{ - dc: dc, - wg: &sync.WaitGroup{}, - hasError: &atomic.Bool{}, + dc: dc, + wg: &sync.WaitGroup{}, + hasError: &atomic.Bool{}, + reorgInfo: reorgInfo, + sessPool: sessPool, } } -func (s *resultConsumer) run(scheduler *backfillScheduler, start kv.Key, totalAddedCount *int64) { +func (s *resultConsumer) run(scheduler backfillScheduler, start kv.Key, totalAddedCount *int64) { s.wg.Add(1) go func() { - reorgInfo := scheduler.reorgInfo + reorgInfo := s.reorgInfo err := consumeResults(scheduler, s, start, totalAddedCount) if err != nil { logutil.BgLogger().Warn("[ddl] backfill worker handle tasks failed", @@ -567,11 +566,15 @@ func (s *resultConsumer) shouldAbort() bool { return s.hasError.Load() } -func consumeResults(scheduler *backfillScheduler, consumer *resultConsumer, start kv.Key, totalAddedCount *int64) error { +func consumeResults(scheduler backfillScheduler, consumer *resultConsumer, start kv.Key, totalAddedCount *int64) error { keeper := newDoneTaskKeeper(start) handledTaskCnt := 0 var firstErr error - for result := range scheduler.resultCh { + for { + result, ok := scheduler.receiveResult() + if !ok { + return firstErr + } err := handleOneResult(result, scheduler, consumer, keeper, totalAddedCount, handledTaskCnt) handledTaskCnt++ if err != nil && firstErr == nil { @@ -579,33 +582,28 @@ func consumeResults(scheduler *backfillScheduler, consumer *resultConsumer, star firstErr = err } } - return firstErr } -func handleOneResult(result *backfillResult, scheduler *backfillScheduler, consumer *resultConsumer, +func handleOneResult(result *backfillResult, scheduler backfillScheduler, consumer *resultConsumer, keeper *doneTaskKeeper, totalAddedCount *int64, taskSeq int) error { - reorgInfo := scheduler.reorgInfo + reorgInfo := consumer.reorgInfo if result.err != nil { logutil.BgLogger().Warn("[ddl] backfill worker failed", zap.Int64("job ID", reorgInfo.ID), zap.String("result next key", hex.EncodeToString(result.nextKey)), zap.Error(result.err)) - // Drain tasks to make it quit early. - for len(scheduler.taskCh) > 0 { - <-scheduler.taskCh - } + scheduler.drainTasks() // Make it quit early. return result.err } *totalAddedCount += int64(result.addedCount) + reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID) + reorgCtx.setRowCount(*totalAddedCount) keeper.updateNextKey(result.taskID, result.nextKey) - if taskSeq%(scheduler.workerSize()*4) == 0 { + if taskSeq%(scheduler.currentWorkerSize()*4) == 0 { err := consumer.dc.isReorgRunnable(reorgInfo.ID, false) if err != nil { logutil.BgLogger().Warn("[ddl] backfill worker is not runnable", zap.Error(err)) - // Drain tasks to make it quit early. - for len(scheduler.taskCh) > 0 { - <-scheduler.taskCh - } + scheduler.drainTasks() // Make it quit early. return err } failpoint.Inject("MockGetIndexRecordErr", func() { @@ -614,7 +612,7 @@ func handleOneResult(result *backfillResult, scheduler *backfillScheduler, consu time.Sleep(50 * time.Millisecond) } }) - err = reorgInfo.UpdateReorgMeta(keeper.nextKey, scheduler.sessPool) + err = reorgInfo.UpdateReorgMeta(keeper.nextKey, consumer.sessPool) if err != nil { logutil.BgLogger().Warn("[ddl] update reorg meta failed", zap.Int64("job ID", reorgInfo.ID), zap.Error(err)) @@ -680,9 +678,9 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, } // sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled. -func sendTasks(scheduler *backfillScheduler, consumer *resultConsumer, t table.PhysicalTable, - kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { - batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize) +func sendTasks(scheduler backfillScheduler, consumer *resultConsumer, t table.PhysicalTable, + kvRanges []kv.KeyRange, reorgInfo *reorgInfo) ([]kv.KeyRange, error) { + batchTasks := getBatchTasks(t, reorgInfo, kvRanges, backfillTaskChanSize) if len(batchTasks) == 0 { return nil, nil } @@ -691,10 +689,7 @@ func sendTasks(scheduler *backfillScheduler, consumer *resultConsumer, t table.P if consumer.shouldAbort() { return nil, nil } - if scheduler.copReqSenderPool != nil { - scheduler.copReqSenderPool.sendTask(task) - } - scheduler.taskCh <- task + scheduler.sendTask(task) } if len(batchTasks) < len(kvRanges) { @@ -710,7 +705,7 @@ var ( // TestCheckWorkerNumCh use for test adjust backfill worker. TestCheckWorkerNumCh = make(chan *sync.WaitGroup) // TestCheckWorkerNumber use for test adjust backfill worker. - TestCheckWorkerNumber = int32(1) + TestCheckWorkerNumber = int32(variable.DefTiDBDDLReorgWorkerCount) // TestCheckReorgTimeout is used to mock timeout when reorg data. TestCheckReorgTimeout = int32(0) ) @@ -759,7 +754,7 @@ func setSessCtxLocation(sctx sessionctx.Context, tzLocation *model.TimeZoneLocat return nil } -var backfillTaskChanSize = 1024 +var backfillTaskChanSize = 128 // SetBackfillTaskChanSizeForTest is only used for test. func SetBackfillTaskChanSizeForTest(n int) { @@ -786,11 +781,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic totalAddedCount := job.GetRowCount() startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey - sessCtx := newContext(reorgInfo.d.store) - decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t) - if err != nil { - return errors.Trace(err) - } if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { return errors.Trace(err) @@ -807,21 +797,21 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic }) jc := dc.jobContext(job.ID) - scheduler := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, decodeColMap, jc) + sessCtx := newContext(reorgInfo.d.store) + scheduler, err := newBackfillScheduler(dc.ctx, reorgInfo, sessPool, bfWorkerType, t, sessCtx, jc) + if err != nil { + return errors.Trace(err) + } defer scheduler.close(true) - var ingestBeCtx *ingest.BackendContext - if bfWorkerType == typeAddIndexWorker && reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok { - ingestBeCtx = bc - defer bc.EngMgr.ResetWorkers(bc, job.ID, reorgInfo.currElement.ID) - } else { - return errors.New(ingest.LitErrGetBackendFail) - } + consumer := newResultConsumer(dc, reorgInfo, sessPool) + consumer.run(scheduler, startKey, &totalAddedCount) + + err = scheduler.setupWorkers() + if err != nil { + return errors.Trace(err) } - consumer := newResultConsumer(dc) - consumer.run(scheduler, startKey, &totalAddedCount) for { kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize) if err != nil { @@ -831,26 +821,14 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic break } - scheduler.setMaxWorkerSize(len(kvRanges)) - err = scheduler.adjustWorkerSize() - if err != nil { - return errors.Trace(err) - } - logutil.BgLogger().Info("[ddl] start backfill workers to reorg record", zap.Stringer("type", bfWorkerType), - zap.Int("workerCnt", scheduler.workerSize()), + zap.Int("workerCnt", scheduler.currentWorkerSize()), zap.Int("regionCnt", len(kvRanges)), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) - if ingestBeCtx != nil { - err := ingestBeCtx.Flush(reorgInfo.currElement.ID) - if err != nil { - return errors.Trace(err) - } - } - remains, err := sendTasks(scheduler, consumer, t, kvRanges) + remains, err := sendTasks(scheduler, consumer, t, kvRanges, reorgInfo) if err != nil { return errors.Trace(err) } diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index f81046e4ae..65f7789859 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -20,11 +20,17 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/resourcemanager/pool/workerpool" + poolutil "github.com/pingcap/tidb/resourcemanager/util" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" decoder "github.com/pingcap/tidb/util/rowDecoder" @@ -32,7 +38,26 @@ import ( ) // backfillScheduler is used to manage the lifetime of backfill workers. -type backfillScheduler struct { +type backfillScheduler interface { + setupWorkers() error + close(force bool) + + sendTask(task *reorgBackfillTask) + drainTasks() + receiveResult() (*backfillResult, bool) + + currentWorkerSize() int + adjustWorkerSize() error +} + +var ( + _ backfillScheduler = &txnBackfillScheduler{} + _ backfillScheduler = &ingestBackfillScheduler{} +) + +const maxBackfillWorkerSize = 16 + +type txnBackfillScheduler struct { ctx context.Context reorgInfo *reorgInfo sessPool *sessionPool @@ -43,19 +68,29 @@ type backfillScheduler struct { workers []*backfillWorker wg sync.WaitGroup - maxSize int taskCh chan *reorgBackfillTask resultCh chan *backfillResult closed bool - - copReqSenderPool *copReqSenderPool // for add index in ingest way. } func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool, - tp backfillerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column, - jobCtx *JobContext) *backfillScheduler { - return &backfillScheduler{ + tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context, + jobCtx *JobContext) (backfillScheduler, error) { + if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { + return newIngestBackfillScheduler(ctx, info, tbl), nil + } + return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx) +} + +func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool, + tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context, + jobCtx *JobContext) (backfillScheduler, error) { + decColMap, err := makeupDecodeColMap(sessCtx, info.dbInfo.Name, tbl) + if err != nil { + return nil, err + } + return &txnBackfillScheduler{ ctx: ctx, reorgInfo: info, sessPool: sessPool, @@ -66,11 +101,29 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessio workers: make([]*backfillWorker, 0, variable.GetDDLReorgWorkerCounter()), taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), resultCh: make(chan *backfillResult, backfillTaskChanSize), + }, nil +} + +func (b *txnBackfillScheduler) setupWorkers() error { + return b.adjustWorkerSize() +} + +func (b *txnBackfillScheduler) sendTask(task *reorgBackfillTask) { + b.taskCh <- task +} + +func (b *txnBackfillScheduler) drainTasks() { + for len(b.taskCh) > 0 { + <-b.taskCh } } -func (b *backfillScheduler) newSessCtx() (sessionctx.Context, error) { - reorgInfo := b.reorgInfo +func (b *txnBackfillScheduler) receiveResult() (*backfillResult, bool) { + ret, ok := <-b.resultCh + return ret, ok +} + +func newSessCtx(reorgInfo *reorgInfo) (sessionctx.Context, error) { sessCtx := newContext(reorgInfo.d.store) if err := initSessCtx(sessCtx, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location); err != nil { return nil, errors.Trace(err) @@ -106,38 +159,26 @@ func initSessCtx(sessCtx sessionctx.Context, sqlMode mysql.SQLMode, tzLocation * return nil } -func (b *backfillScheduler) setMaxWorkerSize(maxSize int) { - b.maxSize = maxSize -} - -func (b *backfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) { +func (b *txnBackfillScheduler) expectedWorkerSize() (size int) { workerCnt := int(variable.GetDDLReorgWorkerCounter()) - if b.tp == typeAddIndexWorker && b.reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - readerSize = mathutil.Min(workerCnt/2, b.maxSize) - readerSize = mathutil.Max(readerSize, 1) - writerSize = mathutil.Min(workerCnt/2+2, b.maxSize) - return readerSize, writerSize - } - workerCnt = mathutil.Min(workerCnt, b.maxSize) - return workerCnt, workerCnt + return mathutil.Min(workerCnt, maxBackfillWorkerSize) } -func (b *backfillScheduler) workerSize() int { +func (b *txnBackfillScheduler) currentWorkerSize() int { return len(b.workers) } -func (b *backfillScheduler) adjustWorkerSize() error { - b.initCopReqSenderPool() +func (b *txnBackfillScheduler) adjustWorkerSize() error { reorgInfo := b.reorgInfo job := reorgInfo.Job jc := b.jobCtx if err := loadDDLReorgVars(b.ctx, b.sessPool); err != nil { logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err)) } - readerCnt, writerCnt := b.expectedWorkerSize() + workerCnt := b.expectedWorkerSize() // Increase the worker. - for i := len(b.workers); i < writerCnt; i++ { - sessCtx, err := b.newSessCtx() + for i := len(b.workers); i < workerCnt; i++ { + sessCtx, err := newSessCtx(b.reorgInfo) if err != nil { return err } @@ -148,27 +189,13 @@ func (b *backfillScheduler) adjustWorkerSize() error { switch b.tp { case typeAddIndexWorker: backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "add_idx_rate", false) - if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - idxWorker, err := newAddIndexIngestWorker(b.tbl, backfillCtx, - job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - if canSkipError(b.reorgInfo.ID, len(b.workers), err) { - continue - } - return err - } - idxWorker.copReqSenderPool = b.copReqSenderPool - runner = newBackfillWorker(jc.ddlJobCtx, idxWorker) - worker = idxWorker - } else { - idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx, - job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) - if err != nil { - return err - } - runner = newBackfillWorker(jc.ddlJobCtx, idxWorker) - worker = idxWorker + idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx, + job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return err } + runner = newBackfillWorker(jc.ddlJobCtx, idxWorker) + worker = idxWorker case typeAddIndexMergeTmpWorker: backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false) tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.currElement.ID) @@ -202,59 +229,18 @@ func (b *backfillScheduler) adjustWorkerSize() error { go runner.run(reorgInfo.d, worker, job) } // Decrease the worker. - if len(b.workers) > writerCnt { - workers := b.workers[writerCnt:] - b.workers = b.workers[:writerCnt] + if len(b.workers) > workerCnt { + workers := b.workers[workerCnt:] + b.workers = b.workers[:workerCnt] closeBackfillWorkers(workers) } - if b.copReqSenderPool != nil { - b.copReqSenderPool.adjustSize(readerCnt) - } return injectCheckBackfillWorkerNum(len(b.workers), b.tp == typeAddIndexMergeTmpWorker) } -func (b *backfillScheduler) initCopReqSenderPool() { - if b.tp != typeAddIndexWorker || b.reorgInfo.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge || - b.copReqSenderPool != nil || len(b.workers) > 0 { - return - } - indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID) - if indexInfo == nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", - zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) - return - } - sessCtx, err := b.newSessCtx() - if err != nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) - return - } - copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx) - if err != nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) - return - } - b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore()) -} - -func canSkipError(jobID int64, workerCnt int, err error) bool { - if workerCnt > 0 { - // The error can be skipped because the rest workers can handle the tasks. - return true - } - logutil.BgLogger().Warn("[ddl] create add index backfill worker failed", - zap.Int("current worker count", workerCnt), - zap.Int64("job ID", jobID), zap.Error(err)) - return false -} - -func (b *backfillScheduler) close(force bool) { +func (b *txnBackfillScheduler) close(force bool) { if b.closed { return } - if b.copReqSenderPool != nil { - b.copReqSenderPool.close(force) - } close(b.taskCh) if force { closeBackfillWorkers(b.workers) @@ -263,3 +249,220 @@ func (b *backfillScheduler) close(force bool) { close(b.resultCh) b.closed = true } + +type ingestBackfillScheduler struct { + ctx context.Context + reorgInfo *reorgInfo + tbl table.PhysicalTable + + closed bool + + taskCh chan *reorgBackfillTask + resultCh chan *backfillResult + + copReqSenderPool *copReqSenderPool + + writerPool *workerpool.WorkerPool[idxRecResult] + writerMaxID int + poolErr chan error + backendCtx *ingest.BackendContext +} + +func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable) *ingestBackfillScheduler { + return &ingestBackfillScheduler{ + ctx: ctx, + reorgInfo: info, + tbl: tbl, + taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize), + resultCh: make(chan *backfillResult, backfillTaskChanSize), + poolErr: make(chan error), + } +} + +func (b *ingestBackfillScheduler) setupWorkers() error { + job := b.reorgInfo.Job + bc, ok := ingest.LitBackCtxMgr.Load(job.ID) + if !ok { + logutil.BgLogger().Error(ingest.LitErrGetBackendFail, zap.Int64("job ID", job.ID)) + return errors.Trace(errors.New("cannot get lightning backend")) + } + b.backendCtx = bc + copReqSenderPool, err := b.createCopReqSenderPool() + if err != nil { + return errors.Trace(err) + } + b.copReqSenderPool = copReqSenderPool + readerCnt, writerCnt := b.expectedWorkerSize() + skipReg := workerpool.OptionSkipRegister[idxRecResult]{} + writerPool, err := workerpool.NewWorkerPool[idxRecResult]("ingest_writer", + poolutil.DDL, writerCnt, b.createWorker, skipReg) + if err != nil { + return errors.Trace(err) + } + b.writerPool = writerPool + b.copReqSenderPool.chunkSender = writerPool + b.copReqSenderPool.adjustSize(readerCnt) + return nil +} + +func (b *ingestBackfillScheduler) close(force bool) { + if b.closed { + return + } + close(b.taskCh) + b.copReqSenderPool.close(force) + b.writerPool.ReleaseAndWait() + close(b.resultCh) + if !force { + jobID := b.reorgInfo.ID + indexID := b.reorgInfo.currElement.ID + if bc, ok := ingest.LitBackCtxMgr.Load(jobID); ok { + bc.EngMgr.ResetWorkers(bc, jobID, indexID) + } + } + b.closed = true +} + +func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) { + b.taskCh <- task +} + +func (b *ingestBackfillScheduler) drainTasks() { + for len(b.taskCh) > 0 { + <-b.taskCh + } +} + +func (b *ingestBackfillScheduler) receiveResult() (*backfillResult, bool) { + select { + case err := <-b.poolErr: + return &backfillResult{err: err}, true + case rs, ok := <-b.resultCh: + return rs, ok + } +} + +func (b *ingestBackfillScheduler) currentWorkerSize() int { + return int(b.writerPool.Cap()) +} + +func (b *ingestBackfillScheduler) adjustWorkerSize() error { + readerCnt, writer := b.expectedWorkerSize() + b.writerPool.Tune(int32(writer)) + b.copReqSenderPool.adjustSize(readerCnt) + return nil +} + +func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult] { + reorgInfo := b.reorgInfo + job := reorgInfo.Job + sessCtx, err := newSessCtx(reorgInfo) + if err != nil { + b.poolErr <- err + return nil + } + bcCtx := b.backendCtx + ei, err := bcCtx.EngMgr.Register(bcCtx, job.ID, b.reorgInfo.currElement.ID, job.SchemaName, job.TableName) + if err != nil { + // Return an error only if it is the first worker. + if b.writerMaxID == 0 { + b.poolErr <- err + return nil + } + logutil.BgLogger().Warn("[ddl-ingest] cannot create new writer", zap.Error(err), + zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) + return nil + } + worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID, + reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx) + if err != nil { + // Return an error only if it is the first worker. + if b.writerMaxID == 0 { + b.poolErr <- err + return nil + } + logutil.BgLogger().Warn("[ddl-ingest] cannot create new writer", zap.Error(err), + zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) + return nil + } + b.writerMaxID++ + return worker +} + +func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, error) { + indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID) + if indexInfo == nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", + zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) + return nil, errors.New("cannot find index info") + } + sessCtx, err := newSessCtx(b.reorgInfo) + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) + return nil, err + } + copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx) + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) + return nil, err + } + return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh), nil +} + +func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) { + workerCnt := int(variable.GetDDLReorgWorkerCounter()) + readerSize = mathutil.Min(workerCnt/2, maxBackfillWorkerSize) + readerSize = mathutil.Max(readerSize, 1) + writerSize = mathutil.Min(workerCnt/2+2, maxBackfillWorkerSize) + return readerSize, writerSize +} + +func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) { + defer util.Recover(metrics.LabelDDL, "ingestWorker.HandleTask", func() { + w.resultCh <- &backfillResult{taskID: rs.id, err: dbterror.ErrReorgPanic} + }, false) + + result := &backfillResult{ + taskID: rs.id, + err: rs.err, + } + if result.err != nil { + logutil.BgLogger().Error("[ddl-ingest] finish a cop-request task with error", + zap.Int("id", rs.id), zap.Error(rs.err)) + w.resultCh <- result + return + } + err := w.d.isReorgRunnable(w.jobID, false) + if err != nil { + result.err = err + w.resultCh <- result + return + } + count, nextKey, err := w.WriteLocal(&rs) + if err != nil { + result.err = err + w.resultCh <- result + return + } + if count == 0 { + logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", zap.Int("id", rs.id)) + if bc, ok := ingest.LitBackCtxMgr.Load(w.jobID); ok { + err := bc.Flush(w.index.Meta().ID) + if err != nil { + result.err = err + w.resultCh <- result + } + } + return + } + result.scanCount = count + result.addedCount = count + result.nextKey = nextKey + w.metricCounter.Add(float64(count)) + if ResultCounterForTest != nil && result.err == nil { + ResultCounterForTest.Add(1) + } + w.resultCh <- result +} + +func (w *addIndexIngestWorker) Close() {} diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go index 2350310cd6..46a7cc5e1f 100644 --- a/ddl/dist_backfilling.go +++ b/ddl/dist_backfilling.go @@ -157,10 +157,6 @@ func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, worker var bf backfiller bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, schemaName, tbl, d.jobContext(jobID), "add_idx_rate", true)) if err != nil { - if canSkipError(jobID, len(bwCtx.backfillWorkers), err) { - err = nil - continue - } logutil.BgLogger().Error("[ddl] new backfill worker context, do bfFunc failed", zap.Int64("jobID", jobID), zap.Error(err)) return nil, errors.Trace(err) } diff --git a/ddl/export_test.go b/ddl/export_test.go index bce1904607..71ee731356 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -32,8 +32,16 @@ func SetBatchInsertDeleteRangeSize(i int) { var NewCopContext4Test = newCopContext +type resultChanForTest struct { + ch chan idxRecResult +} + +func (r *resultChanForTest) AddTask(rs idxRecResult) { + r.ch <- rs +} + func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endKey kv.Key, store kv.Storage, - batchSize int) (*chunk.Chunk, error) { + batchSize int) *chunk.Chunk { variable.SetDDLReorgBatchSize(int32(batchSize)) task := &reorgBackfillTask{ id: 1, @@ -41,12 +49,16 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK endKey: endKey, physicalTable: tbl, } - pool := newCopReqSenderPool(context.Background(), copCtx, store) + taskCh := make(chan *reorgBackfillTask, 5) + resultCh := make(chan idxRecResult, 5) + pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh) + pool.chunkSender = &resultChanForTest{ch: resultCh} pool.adjustSize(1) pool.tasksCh <- task - copChunk, err := pool.fetchChunk() + rs := <-resultCh + close(taskCh) pool.close(false) - return copChunk, err + return rs.chunk } func ConvertRowToHandleAndIndexDatum(row chunk.Row, copCtx *copContext) (kv.Handle, []types.Datum, error) { diff --git a/ddl/index.go b/ddl/index.go index a289980733..a3ea894cd6 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -51,6 +51,7 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" decoder "github.com/pingcap/tidb/util/rowDecoder" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" @@ -1665,158 +1666,86 @@ func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords [ } type addIndexIngestWorker struct { - *backfillCtx + d *ddlCtx + metricCounter prometheus.Counter + sessCtx sessionctx.Context + tbl table.PhysicalTable index table.Index writerCtx *ingest.WriterContext copReqSenderPool *copReqSenderPool - done bool + + resultCh chan *backfillResult + jobID int64 } -func newAddIndexIngestWorker(t table.PhysicalTable, bfCtx *backfillCtx, jobID, eleID int64, eleTypeKey []byte) (*addIndexIngestWorker, error) { - if !bytes.Equal(eleTypeKey, meta.IndexElementKey) { - logutil.BgLogger().Error("Element type for addIndexIngestWorker incorrect", - zap.Int64("job ID", jobID), zap.ByteString("element type", eleTypeKey), zap.Int64("element ID", eleID)) - return nil, errors.Errorf("element type is not index, typeKey: %v", eleTypeKey) - } - indexInfo := model.FindIndexInfoByID(t.Meta().Indices, eleID) +func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo, + resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int, + copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context) (*addIndexIngestWorker, error) { + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, indexID) index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) - - var lwCtx *ingest.WriterContext - bc, ok := ingest.LitBackCtxMgr.Load(jobID) - if !ok { - return nil, errors.Trace(errors.New(ingest.LitErrGetBackendFail)) - } - ei, err := bc.EngMgr.Register(bc, jobID, eleID, bfCtx.schemaName, t.Meta().Name.O) - if err != nil { - return nil, errors.Trace(err) - } - lwCtx, err = ei.NewWriterCtx(bfCtx.id, indexInfo.Unique) + lwCtx, err := ei.NewWriterCtx(writerID, indexInfo.Unique) if err != nil { return nil, err } return &addIndexIngestWorker{ - backfillCtx: bfCtx, - index: index, - writerCtx: lwCtx, + d: d, + sessCtx: sessCtx, + metricCounter: metrics.BackfillTotalCounter.WithLabelValues( + metrics.GenerateReorgLabel("add_idx_rate", schemaName, t.Meta().Name.O)), + tbl: t, + index: index, + writerCtx: lwCtx, + copReqSenderPool: copReqSenderPool, + resultCh: resultCh, + jobID: jobID, }, nil } -func (w *addIndexIngestWorker) AddMetricInfo(count float64) { - if w.done { - return +// WriteLocal will write index records to lightning engine. +func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey kv.Key, err error) { + oprStartTime := time.Now() + copCtx := w.copReqSenderPool.copCtx + vars := w.sessCtx.GetSessionVars() + cnt, lastHandle, err := writeChunkToLocal(w.writerCtx, w.index, copCtx, vars, rs.chunk) + if err != nil || cnt == 0 { + w.copReqSenderPool.recycleChunk(rs.chunk) + return 0, nil, err } - w.metricCounter.Add(count) -} - -func (*addIndexIngestWorker) GetTasks() ([]*BackfillJob, error) { - return nil, nil -} -func (w *addIndexIngestWorker) UpdateTask(bfJob *BackfillJob) error { - s := newSession(w.backfillCtx.sessCtx) - - return s.runInTxn(func(se *session) error { - jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%s'", bfJob.keyString()), "update_backfill_task") - if err != nil { - return err - } - if len(jobs) == 0 { - return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") - } - if jobs[0].InstanceID != bfJob.InstanceID { - return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill job %v, want instance ID %s", jobs[0], bfJob.InstanceID)) - } - - currTime, err := GetOracleTimeWithStartTS(se) - if err != nil { - return err - } - bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease) - return updateBackfillJob(se, BackgroundSubtaskTable, bfJob, "update_backfill_task") - }) -} -func (w *addIndexIngestWorker) FinishTask(bfJob *BackfillJob) error { - s := newSession(w.backfillCtx.sessCtx) - return s.runInTxn(func(se *session) error { - txn, err := se.txn() - if err != nil { - return errors.Trace(err) - } - bfJob.StateUpdateTS = txn.StartTS() - err = RemoveBackfillJob(se, false, bfJob) - if err != nil { - return err - } - return AddBackfillHistoryJob(se, []*BackfillJob{bfJob}) - }) -} -func (w *addIndexIngestWorker) GetCtx() *backfillCtx { - return w.backfillCtx -} - -func (*addIndexIngestWorker) String() string { - return "ingest index" -} - -// BackfillData will ingest index records through lightning engine. -func (w *addIndexIngestWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, err error) { - taskCtx.nextKey = handleRange.startKey - var total int - for { - oprStartTime := time.Now() - copChunk, err := w.copReqSenderPool.fetchChunk() - if err != nil { - return taskCtx, err - } - if copChunk == nil { - // No more chunks. - break - } - copCtx := w.copReqSenderPool.copCtx - vars := w.sessCtx.GetSessionVars() - cnt, err := writeChunkToLocal(w.writerCtx, w.index, copCtx, vars, copChunk) - if err != nil { - w.copReqSenderPool.recycleChunk(copChunk) - return taskCtx, err - } - total += cnt - w.copReqSenderPool.recycleChunk(copChunk) - w.AddMetricInfo(float64(cnt)) - logSlowOperations(time.Since(oprStartTime), "writeChunkToLocal", 3000) - } - taskCtx.scanCount = total - taskCtx.addedCount = total - taskCtx.nextKey = handleRange.endKey - taskCtx.done = true - w.done = true - return taskCtx, nil + w.copReqSenderPool.recycleChunk(rs.chunk) + w.metricCounter.Add(float64(cnt)) + logSlowOperations(time.Since(oprStartTime), "writeChunkToLocal", 3000) + nextKey = tablecodec.EncodeRecordKey(w.tbl.RecordPrefix(), lastHandle) + return cnt, nextKey, nil } func writeChunkToLocal(writerCtx *ingest.WriterContext, index table.Index, copCtx *copContext, vars *variable.SessionVars, - copChunk *chunk.Chunk) (int, error) { + copChunk *chunk.Chunk) (int, kv.Handle, error) { sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() iter := chunk.NewIterator4Chunk(copChunk) idxDataBuf := make([]types.Datum, len(copCtx.idxColOutputOffsets)) handleDataBuf := make([]types.Datum, len(copCtx.handleOutputOffsets)) count := 0 + var lastHandle kv.Handle for row := iter.Begin(); row != iter.End(); row = iter.Next() { idxDataBuf, handleDataBuf = idxDataBuf[:0], handleDataBuf[:0] idxDataBuf = extractDatumByOffsets(row, copCtx.idxColOutputOffsets, copCtx.expColInfos, idxDataBuf) handleDataBuf := extractDatumByOffsets(row, copCtx.handleOutputOffsets, copCtx.expColInfos, handleDataBuf) handle, err := buildHandle(handleDataBuf, copCtx.tblInfo, copCtx.pkInfo, sCtx) if err != nil { - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } rsData := getRestoreData(copCtx.tblInfo, copCtx.idxInfo, copCtx.pkInfo, handleDataBuf) err = writeOneKVToLocal(writerCtx, index, sCtx, writeBufs, idxDataBuf, rsData, handle) if err != nil { - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } count++ + lastHandle = handle } - return count, nil + return count, lastHandle, nil } func writeOneKVToLocal(writerCtx *ingest.WriterContext, diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 33440eeb15..a946af916a 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -59,23 +59,14 @@ func copReadChunkPoolSize() int { return 10 * int(variable.GetDDLReorgWorkerCounter()) } -func (c *copReqSenderPool) fetchChunk() (*chunk.Chunk, error) { - rs, ok := <-c.resultsCh - if !ok { - return nil, nil - } - if rs.err != nil { - logutil.BgLogger().Error("[ddl-ingest] finish a cop-request task with error", - zap.Int("id", rs.id), zap.Error(rs.err)) - } else if rs.done { - logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", zap.Int("id", rs.id)) - } - return rs.chunk, rs.err +// chunkSender is used to receive the result of coprocessor request. +type chunkSender interface { + AddTask(idxRecResult) } type copReqSenderPool struct { - tasksCh chan *reorgBackfillTask - resultsCh chan idxRecResult + tasksCh chan *reorgBackfillTask + chunkSender chunkSender ctx context.Context copCtx *copContext @@ -100,7 +91,7 @@ func (c *copReqSender) run() { defer p.wg.Done() var curTaskID int defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() { - p.resultsCh <- idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic} + p.chunkSender.AddTask(idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic}) }, false) for { if util.HasCancelled(c.ctx) { @@ -115,12 +106,12 @@ func (c *copReqSender) run() { zap.Int("id", task.id), zap.String("task", task.String())) ver, err := p.store.CurrentVersion(kv.GlobalTxnScope) if err != nil { - p.resultsCh <- idxRecResult{id: task.id, err: err} + p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) return } rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey()) if err != nil { - p.resultsCh <- idxRecResult{id: task.id, err: err} + p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) return } failpoint.Inject("MockCopSenderPanic", func(val failpoint.Value) { @@ -133,26 +124,26 @@ func (c *copReqSender) run() { srcChk := p.getChunk() done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk) if err != nil { - p.resultsCh <- idxRecResult{id: task.id, err: err} + p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) p.recycleChunk(srcChk) terror.Call(rs.Close) return } - p.resultsCh <- idxRecResult{id: task.id, chunk: srcChk, done: done} + p.chunkSender.AddTask(idxRecResult{id: task.id, chunk: srcChk, done: done}) } terror.Call(rs.Close) } } -func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage) *copReqSenderPool { +func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage, + taskCh chan *reorgBackfillTask) *copReqSenderPool { poolSize := copReadChunkPoolSize() srcChkPool := make(chan *chunk.Chunk, poolSize) for i := 0; i < poolSize; i++ { srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, copReadBatchSize()) } return &copReqSenderPool{ - tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), - resultsCh: make(chan idxRecResult, backfillTaskChanSize), + tasksCh: taskCh, ctx: ctx, copCtx: copCtx, store: store, @@ -162,10 +153,6 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Stora } } -func (c *copReqSenderPool) sendTask(task *reorgBackfillTask) { - c.tasksCh <- task -} - func (c *copReqSenderPool) adjustSize(n int) { // Add some senders. for i := len(c.senders); i < n; i++ { @@ -192,7 +179,6 @@ func (c *copReqSenderPool) close(force bool) { return } logutil.BgLogger().Info("[ddl-ingest] close cop-request sender pool") - close(c.tasksCh) if force { for _, w := range c.senders { w.cancel() @@ -200,7 +186,6 @@ func (c *copReqSenderPool) close(force bool) { } // Wait for all cop-req senders to exit. c.wg.Wait() - close(c.resultsCh) c.closed = true } diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go index 17799c85af..239fcbcfd7 100644 --- a/ddl/index_cop_test.go +++ b/ddl/index_cop_test.go @@ -45,7 +45,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { endKey := startKey.PrefixNext() txn, err := store.Begin() require.NoError(t, err) - copChunk, err := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10) + copChunk := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10) require.NoError(t, err) require.NoError(t, txn.Rollback()) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 817f3dcc6c..8da174d07d 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -139,7 +139,7 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, eleID in } } -// BackfillDataInTxn merge temp index data in txn. +// BackfillData merge temp index data in txn. func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { oprStartTime := time.Now() ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType()) diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index eacff499dc..beb3d529b3 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -89,7 +89,9 @@ func (bc *BackendContext) Flush(indexID int64) error { return err } - if bc.diskRoot.CurrentUsage() >= uint64(importThreshold*float64(bc.diskRoot.MaxQuota())) { + release := ei.AcquireFlushLock() + if release != nil && bc.diskRoot.CurrentUsage() >= uint64(importThreshold*float64(bc.diskRoot.MaxQuota())) { + defer release() // TODO: it should be changed according checkpoint solution. // Flush writer cached data into local disk for engine first. err := ei.Flush() diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index e154c42ffb..3d894f867f 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -30,9 +30,9 @@ import ( "go.uber.org/zap" ) -// One engine for one index reorg task, each task will create several new writers under the -// Opened Engine. Note engineInfo is not thread safe. -type engineInfo struct { +// EngineInfo is the engine for one index reorg task, each task will create several new writers under the +// Opened Engine. Note EngineInfo is not thread safe. +type EngineInfo struct { ctx context.Context jobID int64 indexID int64 @@ -44,12 +44,13 @@ type engineInfo struct { memRoot MemRoot diskRoot DiskRoot rowSeq atomic.Int64 + flushing atomic.Bool } // NewEngineInfo create a new EngineInfo struct. func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig, - en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *engineInfo { - return &engineInfo{ + en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot, diskRoot DiskRoot) *EngineInfo { + return &EngineInfo{ ctx: ctx, jobID: jobID, indexID: indexID, @@ -64,7 +65,7 @@ func NewEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin } // Flush imports all the key-values in engine to the storage. -func (ei *engineInfo) Flush() error { +func (ei *EngineInfo) Flush() error { err := ei.openedEngine.Flush(ei.ctx) if err != nil { logutil.BgLogger().Error(LitErrFlushEngineErr, zap.Error(err), @@ -74,7 +75,19 @@ func (ei *engineInfo) Flush() error { return nil } -func (ei *engineInfo) Clean() { +// AcquireFlushLock acquires the flush lock of the engine. +func (ei *EngineInfo) AcquireFlushLock() (release func()) { + ok := ei.flushing.CompareAndSwap(false, true) + if !ok { + return nil + } + return func() { + ei.flushing.Store(false) + } +} + +// Clean closes the engine and removes the local intermediate files. +func (ei *EngineInfo) Clean() { if ei.openedEngine == nil { return } @@ -98,7 +111,8 @@ func (ei *engineInfo) Clean() { } } -func (ei *engineInfo) ImportAndClean() error { +// ImportAndClean imports the engine data to TiKV and cleans up the local intermediate files. +func (ei *EngineInfo) ImportAndClean() error { // Close engine and finish local tasks of lightning. logutil.BgLogger().Info(LitInfoCloseEngine, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID)) indexEngine := ei.openedEngine @@ -151,7 +165,8 @@ type WriterContext struct { lWrite *backend.LocalEngineWriter } -func (ei *engineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error) { +// NewWriterCtx creates a new WriterContext. +func (ei *EngineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error) { ei.memRoot.RefreshConsumption() ok := ei.memRoot.CheckConsume(StructSizeWriterCtx) if !ok { @@ -179,7 +194,7 @@ func (ei *engineInfo) NewWriterCtx(id int, unique bool) (*WriterContext, error) // If local writer not exist, then create new one and store it into engine info writer cache. // note: operate ei.writeCache map is not thread safe please make sure there is sync mechanism to // make sure the safe. -func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContext, error) { +func (ei *EngineInfo) newWriterContext(workerID int, unique bool) (*WriterContext, error) { lWrite, exist := ei.writerCache.Load(workerID) if !exist { var err error @@ -198,7 +213,7 @@ func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContex return wc, nil } -func (ei *engineInfo) closeWriters() error { +func (ei *EngineInfo) closeWriters() error { var firstErr error for _, wid := range ei.writerCache.Keys() { if w, ok := ei.writerCache.Load(wid); ok { diff --git a/ddl/ingest/engine_mgr.go b/ddl/ingest/engine_mgr.go index f9b006ec9e..9be0849549 100644 --- a/ddl/ingest/engine_mgr.go +++ b/ddl/ingest/engine_mgr.go @@ -25,19 +25,19 @@ import ( ) type engineManager struct { - generic.SyncMap[int64, *engineInfo] + generic.SyncMap[int64, *EngineInfo] MemRoot MemRoot DiskRoot DiskRoot } func (m *engineManager) init(memRoot MemRoot, diskRoot DiskRoot) { - m.SyncMap = generic.NewSyncMap[int64, *engineInfo](10) + m.SyncMap = generic.NewSyncMap[int64, *EngineInfo](10) m.MemRoot = memRoot m.DiskRoot = diskRoot } -// Register create a new engineInfo and register it to the engineManager. -func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schemaName, tableName string) (*engineInfo, error) { +// Register create a new EngineInfo and register it to the engineManager. +func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schemaName, tableName string) (*EngineInfo, error) { // Calculate lightning concurrency degree and set memory usage // and pre-allocate memory usage for worker. m.MemRoot.RefreshConsumption() @@ -87,7 +87,7 @@ func (m *engineManager) Register(bc *BackendContext, jobID, indexID int64, schem return en, nil } -// Unregister delete the engineInfo from the engineManager. +// Unregister delete the EngineInfo from the engineManager. func (m *engineManager) Unregister(jobID, indexID int64) { ei, exist := m.Load(indexID) if !exist { @@ -101,7 +101,7 @@ func (m *engineManager) Unregister(jobID, indexID int64) { m.MemRoot.Release(StructSizeEngineInfo) } -// ResetWorkers reset the writer count of the engineInfo because +// ResetWorkers reset the writer count of the EngineInfo because // the goroutines of backfill workers have been terminated. func (m *engineManager) ResetWorkers(bc *BackendContext, jobID, indexID int64) { ei, exist := m.Load(indexID) @@ -115,7 +115,7 @@ func (m *engineManager) ResetWorkers(bc *BackendContext, jobID, indexID int64) { ei.writerCount = 0 } -// UnregisterAll delete all engineInfo from the engineManager. +// UnregisterAll delete all EngineInfo from the engineManager. func (m *engineManager) UnregisterAll(jobID int64) { for _, idxID := range m.Keys() { m.Unregister(jobID, idxID) diff --git a/ddl/ingest/mem_root.go b/ddl/ingest/mem_root.go index 522e5ddc1f..346b8e4881 100644 --- a/ddl/ingest/mem_root.go +++ b/ddl/ingest/mem_root.go @@ -45,7 +45,7 @@ var ( func init() { StructSizeBackendCtx = int64(unsafe.Sizeof(BackendContext{})) - StructSizeEngineInfo = int64(unsafe.Sizeof(engineInfo{})) + StructSizeEngineInfo = int64(unsafe.Sizeof(EngineInfo{})) StructSizeWriterCtx = int64(unsafe.Sizeof(WriterContext{})) } diff --git a/resourcemanager/pool/workerpool/workerpool.go b/resourcemanager/pool/workerpool/workerpool.go index 95160bef93..479029033a 100644 --- a/resourcemanager/pool/workerpool/workerpool.go +++ b/resourcemanager/pool/workerpool/workerpool.go @@ -102,11 +102,11 @@ func (p *WorkerPool[T]) handleTaskWithRecover(w Worker[T], task T) { } func (p *WorkerPool[T]) runAWorker() { + w := p.createWorker() + if w == nil { + return // Fail to create worker, quit. + } p.wg.Run(func() { - w := p.createWorker() - if w == nil { - return // Fail to create worker, quit. - } for { select { case task := <-p.taskChan: