ddl: integrate add index operator to dist framework (#46495)

ref pingcap/tidb#46258
This commit is contained in:
tangenta
2023-09-05 17:05:11 +08:00
committed by GitHub
parent 17b348374c
commit 2fbf4cf032
24 changed files with 364 additions and 259 deletions

View File

@ -21,6 +21,7 @@ import (
"sync/atomic"
"time"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/ddl/internal/session"
"github.com/pingcap/tidb/disttask/operator"
@ -35,6 +36,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -66,11 +68,14 @@ func NewAddIndexIngestPipeline(
ctx context.Context,
store kv.Storage,
sessPool opSessPool,
backendCtx ingest.BackendCtx,
engine ingest.Engine,
sessCtx sessionctx.Context,
tbl table.PhysicalTable,
idxInfo *model.IndexInfo,
startKey, endKey kv.Key,
totalRowCount *atomic.Int64,
metricCounter prometheus.Counter,
) (*operator.AsyncPipeline, error) {
index := tables.NewIndex(tbl.GetPhysicalID(), tbl.Meta(), idxInfo)
copCtx, err := NewCopContext(tbl.Meta(), idxInfo, sessCtx)
@ -87,7 +92,7 @@ func NewAddIndexIngestPipeline(
srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool, tbl, index, engine, srcChkPool, writerCnt)
sinkOp := newIndexWriteResultSink(ctx)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, index, totalRowCount, metricCounter)
operator.Compose[TableScanTask](srcOp, scanOp)
operator.Compose[IndexRecordChunk](scanOp, ingestOp)
@ -288,7 +293,9 @@ func (w *tableScanWorker) HandleTask(task TableScanTask, sender func(IndexRecord
}
func (w *tableScanWorker) Close() {
w.sessPool.Put(w.se.Context)
if w.se != nil {
w.sessPool.Put(w.se.Context)
}
}
func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecordChunk)) {
@ -447,7 +454,10 @@ func (w *indexIngestWorker) HandleTask(rs IndexRecordChunk, send func(IndexWrite
send(result)
}
func (*indexIngestWorker) Close() {
func (w *indexIngestWorker) Close() {
if w.se != nil {
w.sessPool.Put(w.se.Context)
}
}
// WriteLocal will write index records to lightning engine.
@ -464,7 +474,13 @@ func (w *indexIngestWorker) WriteLocal(rs *IndexRecordChunk) (count int, nextKey
}
type indexWriteResultSink struct {
ctx context.Context
ctx context.Context
backendCtx ingest.BackendCtx
tbl table.PhysicalTable
index table.Index
rowCount *atomic.Int64
metricCounter prometheus.Counter
errGroup errgroup.Group
source operator.DataChannel[IndexWriteResult]
@ -472,10 +488,20 @@ type indexWriteResultSink struct {
func newIndexWriteResultSink(
ctx context.Context,
backendCtx ingest.BackendCtx,
tbl table.PhysicalTable,
index table.Index,
rowCount *atomic.Int64,
metricCounter prometheus.Counter,
) *indexWriteResultSink {
return &indexWriteResultSink{
ctx: ctx,
errGroup: errgroup.Group{},
ctx: ctx,
backendCtx: backendCtx,
tbl: tbl,
index: index,
rowCount: rowCount,
metricCounter: metricCounter,
errGroup: errgroup.Group{},
}
}
@ -489,14 +515,17 @@ func (s *indexWriteResultSink) Open() error {
}
func (s *indexWriteResultSink) collectResult() error {
// TODO(tangenta): use results to update reorg info and metrics.
for {
select {
case <-s.ctx.Done():
return nil
case result, ok := <-s.source.Channel():
if !ok {
return nil
return s.flush()
}
s.rowCount.Add(int64(result.Added))
if s.metricCounter != nil {
s.metricCounter.Add(float64(result.Added))
}
if result.Err != nil {
return result.Err
@ -505,6 +534,25 @@ func (s *indexWriteResultSink) collectResult() error {
}
}
func (s *indexWriteResultSink) flush() error {
flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota
if s.tbl.GetPartitionedTable() != nil {
flushMode = ingest.FlushModeForceGlobal
}
idxInfo := s.index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, flushMode)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
return err
}
logutil.BgLogger().Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
}
return nil
}
func (s *indexWriteResultSink) Close() error {
return s.errGroup.Wait()
}

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl/ingest"
sess "github.com/pingcap/tidb/ddl/internal/session"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
@ -127,9 +128,9 @@ func (b *txnBackfillScheduler) receiveResult() (*backfillResult, bool) {
return ret, ok
}
func newSessCtx(reorgInfo *reorgInfo) (sessionctx.Context, error) {
sessCtx := newContext(reorgInfo.d.store)
if err := initSessCtx(sessCtx, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location); err != nil {
func newSessCtx(store kv.Storage, sqlMode mysql.SQLMode, tzLocation *model.TimeZoneLocation) (sessionctx.Context, error) {
sessCtx := newContext(store)
if err := initSessCtx(sessCtx, sqlMode, tzLocation); err != nil {
return nil, errors.Trace(err)
}
return sessCtx, nil
@ -182,7 +183,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error {
workerCnt := b.expectedWorkerSize()
// Increase the worker.
for i := len(b.workers); i < workerCnt; i++ {
sessCtx, err := newSessCtx(b.reorgInfo)
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location)
if err != nil {
return err
}
@ -382,7 +383,7 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error {
func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordChunk, workerpool.None] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo)
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location)
if err != nil {
b.poolErr <- err
return nil
@ -423,7 +424,8 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return nil, errors.New("cannot find index info")
}
sessCtx, err := newSessCtx(b.reorgInfo)
ri := b.reorgInfo
sessCtx, err := newSessCtx(ri.d.store, ri.ReorgMeta.SQLMode, ri.ReorgMeta.Location)
if err != nil {
logutil.Logger(b.ctx).Warn("cannot init cop request sender", zap.Error(err))
return nil, err

View File

@ -682,13 +682,13 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
scheduler.RegisterTaskType("backfill")
scheduler.RegisterSchedulerConstructor("backfill", proto.StepOne,
func(ctx context.Context, _ int64, taskMeta []byte, step int64) (scheduler.Scheduler, error) {
return NewBackfillSchedulerHandle(ctx, taskMeta, d, step == proto.StepTwo)
})
func(ctx context.Context, task *proto.Task, summary *scheduler.Summary) (scheduler.Scheduler, error) {
return NewBackfillSchedulerHandle(ctx, task.Meta, d, task.Step == proto.StepTwo, summary)
}, scheduler.WithSummary)
scheduler.RegisterSchedulerConstructor("backfill", proto.StepTwo,
func(ctx context.Context, _ int64, taskMeta []byte, step int64) (scheduler.Scheduler, error) {
return NewBackfillSchedulerHandle(ctx, taskMeta, d, step == proto.StepTwo)
func(ctx context.Context, task *proto.Task, summary *scheduler.Summary) (scheduler.Scheduler, error) {
return NewBackfillSchedulerHandle(ctx, task.Meta, d, task.Step == proto.StepTwo, nil)
})
backFillDsp, err := NewBackfillingDispatcherExt(d)

View File

@ -19,7 +19,11 @@ import (
)
// CheckBackfillJobFinishInterval is export for test.
var CheckBackfillJobFinishInterval = 300 * time.Millisecond
var (
CheckBackfillJobFinishInterval = 300 * time.Millisecond
// UpdateBackfillJobRowCountInterval is the interval of updating the job row count.
UpdateBackfillJobRowCountInterval = 3 * time.Second
)
const (
distPhysicalTableConcurrency = 16

View File

@ -25,7 +25,6 @@ import (
"path/filepath"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
@ -37,6 +36,8 @@ import (
"github.com/pingcap/tidb/ddl/ingest"
sess "github.com/pingcap/tidb/ddl/internal/session"
"github.com/pingcap/tidb/disttask/framework/handle"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
@ -1617,7 +1618,6 @@ type addIndexIngestWorker struct {
writer ingest.Writer
copReqSenderPool *copReqSenderPool
checkpointMgr *ingest.CheckpointManager
flushLock *sync.RWMutex
resultCh chan *backfillResult
jobID int64
@ -1901,13 +1901,16 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
})
g.Go(func() error {
ticker := time.NewTicker(CheckBackfillJobFinishInterval)
defer ticker.Stop()
checkFinishTk := time.NewTicker(CheckBackfillJobFinishInterval)
defer checkFinishTk.Stop()
updateRowCntTk := time.NewTicker(UpdateBackfillJobRowCountInterval)
defer updateRowCntTk.Stop()
for {
select {
case <-done:
w.updateJobRowCount(taskKey, reorgInfo.Job.ID)
return nil
case <-ticker.C:
case <-checkFinishTk.C:
if err = w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if !dbterror.ErrCancelledDDLJob.Equal(err) {
return errors.Trace(err)
@ -1919,10 +1922,32 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error {
continue
}
}
case <-updateRowCntTk.C:
w.updateJobRowCount(taskKey, reorgInfo.Job.ID)
}
}
})
return g.Wait()
err = g.Wait()
return err
}
func (w *worker) updateJobRowCount(taskKey string, jobID int64) {
taskMgr, err := storage.GetTaskManager()
if err != nil {
logutil.BgLogger().Warn("cannot get task manager", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
return
}
gTask, err := taskMgr.GetGlobalTaskByKey(taskKey)
if err != nil || gTask == nil {
logutil.BgLogger().Warn("cannot get global task", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
return
}
rowCount, err := taskMgr.GetSubtaskRowCount(gTask.ID, proto.StepOne)
if err != nil {
logutil.BgLogger().Warn("cannot get subtask row count", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err))
return
}
w.getReorgCtx(jobID).setRowCount(rowCount)
}
func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysicalTableID int64) (int64, kv.Key, kv.Key, error) {

View File

@ -15,7 +15,6 @@
package ddl
import (
"context"
"encoding/hex"
"fmt"
"strconv"
@ -50,13 +49,10 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
const rowCountEtcdPath = "distAddIndex"
// reorgCtx is for reorganization.
type reorgCtx struct {
// doneCh is used to notify.
@ -138,48 +134,6 @@ func (rc *reorgCtx) getRowCount() int64 {
return row
}
func getAndSetJobRowCnt(ctx context.Context, reorgInfo *reorgInfo, rc *reorgCtx, job *model.Job, client *clientv3.Client) int64 {
rowCount := int64(0)
if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
path := fmt.Sprintf("%s/%d", rowCountEtcdPath, job.ID)
resp, err := client.Get(ctx, path, clientv3.WithPrefix())
if err != nil {
logutil.BgLogger().Warn("get row count from ETCD failed", zap.String("category", "ddl"), zap.Error(err))
return 0
}
if len(resp.Kvs) == 0 {
return 0
}
for _, kv := range resp.Kvs {
cnt, err := strconv.Atoi(string(kv.Value))
if err != nil {
logutil.BgLogger().Error("parse row count from ETCD failed", zap.String("category", "ddl"), zap.Error(err))
continue
}
rowCount += int64(cnt)
}
job.SetRowCount(rowCount)
} else {
rowCount = rc.getRowCount()
job.SetRowCount(rowCount)
}
return rowCount
}
func deleteETCDRowCntStatIfNecessary(ctx context.Context, reorgInfo *reorgInfo, job *model.Job, client *clientv3.Client) {
if reorgInfo.Job.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
path := fmt.Sprintf("%s/%d", rowCountEtcdPath, job.ID)
const retryCnt = 3
for i := 0; i < retryCnt; i++ {
_, err := client.Delete(ctx, path, clientv3.WithPrefix())
if err == nil {
return
}
logutil.BgLogger().Warn("delete row count from ETCD failed", zap.String("category", "ddl"), zap.Error(err))
}
}
}
// runReorgJob is used as a portal to do the reorganization work.
// eg:
// 1: add index
@ -271,15 +225,14 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo,
d.removeReorgCtx(job.ID)
return dbterror.ErrCancelledDDLJob
}
rowCount := getAndSetJobRowCnt(w.ctx, reorgInfo, rc, job, d.etcdCli)
rowCount := rc.getRowCount()
job.SetRowCount(rowCount)
if err != nil {
logutil.BgLogger().Warn("run reorg job done", zap.String("category", "ddl"), zap.Int64("handled rows", rowCount), zap.Error(err))
} else {
logutil.BgLogger().Info("run reorg job done", zap.String("category", "ddl"), zap.Int64("handled rows", rowCount))
}
deleteETCDRowCntStatIfNecessary(w.ctx, reorgInfo, job, d.etcdCli)
// Update a job's warnings.
w.mergeWarningsIntoJob(job)
@ -298,7 +251,8 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo,
// We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break.
return dbterror.ErrWaitReorgTimeout
case <-time.After(waitTimeout):
rowCount := getAndSetJobRowCnt(w.ctx, reorgInfo, rc, job, d.etcdCli)
rowCount := rc.getRowCount()
job.SetRowCount(rowCount)
updateBackfillProgress(w, reorgInfo, tblInfo, rowCount)
// Update a job's warnings.

View File

@ -54,6 +54,7 @@ func (i *ingestIndexStage) InitSubtaskExecEnv(_ context.Context) error {
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, i.index, i.ptbl.Meta())
return err
}
logutil.BgLogger().Error("flush error", zap.String("category", "ddl"), zap.Error(err))
return err
@ -61,7 +62,7 @@ func (i *ingestIndexStage) InitSubtaskExecEnv(_ context.Context) error {
return err
}
func (*ingestIndexStage) SplitSubtask(_ context.Context, _ []byte) ([]proto.MinimalTask, error) {
func (*ingestIndexStage) SplitSubtask(_ context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) {
logutil.BgLogger().Info("ingest index stage split subtask", zap.String("category", "ddl"))
return nil, nil
}

View File

@ -16,21 +16,16 @@ package ddl
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"strconv"
"time"
"sync/atomic"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/ddl/ingest"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/logutil"
@ -44,10 +39,8 @@ type readIndexToLocalStage struct {
ptbl table.PhysicalTable
jc *JobContext
bc ingest.BackendCtx
done chan struct{}
totalRowCnt int64
bc ingest.BackendCtx
summary *scheduler.Summary
}
func newReadIndexToLocalStage(
@ -57,53 +50,32 @@ func newReadIndexToLocalStage(
ptbl table.PhysicalTable,
jc *JobContext,
bc ingest.BackendCtx,
summary *scheduler.Summary,
) *readIndexToLocalStage {
return &readIndexToLocalStage{
d: d,
job: job,
index: index,
ptbl: ptbl,
jc: jc,
bc: bc,
done: make(chan struct{}),
totalRowCnt: 0,
d: d,
job: job,
index: index,
ptbl: ptbl,
jc: jc,
bc: bc,
summary: summary,
}
}
func (r *readIndexToLocalStage) InitSubtaskExecEnv(ctx context.Context) error {
func (*readIndexToLocalStage) InitSubtaskExecEnv(_ context.Context) error {
logutil.BgLogger().Info("read index stage init subtask exec env",
zap.String("category", "ddl"))
d := r.d
ser, err := infosync.GetServerInfo()
if err != nil {
return err
}
path := fmt.Sprintf("distAddIndex/%d/%s:%d", r.job.ID, ser.IP, ser.Port)
response, err := d.etcdCli.Get(ctx, path)
if err != nil {
return err
}
if len(response.Kvs) > 0 {
cnt, err := strconv.Atoi(string(response.Kvs[0].Value))
if err != nil {
return err
}
r.totalRowCnt = int64(cnt)
}
r.done = make(chan struct{})
go r.UpdateStatLoop()
return nil
}
func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error) {
func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) {
logutil.BgLogger().Info("read index stage run subtask",
zap.String("category", "ddl"))
d := r.d
sm := &BackfillSubTaskMeta{}
err := json.Unmarshal(subtask, sm)
err := json.Unmarshal(subtask.Meta, sm)
if err != nil {
logutil.BgLogger().Error("unmarshal error",
zap.String("category", "ddl"),
@ -113,7 +85,6 @@ func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask []byte
var startKey, endKey kv.Key
var tbl table.PhysicalTable
var isPartition bool
currentVer, err1 := getValidCurrentVersion(d.store)
if err1 != nil {
@ -130,78 +101,42 @@ func (r *readIndexToLocalStage) SplitSubtask(ctx context.Context, subtask []byte
return nil, err
}
tbl = parTbl.GetPartition(pid)
isPartition = true
} else {
startKey, endKey = sm.StartKey, sm.EndKey
tbl = r.ptbl
}
mockReorgInfo := &reorgInfo{Job: r.job, d: d.ddlCtx}
elements := make([]*meta.Element, 0)
elements = append(elements, &meta.Element{ID: r.index.ID, TypeKey: meta.IndexElementKey})
mockReorgInfo.elements = elements
mockReorgInfo.currElement = mockReorgInfo.elements[0]
ingestScheduler := newIngestBackfillScheduler(ctx, mockReorgInfo, d.sessPool, tbl, true)
defer ingestScheduler.close(true)
consumer := newResultConsumer(d.ddlCtx, mockReorgInfo, nil, true)
consumer.run(ingestScheduler, startKey, &r.totalRowCnt)
err = ingestScheduler.setupWorkers()
ei, err := r.bc.Register(r.job.ID, r.index.ID, r.job.SchemaName, r.job.TableName)
if err != nil {
logutil.BgLogger().Error("setup workers error",
zap.String("category", "ddl"),
zap.Error(err))
logutil.Logger(ctx).Warn("cannot register new engine", zap.Error(err),
zap.Int64("job ID", r.job.ID), zap.Int64("index ID", r.index.ID))
return nil, err
}
taskIDAlloc := newTaskIDAllocator()
for {
kvRanges, err := splitTableRanges(r.ptbl, d.store, startKey, endKey, backfillTaskChanSize)
if err != nil {
return nil, err
}
if len(kvRanges) == 0 {
break
}
logutil.BgLogger().Info("start backfill workers to reorg record",
zap.String("category", "ddl"),
zap.Int("workerCnt", ingestScheduler.currentWorkerSize()),
zap.Int("regionCnt", len(kvRanges)),
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)))
sendTasks(ingestScheduler, consumer, tbl, kvRanges, mockReorgInfo, taskIDAlloc)
if consumer.shouldAbort() {
break
}
rangeEndKey := kvRanges[len(kvRanges)-1].EndKey
startKey = rangeEndKey.Next()
if startKey.Cmp(endKey) >= 0 {
break
}
}
ingestScheduler.close(false)
if err := consumer.getResult(); err != nil {
return nil, err
}
flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota
if isPartition {
flushMode = ingest.FlushModeForceGlobal
}
_, _, err = r.bc.Flush(r.index.ID, flushMode)
sessCtx, err := newSessCtx(d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, r.index, r.ptbl.Meta())
}
logutil.BgLogger().Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return nil, err
}
totalRowCount := &atomic.Int64{}
counter := metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
pipe, err := NewAddIndexIngestPipeline(
ctx, d.store, d.sessPool, r.bc, ei, sessCtx, tbl, r.index, startKey, endKey, totalRowCount, counter)
if err != nil {
return nil, err
}
err = pipe.Execute()
if err != nil {
return nil, err
}
err = pipe.Close()
if err != nil {
return nil, err
}
r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load())
return nil, nil
}
@ -211,11 +146,6 @@ func (r *readIndexToLocalStage) CleanupSubtaskExecEnv(_ context.Context) error {
if _, ok := r.ptbl.(table.PartitionedTable); ok {
ingest.LitBackCtxMgr.Unregister(r.job.ID)
}
close(r.done)
if !r.d.OwnerManager().IsOwner() {
// For owner, reorg ctx will be removed after the reorg job is done.
r.d.removeReorgCtx(r.job.ID)
}
return nil
}
@ -236,38 +166,5 @@ func (r *readIndexToLocalStage) Rollback(_ context.Context) error {
logutil.BgLogger().Info("read index stage rollback backfill add index task",
zap.String("category", "ddl"), zap.Int64("jobID", r.job.ID))
ingest.LitBackCtxMgr.Unregister(r.job.ID)
if !r.d.OwnerManager().IsOwner() {
// For owner, reorg ctx will be removed after the reorg job is done.
r.d.removeReorgCtx(r.job.ID)
}
return nil
}
// UpdateStatLoop updates the row count of adding index.
func (r *readIndexToLocalStage) UpdateStatLoop() {
tk := time.Tick(time.Second * 5)
ser, err := infosync.GetServerInfo()
if err != nil {
logutil.BgLogger().Warn("get server info failed",
zap.String("category", "ddl"), zap.Error(err))
return
}
path := fmt.Sprintf("%s/%d/%s:%d", rowCountEtcdPath, r.job.ID, ser.IP, ser.Port)
writeToEtcd := func() {
err := ddlutil.PutKVToEtcd(context.TODO(), r.d.etcdCli, 3, path, strconv.Itoa(int(r.totalRowCnt)))
if err != nil {
logutil.BgLogger().Warn("update row count for distributed add index failed",
zap.String("category", "ddl"),
zap.Error(err))
}
}
for {
select {
case <-r.done:
writeToEtcd()
return
case <-tk:
writeToEtcd()
}
}
}

View File

@ -39,10 +39,12 @@ type BackfillSubTaskMeta struct {
PhysicalTableID int64 `json:"physical_table_id"`
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`
RowCount int64 `json:"row_count"`
}
// NewBackfillSchedulerHandle creates a new backfill scheduler.
func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, stepForImport bool) (scheduler.Scheduler, error) {
func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl,
stepForImport bool, summary *scheduler.Summary) (scheduler.Scheduler, error) {
bgm := &BackfillGlobalMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
@ -70,7 +72,7 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, st
jc := d.jobContext(jobMeta.ID)
d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query)
d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexToLocalStage(d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc), nil
return newReadIndexToLocalStage(d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc, summary), nil
}
return newIngestIndexStage(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc), nil
}

View File

@ -87,10 +87,10 @@ func RegisterHATaskMeta(m *sync.Map) {
})
scheduler.ClearSchedulers()
scheduler.RegisterTaskType(proto.TaskTypeExample)
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) {
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) {
return &testScheduler{}, nil
})
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) {
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) {
return &testScheduler{}, nil
})
scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) {

View File

@ -84,7 +84,7 @@ func (t *rollbackScheduler) Rollback(_ context.Context) error {
return nil
}
func (t *rollbackScheduler) SplitSubtask(_ context.Context, _ []byte) ([]proto.MinimalTask, error) {
func (t *rollbackScheduler) SplitSubtask(_ context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) {
return []proto.MinimalTask{
testRollbackMiniTask{},
testRollbackMiniTask{},
@ -115,7 +115,7 @@ func RegisterRollbackTaskMeta(m *sync.Map) {
})
scheduler.ClearSchedulers()
scheduler.RegisterTaskType(proto.TaskTypeExample)
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) {
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) {
return &rollbackScheduler{m: m}, nil
})
scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) {

View File

@ -97,7 +97,7 @@ func (t *testScheduler) CleanupSubtaskExecEnv(_ context.Context) error { return
func (t *testScheduler) Rollback(_ context.Context) error { return nil }
func (t *testScheduler) SplitSubtask(_ context.Context, _ []byte) ([]proto.MinimalTask, error) {
func (t *testScheduler) SplitSubtask(_ context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) {
return []proto.MinimalTask{
testMiniTask{},
testMiniTask{},
@ -138,10 +138,10 @@ func RegisterTaskMeta(m *sync.Map, dispatcherHandle dispatcher.Extension) {
})
scheduler.ClearSchedulers()
scheduler.RegisterTaskType(proto.TaskTypeExample)
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) {
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) {
return &testScheduler{}, nil
})
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ int64, _ []byte, _ int64) (scheduler.Scheduler, error) {
scheduler.RegisterSchedulerConstructor(proto.TaskTypeExample, proto.StepTwo, func(_ context.Context, _ *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) {
return &testScheduler{}, nil
})
scheduler.RegisterSubtaskExectorConstructor(proto.TaskTypeExample, proto.StepOne, func(_ proto.MinimalTask, _ int64) (scheduler.SubtaskExecutor, error) {

View File

@ -361,7 +361,7 @@ func (mr *MockSchedulerMockRecorder) Rollback(arg0 interface{}) *gomock.Call {
}
// SplitSubtask mocks base method.
func (m *MockScheduler) SplitSubtask(arg0 context.Context, arg1 []byte) ([]proto.MinimalTask, error) {
func (m *MockScheduler) SplitSubtask(arg0 context.Context, arg1 *proto.Subtask) ([]proto.MinimalTask, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SplitSubtask", arg0, arg1)
ret0, _ := ret[0].([]proto.MinimalTask)

View File

@ -7,6 +7,7 @@ go_library(
"manager.go",
"register.go",
"scheduler.go",
"summary.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/scheduler",
visibility = ["//visibility:public"],

View File

@ -53,7 +53,7 @@ type Scheduler interface {
// InitSubtaskExecEnv is used to initialize the environment for the subtask executor.
InitSubtaskExecEnv(context.Context) error
// SplitSubtask is used to split the subtask into multiple minimal tasks.
SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error)
SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error)
// CleanupSubtaskExecEnv is used to clean up the environment for the subtask executor.
CleanupSubtaskExecEnv(context.Context) error
// OnSubtaskFinished is used to handle the subtask when it is finished.
@ -84,7 +84,7 @@ func (*EmptyScheduler) InitSubtaskExecEnv(context.Context) error {
}
// SplitSubtask implements the Scheduler interface.
func (*EmptyScheduler) SplitSubtask(context.Context, []byte) ([]proto.MinimalTask, error) {
func (*EmptyScheduler) SplitSubtask(context.Context, *proto.Subtask) ([]proto.MinimalTask, error) {
return nil, nil
}

View File

@ -37,10 +37,15 @@ func WithPoolSize(poolSize int32) TaskTypeOption {
}
type schedulerRegisterOptions struct {
Summary *Summary
}
// Constructor is the constructor of Scheduler.
type Constructor func(context context.Context, taskID int64, taskMeta []byte, step int64) (Scheduler, error)
type Constructor func(
context context.Context,
task *proto.Task,
summary *Summary,
) (Scheduler, error)
// RegisterOption is the register option of Scheduler.
type RegisterOption func(opts *schedulerRegisterOptions)
@ -112,3 +117,8 @@ func ClearSchedulers() {
subtaskExecutorConstructors = make(map[string]SubtaskExecutorConstructor)
subtaskExecutorOptions = make(map[string]subtaskExecutorRegisterOptions)
}
// WithSummary is the option of Scheduler to set the summary.
var WithSummary RegisterOption = func(opts *schedulerRegisterOptions) {
opts.Summary = NewSummary()
}

View File

@ -24,7 +24,7 @@ import (
func mockSchedulerOptionFunc(op *schedulerRegisterOptions) {}
func mockSchedulerConstructor(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) {
func mockSchedulerConstructor(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) {
return nil, nil
}

View File

@ -116,7 +116,15 @@ func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error
s.registerCancelFunc(runCancel)
s.resetError()
logutil.Logger(s.logCtx).Info("scheduler run a step", zap.Any("step", task.Step), zap.Any("concurrency", task.Concurrency))
scheduler, err := createScheduler(ctx, task)
summary, cleanup, err := runSummaryCollectLoop(ctx, task, s.taskTable)
if err != nil {
s.onError(err)
return s.getError()
}
defer cleanup()
scheduler, err := createScheduler(ctx, task, summary)
if err != nil {
s.onError(err)
return s.getError()
@ -185,7 +193,7 @@ func (s *InternalSchedulerImpl) run(ctx context.Context, task *proto.Task) error
}
func (s *InternalSchedulerImpl) runSubtask(ctx context.Context, scheduler Scheduler, subtask *proto.Subtask, minimalTaskCh chan func()) {
minimalTasks, err := scheduler.SplitSubtask(ctx, subtask.Meta)
minimalTasks, err := scheduler.SplitSubtask(ctx, subtask)
if err != nil {
s.onError(err)
if errors.Cause(err) == context.Canceled {
@ -349,7 +357,7 @@ func (s *InternalSchedulerImpl) Rollback(ctx context.Context, task *proto.Task)
}
}
scheduler, err := createScheduler(ctx, task)
scheduler, err := createScheduler(ctx, task, nil)
if err != nil {
s.onError(err)
return s.getError()
@ -378,13 +386,36 @@ func (s *InternalSchedulerImpl) Rollback(ctx context.Context, task *proto.Task)
return s.getError()
}
func createScheduler(ctx context.Context, task *proto.Task) (Scheduler, error) {
func createScheduler(ctx context.Context, task *proto.Task, summary *Summary) (Scheduler, error) {
key := getKey(task.Type, task.Step)
constructor, ok := schedulerConstructors[key]
if !ok {
return nil, errors.Errorf("constructor of scheduler for key %s not found", key)
}
return constructor(ctx, task.ID, task.Meta, task.Step)
return constructor(ctx, task, summary)
}
func runSummaryCollectLoop(
ctx context.Context,
task *proto.Task,
taskTable TaskTable,
) (summary *Summary, cleanup func(), err error) {
taskMgr, ok := taskTable.(*storage.TaskManager)
if !ok {
return nil, func() {}, nil
}
key := getKey(task.Type, task.Step)
opt, ok := schedulerOptions[key]
if !ok {
return nil, func() {}, errors.Errorf("scheduler option for key %s not found", key)
}
if opt.Summary != nil {
go opt.Summary.UpdateRowCountLoop(ctx, taskMgr)
return opt.Summary, func() {
opt.Summary.PersistRowCount(ctx, taskMgr)
}, nil
}
return nil, func() {}, nil
}
func createSubtaskExecutor(minimalTask proto.MinimalTask, tp string, step int64) (SubtaskExecutor, error) {

View File

@ -86,7 +86,7 @@ func TestSchedulerRun(t *testing.T) {
err := scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp})
require.EqualError(t, err, schedulerRegisterErr.Error())
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) {
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) {
return mockScheduler, nil
})
@ -172,7 +172,7 @@ func TestSchedulerRun(t *testing.T) {
require.NoError(t, err)
// 9. run subtask one by one
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) {
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) {
return mockScheduler, nil
})
mockScheduler.EXPECT().InitSubtaskExecEnv(gomock.Any()).Return(nil)
@ -283,7 +283,7 @@ func TestSchedulerRollback(t *testing.T) {
err := scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, ID: 1, Type: tp})
require.EqualError(t, err, schedulerRegisterErr.Error())
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) {
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) {
return mockScheduler, nil
})
@ -360,7 +360,7 @@ func TestScheduler(t *testing.T) {
mockScheduler := mock.NewMockScheduler(ctrl)
mockSubtaskExecutor := mock.NewMockSubtaskExecutor(ctrl)
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ int64, task []byte, step int64) (Scheduler, error) {
RegisterSchedulerConstructor(tp, proto.StepOne, func(_ context.Context, _ *proto.Task, _ *Summary) (Scheduler, error) {
return mockScheduler, nil
})
RegisterSubtaskExectorConstructor(tp, proto.StepOne, func(minimalTask proto.MinimalTask, step int64) (SubtaskExecutor, error) {

View File

@ -0,0 +1,93 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"context"
"sync"
"time"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
// Summary is used to collect the summary of subtasks execution.
type Summary struct {
mu struct {
sync.Mutex
RowCount map[int64]int64 // subtask ID -> row count
}
}
// NewSummary creates a new Summary.
func NewSummary() *Summary {
return &Summary{
mu: struct {
sync.Mutex
RowCount map[int64]int64
}{
RowCount: map[int64]int64{},
},
}
}
// UpdateRowCount updates the row count of the subtask.
func (s *Summary) UpdateRowCount(subtaskID int64, rowCount int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RowCount[subtaskID] = rowCount
}
// UpdateRowCountLoop updates the row count of the subtask periodically.
func (s *Summary) UpdateRowCountLoop(ctx context.Context, taskMgr *storage.TaskManager) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.PersistRowCount(ctx, taskMgr)
}
}
}
// PersistRowCount persists the row count of the subtask to the storage.
func (s *Summary) PersistRowCount(ctx context.Context, taskMgr *storage.TaskManager) {
var copiedRowCount map[int64]int64
s.mu.Lock()
if len(s.mu.RowCount) == 0 {
s.mu.Unlock()
return
}
copiedRowCount = make(map[int64]int64, len(s.mu.RowCount))
for subtaskID, rowCount := range s.mu.RowCount {
copiedRowCount[subtaskID] = rowCount
}
s.mu.Unlock()
for subtaskID, rowCount := range copiedRowCount {
err := taskMgr.UpdateSubtaskRowCount(subtaskID, rowCount)
if err != nil {
logutil.Logger(ctx).Warn("update subtask row count failed", zap.Error(err))
}
}
s.mu.Lock()
for subtaskID := range copiedRowCount {
delete(s.mu.RowCount, subtaskID)
}
s.mu.Unlock()
}

View File

@ -224,12 +224,23 @@ func TestSubTaskTable(t *testing.T) {
subtasks, err := sm.GetSucceedSubtasksByStep(2, proto.StepInit)
require.NoError(t, err)
require.Len(t, subtasks, 0)
err = sm.FinishSubtask(2, []byte{})
require.NoError(t, err)
subtasks, err = sm.GetSucceedSubtasksByStep(2, proto.StepInit)
require.NoError(t, err)
require.Len(t, subtasks, 1)
rowCount, err := sm.GetSubtaskRowCount(2, proto.StepInit)
require.NoError(t, err)
require.Equal(t, int64(0), rowCount)
err = sm.UpdateSubtaskRowCount(2, 100)
require.NoError(t, err)
rowCount, err = sm.GetSubtaskRowCount(2, proto.StepInit)
require.NoError(t, err)
require.Equal(t, int64(100), rowCount)
// test UpdateErrorToSubtask do update start/update time
err = sm.AddNewSubTask(3, proto.StepInit, "for_test", []byte("test"), proto.TaskTypeExample, false)
require.NoError(t, err)

View File

@ -388,6 +388,29 @@ func (stm *TaskManager) GetSucceedSubtasksByStep(taskID int64, step int64) ([]*p
return subtasks, nil
}
// GetSubtaskRowCount gets the subtask row count.
func (stm *TaskManager) GetSubtaskRowCount(taskID int64, step int64) (int64, error) {
rs, err := stm.executeSQLWithNewSession(stm.ctx, `select
cast(sum(json_extract(summary, '$.row_count')) as signed) as row_count
from mysql.tidb_background_subtask where task_key = %? and step = %?`,
taskID, step)
if err != nil {
return 0, err
}
if len(rs) == 0 {
return 0, nil
}
return rs[0].GetInt64(0), nil
}
// UpdateSubtaskRowCount updates the subtask row count.
func (stm *TaskManager) UpdateSubtaskRowCount(subtaskID int64, rowCount int64) error {
_, err := stm.executeSQLWithNewSession(stm.ctx, `update mysql.tidb_background_subtask
set summary = json_set(summary, '$.row_count', %?) where id = %?`,
rowCount, subtaskID)
return err
}
// GetSubtaskInStatesCnt gets the subtask count in the states.
func (stm *TaskManager) GetSubtaskInStatesCnt(taskID int64, states ...interface{}) (int64, error) {
args := []interface{}{taskID}

View File

@ -90,7 +90,8 @@ func (s *importStepScheduler) InitSubtaskExecEnv(ctx context.Context) error {
return nil
}
func (s *importStepScheduler) SplitSubtask(ctx context.Context, bs []byte) ([]proto.MinimalTask, error) {
func (s *importStepScheduler) SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) {
bs := subtask.Meta
var subtaskMeta ImportStepMeta
err := json.Unmarshal(bs, &subtaskMeta)
if err != nil {
@ -208,11 +209,12 @@ type postStepScheduler struct {
var _ scheduler.Scheduler = &postStepScheduler{}
func (p *postStepScheduler) SplitSubtask(_ context.Context, metaBytes []byte) ([]proto.MinimalTask, error) {
func (p *postStepScheduler) SplitSubtask(_ context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) {
mTask := &postProcessStepMinimalTask{
taskMeta: p.taskMeta,
logger: p.logger,
}
metaBytes := subtask.Meta
if err := json.Unmarshal(metaBytes, &mTask.meta); err != nil {
return nil, err
}
@ -235,28 +237,28 @@ func init() {
}
scheduler.RegisterTaskType(proto.ImportInto, scheduler.WithPoolSize(int32(runtime.GOMAXPROCS(0))))
scheduler.RegisterSchedulerConstructor(proto.ImportInto, StepImport,
func(ctx context.Context, taskID int64, bs []byte, step int64) (scheduler.Scheduler, error) {
func(_ context.Context, task *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) {
// TODO(tangenta): use context for lifetime control.
taskMeta, logger, err := prepareFn(taskID, bs, step)
taskMeta, logger, err := prepareFn(task.ID, task.Meta, task.Step)
if err != nil {
return nil, err
}
return &importStepScheduler{
taskID: taskID,
taskID: task.ID,
taskMeta: taskMeta,
logger: logger,
}, nil
},
)
scheduler.RegisterSchedulerConstructor(proto.ImportInto, StepPostProcess,
func(ctx context.Context, taskID int64, bs []byte, step int64) (scheduler.Scheduler, error) {
func(_ context.Context, task *proto.Task, _ *scheduler.Summary) (scheduler.Scheduler, error) {
// TODO(tangenta): use context for lifetime control.
taskMeta, logger, err := prepareFn(taskID, bs, step)
taskMeta, logger, err := prepareFn(task.ID, task.Meta, task.Step)
if err != nil {
return nil, err
}
return &postStepScheduler{
taskID: taskID,
taskID: task.ID,
taskMeta: taskMeta,
logger: logger,
}, nil

View File

@ -17,6 +17,7 @@ package addindextest
import (
"context"
"fmt"
"sync/atomic"
"testing"
"github.com/ngaut/pools"
@ -192,31 +193,31 @@ func TestBackfillOperatorPipeline(t *testing.T) {
sessPool := newSessPoolForTest(t, store)
ctx := context.Background()
var keys, values [][]byte
onWrite := func(key, val []byte) {
keys = append(keys, key)
values = append(values, val)
}
mockBackendCtx := &ingest.MockBackendCtx{}
mockEngine := ingest.NewMockEngineInfo(nil)
mockEngine.SetHook(onWrite)
mockEngine.SetHook(func(key, val []byte) {})
totalRowCount := &atomic.Int64{}
pipeline, err := ddl.NewAddIndexIngestPipeline(
ctx, store,
sessPool,
mockBackendCtx,
mockEngine,
tk.Session(),
tbl.(table.PhysicalTable),
idxInfo,
startKey,
endKey,
totalRowCount,
nil,
)
require.NoError(t, err)
err = pipeline.Execute()
require.NoError(t, err)
err = pipeline.Close()
require.NoError(t, err)
require.Len(t, keys, 10)
require.Len(t, values, 10)
require.Equal(t, int64(10), totalRowCount.Load())
}
type sessPoolForTest struct {