diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 8d1601aca3..cfef206007 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -125,8 +125,6 @@ go_library( "//pkg/statistics", "//pkg/statistics/handle", "//pkg/statistics/handle/util", - "//pkg/store/copr", - "//pkg/store/driver/backoff", "//pkg/store/driver/txn", "//pkg/store/helper", "//pkg/table", diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 5483417afe..ba1afe446a 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -15,6 +15,7 @@ package ddl import ( + "bytes" "context" "encoding/hex" "fmt" @@ -37,8 +38,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/store/copr" - "github.com/pingcap/tidb/pkg/store/driver/backoff" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util" @@ -427,79 +426,93 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { } } -func splitAndValidateTableRanges( +// loadTableRanges load table key ranges from PD between given start key and end key. +// It returns up to `limit` ranges. +func loadTableRanges( ctx context.Context, t table.PhysicalTable, store kv.Storage, startKey, endKey kv.Key, limit int, ) ([]kv.KeyRange, error) { - ranges, err := splitTableRanges(ctx, t, store, startKey, endKey, limit) - if err != nil { - return nil, err - } - return validateTableRanges(ranges, startKey, endKey) -} - -func validateTableRanges(ranges []kv.KeyRange, start, end kv.Key) ([]kv.KeyRange, error) { - for i, r := range ranges { - if len(r.StartKey) == 0 { - if i != 0 { - return nil, errors.Errorf("get empty start key in the middle of ranges") - } - r.StartKey = start - } - if len(r.EndKey) == 0 { - if i != len(ranges)-1 { - return nil, errors.Errorf("get empty end key in the middle of ranges") - } - r.EndKey = end - } - } - return ranges, nil -} - -// splitTableRanges uses PD region's key ranges to split the backfilling table key range space, -// to speed up backfilling data in table with disperse handle. -// The `t` should be a non-partitioned table or a partition. -func splitTableRanges( - ctx context.Context, - t table.PhysicalTable, - store kv.Storage, - startKey, endKey kv.Key, - limit int, -) ([]kv.KeyRange, error) { - logutil.DDLLogger().Info("split table range from PD", - zap.Int64("physicalTableID", t.GetPhysicalID()), - zap.String("start key", hex.EncodeToString(startKey)), - zap.String("end key", hex.EncodeToString(endKey))) if len(startKey) == 0 && len(endKey) == 0 { - logutil.DDLLogger().Info("split table range from PD, get noop table range", + logutil.DDLLogger().Info("load noop table range", zap.Int64("physicalTableID", t.GetPhysicalID())) return []kv.KeyRange{}, nil } - kvRange := kv.KeyRange{StartKey: startKey, EndKey: endKey} s, ok := store.(tikv.Storage) if !ok { // Only support split ranges in tikv.Storage now. - return []kv.KeyRange{kvRange}, nil + logutil.DDLLogger().Info("load table ranges failed, unsupported storage", + zap.String("storage", fmt.Sprintf("%T", store)), + zap.Int64("physicalTableID", t.GetPhysicalID())) + return []kv.KeyRange{{StartKey: startKey, EndKey: endKey}}, nil } + rc := s.GetRegionCache() maxSleep := 10000 // ms - bo := backoff.NewBackofferWithVars(ctx, maxSleep, nil) - rc := copr.NewRegionCache(s.GetRegionCache()) - ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange}, limit) + bo := tikv.NewBackofferWithVars(ctx, maxSleep, nil) + var ranges []kv.KeyRange + err := util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (bool, error) { + logutil.DDLLogger().Info("load table ranges from PD", + zap.Int64("physicalTableID", t.GetPhysicalID()), + zap.String("start key", hex.EncodeToString(startKey)), + zap.String("end key", hex.EncodeToString(endKey))) + rs, err := rc.BatchLoadRegionsWithKeyRange(bo, startKey, endKey, limit) + if err != nil { + return false, errors.Trace(err) + } + ranges = make([]kv.KeyRange, 0, len(rs)) + for _, r := range rs { + ranges = append(ranges, kv.KeyRange{StartKey: r.StartKey(), EndKey: r.EndKey()}) + } + err = validateAndFillRanges(ranges, startKey, endKey) + if err != nil { + return true, errors.Trace(err) + } + return false, nil + }) if err != nil { return nil, errors.Trace(err) } - if len(ranges) == 0 { - errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String()) - return nil, errors.Trace(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg)) - } + logutil.DDLLogger().Info("load table ranges from PD done", + zap.Int64("physicalTableID", t.GetPhysicalID()), + zap.String("range start", hex.EncodeToString(ranges[0].StartKey)), + zap.String("range end", hex.EncodeToString(ranges[len(ranges)-1].EndKey)), + zap.Int("range count", len(ranges))) return ranges, nil } +func validateAndFillRanges(ranges []kv.KeyRange, startKey, endKey []byte) error { + if len(ranges) == 0 { + errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", + hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + return dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg) + } + for i, r := range ranges { + if i == 0 { + s := r.StartKey + if len(s) == 0 || bytes.Compare(s, startKey) < 0 { + ranges[i].StartKey = startKey + } + } + if i == len(ranges)-1 { + e := r.EndKey + if len(e) == 0 || bytes.Compare(e, endKey) > 0 { + ranges[i].EndKey = endKey + } + } + if len(ranges[i].StartKey) == 0 || len(ranges[i].EndKey) == 0 { + return errors.Errorf("get empty start/end key in the middle of ranges") + } + if i > 0 && !bytes.Equal(ranges[i-1].EndKey, ranges[i].StartKey) { + return errors.Errorf("ranges are not continuous") + } + } + return nil +} + func getBatchTasks( t table.Table, reorgInfo *reorgInfo, @@ -885,7 +898,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( start, end := startKey, endKey taskIDAlloc := newTaskIDAllocator() for { - kvRanges, err2 := splitAndValidateTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize) + kvRanges, err2 := loadTableRanges(egCtx, t, dc.store, start, end, backfillTaskChanSize) if err2 != nil { return errors.Trace(err2) } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index e3e54ca225..33583abde4 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -368,7 +368,7 @@ func (src *TableScanTaskSource) generateTasks() error { startKey := src.adjustStartKey(src.startKey, src.endKey) for { - kvRanges, err := splitAndValidateTableRanges( + kvRanges, err := loadTableRanges( src.ctx, src.tbl, src.store, diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index 5ad61cedfe..a44be213dd 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -190,3 +190,76 @@ func TestReorgDistSQLCtx(t *testing.T) { require.Equal(t, ctx1, ctx2) } + +func TestValidateAndFillRanges(t *testing.T) { + mkRange := func(start, end string) kv.KeyRange { + return kv.KeyRange{StartKey: []byte(start), EndKey: []byte(end)} + } + ranges := []kv.KeyRange{ + mkRange("b", "c"), + mkRange("c", "d"), + mkRange("d", "e"), + } + err := validateAndFillRanges(ranges, []byte("a"), []byte("e")) + require.NoError(t, err) + require.EqualValues(t, []kv.KeyRange{ + mkRange("b", "c"), + mkRange("c", "d"), + mkRange("d", "e"), + }, ranges) + + // adjust first and last range. + ranges = []kv.KeyRange{ + mkRange("a", "c"), + mkRange("c", "e"), + mkRange("e", "g"), + } + err = validateAndFillRanges(ranges, []byte("b"), []byte("f")) + require.NoError(t, err) + require.EqualValues(t, []kv.KeyRange{ + mkRange("b", "c"), + mkRange("c", "e"), + mkRange("e", "f"), + }, ranges) + + // first range startKey and last range endKey are empty. + ranges = []kv.KeyRange{ + mkRange("", "c"), + mkRange("c", "e"), + mkRange("e", ""), + } + err = validateAndFillRanges(ranges, []byte("b"), []byte("f")) + require.NoError(t, err) + require.EqualValues(t, []kv.KeyRange{ + mkRange("b", "c"), + mkRange("c", "e"), + mkRange("e", "f"), + }, ranges) + ranges = []kv.KeyRange{ + mkRange("", "c"), + mkRange("c", ""), + } + err = validateAndFillRanges(ranges, []byte("b"), []byte("f")) + require.NoError(t, err) + require.EqualValues(t, []kv.KeyRange{ + mkRange("b", "c"), + mkRange("c", "f"), + }, ranges) + + // invalid range. + ranges = []kv.KeyRange{ + mkRange("b", "c"), + mkRange("c", ""), + mkRange("e", "f"), + } + err = validateAndFillRanges(ranges, []byte("b"), []byte("f")) + require.Error(t, err) + + ranges = []kv.KeyRange{ + mkRange("b", "c"), + mkRange("c", "d"), + mkRange("e", "f"), + } + err = validateAndFillRanges(ranges, []byte("b"), []byte("f")) + require.Error(t, err) +}