diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 78d2ad1512..3f81ed4600 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -419,6 +419,38 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { } } +func splitAndValidateTableRanges( + ctx context.Context, + t table.PhysicalTable, + store kv.Storage, + startKey, endKey kv.Key, + limit int, +) ([]kv.KeyRange, error) { + ranges, err := splitTableRanges(ctx, t, store, startKey, endKey, limit) + if err != nil { + return nil, err + } + return validateTableRanges(ranges, startKey, endKey) +} + +func validateTableRanges(ranges []kv.KeyRange, start, end kv.Key) ([]kv.KeyRange, error) { + for i, r := range ranges { + if len(r.StartKey) == 0 { + if i != 0 { + return nil, errors.Errorf("get empty start key in the middle of ranges") + } + r.StartKey = start + } + if len(r.EndKey) == 0 { + if i != len(ranges)-1 { + return nil, errors.Errorf("get empty end key in the middle of ranges") + } + r.EndKey = end + } + } + return ranges, nil +} + // splitTableRanges uses PD region's key ranges to split the backfilling table key range space, // to speed up backfilling data in table with disperse handle. // The `t` should be a non-partitioned table or a partition. @@ -460,40 +492,21 @@ func splitTableRanges( return ranges, nil } -func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, - taskIDAlloc *taskIDAllocator) []*reorgBackfillTask { +func getBatchTasks( + t table.Table, + reorgInfo *reorgInfo, + kvRanges []kv.KeyRange, + taskIDAlloc *taskIDAllocator, + bfWorkerTp backfillerType, +) []*reorgBackfillTask { batchTasks := make([]*reorgBackfillTask, 0, len(kvRanges)) - var prefix kv.Key - if reorgInfo.mergingTmpIdx { - prefix = t.IndexPrefix() - } else { - prefix = t.RecordPrefix() - } - // Build reorg tasks. - job := reorgInfo.Job //nolint:forcetypeassert phyTbl := t.(table.PhysicalTable) - jobCtx := reorgInfo.NewJobContext() - for _, keyRange := range kvRanges { + for _, r := range kvRanges { taskID := taskIDAlloc.alloc() - startKey := keyRange.StartKey - if len(startKey) == 0 { - startKey = prefix - } - endKey := keyRange.EndKey - if len(endKey) == 0 { - endKey = prefix.PrefixNext() - } - endK, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, startKey, endKey) - if err != nil { - logutil.DDLLogger().Info("get backfill range task, get reverse key failed", zap.Error(err)) - } else { - logutil.DDLLogger().Info("get backfill range task, change end key", - zap.Int("id", taskID), zap.Int64("pTbl", phyTbl.GetPhysicalID()), - zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK))) - endKey = endK - } - + startKey := r.StartKey + endKey := r.EndKey + endKey = getActualEndKey(t, reorgInfo, bfWorkerTp, startKey, endKey, taskID) task := &reorgBackfillTask{ id: taskID, jobID: reorgInfo.Job.ID, @@ -507,6 +520,45 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, return batchTasks } +func getActualEndKey( + t table.Table, + reorgInfo *reorgInfo, + bfTp backfillerType, + rangeStart, rangeEnd kv.Key, + taskID int, +) kv.Key { + job := reorgInfo.Job + //nolint:forcetypeassert + phyTbl := t.(table.PhysicalTable) + + if bfTp == typeAddIndexMergeTmpWorker { + // Temp Index data does not grow infinitely, we can return the whole range + // and IndexMergeTmpWorker should still be finished in a bounded time. + return rangeEnd + } + if bfTp == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { + // Ingest worker uses coprocessor to read table data. It is fast enough, + // we don't need to get the actual end key of this range. + return rangeEnd + } + + // Otherwise to avoid the future data written to key range of [backfillChunkEndKey, rangeEnd) and + // backfill worker can't catch up, we shrink the end key to the actual written key for now. + jobCtx := reorgInfo.NewJobContext() + + actualEndKey, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, t.RecordPrefix(), rangeStart, rangeEnd) + if err != nil { + logutil.DDLLogger().Info("get backfill range task, get reverse key failed", zap.Error(err)) + return rangeEnd + } + logutil.DDLLogger().Info("get backfill range task, change end key", + zap.Int("id", taskID), + zap.Int64("pTbl", phyTbl.GetPhysicalID()), + zap.String("end key", hex.EncodeToString(rangeEnd)), + zap.String("current end key", hex.EncodeToString(actualEndKey))) + return actualEndKey +} + // sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled. func sendTasks( scheduler backfillScheduler, @@ -514,8 +566,9 @@ func sendTasks( kvRanges []kv.KeyRange, reorgInfo *reorgInfo, taskIDAlloc *taskIDAllocator, + bfWorkerTp backfillerType, ) error { - batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc) + batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc, bfWorkerTp) for _, task := range batchTasks { if err := scheduler.sendTask(task); err != nil { return errors.Trace(err) @@ -686,7 +739,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( start, end := startKey, endKey taskIDAlloc := newTaskIDAllocator() for { - kvRanges, err2 := splitTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize) + kvRanges, err2 := splitAndValidateTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize) if err2 != nil { return errors.Trace(err2) } @@ -700,7 +753,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( zap.String("startKey", hex.EncodeToString(start)), zap.String("endKey", hex.EncodeToString(end))) - err2 = sendTasks(scheduler, t, kvRanges, reorgInfo, taskIDAlloc) + err2 = sendTasks(scheduler, t, kvRanges, reorgInfo, taskIDAlloc, bfWorkerType) if err2 != nil { return errors.Trace(err2) } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 08e83a050d..20e4d7138a 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -308,7 +308,7 @@ func (src *TableScanTaskSource) generateTasks() error { startKey := src.startKey endKey := src.endKey for { - kvRanges, err := splitTableRanges( + kvRanges, err := splitAndValidateTableRanges( src.ctx, src.tbl, src.store,