lightning: split and scatter regions in batches (#33625)
close pingcap/tidb#33618
This commit is contained in:
@ -91,6 +91,8 @@ const (
|
||||
// lower the max-key-count to avoid tikv trigger region auto split
|
||||
regionMaxKeyCount = 1_280_000
|
||||
defaultRegionSplitSize = 96 * units.MiB
|
||||
// The max ranges count in a batch to split and scatter.
|
||||
maxBatchSplitRanges = 4096
|
||||
|
||||
propRangeIndex = "tikv.range_index"
|
||||
|
||||
@ -1364,7 +1366,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
|
||||
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
|
||||
// split region by given ranges
|
||||
for i := 0; i < maxRetryTimes; i++ {
|
||||
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize)
|
||||
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
|
||||
if err == nil || common.IsContextCanceledError(err) {
|
||||
break
|
||||
}
|
||||
|
||||
@ -59,10 +59,32 @@ var (
|
||||
splitRetryTimes = 8
|
||||
)
|
||||
|
||||
// TODO remove this file and use br internal functions
|
||||
// This File include region split & scatter operation just like br.
|
||||
// SplitAndScatterRegionInBatches splits&scatter regions in batches.
|
||||
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
|
||||
func (local *local) SplitAndScatterRegionInBatches(
|
||||
ctx context.Context,
|
||||
ranges []Range,
|
||||
tableInfo *checkpoints.TidbTableInfo,
|
||||
needSplit bool,
|
||||
regionSplitSize int64,
|
||||
batchCnt int,
|
||||
) error {
|
||||
for i := 0; i < len(ranges); i += batchCnt {
|
||||
batch := ranges[i:]
|
||||
if len(batch) > batchCnt {
|
||||
batch = batch[:batchCnt]
|
||||
}
|
||||
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SplitAndScatterRegionByRanges include region split & scatter operation just like br.
|
||||
// we can simply call br function, but we need to change some function signature of br
|
||||
// When the ranges total size is small, we can skip the split to avoid generate empty regions.
|
||||
// TODO: remove this file and use br internal functions
|
||||
func (local *local) SplitAndScatterRegionByRanges(
|
||||
ctx context.Context,
|
||||
ranges []Range,
|
||||
@ -424,16 +446,17 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
|
||||
}
|
||||
|
||||
func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
|
||||
regionID := regionInfo.Region.GetId()
|
||||
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
|
||||
ok, err := local.isScatterRegionFinished(ctx, regionID)
|
||||
if err != nil {
|
||||
log.L().Warn("scatter region failed: do not have the region",
|
||||
logutil.Region(regionInfo.Region))
|
||||
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
if ok {
|
||||
break
|
||||
if err != nil {
|
||||
if !utils.IsRetryableError(err) {
|
||||
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
|
||||
return
|
||||
}
|
||||
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
|
||||
}
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
@ -443,8 +466,8 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.
|
||||
}
|
||||
}
|
||||
|
||||
func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) {
|
||||
resp, err := local.splitCli.GetOperator(ctx, regionID)
|
||||
func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
|
||||
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -462,9 +485,20 @@ func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64
|
||||
return false, errors.Errorf("get operator error: %s", respErr.GetType())
|
||||
}
|
||||
// If the current operator of the region is not 'scatter-region', we could assume
|
||||
// that 'scatter-operator' has finished or timeout
|
||||
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING
|
||||
return ok, nil
|
||||
// that 'scatter-operator' has finished.
|
||||
if string(resp.GetDesc()) != "scatter-region" {
|
||||
return true, nil
|
||||
}
|
||||
switch resp.GetStatus() {
|
||||
case pdpb.OperatorStatus_RUNNING:
|
||||
return false, nil
|
||||
case pdpb.OperatorStatus_SUCCESS:
|
||||
return true, nil
|
||||
default:
|
||||
log.L().Warn("scatter-region operator status is abnormal, will scatter region again",
|
||||
logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus()))
|
||||
return false, local.splitCli.ScatterRegion(ctx, regionInfo)
|
||||
}
|
||||
}
|
||||
|
||||
func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo) map[uint64][][]byte {
|
||||
|
||||
@ -17,6 +17,7 @@ package local
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
@ -302,7 +303,8 @@ func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
|
||||
return &restore.RegionInfo{Region: r, Leader: l}
|
||||
}
|
||||
|
||||
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
|
||||
// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of
|
||||
// regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ).
|
||||
func initTestClient(keys [][]byte, hook clientHook) *testClient {
|
||||
peers := make([]*metapb.Peer, 1)
|
||||
peers[0] = &metapb.Peer{
|
||||
@ -574,6 +576,44 @@ func TestBatchSplitByRangesNoValidKeys(t *testing.T) {
|
||||
doTestBatchSplitRegionByRanges(context.Background(), t, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, "no valid key", defaultHook{})
|
||||
}
|
||||
|
||||
func TestSplitAndScatterRegionInBatches(t *testing.T) {
|
||||
splitHook := defaultHook{}
|
||||
deferFunc := splitHook.setup(t)
|
||||
defer deferFunc()
|
||||
|
||||
keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")}
|
||||
client := initTestClient(keys, nil)
|
||||
local := &local{
|
||||
splitCli: client,
|
||||
g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var ranges []Range
|
||||
for i := 0; i < 20; i++ {
|
||||
ranges = append(ranges, Range{
|
||||
start: []byte(fmt.Sprintf("a%02d", i)),
|
||||
end: []byte(fmt.Sprintf("a%02d", i+1)),
|
||||
})
|
||||
}
|
||||
|
||||
err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
|
||||
require.NoError(t, err)
|
||||
|
||||
rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
|
||||
rangeEnd := codec.EncodeBytes([]byte{}, []byte("b"))
|
||||
regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
|
||||
require.NoError(t, err)
|
||||
result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"),
|
||||
[]byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"),
|
||||
[]byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"),
|
||||
[]byte("a19"), []byte("a20"), []byte("b"),
|
||||
}
|
||||
checkRegionRanges(t, regions, result)
|
||||
}
|
||||
|
||||
type reportAfterSplitHook struct {
|
||||
noopHook
|
||||
ch chan<- struct{}
|
||||
|
||||
Reference in New Issue
Block a user