From 2fbf4cf0322c502d8e1db484bbf64ecbbffb8f4e Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 5 Sep 2023 17:05:11 +0800 Subject: [PATCH] ddl: integrate add index operator to dist framework (#46495) ref pingcap/tidb#46258 --- ddl/backfilling_operator.go | 64 +++++- ddl/backfilling_scheduler.go | 14 +- ddl/ddl.go | 10 +- ddl/dist_owner.go | 6 +- ddl/index.go | 37 +++- ddl/reorg.go | 54 +----- ddl/stage_ingest_index.go | 3 +- ddl/stage_read_index.go | 183 ++++-------------- ddl/stage_scheduler.go | 6 +- disttask/framework/framework_ha_test.go | 4 +- disttask/framework/framework_rollback_test.go | 4 +- disttask/framework/framework_test.go | 6 +- disttask/framework/mock/scheduler_mock.go | 2 +- disttask/framework/scheduler/BUILD.bazel | 1 + disttask/framework/scheduler/interface.go | 4 +- disttask/framework/scheduler/register.go | 12 +- disttask/framework/scheduler/register_test.go | 2 +- disttask/framework/scheduler/scheduler.go | 41 +++- .../framework/scheduler/scheduler_test.go | 8 +- disttask/framework/scheduler/summary.go | 93 +++++++++ disttask/framework/storage/table_test.go | 11 ++ disttask/framework/storage/task_table.go | 23 +++ disttask/importinto/scheduler.go | 18 +- .../addindextest/operator_test.go | 17 +- 24 files changed, 364 insertions(+), 259 deletions(-) create mode 100644 disttask/framework/scheduler/summary.go diff --git a/ddl/backfilling_operator.go b/ddl/backfilling_operator.go index e0a4059462..d1053037e8 100644 --- a/ddl/backfilling_operator.go +++ b/ddl/backfilling_operator.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/ddl/internal/session" "github.com/pingcap/tidb/disttask/operator" @@ -35,6 +36,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -66,11 +68,14 @@ func NewAddIndexIngestPipeline( ctx context.Context, store kv.Storage, sessPool opSessPool, + backendCtx ingest.BackendCtx, engine ingest.Engine, sessCtx sessionctx.Context, tbl table.PhysicalTable, idxInfo *model.IndexInfo, startKey, endKey kv.Key, + totalRowCount *atomic.Int64, + metricCounter prometheus.Counter, ) (*operator.AsyncPipeline, error) { index := tables.NewIndex(tbl.GetPhysicalID(), tbl.Meta(), idxInfo) copCtx, err := NewCopContext(tbl.Meta(), idxInfo, sessCtx) @@ -87,7 +92,7 @@ func NewAddIndexIngestPipeline( srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt) ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool, tbl, index, engine, srcChkPool, writerCnt) - sinkOp := newIndexWriteResultSink(ctx) + sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, index, totalRowCount, metricCounter) operator.Compose[TableScanTask](srcOp, scanOp) operator.Compose[IndexRecordChunk](scanOp, ingestOp) @@ -288,7 +293,9 @@ func (w *tableScanWorker) HandleTask(task TableScanTask, sender func(IndexRecord } func (w *tableScanWorker) Close() { - w.sessPool.Put(w.se.Context) + if w.se != nil { + w.sessPool.Put(w.se.Context) + } } func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecordChunk)) { @@ -447,7 +454,10 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite send(result) } -func (*indexIngestWorker) Close() { +func (w *indexIngestWorker) Close() { + if w.se != nil { + w.sessPool.Put(w.se.Context) + } } // WriteLocal will write index records to lightning engine. @@ -464,7 +474,13 @@ func (w *indexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey } type indexWriteResultSink struct { - ctx context.Context + ctx context.Context + backendCtx ingest.BackendCtx + tbl table.PhysicalTable + index table.Index + + rowCount *atomic.Int64 + metricCounter prometheus.Counter errGroup errgroup.Group source operator.DataChannel[IndexWriteResult] @@ -472,10 +488,20 @@ type indexWriteResultSink struct { func newIndexWriteResultSink( ctx context.Context, + backendCtx ingest.BackendCtx, + tbl table.PhysicalTable, + index table.Index, + rowCount *atomic.Int64, + metricCounter prometheus.Counter, ) *indexWriteResultSink { return &indexWriteResultSink{ - ctx: ctx, - errGroup: errgroup.Group{}, + ctx: ctx, + backendCtx: backendCtx, + tbl: tbl, + index: index, + rowCount: rowCount, + metricCounter: metricCounter, + errGroup: errgroup.Group{}, } } @@ -489,14 +515,17 @@ func (s *indexWriteResultSink) Open() error { } func (s *indexWriteResultSink) collectResult() error { - // TODO(tangenta): use results to update reorg info and metrics. for { select { case <-s.ctx.Done(): return nil case result, ok := <-s.source.Channel(): if !ok { - return nil + return s.flush() + } + s.rowCount.Add(int64(result.Added)) + if s.metricCounter != nil { + s.metricCounter.Add(float64(result.Added)) } if result.Err != nil { return result.Err @@ -505,6 +534,25 @@ func (s *indexWriteResultSink) collectResult() error { } } +func (s *indexWriteResultSink) flush() error { + flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota + if s.tbl.GetPartitionedTable() != nil { + flushMode = ingest.FlushModeForceGlobal + } + idxInfo := s.index.Meta() + _, _, err := s.backendCtx.Flush(idxInfo.ID, flushMode) + if err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta()) + return err + } + logutil.BgLogger().Error("flush error", + zap.String("category", "ddl"), zap.Error(err)) + return err + } + return nil +} + func (s *indexWriteResultSink) Close() error { return s.errGroup.Wait() } diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index 9dac7c81e3..6b375574b2 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/ddl/ingest" sess "github.com/pingcap/tidb/ddl/internal/session" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -127,9 +128,9 @@ func (b *txnBackfillScheduler) receiveResult() (*backfillResult, bool) { return ret, ok } -func newSessCtx(reorgInfo *reorgInfo) (sessionctx.Context, error) { - sessCtx := newContext(reorgInfo.d.store) - if err := initSessCtx(sessCtx, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location); err != nil { +func newSessCtx(store kv.Storage, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) (sessionctx.Context, error) { + sessCtx := newContext(store) + if err := initSessCtx(sessCtx, sqlMode, tzLocation); err != nil { return nil, errors.Trace(err) } return sessCtx, nil @@ -182,7 +183,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { workerCnt := b.expectedWorkerSize() // Increase the worker. for i := len(b.workers); i < workerCnt; i++ { - sessCtx, err := newSessCtx(b.reorgInfo) + sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location) if err != nil { return err } @@ -382,7 +383,7 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error { func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordChunk, workerpool.None] { reorgInfo := b.reorgInfo job := reorgInfo.Job - sessCtx, err := newSessCtx(reorgInfo) + sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location) if err != nil { b.poolErr <- err return nil @@ -423,7 +424,8 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) return nil, errors.New("cannot find index info") } - sessCtx, err := newSessCtx(b.reorgInfo) + ri := b.reorgInfo + sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location) if err != nil { logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err)) return nil, err diff --git a/ddl/ddl.go b/ddl/ddl.go index 7e9a568951..65ed1f7693 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -682,13 +682,13 @@ func newDDL(ctx context.Context, options ...Option) *ddl { scheduler.RegisterTaskType("backfill") scheduler.RegisterSchedulerConstructor("backfill", proto.StepOne, - func(ctx context.Context, _ int64, taskMeta []byte, step int64) (scheduler.Scheduler, error) { - return NewBackfillSchedulerHandle(ctx, taskMeta, d, step == proto.StepTwo) - }) + func(ctx context.Context, task *proto.Task, summary *scheduler.Summary) (scheduler.Scheduler, error) { + return NewBackfillSchedulerHandle(ctx, task.Meta, d, task.Step == proto.StepTwo, summary) + }, scheduler.WithSummary) scheduler.RegisterSchedulerConstructor("backfill", proto.StepTwo, - func(ctx context.Context, _ int64, taskMeta []byte, step int64) (scheduler.Scheduler, error) { - return NewBackfillSchedulerHandle(ctx, taskMeta, d, step == proto.StepTwo) + func(ctx context.Context, task *proto.Task, summary *scheduler.Summary) (scheduler.Scheduler, error) { + return NewBackfillSchedulerHandle(ctx, task.Meta, d, task.Step == proto.StepTwo, nil) }) backFillDsp, err := NewBackfillingDispatcherExt(d) diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index aced50d4bf..8e0e80514f 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -19,7 +19,11 @@ import ( ) // CheckBackfillJobFinishInterval is export for test. -var CheckBackfillJobFinishInterval = 300 * time.Millisecond +var ( + CheckBackfillJobFinishInterval = 300 * time.Millisecond + // UpdateBackfillJobRowCountInterval is the interval of updating the job row count. + UpdateBackfillJobRowCountInterval = 3 * time.Second +) const ( distPhysicalTableConcurrency = 16 diff --git a/ddl/index.go b/ddl/index.go index c2369fb290..4912924e4d 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -25,7 +25,6 @@ import ( "path/filepath" "slices" "strings" - "sync" "sync/atomic" "time" @@ -37,6 +36,8 @@ import ( "github.com/pingcap/tidb/ddl/ingest" sess "github.com/pingcap/tidb/ddl/internal/session" "github.com/pingcap/tidb/disttask/framework/handle" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -1617,7 +1618,6 @@ type addIndexIngestWorker struct { writer ingest.Writer copReqSenderPool *copReqSenderPool checkpointMgr *ingest.CheckpointManager - flushLock *sync.RWMutex resultCh chan *backfillResult jobID int64 @@ -1901,13 +1901,16 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error { }) g.Go(func() error { - ticker := time.NewTicker(CheckBackfillJobFinishInterval) - defer ticker.Stop() + checkFinishTk := time.NewTicker(CheckBackfillJobFinishInterval) + defer checkFinishTk.Stop() + updateRowCntTk := time.NewTicker(UpdateBackfillJobRowCountInterval) + defer updateRowCntTk.Stop() for { select { case <-done: + w.updateJobRowCount(taskKey, reorgInfo.Job.ID) return nil - case <-ticker.C: + case <-checkFinishTk.C: if err = w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil { if !dbterror.ErrCancelledDDLJob.Equal(err) { return errors.Trace(err) @@ -1919,10 +1922,32 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error { continue } } + case <-updateRowCntTk.C: + w.updateJobRowCount(taskKey, reorgInfo.Job.ID) } } }) - return g.Wait() + err = g.Wait() + return err +} + +func (w *worker) updateJobRowCount(taskKey string, jobID int64) { + taskMgr, err := storage.GetTaskManager() + if err != nil { + logutil.BgLogger().Warn("cannot get task manager", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err)) + return + } + gTask, err := taskMgr.GetGlobalTaskByKey(taskKey) + if err != nil || gTask == nil { + logutil.BgLogger().Warn("cannot get global task", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err)) + return + } + rowCount, err := taskMgr.GetSubtaskRowCount(gTask.ID, proto.StepOne) + if err != nil { + logutil.BgLogger().Warn("cannot get subtask row count", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err)) + return + } + w.getReorgCtx(jobID).setRowCount(rowCount) } func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysicalTableID int64) (int64, kv.Key, kv.Key, error) { diff --git a/ddl/reorg.go b/ddl/reorg.go index 4d82071f4a..2c83a8d6df 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -15,7 +15,6 @@ package ddl import ( - "context" "encoding/hex" "fmt" "strconv" @@ -50,13 +49,10 @@ import ( "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tipb/go-tipb" - clientv3 "go.etcd.io/etcd/client/v3" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) -const rowCountEtcdPath = "distAddIndex" - // reorgCtx is for reorganization. type reorgCtx struct { // doneCh is used to notify. @@ -138,48 +134,6 @@ func (rc *reorgCtx) getRowCount() int64 { return row } -func getAndSetJobRowCnt(ctx context.Context, reorgInfo *reorgInfo, rc *reorgCtx, job *model.Job, client *clientv3.Client) int64 { - rowCount := int64(0) - if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { - path := fmt.Sprintf("%s/%d", rowCountEtcdPath, job.ID) - resp, err := client.Get(ctx, path, clientv3.WithPrefix()) - if err != nil { - logutil.BgLogger().Warn("get row count from ETCD failed", zap.String("category", "ddl"), zap.Error(err)) - return 0 - } - if len(resp.Kvs) == 0 { - return 0 - } - for _, kv := range resp.Kvs { - cnt, err := strconv.Atoi(string(kv.Value)) - if err != nil { - logutil.BgLogger().Error("parse row count from ETCD failed", zap.String("category", "ddl"), zap.Error(err)) - continue - } - rowCount += int64(cnt) - } - job.SetRowCount(rowCount) - } else { - rowCount = rc.getRowCount() - job.SetRowCount(rowCount) - } - return rowCount -} - -func deleteETCDRowCntStatIfNecessary(ctx context.Context, reorgInfo *reorgInfo, job *model.Job, client *clientv3.Client) { - if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { - path := fmt.Sprintf("%s/%d", rowCountEtcdPath, job.ID) - const retryCnt = 3 - for i := 0; i < retryCnt; i++ { - _, err := client.Delete(ctx, path, clientv3.WithPrefix()) - if err == nil { - return - } - logutil.BgLogger().Warn("delete row count from ETCD failed", zap.String("category", "ddl"), zap.Error(err)) - } - } -} - // runReorgJob is used as a portal to do the reorganization work. // eg: // 1: add index @@ -271,15 +225,14 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, d.removeReorgCtx(job.ID) return dbterror.ErrCancelledDDLJob } - rowCount := getAndSetJobRowCnt(w.ctx, reorgInfo, rc, job, d.etcdCli) + rowCount := rc.getRowCount() + job.SetRowCount(rowCount) if err != nil { logutil.BgLogger().Warn("run reorg job done", zap.String("category", "ddl"), zap.Int64("handled rows", rowCount), zap.Error(err)) } else { logutil.BgLogger().Info("run reorg job done", zap.String("category", "ddl"), zap.Int64("handled rows", rowCount)) } - deleteETCDRowCntStatIfNecessary(w.ctx, reorgInfo, job, d.etcdCli) - // Update a job's warnings. w.mergeWarningsIntoJob(job) @@ -298,7 +251,8 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo, // We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break. return dbterror.ErrWaitReorgTimeout case <-time.After(waitTimeout): - rowCount := getAndSetJobRowCnt(w.ctx, reorgInfo, rc, job, d.etcdCli) + rowCount := rc.getRowCount() + job.SetRowCount(rowCount) updateBackfillProgress(w, reorgInfo, tblInfo, rowCount) // Update a job's warnings. diff --git a/ddl/stage_ingest_index.go b/ddl/stage_ingest_index.go index 0e41e6f51d..61c4c2295f 100644 --- a/ddl/stage_ingest_index.go +++ b/ddl/stage_ingest_index.go @@ -54,6 +54,7 @@ func (i *ingestIndexStage) InitSubtaskExecEnv(_ context.Context) error { if err != nil { if common.ErrFoundDuplicateKeys.Equal(err) { err = convertToKeyExistsErr(err, i.index, i.ptbl.Meta()) + return err } logutil.BgLogger().Error("flush error", zap.String("category", "ddl"), zap.Error(err)) return err @@ -61,7 +62,7 @@ func (i *ingestIndexStage) InitSubtaskExecEnv(_ context.Context) error { return err } -func (*ingestIndexStage) SplitSubtask(_ context.Context, _ []byte) ([]proto.MinimalTask, error) { +func (*ingestIndexStage) SplitSubtask(_ context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) { logutil.BgLogger().Info("ingest index stage split subtask", zap.String("category", "ddl")) return nil, nil } diff --git a/ddl/stage_read_index.go b/ddl/stage_read_index.go index 657d273717..210911b5db 100644 --- a/ddl/stage_read_index.go +++ b/ddl/stage_read_index.go @@ -16,21 +16,16 @@ package ddl import ( "context" - "encoding/hex" "encoding/json" - "fmt" - "strconv" - "time" + "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/ddl/ingest" - ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/disttask/framework/proto" - "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/disttask/framework/scheduler" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/logutil" @@ -44,10 +39,8 @@ type readIndexToLocalStage struct { ptbl table.PhysicalTable jc *JobContext - bc ingest.BackendCtx - - done chan struct{} - totalRowCnt int64 + bc ingest.BackendCtx + summary *scheduler.Summary } func newReadIndexToLocalStage( @@ -57,53 +50,32 @@ func newReadIndexToLocalStage( ptbl table.PhysicalTable, jc *JobContext, bc ingest.BackendCtx, + summary *scheduler.Summary, ) *readIndexToLocalStage { return &readIndexToLocalStage{ - d: d, - job: job, - index: index, - ptbl: ptbl, - jc: jc, - bc: bc, - done: make(chan struct{}), - totalRowCnt: 0, + d: d, + job: job, + index: index, + ptbl: ptbl, + jc: jc, + bc: bc, + summary: summary, } } -func (r *readIndexToLocalStage) InitSubtaskExecEnv(ctx context.Context) error { +func (*readIndexToLocalStage) InitSubtaskExecEnv(_ context.Context) error { logutil.BgLogger().Info("read index stage init subtask exec env", zap.String("category", "ddl")) - d := r.d - - ser, err := infosync.GetServerInfo() - if err != nil { - return err - } - path := fmt.Sprintf("distAddIndex/%d/%s:%d", r.job.ID, ser.IP, ser.Port) - response, err := d.etcdCli.Get(ctx, path) - if err != nil { - return err - } - if len(response.Kvs) > 0 { - cnt, err := strconv.Atoi(string(response.Kvs[0].Value)) - if err != nil { - return err - } - r.totalRowCnt = int64(cnt) - } - - r.done = make(chan struct{}) - go r.UpdateStatLoop() return nil } -func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error) { +func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) { logutil.BgLogger().Info("read index stage run subtask", zap.String("category", "ddl")) d := r.d sm := &BackfillSubTaskMeta{} - err := json.Unmarshal(subtask, sm) + err := json.Unmarshal(subtask.Meta, sm) if err != nil { logutil.BgLogger().Error("unmarshal error", zap.String("category", "ddl"), @@ -113,7 +85,6 @@ func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask []byte var startKey, endKey kv.Key var tbl table.PhysicalTable - var isPartition bool currentVer, err1 := getValidCurrentVersion(d.store) if err1 != nil { @@ -130,78 +101,42 @@ func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask []byte return nil, err } tbl = parTbl.GetPartition(pid) - isPartition = true } else { startKey, endKey = sm.StartKey, sm.EndKey tbl = r.ptbl } - mockReorgInfo := &reorgInfo{Job: r.job, d: d.ddlCtx} - elements := make([]*meta.Element, 0) - elements = append(elements, &meta.Element{ID: r.index.ID, TypeKey: meta.IndexElementKey}) - mockReorgInfo.elements = elements - mockReorgInfo.currElement = mockReorgInfo.elements[0] - - ingestScheduler := newIngestBackfillScheduler(ctx, mockReorgInfo, d.sessPool, tbl, true) - defer ingestScheduler.close(true) - - consumer := newResultConsumer(d.ddlCtx, mockReorgInfo, nil, true) - consumer.run(ingestScheduler, startKey, &r.totalRowCnt) - - err = ingestScheduler.setupWorkers() + ei, err := r.bc.Register(r.job.ID, r.index.ID, r.job.SchemaName, r.job.TableName) if err != nil { - logutil.BgLogger().Error("setup workers error", - zap.String("category", "ddl"), - zap.Error(err)) + logutil.Logger(ctx).Warn("cannot register new engine", zap.Error(err), + zap.Int64("job ID", r.job.ID), zap.Int64("index ID", r.index.ID)) return nil, err } - taskIDAlloc := newTaskIDAllocator() - for { - kvRanges, err := splitTableRanges(r.ptbl, d.store, startKey, endKey, backfillTaskChanSize) - if err != nil { - return nil, err - } - if len(kvRanges) == 0 { - break - } - - logutil.BgLogger().Info("start backfill workers to reorg record", - zap.String("category", "ddl"), - zap.Int("workerCnt", ingestScheduler.currentWorkerSize()), - zap.Int("regionCnt", len(kvRanges)), - zap.String("startKey", hex.EncodeToString(startKey)), - zap.String("endKey", hex.EncodeToString(endKey))) - - sendTasks(ingestScheduler, consumer, tbl, kvRanges, mockReorgInfo, taskIDAlloc) - if consumer.shouldAbort() { - break - } - rangeEndKey := kvRanges[len(kvRanges)-1].EndKey - startKey = rangeEndKey.Next() - if startKey.Cmp(endKey) >= 0 { - break - } - } - ingestScheduler.close(false) - - if err := consumer.getResult(); err != nil { - return nil, err - } - - flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota - if isPartition { - flushMode = ingest.FlushModeForceGlobal - } - _, _, err = r.bc.Flush(r.index.ID, flushMode) + sessCtx, err := newSessCtx(d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location) if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, r.index, r.ptbl.Meta()) - } - logutil.BgLogger().Error("flush error", - zap.String("category", "ddl"), zap.Error(err)) return nil, err } + + totalRowCount := &atomic.Int64{} + counter := metrics.BackfillTotalCounter.WithLabelValues( + metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O)) + + pipe, err := NewAddIndexIngestPipeline( + ctx, d.store, d.sessPool, r.bc, ei, sessCtx, tbl, r.index, startKey, endKey, totalRowCount, counter) + if err != nil { + return nil, err + } + err = pipe.Execute() + if err != nil { + return nil, err + } + err = pipe.Close() + if err != nil { + return nil, err + } + + r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load()) return nil, nil } @@ -211,11 +146,6 @@ func (r *readIndexToLocalStage) CleanupSubtaskExecEnv(_ context.Context) error { if _, ok := r.ptbl.(table.PartitionedTable); ok { ingest.LitBackCtxMgr.Unregister(r.job.ID) } - close(r.done) - if !r.d.OwnerManager().IsOwner() { - // For owner, reorg ctx will be removed after the reorg job is done. - r.d.removeReorgCtx(r.job.ID) - } return nil } @@ -236,38 +166,5 @@ func (r *readIndexToLocalStage) Rollback(_ context.Context) error { logutil.BgLogger().Info("read index stage rollback backfill add index task", zap.String("category", "ddl"), zap.Int64("jobID", r.job.ID)) ingest.LitBackCtxMgr.Unregister(r.job.ID) - if !r.d.OwnerManager().IsOwner() { - // For owner, reorg ctx will be removed after the reorg job is done. - r.d.removeReorgCtx(r.job.ID) - } return nil } - -// UpdateStatLoop updates the row count of adding index. -func (r *readIndexToLocalStage) UpdateStatLoop() { - tk := time.Tick(time.Second * 5) - ser, err := infosync.GetServerInfo() - if err != nil { - logutil.BgLogger().Warn("get server info failed", - zap.String("category", "ddl"), zap.Error(err)) - return - } - path := fmt.Sprintf("%s/%d/%s:%d", rowCountEtcdPath, r.job.ID, ser.IP, ser.Port) - writeToEtcd := func() { - err := ddlutil.PutKVToEtcd(context.TODO(), r.d.etcdCli, 3, path, strconv.Itoa(int(r.totalRowCnt))) - if err != nil { - logutil.BgLogger().Warn("update row count for distributed add index failed", - zap.String("category", "ddl"), - zap.Error(err)) - } - } - for { - select { - case <-r.done: - writeToEtcd() - return - case <-tk: - writeToEtcd() - } - } -} diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index c58f553c7e..ec871deff4 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -39,10 +39,12 @@ type BackfillSubTaskMeta struct { PhysicalTableID int64 `json:"physical_table_id"` StartKey []byte `json:"start_key"` EndKey []byte `json:"end_key"` + RowCount int64 `json:"row_count"` } // NewBackfillSchedulerHandle creates a new backfill scheduler. -func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, stepForImport bool) (scheduler.Scheduler, error) { +func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, + stepForImport bool, summary *scheduler.Summary) (scheduler.Scheduler, error) { bgm := &BackfillGlobalMeta{} err := json.Unmarshal(taskMeta, bgm) if err != nil { @@ -70,7 +72,7 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, st jc := d.jobContext(jobMeta.ID) d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) - return newReadIndexToLocalStage(d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc), nil + return newReadIndexToLocalStage(d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc, summary), nil } return newIngestIndexStage(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc), nil } diff --git a/disttask/framework/framework_ha_test.go b/disttask/framework/framework_ha_test.go index d6bc28f2f4..e0d32de3f6 100644 --- a/disttask/framework/framework_ha_test.go +++ b/disttask/framework/framework_ha_test.go @@ -87,10 +87,10 @@ func RegisterHATaskMeta(m *sync.Map) { }) scheduler.ClearSchedulers() scheduler.RegisterTaskType(proto.TaskTypeExample) - scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) { + scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) { return &testScheduler{}, nil }) - scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) { + scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) { return &testScheduler{}, nil }) scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) { diff --git a/disttask/framework/framework_rollback_test.go b/disttask/framework/framework_rollback_test.go index d7ad0fa5e4..5b773f692f 100644 --- a/disttask/framework/framework_rollback_test.go +++ b/disttask/framework/framework_rollback_test.go @@ -84,7 +84,7 @@ func (t *rollbackScheduler) Rollback(_ context.Context) error { return nil } -func (t *rollbackScheduler) SplitSubtask(_ context.Context, _ []byte) ([]proto.MinimalTask, error) { +func (t *rollbackScheduler) SplitSubtask(_ context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) { return []proto.MinimalTask{ testRollbackMiniTask{}, testRollbackMiniTask{}, @@ -115,7 +115,7 @@ func RegisterRollbackTaskMeta(m *sync.Map) { }) scheduler.ClearSchedulers() scheduler.RegisterTaskType(proto.TaskTypeExample) - scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) { + scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) { return &rollbackScheduler{m: m}, nil }) scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) { diff --git a/disttask/framework/framework_test.go b/disttask/framework/framework_test.go index e3fa7646bb..357cb295e7 100644 --- a/disttask/framework/framework_test.go +++ b/disttask/framework/framework_test.go @@ -97,7 +97,7 @@ func (t *testScheduler) CleanupSubtaskExecEnv(_ context.Context) error { return func (t *testScheduler) Rollback(_ context.Context) error { return nil } -func (t *testScheduler) SplitSubtask(_ context.Context, _ []byte) ([]proto.MinimalTask, error) { +func (t *testScheduler) SplitSubtask(_ context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) { return []proto.MinimalTask{ testMiniTask{}, testMiniTask{}, @@ -138,10 +138,10 @@ func RegisterTaskMeta(m *sync.Map, dispatcherHandle dispatcher.Extension) { }) scheduler.ClearSchedulers() scheduler.RegisterTaskType(proto.TaskTypeExample) - scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) { + scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) { return &testScheduler{}, nil }) - scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) { + scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) { return &testScheduler{}, nil }) scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) { diff --git a/disttask/framework/mock/scheduler_mock.go b/disttask/framework/mock/scheduler_mock.go index 43f2212ffb..ab9f805f36 100644 --- a/disttask/framework/mock/scheduler_mock.go +++ b/disttask/framework/mock/scheduler_mock.go @@ -361,7 +361,7 @@ func (mr *MockSchedulerMockRecorder) Rollback(arg0 interface{}) *gomock.Call { } // SplitSubtask mocks base method. -func (m *MockScheduler) SplitSubtask(arg0 context.Context, arg1 []byte) ([]proto.MinimalTask, error) { +func (m *MockScheduler) SplitSubtask(arg0 context.Context, arg1 *proto.Subtask) ([]proto.MinimalTask, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SplitSubtask", arg0, arg1) ret0, _ := ret[0].([]proto.MinimalTask) diff --git a/disttask/framework/scheduler/BUILD.bazel b/disttask/framework/scheduler/BUILD.bazel index 273ee4e320..e6c6153b76 100644 --- a/disttask/framework/scheduler/BUILD.bazel +++ b/disttask/framework/scheduler/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "manager.go", "register.go", "scheduler.go", + "summary.go", ], importpath = "github.com/pingcap/tidb/disttask/framework/scheduler", visibility = ["//visibility:public"], diff --git a/disttask/framework/scheduler/interface.go b/disttask/framework/scheduler/interface.go index d1063a4b88..2a4f79108b 100644 --- a/disttask/framework/scheduler/interface.go +++ b/disttask/framework/scheduler/interface.go @@ -53,7 +53,7 @@ type Scheduler interface { // InitSubtaskExecEnv is used to initialize the environment for the subtask executor. InitSubtaskExecEnv(context.Context) error // SplitSubtask is used to split the subtask into multiple minimal tasks. - SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error) + SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) // CleanupSubtaskExecEnv is used to clean up the environment for the subtask executor. CleanupSubtaskExecEnv(context.Context) error // OnSubtaskFinished is used to handle the subtask when it is finished. @@ -84,7 +84,7 @@ func (*EmptyScheduler) InitSubtaskExecEnv(context.Context) error { } // SplitSubtask implements the Scheduler interface. -func (*EmptyScheduler) SplitSubtask(context.Context, []byte) ([]proto.MinimalTask, error) { +func (*EmptyScheduler) SplitSubtask(context.Context, *proto.Subtask) ([]proto.MinimalTask, error) { return nil, nil } diff --git a/disttask/framework/scheduler/register.go b/disttask/framework/scheduler/register.go index 968e4f3f71..63442e8a85 100644 --- a/disttask/framework/scheduler/register.go +++ b/disttask/framework/scheduler/register.go @@ -37,10 +37,15 @@ func WithPoolSize(poolSize int32) TaskTypeOption { } type schedulerRegisterOptions struct { + Summary *Summary } // Constructor is the constructor of Scheduler. -type Constructor func(context context.Context, taskID int64, taskMeta []byte, step int64) (Scheduler, error) +type Constructor func( + context context.Context, + task *proto.Task, + summary *Summary, +) (Scheduler, error) // RegisterOption is the register option of Scheduler. type RegisterOption func(opts *schedulerRegisterOptions) @@ -112,3 +117,8 @@ func ClearSchedulers() { subtaskExecutorConstructors = make(map[string]SubtaskExecutorConstructor) subtaskExecutorOptions = make(map[string]subtaskExecutorRegisterOptions) } + +// WithSummary is the option of Scheduler to set the summary. +var WithSummary RegisterOption = func(opts *schedulerRegisterOptions) { + opts.Summary = NewSummary() +} diff --git a/disttask/framework/scheduler/register_test.go b/disttask/framework/scheduler/register_test.go index a7d8dd809c..d740ba224d 100644 --- a/disttask/framework/scheduler/register_test.go +++ b/disttask/framework/scheduler/register_test.go @@ -24,7 +24,7 @@ import ( func mockSchedulerOptionFunc(op *schedulerRegisterOptions) {} -func mockSchedulerConstructor(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) { +func mockSchedulerConstructor(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) { return nil, nil } diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 8a50ad1aab..796c93ab18 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -116,7 +116,15 @@ func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error s.registerCancelFunc(runCancel) s.resetError() logutil.Logger(s.logCtx).Info("scheduler run a step", zap.Any("step", task.Step), zap.Any("concurrency", task.Concurrency)) - scheduler, err := createScheduler(ctx, task) + + summary, cleanup, err := runSummaryCollectLoop(ctx, task, s.taskTable) + if err != nil { + s.onError(err) + return s.getError() + } + defer cleanup() + + scheduler, err := createScheduler(ctx, task, summary) if err != nil { s.onError(err) return s.getError() @@ -185,7 +193,7 @@ func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error } func (s *InternalSchedulerImpl) runSubtask(ctx context.Context, scheduler Scheduler, subtask *proto.Subtask, minimalTaskCh chan func()) { - minimalTasks, err := scheduler.SplitSubtask(ctx, subtask.Meta) + minimalTasks, err := scheduler.SplitSubtask(ctx, subtask) if err != nil { s.onError(err) if errors.Cause(err) == context.Canceled { @@ -349,7 +357,7 @@ func (s *InternalSchedulerImpl) Rollback(ctx context.Context, task *proto.Task) } } - scheduler, err := createScheduler(ctx, task) + scheduler, err := createScheduler(ctx, task, nil) if err != nil { s.onError(err) return s.getError() @@ -378,13 +386,36 @@ func (s *InternalSchedulerImpl) Rollback(ctx context.Context, task *proto.Task) return s.getError() } -func createScheduler(ctx context.Context, task *proto.Task) (Scheduler, error) { +func createScheduler(ctx context.Context, task *proto.Task, summary *Summary) (Scheduler, error) { key := getKey(task.Type, task.Step) constructor, ok := schedulerConstructors[key] if !ok { return nil, errors.Errorf("constructor of scheduler for key %s not found", key) } - return constructor(ctx, task.ID, task.Meta, task.Step) + return constructor(ctx, task, summary) +} + +func runSummaryCollectLoop( + ctx context.Context, + task *proto.Task, + taskTable TaskTable, +) (summary *Summary, cleanup func(), err error) { + taskMgr, ok := taskTable.(*storage.TaskManager) + if !ok { + return nil, func() {}, nil + } + key := getKey(task.Type, task.Step) + opt, ok := schedulerOptions[key] + if !ok { + return nil, func() {}, errors.Errorf("scheduler option for key %s not found", key) + } + if opt.Summary != nil { + go opt.Summary.UpdateRowCountLoop(ctx, taskMgr) + return opt.Summary, func() { + opt.Summary.PersistRowCount(ctx, taskMgr) + }, nil + } + return nil, func() {}, nil } func createSubtaskExecutor(minimalTask proto.MinimalTask, tp string, step int64) (SubtaskExecutor, error) { diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index 587946251b..5664c40c6e 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -86,7 +86,7 @@ func TestSchedulerRun(t *testing.T) { err := scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp}) require.EqualError(t, err, schedulerRegisterErr.Error()) - RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) { + RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) { return mockScheduler, nil }) @@ -172,7 +172,7 @@ func TestSchedulerRun(t *testing.T) { require.NoError(t, err) // 9. run subtask one by one - RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) { + RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) { return mockScheduler, nil }) mockScheduler.EXPECT().InitSubtaskExecEnv(gomock.Any()).Return(nil) @@ -283,7 +283,7 @@ func TestSchedulerRollback(t *testing.T) { err := scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, ID: 1, Type: tp}) require.EqualError(t, err, schedulerRegisterErr.Error()) - RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) { + RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) { return mockScheduler, nil }) @@ -360,7 +360,7 @@ func TestScheduler(t *testing.T) { mockScheduler := mock.NewMockScheduler(ctrl) mockSubtaskExecutor := mock.NewMockSubtaskExecutor(ctrl) - RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) { + RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) { return mockScheduler, nil }) RegisterSubtaskExectorConstructor(tp, proto.StepOne, func(minimalTask proto.MinimalTask, step int64) (SubtaskExecutor, error) { diff --git a/disttask/framework/scheduler/summary.go b/disttask/framework/scheduler/summary.go new file mode 100644 index 0000000000..28104becdf --- /dev/null +++ b/disttask/framework/scheduler/summary.go @@ -0,0 +1,93 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +// Summary is used to collect the summary of subtasks execution. +type Summary struct { + mu struct { + sync.Mutex + RowCount map[int64]int64 // subtask ID -> row count + } +} + +// NewSummary creates a new Summary. +func NewSummary() *Summary { + return &Summary{ + mu: struct { + sync.Mutex + RowCount map[int64]int64 + }{ + RowCount: map[int64]int64{}, + }, + } +} + +// UpdateRowCount updates the row count of the subtask. +func (s *Summary) UpdateRowCount(subtaskID int64, rowCount int64) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.RowCount[subtaskID] = rowCount +} + +// UpdateRowCountLoop updates the row count of the subtask periodically. +func (s *Summary) UpdateRowCountLoop(ctx context.Context, taskMgr *storage.TaskManager) { + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.PersistRowCount(ctx, taskMgr) + } + } +} + +// PersistRowCount persists the row count of the subtask to the storage. +func (s *Summary) PersistRowCount(ctx context.Context, taskMgr *storage.TaskManager) { + var copiedRowCount map[int64]int64 + s.mu.Lock() + if len(s.mu.RowCount) == 0 { + s.mu.Unlock() + return + } + copiedRowCount = make(map[int64]int64, len(s.mu.RowCount)) + for subtaskID, rowCount := range s.mu.RowCount { + copiedRowCount[subtaskID] = rowCount + } + s.mu.Unlock() + + for subtaskID, rowCount := range copiedRowCount { + err := taskMgr.UpdateSubtaskRowCount(subtaskID, rowCount) + if err != nil { + logutil.Logger(ctx).Warn("update subtask row count failed", zap.Error(err)) + } + } + s.mu.Lock() + for subtaskID := range copiedRowCount { + delete(s.mu.RowCount, subtaskID) + } + s.mu.Unlock() +} diff --git a/disttask/framework/storage/table_test.go b/disttask/framework/storage/table_test.go index a72ffb0a82..3e06bc56b7 100644 --- a/disttask/framework/storage/table_test.go +++ b/disttask/framework/storage/table_test.go @@ -224,12 +224,23 @@ func TestSubTaskTable(t *testing.T) { subtasks, err := sm.GetSucceedSubtasksByStep(2, proto.StepInit) require.NoError(t, err) require.Len(t, subtasks, 0) + err = sm.FinishSubtask(2, []byte{}) require.NoError(t, err) + subtasks, err = sm.GetSucceedSubtasksByStep(2, proto.StepInit) require.NoError(t, err) require.Len(t, subtasks, 1) + rowCount, err := sm.GetSubtaskRowCount(2, proto.StepInit) + require.NoError(t, err) + require.Equal(t, int64(0), rowCount) + err = sm.UpdateSubtaskRowCount(2, 100) + require.NoError(t, err) + rowCount, err = sm.GetSubtaskRowCount(2, proto.StepInit) + require.NoError(t, err) + require.Equal(t, int64(100), rowCount) + // test UpdateErrorToSubtask do update start/update time err = sm.AddNewSubTask(3, proto.StepInit, "for_test", []byte("test"), proto.TaskTypeExample, false) require.NoError(t, err) diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index 9238db6d97..1f0a7af736 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -388,6 +388,29 @@ func (stm *TaskManager) GetSucceedSubtasksByStep(taskID int64, step int64) ([]*p return subtasks, nil } +// GetSubtaskRowCount gets the subtask row count. +func (stm *TaskManager) GetSubtaskRowCount(taskID int64, step int64) (int64, error) { + rs, err := stm.executeSQLWithNewSession(stm.ctx, `select + cast(sum(json_extract(summary, '$.row_count')) as signed) as row_count + from mysql.tidb_background_subtask where task_key = %? and step = %?`, + taskID, step) + if err != nil { + return 0, err + } + if len(rs) == 0 { + return 0, nil + } + return rs[0].GetInt64(0), nil +} + +// UpdateSubtaskRowCount updates the subtask row count. +func (stm *TaskManager) UpdateSubtaskRowCount(subtaskID int64, rowCount int64) error { + _, err := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask + set summary = json_set(summary, '$.row_count', %?) where id = %?`, + rowCount, subtaskID) + return err +} + // GetSubtaskInStatesCnt gets the subtask count in the states. func (stm *TaskManager) GetSubtaskInStatesCnt(taskID int64, states ...interface{}) (int64, error) { args := []interface{}{taskID} diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index cd4a277448..6d21a2d464 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -90,7 +90,8 @@ func (s *importStepScheduler) InitSubtaskExecEnv(ctx context.Context) error { return nil } -func (s *importStepScheduler) SplitSubtask(ctx context.Context, bs []byte) ([]proto.MinimalTask, error) { +func (s *importStepScheduler) SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) { + bs := subtask.Meta var subtaskMeta ImportStepMeta err := json.Unmarshal(bs, &subtaskMeta) if err != nil { @@ -208,11 +209,12 @@ type postStepScheduler struct { var _ scheduler.Scheduler = &postStepScheduler{} -func (p *postStepScheduler) SplitSubtask(_ context.Context, metaBytes []byte) ([]proto.MinimalTask, error) { +func (p *postStepScheduler) SplitSubtask(_ context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) { mTask := &postProcessStepMinimalTask{ taskMeta: p.taskMeta, logger: p.logger, } + metaBytes := subtask.Meta if err := json.Unmarshal(metaBytes, &mTask.meta); err != nil { return nil, err } @@ -235,28 +237,28 @@ func init() { } scheduler.RegisterTaskType(proto.ImportInto, scheduler.WithPoolSize(int32(runtime.GOMAXPROCS(0)))) scheduler.RegisterSchedulerConstructor(proto.ImportInto, StepImport, - func(ctx context.Context, taskID int64, bs []byte, step int64) (scheduler.Scheduler, error) { + func(_ context.Context, task *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) { // TODO(tangenta): use context for lifetime control. - taskMeta, logger, err := prepareFn(taskID, bs, step) + taskMeta, logger, err := prepareFn(task.ID, task.Meta, task.Step) if err != nil { return nil, err } return &importStepScheduler{ - taskID: taskID, + taskID: task.ID, taskMeta: taskMeta, logger: logger, }, nil }, ) scheduler.RegisterSchedulerConstructor(proto.ImportInto, StepPostProcess, - func(ctx context.Context, taskID int64, bs []byte, step int64) (scheduler.Scheduler, error) { + func(_ context.Context, task *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) { // TODO(tangenta): use context for lifetime control. - taskMeta, logger, err := prepareFn(taskID, bs, step) + taskMeta, logger, err := prepareFn(task.ID, task.Meta, task.Step) if err != nil { return nil, err } return &postStepScheduler{ - taskID: taskID, + taskID: task.ID, taskMeta: taskMeta, logger: logger, }, nil diff --git a/tests/realtikvtest/addindextest/operator_test.go b/tests/realtikvtest/addindextest/operator_test.go index 28fb9a6923..48302306c5 100644 --- a/tests/realtikvtest/addindextest/operator_test.go +++ b/tests/realtikvtest/addindextest/operator_test.go @@ -17,6 +17,7 @@ package addindextest import ( "context" "fmt" + "sync/atomic" "testing" "github.com/ngaut/pools" @@ -192,31 +193,31 @@ func TestBackfillOperatorPipeline(t *testing.T) { sessPool := newSessPoolForTest(t, store) ctx := context.Background() - var keys, values [][]byte - onWrite := func(key, val []byte) { - keys = append(keys, key) - values = append(values, val) - } + mockBackendCtx := &ingest.MockBackendCtx{} mockEngine := ingest.NewMockEngineInfo(nil) - mockEngine.SetHook(onWrite) + mockEngine.SetHook(func(key, val []byte) {}) + + totalRowCount := &atomic.Int64{} pipeline, err := ddl.NewAddIndexIngestPipeline( ctx, store, sessPool, + mockBackendCtx, mockEngine, tk.Session(), tbl.(table.PhysicalTable), idxInfo, startKey, endKey, + totalRowCount, + nil, ) require.NoError(t, err) err = pipeline.Execute() require.NoError(t, err) err = pipeline.Close() require.NoError(t, err) - require.Len(t, keys, 10) - require.Len(t, values, 10) + require.Equal(t, int64(10), totalRowCount.Load()) } type sessPoolForTest struct {