diff --git a/ddl/backfilling.go b/ddl/backfilling.go index b20017ca18..251333a39e 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -628,8 +628,8 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum return nil } -func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask { - batchTasks := make([]*reorgBackfillTask, 0, batch) +func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) []*reorgBackfillTask { + batchTasks := make([]*reorgBackfillTask, 0, len(kvRanges)) var prefix kv.Key if reorgInfo.mergingTmpIdx { prefix = t.IndexPrefix() @@ -669,36 +669,19 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, // If the boundaries overlap, we should ignore the preceding endKey. endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1} batchTasks = append(batchTasks, task) - - if len(batchTasks) >= batch { - break - } } return batchTasks } // sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled. -func sendTasks(scheduler backfillScheduler, consumer *resultConsumer, t table.PhysicalTable, - kvRanges []kv.KeyRange, reorgInfo *reorgInfo) ([]kv.KeyRange, error) { - batchTasks := getBatchTasks(t, reorgInfo, kvRanges, backfillTaskChanSize) - if len(batchTasks) == 0 { - return nil, nil - } - +func sendTasks(scheduler backfillScheduler, consumer *resultConsumer, t table.PhysicalTable, kvRanges []kv.KeyRange, reorgInfo *reorgInfo) { + batchTasks := getBatchTasks(t, reorgInfo, kvRanges) for _, task := range batchTasks { if consumer.shouldAbort() { - return nil, nil + return } scheduler.sendTask(task) } - - if len(batchTasks) < len(kvRanges) { - // There are kvRanges not handled. - remains := kvRanges[len(batchTasks):] - return remains, nil - } - - return nil, nil } var ( @@ -828,19 +811,12 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) - remains, err := sendTasks(scheduler, consumer, t, kvRanges, reorgInfo) - if err != nil { - return errors.Trace(err) - } + sendTasks(scheduler, consumer, t, kvRanges, reorgInfo) if consumer.shouldAbort() { break } - if len(remains) > 0 { - startKey = remains[0].StartKey - } else { - rangeEndKey := kvRanges[len(kvRanges)-1].EndKey - startKey = rangeEndKey.Next() - } + rangeEndKey := kvRanges[len(kvRanges)-1].EndKey + startKey = rangeEndKey.Next() if startKey.Cmp(endKey) >= 0 { break } diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index 133721b115..0a7dfe211c 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -333,7 +333,7 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, if err != nil { return errors.Trace(err) } - batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize) + batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges) if len(batchTasks) == 0 { break }