ddl: skip getting actual end key for each range in ingest mode (#54143)
close pingcap/tidb#45847, close pingcap/tidb#54147
This commit is contained in:
@ -419,6 +419,38 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
|
||||
}
|
||||
}
|
||||
|
||||
func splitAndValidateTableRanges(
|
||||
ctx context.Context,
|
||||
t table.PhysicalTable,
|
||||
store kv.Storage,
|
||||
startKey, endKey kv.Key,
|
||||
limit int,
|
||||
) ([]kv.KeyRange, error) {
|
||||
ranges, err := splitTableRanges(ctx, t, store, startKey, endKey, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return validateTableRanges(ranges, startKey, endKey)
|
||||
}
|
||||
|
||||
func validateTableRanges(ranges []kv.KeyRange, start, end kv.Key) ([]kv.KeyRange, error) {
|
||||
for i, r := range ranges {
|
||||
if len(r.StartKey) == 0 {
|
||||
if i != 0 {
|
||||
return nil, errors.Errorf("get empty start key in the middle of ranges")
|
||||
}
|
||||
r.StartKey = start
|
||||
}
|
||||
if len(r.EndKey) == 0 {
|
||||
if i != len(ranges)-1 {
|
||||
return nil, errors.Errorf("get empty end key in the middle of ranges")
|
||||
}
|
||||
r.EndKey = end
|
||||
}
|
||||
}
|
||||
return ranges, nil
|
||||
}
|
||||
|
||||
// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
|
||||
// to speed up backfilling data in table with disperse handle.
|
||||
// The `t` should be a non-partitioned table or a partition.
|
||||
@ -460,40 +492,21 @@ func splitTableRanges(
|
||||
return ranges, nil
|
||||
}
|
||||
|
||||
func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
|
||||
taskIDAlloc *taskIDAllocator) []*reorgBackfillTask {
|
||||
func getBatchTasks(
|
||||
t table.Table,
|
||||
reorgInfo *reorgInfo,
|
||||
kvRanges []kv.KeyRange,
|
||||
taskIDAlloc *taskIDAllocator,
|
||||
bfWorkerTp backfillerType,
|
||||
) []*reorgBackfillTask {
|
||||
batchTasks := make([]*reorgBackfillTask, 0, len(kvRanges))
|
||||
var prefix kv.Key
|
||||
if reorgInfo.mergingTmpIdx {
|
||||
prefix = t.IndexPrefix()
|
||||
} else {
|
||||
prefix = t.RecordPrefix()
|
||||
}
|
||||
// Build reorg tasks.
|
||||
job := reorgInfo.Job
|
||||
//nolint:forcetypeassert
|
||||
phyTbl := t.(table.PhysicalTable)
|
||||
jobCtx := reorgInfo.NewJobContext()
|
||||
for _, keyRange := range kvRanges {
|
||||
for _, r := range kvRanges {
|
||||
taskID := taskIDAlloc.alloc()
|
||||
startKey := keyRange.StartKey
|
||||
if len(startKey) == 0 {
|
||||
startKey = prefix
|
||||
}
|
||||
endKey := keyRange.EndKey
|
||||
if len(endKey) == 0 {
|
||||
endKey = prefix.PrefixNext()
|
||||
}
|
||||
endK, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, startKey, endKey)
|
||||
if err != nil {
|
||||
logutil.DDLLogger().Info("get backfill range task, get reverse key failed", zap.Error(err))
|
||||
} else {
|
||||
logutil.DDLLogger().Info("get backfill range task, change end key",
|
||||
zap.Int("id", taskID), zap.Int64("pTbl", phyTbl.GetPhysicalID()),
|
||||
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
|
||||
endKey = endK
|
||||
}
|
||||
|
||||
startKey := r.StartKey
|
||||
endKey := r.EndKey
|
||||
endKey = getActualEndKey(t, reorgInfo, bfWorkerTp, startKey, endKey, taskID)
|
||||
task := &reorgBackfillTask{
|
||||
id: taskID,
|
||||
jobID: reorgInfo.Job.ID,
|
||||
@ -507,6 +520,45 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
|
||||
return batchTasks
|
||||
}
|
||||
|
||||
func getActualEndKey(
|
||||
t table.Table,
|
||||
reorgInfo *reorgInfo,
|
||||
bfTp backfillerType,
|
||||
rangeStart, rangeEnd kv.Key,
|
||||
taskID int,
|
||||
) kv.Key {
|
||||
job := reorgInfo.Job
|
||||
//nolint:forcetypeassert
|
||||
phyTbl := t.(table.PhysicalTable)
|
||||
|
||||
if bfTp == typeAddIndexMergeTmpWorker {
|
||||
// Temp Index data does not grow infinitely, we can return the whole range
|
||||
// and IndexMergeTmpWorker should still be finished in a bounded time.
|
||||
return rangeEnd
|
||||
}
|
||||
if bfTp == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
|
||||
// Ingest worker uses coprocessor to read table data. It is fast enough,
|
||||
// we don't need to get the actual end key of this range.
|
||||
return rangeEnd
|
||||
}
|
||||
|
||||
// Otherwise to avoid the future data written to key range of [backfillChunkEndKey, rangeEnd) and
|
||||
// backfill worker can't catch up, we shrink the end key to the actual written key for now.
|
||||
jobCtx := reorgInfo.NewJobContext()
|
||||
|
||||
actualEndKey, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, t.RecordPrefix(), rangeStart, rangeEnd)
|
||||
if err != nil {
|
||||
logutil.DDLLogger().Info("get backfill range task, get reverse key failed", zap.Error(err))
|
||||
return rangeEnd
|
||||
}
|
||||
logutil.DDLLogger().Info("get backfill range task, change end key",
|
||||
zap.Int("id", taskID),
|
||||
zap.Int64("pTbl", phyTbl.GetPhysicalID()),
|
||||
zap.String("end key", hex.EncodeToString(rangeEnd)),
|
||||
zap.String("current end key", hex.EncodeToString(actualEndKey)))
|
||||
return actualEndKey
|
||||
}
|
||||
|
||||
// sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
|
||||
func sendTasks(
|
||||
scheduler backfillScheduler,
|
||||
@ -514,8 +566,9 @@ func sendTasks(
|
||||
kvRanges []kv.KeyRange,
|
||||
reorgInfo *reorgInfo,
|
||||
taskIDAlloc *taskIDAllocator,
|
||||
bfWorkerTp backfillerType,
|
||||
) error {
|
||||
batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc)
|
||||
batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc, bfWorkerTp)
|
||||
for _, task := range batchTasks {
|
||||
if err := scheduler.sendTask(task); err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -686,7 +739,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
|
||||
start, end := startKey, endKey
|
||||
taskIDAlloc := newTaskIDAllocator()
|
||||
for {
|
||||
kvRanges, err2 := splitTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize)
|
||||
kvRanges, err2 := splitAndValidateTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize)
|
||||
if err2 != nil {
|
||||
return errors.Trace(err2)
|
||||
}
|
||||
@ -700,7 +753,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
|
||||
zap.String("startKey", hex.EncodeToString(start)),
|
||||
zap.String("endKey", hex.EncodeToString(end)))
|
||||
|
||||
err2 = sendTasks(scheduler, t, kvRanges, reorgInfo, taskIDAlloc)
|
||||
err2 = sendTasks(scheduler, t, kvRanges, reorgInfo, taskIDAlloc, bfWorkerType)
|
||||
if err2 != nil {
|
||||
return errors.Trace(err2)
|
||||
}
|
||||
|
||||
@ -308,7 +308,7 @@ func (src *TableScanTaskSource) generateTasks() error {
|
||||
startKey := src.startKey
|
||||
endKey := src.endKey
|
||||
for {
|
||||
kvRanges, err := splitTableRanges(
|
||||
kvRanges, err := splitAndValidateTableRanges(
|
||||
src.ctx,
|
||||
src.tbl,
|
||||
src.store,
|
||||
|
||||
Reference in New Issue
Block a user