507 lines
17 KiB
Go
507 lines
17 KiB
Go
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package split
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
goerrors "errors"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/log"
|
|
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
|
"github.com/pingcap/tidb/br/pkg/logutil"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/pkg/lightning/config"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
"github.com/pingcap/tidb/pkg/util/redact"
|
|
"github.com/tikv/pd/client/opt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
WaitRegionOnlineAttemptTimes = config.DefaultRegionCheckBackoffLimit
|
|
SplitRetryTimes = 150
|
|
)
|
|
|
|
// Constants for split retry machinery.
|
|
const (
|
|
SplitRetryInterval = 50 * time.Millisecond
|
|
SplitMaxRetryInterval = 4 * time.Second
|
|
|
|
// it takes 30 minutes to scatter regions when each TiKV has 400k regions
|
|
ScatterWaitUpperInterval = 30 * time.Minute
|
|
|
|
ScanRegionPaginationLimit = 128
|
|
)
|
|
|
|
// RegionSplitter is a executor of region split by rules.
|
|
type RegionSplitter struct {
|
|
client SplitClient
|
|
}
|
|
|
|
// NewRegionSplitter returns a new RegionSplitter.
|
|
func NewRegionSplitter(client SplitClient) *RegionSplitter {
|
|
return &RegionSplitter{
|
|
client: client,
|
|
}
|
|
}
|
|
|
|
// ExecuteSortedKeysOnRegion expose the function `SplitWaitAndScatter` of split client.
|
|
func (rs *RegionSplitter) ExecuteSortedKeysOnRegion(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error) {
|
|
return rs.client.SplitWaitAndScatter(ctx, region, keys)
|
|
}
|
|
|
|
// ExecuteSortedKeys executes regions split and make sure new splitted regions are balance.
|
|
// It will split regions by the rewrite rules,
|
|
// then it will split regions by the end key of each range.
|
|
// tableRules includes the prefix of a table, since some ranges may have
|
|
// a prefix with record sequence or index sequence.
|
|
// note: all ranges and rewrite rules must have raw key.
|
|
func (rs *RegionSplitter) ExecuteSortedKeys(
|
|
ctx context.Context,
|
|
sortedSplitKeys [][]byte,
|
|
) error {
|
|
if len(sortedSplitKeys) == 0 {
|
|
log.Info("skip split regions, no split keys")
|
|
return nil
|
|
}
|
|
|
|
log.Info("execute split sorted keys", zap.Int("keys count", len(sortedSplitKeys)))
|
|
return rs.executeSplitByRanges(ctx, sortedSplitKeys)
|
|
}
|
|
|
|
func (rs *RegionSplitter) executeSplitByRanges(
|
|
ctx context.Context,
|
|
sortedKeys [][]byte,
|
|
) error {
|
|
startTime := time.Now()
|
|
// Choose the rough region split keys,
|
|
// each splited region contains 128 regions to be splitted.
|
|
const regionIndexStep = 128
|
|
|
|
roughSortedSplitKeys := make([][]byte, 0, len(sortedKeys)/regionIndexStep+1)
|
|
for curRegionIndex := regionIndexStep; curRegionIndex < len(sortedKeys); curRegionIndex += regionIndexStep {
|
|
roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex])
|
|
}
|
|
if len(roughSortedSplitKeys) > 0 {
|
|
if err := rs.executeSplitByKeys(ctx, roughSortedSplitKeys); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime)))
|
|
|
|
// Then send split requests to each TiKV.
|
|
if err := rs.executeSplitByKeys(ctx, sortedKeys); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
log.Info("finish spliting and scattering regions", zap.Duration("take", time.Since(startTime)))
|
|
return nil
|
|
}
|
|
|
|
// executeSplitByKeys will split regions by **sorted** keys with following steps.
|
|
// 1. locate regions with correspond keys.
|
|
// 2. split these regions with correspond keys.
|
|
// 3. make sure new split regions are balanced.
|
|
func (rs *RegionSplitter) executeSplitByKeys(
|
|
ctx context.Context,
|
|
sortedKeys [][]byte,
|
|
) error {
|
|
startTime := time.Now()
|
|
scatterRegions, err := rs.client.SplitKeysAndScatter(ctx, sortedKeys)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if len(scatterRegions) > 0 {
|
|
log.Info("finish splitting and scattering regions. and starts to wait", zap.Int("regions", len(scatterRegions)),
|
|
zap.Duration("take", time.Since(startTime)))
|
|
rs.waitRegionsScattered(ctx, scatterRegions, ScatterWaitUpperInterval)
|
|
} else {
|
|
log.Info("finish splitting regions.", zap.Duration("take", time.Since(startTime)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// waitRegionsScattered try to wait mutilple regions scatterd in 3 minutes.
|
|
// this could timeout, but if many regions scatterd the restore could continue
|
|
// so we don't wait long time here.
|
|
func (rs *RegionSplitter) waitRegionsScattered(ctx context.Context, scatterRegions []*RegionInfo, timeout time.Duration) {
|
|
log.Info("start to wait for scattering regions", zap.Int("regions", len(scatterRegions)))
|
|
startTime := time.Now()
|
|
leftCnt := rs.WaitForScatterRegionsTimeout(ctx, scatterRegions, timeout)
|
|
if leftCnt == 0 {
|
|
log.Info("waiting for scattering regions done",
|
|
zap.Int("regions", len(scatterRegions)),
|
|
zap.Duration("take", time.Since(startTime)))
|
|
} else {
|
|
log.Warn("waiting for scattering regions timeout",
|
|
zap.Int("not scattered Count", leftCnt),
|
|
zap.Int("regions", len(scatterRegions)),
|
|
zap.Duration("take", time.Since(startTime)))
|
|
}
|
|
}
|
|
|
|
func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regionInfos []*RegionInfo, timeout time.Duration) int {
|
|
ctx2, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
leftRegions, _ := rs.client.WaitRegionsScattered(ctx2, regionInfos)
|
|
return leftRegions
|
|
}
|
|
|
|
// checkRegionConsistency checks the readiness and continuity of regions.
|
|
// if the argument `limitted` is true, regions are regarded as limitted scanned result.
|
|
// so it will not compare `endKey` with the last region's `EndKey`.
|
|
func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo, limitted bool) error {
|
|
// current pd can't guarantee the consistency of returned regions
|
|
if len(regions) == 0 {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s",
|
|
redact.Key(startKey), redact.Key(endKey))
|
|
}
|
|
|
|
if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"first region %d's startKey(%s) > startKey(%s), region epoch: %s",
|
|
regions[0].Region.Id,
|
|
redact.Key(regions[0].Region.StartKey), redact.Key(startKey),
|
|
regions[0].Region.RegionEpoch.String())
|
|
} else if !limitted && len(regions[len(regions)-1].Region.EndKey) != 0 &&
|
|
bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"last region %d's endKey(%s) < endKey(%s), region epoch: %s",
|
|
regions[len(regions)-1].Region.Id,
|
|
redact.Key(regions[len(regions)-1].Region.EndKey), redact.Key(endKey),
|
|
regions[len(regions)-1].Region.RegionEpoch.String())
|
|
}
|
|
|
|
cur := regions[0]
|
|
if cur.Leader == nil {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"region %d's leader is nil", cur.Region.Id)
|
|
}
|
|
if cur.Leader.StoreId == 0 {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"region %d's leader's store id is 0", cur.Region.Id)
|
|
}
|
|
for _, r := range regions[1:] {
|
|
if r.Leader == nil {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"region %d's leader is nil", r.Region.Id)
|
|
}
|
|
if r.Leader.StoreId == 0 {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"region %d's leader's store id is 0", r.Region.Id)
|
|
}
|
|
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s",
|
|
cur.Region.Id, r.Region.Id,
|
|
redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey),
|
|
cur.Region.RegionEpoch.String(), r.Region.RegionEpoch.String())
|
|
}
|
|
cur = r
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func scanRegionsLimitWithRetry(
|
|
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, mustLeader bool,
|
|
) ([]*RegionInfo, bool, error) {
|
|
var (
|
|
batch []*RegionInfo
|
|
err error
|
|
)
|
|
_ = utils.WithRetry(ctx, func() error {
|
|
defer func() { mustLeader = mustLeader || err != nil }()
|
|
if mustLeader {
|
|
batch, err = client.ScanRegions(ctx, startKey, endKey, limit)
|
|
} else {
|
|
batch, err = client.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle())
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = checkRegionConsistency(startKey, endKey, batch, true); err != nil {
|
|
log.Warn("failed to scan region, retrying",
|
|
logutil.ShortError(err),
|
|
zap.Int("regionLength", len(batch)))
|
|
return err
|
|
}
|
|
return nil
|
|
}, NewWaitRegionOnlineBackoffer())
|
|
return batch, mustLeader, err
|
|
}
|
|
|
|
// PaginateScanRegion scan regions with a limit pagination and return all regions
|
|
// at once. The returned regions are continuous and cover the key range. If not,
|
|
// or meet errors, it will retry internally.
|
|
func PaginateScanRegion(
|
|
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
|
|
) ([]*RegionInfo, error) {
|
|
if len(endKey) != 0 && bytes.Compare(startKey, endKey) > 0 {
|
|
return nil, errors.Annotatef(berrors.ErrInvalidRange, "startKey > endKey, startKey: %s, endkey: %s",
|
|
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
|
|
}
|
|
|
|
var (
|
|
lastRegions []*RegionInfo
|
|
err error
|
|
mustLeader = false
|
|
backoffer = NewWaitRegionOnlineBackoffer()
|
|
)
|
|
_ = utils.WithRetry(ctx, func() error {
|
|
defer func() { mustLeader = true }()
|
|
regions := make([]*RegionInfo, 0, 16)
|
|
scanStartKey := startKey
|
|
for {
|
|
var batch []*RegionInfo
|
|
batch, mustLeader, err = scanRegionsLimitWithRetry(ctx, client, scanStartKey, endKey, limit, mustLeader)
|
|
if err != nil {
|
|
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
|
|
redact.Key(scanStartKey), err.Error())
|
|
return err
|
|
}
|
|
regions = append(regions, batch...)
|
|
if len(batch) < limit {
|
|
// No more region
|
|
break
|
|
}
|
|
scanStartKey = batch[len(batch)-1].Region.GetEndKey()
|
|
if len(scanStartKey) == 0 ||
|
|
(len(endKey) > 0 && bytes.Compare(scanStartKey, endKey) >= 0) {
|
|
// All key space have scanned
|
|
break
|
|
}
|
|
}
|
|
// if the number of regions changed, we can infer TiKV side really
|
|
// made some progress so don't increase the retry times.
|
|
if len(regions) != len(lastRegions) {
|
|
backoffer.Stat.ReduceRetry()
|
|
}
|
|
lastRegions = regions
|
|
|
|
if err = checkRegionConsistency(startKey, endKey, regions, false); err != nil {
|
|
log.Warn("failed to scan region, retrying",
|
|
logutil.ShortError(err),
|
|
zap.Int("regionLength", len(regions)))
|
|
return err
|
|
}
|
|
return nil
|
|
}, backoffer)
|
|
|
|
return lastRegions, err
|
|
}
|
|
|
|
// checkPartRegionConsistency only checks the continuity of regions and the first region consistency.
|
|
func checkPartRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
|
|
// current pd can't guarantee the consistency of returned regions
|
|
if len(regions) == 0 {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"scan region return empty result, startKey: %s, endKey: %s",
|
|
redact.Key(startKey), redact.Key(endKey))
|
|
}
|
|
|
|
if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"first region's startKey > startKey, startKey: %s, regionStartKey: %s",
|
|
redact.Key(startKey), redact.Key(regions[0].Region.StartKey))
|
|
}
|
|
|
|
cur := regions[0]
|
|
for _, r := range regions[1:] {
|
|
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
|
|
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
|
"region endKey not equal to next region startKey, endKey: %s, startKey: %s",
|
|
redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey))
|
|
}
|
|
cur = r
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ScanRegionsWithRetry(
|
|
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
|
|
) ([]*RegionInfo, error) {
|
|
if len(endKey) != 0 && bytes.Compare(startKey, endKey) > 0 {
|
|
return nil, errors.Annotatef(berrors.ErrInvalidRange, "startKey > endKey, startKey: %s, endkey: %s",
|
|
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
|
|
}
|
|
|
|
var regions []*RegionInfo
|
|
var err error
|
|
// we don't need to return multierr. since there only 3 times retry.
|
|
// in most case 3 times retry have the same error. so we just return the last error.
|
|
// actually we'd better remove all multierr in br/lightning.
|
|
// because it's not easy to check multierr equals normal error.
|
|
// see https://github.com/pingcap/tidb/issues/33419.
|
|
_ = utils.WithRetry(ctx, func() error {
|
|
if err != nil {
|
|
regions, err = client.ScanRegions(ctx, startKey, endKey, limit)
|
|
} else {
|
|
regions, err = client.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle())
|
|
}
|
|
if err != nil {
|
|
err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s",
|
|
redact.Key(startKey), err.Error())
|
|
return err
|
|
}
|
|
|
|
if err = checkPartRegionConsistency(startKey, endKey, regions); err != nil {
|
|
log.Warn("failed to scan region, retrying", logutil.ShortError(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}, NewWaitRegionOnlineBackoffer())
|
|
|
|
return regions, err
|
|
}
|
|
|
|
// TODO: merge with backoff.go
|
|
type WaitRegionOnlineBackoffer struct {
|
|
Stat utils.RetryState
|
|
}
|
|
|
|
// NewWaitRegionOnlineBackoffer create a backoff to wait region online.
|
|
func NewWaitRegionOnlineBackoffer() *WaitRegionOnlineBackoffer {
|
|
return &WaitRegionOnlineBackoffer{
|
|
Stat: utils.InitialRetryState(
|
|
WaitRegionOnlineAttemptTimes,
|
|
time.Millisecond*10,
|
|
time.Second*2,
|
|
),
|
|
}
|
|
}
|
|
|
|
// NextBackoff returns a duration to wait before retrying again
|
|
func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration {
|
|
// TODO(lance6716): why we only backoff when the error is ErrPDBatchScanRegion?
|
|
var perr *errors.Error
|
|
if goerrors.As(err, &perr) && berrors.ErrPDBatchScanRegion.ID() == perr.ID() {
|
|
// it needs more time to wait splitting the regions that contains data in PITR.
|
|
// 2s * 150
|
|
delayTime := b.Stat.ExponentialBackoff()
|
|
failpoint.Inject("hint-scan-region-backoff", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
delayTime = time.Microsecond
|
|
}
|
|
})
|
|
return delayTime
|
|
}
|
|
b.Stat.GiveUp()
|
|
return 0
|
|
}
|
|
|
|
// RemainingAttempts returns the remain attempt times
|
|
func (b *WaitRegionOnlineBackoffer) RemainingAttempts() int {
|
|
return b.Stat.RemainingAttempts()
|
|
}
|
|
|
|
// BackoffMayNotCountBackoffer is a backoffer but it may not increase the retry
|
|
// counter. It should be used with ErrBackoff or ErrBackoffAndDontCount.
|
|
// TODO: merge with backoff.go
|
|
type BackoffMayNotCountBackoffer struct {
|
|
state utils.RetryState
|
|
}
|
|
|
|
var (
|
|
ErrBackoff = errors.New("found backoff error")
|
|
ErrBackoffAndDontCount = errors.New("found backoff error but don't count")
|
|
)
|
|
|
|
// NewBackoffMayNotCountBackoffer creates a new backoffer that may backoff or retry.
|
|
//
|
|
// TODO: currently it has the same usage as NewWaitRegionOnlineBackoffer so we
|
|
// don't expose its inner settings.
|
|
func NewBackoffMayNotCountBackoffer() *BackoffMayNotCountBackoffer {
|
|
return &BackoffMayNotCountBackoffer{
|
|
state: utils.InitialRetryState(
|
|
WaitRegionOnlineAttemptTimes,
|
|
time.Millisecond*10,
|
|
time.Second*2,
|
|
),
|
|
}
|
|
}
|
|
|
|
// NextBackoff implements utils.BackoffStrategy. For BackoffMayNotCountBackoffer, only
|
|
// ErrBackoff and ErrBackoffAndDontCount is meaningful.
|
|
func (b *BackoffMayNotCountBackoffer) NextBackoff(err error) time.Duration {
|
|
if errors.ErrorEqual(err, ErrBackoff) {
|
|
return b.state.ExponentialBackoff()
|
|
}
|
|
if errors.ErrorEqual(err, ErrBackoffAndDontCount) {
|
|
delay := b.state.ExponentialBackoff()
|
|
b.state.ReduceRetry()
|
|
return delay
|
|
}
|
|
b.state.GiveUp()
|
|
return 0
|
|
}
|
|
|
|
// RemainingAttempts implements utils.BackoffStrategy.
|
|
func (b *BackoffMayNotCountBackoffer) RemainingAttempts() int {
|
|
return b.state.RemainingAttempts()
|
|
}
|
|
|
|
// getSplitKeysOfRegions checks every input key is necessary to split region on
|
|
// it. Returns a map from region to split keys belongs to it.
|
|
//
|
|
// The key will be skipped if it's the region boundary.
|
|
//
|
|
// prerequisite:
|
|
// - sortedKeys are sorted in ascending order.
|
|
// - sortedRegions are continuous and sorted in ascending order by start key.
|
|
// - sortedRegions can cover all keys in sortedKeys.
|
|
// PaginateScanRegion should satisfy the above prerequisites.
|
|
func getSplitKeysOfRegions(
|
|
sortedKeys [][]byte,
|
|
sortedRegions []*RegionInfo,
|
|
isRawKV bool,
|
|
) map[*RegionInfo][][]byte {
|
|
splitKeyMap := make(map[*RegionInfo][][]byte, len(sortedRegions))
|
|
curKeyIndex := 0
|
|
splitKey := codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)
|
|
|
|
for _, region := range sortedRegions {
|
|
for {
|
|
if len(sortedKeys[curKeyIndex]) == 0 {
|
|
// should not happen?
|
|
goto nextKey
|
|
}
|
|
// If splitKey is the boundary of the region, don't need to split on it.
|
|
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
|
|
goto nextKey
|
|
}
|
|
// If splitKey is not in this region, we should move to the next region.
|
|
if !region.ContainsInterior(splitKey) {
|
|
break
|
|
}
|
|
|
|
splitKeyMap[region] = append(splitKeyMap[region], sortedKeys[curKeyIndex])
|
|
|
|
nextKey:
|
|
curKeyIndex++
|
|
if curKeyIndex >= len(sortedKeys) {
|
|
return splitKeyMap
|
|
}
|
|
splitKey = codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)
|
|
}
|
|
}
|
|
lastKey := sortedKeys[len(sortedKeys)-1]
|
|
endOfLastRegion := sortedRegions[len(sortedRegions)-1].Region.GetEndKey()
|
|
if !bytes.Equal(lastKey, endOfLastRegion) {
|
|
log.Error("in getSplitKeysOfRegions, regions don't cover all keys",
|
|
zap.String("firstKey", hex.EncodeToString(sortedKeys[0])),
|
|
zap.String("lastKey", hex.EncodeToString(lastKey)),
|
|
zap.String("firstRegionStartKey", hex.EncodeToString(sortedRegions[0].Region.GetStartKey())),
|
|
zap.String("lastRegionEndKey", hex.EncodeToString(endOfLastRegion)),
|
|
)
|
|
}
|
|
return splitKeyMap
|
|
}
|