1085 lines
35 KiB
Go
1085 lines
35 KiB
Go
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package split
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/docker/go-units"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/errorpb"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/kvproto/pkg/pdpb"
|
|
"github.com/pingcap/kvproto/pkg/tikvpb"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/br/pkg/conn/util"
|
|
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
|
"github.com/pingcap/tidb/br/pkg/logutil"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/config"
|
|
tidbutil "github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
|
|
pd "github.com/tikv/pd/client"
|
|
pdhttp "github.com/tikv/pd/client/http"
|
|
"github.com/tikv/pd/client/opt"
|
|
"go.uber.org/multierr"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
splitRegionMaxRetryTime = 4
|
|
)
|
|
|
|
var (
|
|
// the max total key size in a split region batch.
|
|
// our threshold should be smaller than TiKV's raft max entry size(default is 8MB).
|
|
maxBatchSplitSize = 6 * units.MiB
|
|
)
|
|
|
|
// SplitClient is an external client used by RegionSplitter.
|
|
type SplitClient interface {
|
|
// GetStore gets a store by a store id.
|
|
GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error)
|
|
// GetRegion gets a region which includes a specified key.
|
|
GetRegion(ctx context.Context, key []byte) (*RegionInfo, error)
|
|
// GetRegionByID gets a region by a region id.
|
|
GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error)
|
|
// SplitKeysAndScatter splits the related regions of the keys and scatters the
|
|
// new regions. It returns the new regions that need to be called with
|
|
// WaitRegionsScattered.
|
|
SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][]byte) ([]*RegionInfo, error)
|
|
|
|
// SplitWaitAndScatter splits a region from a batch of keys, waits for the split
|
|
// is finished, and scatters the new regions. It will return the original region,
|
|
// new regions and error. The input keys should not be encoded.
|
|
//
|
|
// The split step has a few retry times. If it meets error, the error is returned
|
|
// directly.
|
|
//
|
|
// The split waiting step has a backoff retry logic, if split has made progress,
|
|
// it will not increase the retry counter. Otherwise, it will retry for about 1h.
|
|
// If the retry is timeout, it will log a warning and continue.
|
|
//
|
|
// The scatter step has a few retry times. If it meets error, it will log a
|
|
// warning and continue.
|
|
// TODO(lance6716): remove this function in interface after BR uses SplitKeysAndScatter.
|
|
SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error)
|
|
// GetOperator gets the status of operator of the specified region.
|
|
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
|
|
// ScanRegions gets a list of regions, starts from the region that contains key.
|
|
// Limit limits the maximum number of regions returned.
|
|
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*RegionInfo, error)
|
|
// GetPlacementRule loads a placement rule from PD.
|
|
GetPlacementRule(ctx context.Context, groupID, ruleID string) (*pdhttp.Rule, error)
|
|
// SetPlacementRule insert or update a placement rule to PD.
|
|
SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error
|
|
// DeletePlacementRule removes a placement rule from PD.
|
|
DeletePlacementRule(ctx context.Context, groupID, ruleID string) error
|
|
// SetStoresLabel add or update specified label of stores. If labelValue
|
|
// is empty, it clears the label.
|
|
SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error
|
|
// WaitRegionsScattered waits for an already started scatter region action to
|
|
// finish. Internally it will backoff and retry at the maximum internal of 2
|
|
// seconds. If the scatter makes progress during the retry, it will not decrease
|
|
// the retry counter. If there's always no progress, it will retry for about 1h.
|
|
// Caller can set the context timeout to control the max waiting time.
|
|
//
|
|
// The first return value is always the number of regions that are not finished
|
|
// scattering no matter what the error is.
|
|
WaitRegionsScattered(ctx context.Context, regionInfos []*RegionInfo) (notFinished int, err error)
|
|
}
|
|
|
|
// pdClient is a wrapper of pd client, can be used by RegionSplitter.
|
|
type pdClient struct {
|
|
mu sync.Mutex
|
|
client pd.Client
|
|
httpCli pdhttp.Client
|
|
tlsConf *tls.Config
|
|
storeCache map[uint64]*metapb.Store
|
|
|
|
// FIXME when config changed during the lifetime of pdClient,
|
|
// this may mislead the scatter.
|
|
needScatterVal bool
|
|
needScatterInit sync.Once
|
|
|
|
isRawKv bool
|
|
onSplit func(key [][]byte)
|
|
splitConcurrency int
|
|
splitBatchKeyCnt int
|
|
}
|
|
|
|
type ClientOptionalParameter func(*pdClient)
|
|
|
|
// WithRawKV sets the client to use raw kv mode.
|
|
func WithRawKV() ClientOptionalParameter {
|
|
return func(c *pdClient) {
|
|
c.isRawKv = true
|
|
}
|
|
}
|
|
|
|
// WithOnSplit sets a callback function to be called after each split.
|
|
func WithOnSplit(onSplit func(key [][]byte)) ClientOptionalParameter {
|
|
return func(c *pdClient) {
|
|
c.onSplit = onSplit
|
|
}
|
|
}
|
|
|
|
// NewClient creates a SplitClient.
|
|
//
|
|
// splitBatchKeyCnt controls how many keys are sent to TiKV in a batch in split
|
|
// region API. splitConcurrency controls how many regions are split concurrently.
|
|
func NewClient(
|
|
client pd.Client,
|
|
httpCli pdhttp.Client,
|
|
tlsConf *tls.Config,
|
|
splitBatchKeyCnt int,
|
|
splitConcurrency int,
|
|
opts ...ClientOptionalParameter,
|
|
) SplitClient {
|
|
cli := &pdClient{
|
|
client: client,
|
|
httpCli: httpCli,
|
|
tlsConf: tlsConf,
|
|
storeCache: make(map[uint64]*metapb.Store),
|
|
splitBatchKeyCnt: splitBatchKeyCnt,
|
|
splitConcurrency: splitConcurrency,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(cli)
|
|
}
|
|
return cli
|
|
}
|
|
|
|
func (c *pdClient) needScatter(ctx context.Context) bool {
|
|
c.needScatterInit.Do(func() {
|
|
var err error
|
|
c.needScatterVal, err = c.checkNeedScatter(ctx)
|
|
if err != nil {
|
|
log.Warn(
|
|
"failed to check whether need to scatter, use permissive strategy: always scatter",
|
|
logutil.ShortError(err))
|
|
c.needScatterVal = true
|
|
}
|
|
if !c.needScatterVal {
|
|
log.Info("skipping scatter because the replica number isn't less than store count.")
|
|
}
|
|
})
|
|
return c.needScatterVal
|
|
}
|
|
|
|
func (c *pdClient) scatterRegions(ctx context.Context, newRegions []*RegionInfo) error {
|
|
log.Info("scatter regions", zap.Int("regions", len(newRegions)))
|
|
// the retry is for the temporary network errors during sending request.
|
|
err := utils.WithRetry(ctx, func() error {
|
|
failedRegionsID, err := c.tryScatterRegions(ctx, newRegions)
|
|
// if err is unsupported, we need to fallback to the old method.
|
|
// ErrPDRegionsNotFullyScatter means the regions are not fully scattered,
|
|
// in new version of PD, the scatter regions API will return the failed regions id,
|
|
// but the old version of PD will only return the FinishedPercentage.
|
|
// so we need to retry the regions one by one.
|
|
if isUnsupportedError(err) || berrors.ErrPDRegionsNotFullyScatter.Equal(err) {
|
|
log.Warn("failed to batch scatter regions, rollback to sequentially scatter", logutil.ShortError(err))
|
|
c.scatterRegionsSequentially(
|
|
ctx, newRegions,
|
|
// backoff about 1h total, or we give up scattering this region.
|
|
utils.NewBackoffRetryAllErrorStrategy(1800, 100*time.Millisecond, 2*time.Second))
|
|
return nil
|
|
}
|
|
// If there are failed regions, retry them
|
|
if len(failedRegionsID) > 0 {
|
|
failedRegions := make([]*RegionInfo, 0, len(failedRegionsID))
|
|
for _, region := range newRegions {
|
|
if _, exists := failedRegionsID[region.Region.Id]; exists {
|
|
failedRegions = append(failedRegions, region)
|
|
}
|
|
}
|
|
newRegions = failedRegions
|
|
return errors.Annotatef(berrors.ErrPDNotFullyScatter,
|
|
"pd returns error during batch scattering: %d regions failed to scatter", len(failedRegionsID))
|
|
}
|
|
return err
|
|
}, utils.NewBackoffRetryAllErrorStrategy(1800, 500*time.Millisecond, 2*time.Second))
|
|
if err != nil && berrors.ErrPDNotFullyScatter.Equal(err) {
|
|
log.Warn("some regions haven't been scattered", zap.Error(err))
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *pdClient) tryScatterRegions(ctx context.Context, regionInfo []*RegionInfo) (map[uint64]struct{}, error) {
|
|
regionsID := make([]uint64, 0, len(regionInfo))
|
|
for _, v := range regionInfo {
|
|
regionsID = append(regionsID, v.Region.Id)
|
|
log.Debug("scattering regions", logutil.Key("start", v.Region.StartKey),
|
|
logutil.Key("end", v.Region.EndKey),
|
|
zap.Uint64("id", v.Region.Id))
|
|
}
|
|
resp, err := c.client.ScatterRegions(ctx, regionsID, opt.WithSkipStoreLimit())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if pbErr := resp.GetHeader().GetError(); pbErr.GetType() != pdpb.ErrorType_OK {
|
|
return nil, errors.Annotatef(berrors.ErrPDInvalidResponse,
|
|
"pd returns error during batch scattering: %s", pbErr)
|
|
}
|
|
|
|
if len(resp.FailedRegionsId) > 0 {
|
|
failedRegionsID := make(map[uint64]struct{})
|
|
for _, id := range resp.FailedRegionsId {
|
|
failedRegionsID[id] = struct{}{}
|
|
}
|
|
return failedRegionsID, nil
|
|
}
|
|
|
|
if finished := resp.GetFinishedPercentage(); finished < 100 {
|
|
return nil, errors.Annotatef(berrors.ErrPDRegionsNotFullyScatter, "scatter finished percentage %d less than 100", finished)
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
store, ok := c.storeCache[storeID]
|
|
if ok {
|
|
return store, nil
|
|
}
|
|
store, err := c.client.GetStore(ctx, storeID)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
c.storeCache[storeID] = store
|
|
return store, nil
|
|
}
|
|
|
|
func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) {
|
|
region, err := c.client.GetRegion(ctx, key, opt.WithAllowFollowerHandle())
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if region == nil {
|
|
return nil, nil
|
|
}
|
|
return &RegionInfo{
|
|
Region: region.Meta,
|
|
Leader: region.Leader,
|
|
}, nil
|
|
}
|
|
|
|
func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) {
|
|
region, err := c.client.GetRegionByID(ctx, regionID, opt.WithAllowFollowerHandle())
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if region == nil {
|
|
return nil, nil
|
|
}
|
|
return &RegionInfo{
|
|
Region: region.Meta,
|
|
Leader: region.Leader,
|
|
PendingPeers: region.PendingPeers,
|
|
DownPeers: region.DownPeers,
|
|
}, nil
|
|
}
|
|
|
|
func splitRegionWithFailpoint(
|
|
ctx context.Context,
|
|
regionInfo *RegionInfo,
|
|
peer *metapb.Peer,
|
|
client tikvpb.TikvClient,
|
|
keys [][]byte,
|
|
isRawKv bool,
|
|
) (*kvrpcpb.SplitRegionResponse, error) {
|
|
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
|
|
log.Debug("failpoint not-leader-error injected.")
|
|
resp := &kvrpcpb.SplitRegionResponse{
|
|
RegionError: &errorpb.Error{
|
|
NotLeader: &errorpb.NotLeader{
|
|
RegionId: regionInfo.Region.Id,
|
|
},
|
|
},
|
|
}
|
|
if injectNewLeader.(bool) {
|
|
resp.RegionError.NotLeader.Leader = regionInfo.Leader
|
|
}
|
|
failpoint.Return(resp, nil)
|
|
})
|
|
failpoint.Inject("somewhat-retryable-error", func() {
|
|
log.Debug("failpoint somewhat-retryable-error injected.")
|
|
failpoint.Return(&kvrpcpb.SplitRegionResponse{
|
|
RegionError: &errorpb.Error{
|
|
ServerIsBusy: &errorpb.ServerIsBusy{},
|
|
},
|
|
}, nil)
|
|
})
|
|
return client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
|
|
Context: &kvrpcpb.Context{
|
|
RegionId: regionInfo.Region.Id,
|
|
RegionEpoch: regionInfo.Region.RegionEpoch,
|
|
Peer: peer,
|
|
},
|
|
SplitKeys: keys,
|
|
IsRawKv: isRawKv,
|
|
})
|
|
}
|
|
|
|
func (c *pdClient) sendSplitRegionRequest(
|
|
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
|
|
) (*kvrpcpb.SplitRegionResponse, error) {
|
|
var splitErrors error
|
|
for i := range splitRegionMaxRetryTime {
|
|
retry, result, err := sendSplitRegionRequest(ctx, c, regionInfo, keys, &splitErrors, i)
|
|
if retry {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return nil, multierr.Append(splitErrors, err)
|
|
}
|
|
if result != nil {
|
|
return result, nil
|
|
}
|
|
return nil, errors.Trace(splitErrors)
|
|
}
|
|
return nil, errors.Trace(splitErrors)
|
|
}
|
|
|
|
func sendSplitRegionRequest(
|
|
ctx context.Context,
|
|
c *pdClient,
|
|
regionInfo *RegionInfo,
|
|
keys [][]byte,
|
|
splitErrors *error,
|
|
retry int,
|
|
) (bool, *kvrpcpb.SplitRegionResponse, error) {
|
|
if intest.InTest {
|
|
mockCli, ok := c.client.(*MockPDClientForSplit)
|
|
if ok {
|
|
return mockCli.SplitRegion(regionInfo, keys, c.isRawKv)
|
|
}
|
|
}
|
|
var peer *metapb.Peer
|
|
// scanRegions may return empty Leader in https://github.com/tikv/pd/blob/v4.0.8/server/grpc_service.go#L524
|
|
// so wee also need check Leader.Id != 0
|
|
if regionInfo.Leader != nil && regionInfo.Leader.Id != 0 {
|
|
peer = regionInfo.Leader
|
|
} else {
|
|
if len(regionInfo.Region.Peers) == 0 {
|
|
return false, nil,
|
|
errors.Annotatef(berrors.ErrRestoreNoPeer, "region[%d] doesn't have any peer",
|
|
regionInfo.Region.GetId())
|
|
}
|
|
peer = regionInfo.Region.Peers[0]
|
|
}
|
|
storeID := peer.GetStoreId()
|
|
store, err := c.GetStore(ctx, storeID)
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
|
|
if c.tlsConf != nil {
|
|
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
|
|
}
|
|
conn, err := grpc.Dial(store.GetAddress(), opt,
|
|
config.DefaultGrpcKeepaliveParams)
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
defer conn.Close()
|
|
client := tikvpb.NewTikvClient(conn)
|
|
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv)
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
if resp.RegionError != nil {
|
|
log.Warn("fail to split region",
|
|
logutil.Region(regionInfo.Region),
|
|
logutil.Keys(keys),
|
|
zap.Stringer("regionErr", resp.RegionError))
|
|
*splitErrors = multierr.Append(*splitErrors,
|
|
errors.Annotatef(berrors.ErrRestoreSplitFailed, "split region failed: err=%v", resp.RegionError))
|
|
if nl := resp.RegionError.NotLeader; nl != nil {
|
|
if leader := nl.GetLeader(); leader != nil {
|
|
regionInfo.Leader = leader
|
|
} else {
|
|
newRegionInfo, findLeaderErr := c.GetRegionByID(ctx, nl.RegionId)
|
|
if findLeaderErr != nil {
|
|
return false, nil, findLeaderErr
|
|
}
|
|
if !CheckRegionEpoch(newRegionInfo, regionInfo) {
|
|
return false, nil, berrors.ErrKVEpochNotMatch
|
|
}
|
|
log.Info("find new leader", zap.Uint64("new leader", newRegionInfo.Leader.Id))
|
|
regionInfo = newRegionInfo
|
|
}
|
|
log.Info("split region meet not leader error, retrying",
|
|
zap.Int("retry times", retry),
|
|
zap.Uint64("regionID", regionInfo.Region.Id),
|
|
zap.Any("new leader", regionInfo.Leader),
|
|
)
|
|
return true, nil, nil
|
|
}
|
|
// TODO: we don't handle RegionNotMatch and RegionNotFound here,
|
|
// because I think we don't have enough information to retry.
|
|
// But maybe we can handle them here by some information the error itself provides.
|
|
if resp.RegionError.ServerIsBusy != nil ||
|
|
resp.RegionError.StaleCommand != nil {
|
|
log.Warn("a error occurs on split region",
|
|
zap.Int("retry times", retry),
|
|
zap.Uint64("regionID", regionInfo.Region.Id),
|
|
zap.String("error", resp.RegionError.Message),
|
|
zap.Any("error verbose", resp.RegionError),
|
|
)
|
|
return true, nil, nil
|
|
}
|
|
return false, nil, nil
|
|
}
|
|
return false, resp, nil
|
|
}
|
|
|
|
// batchSplitRegionsWithOrigin calls the batch split region API and groups the
|
|
// returned regions into two groups: the region with the same ID as the origin,
|
|
// and the other regions. The former does not need to be scattered while the
|
|
// latter need to be scattered.
|
|
//
|
|
// Depending on the TiKV configuration right-derive-when-split, the origin region
|
|
// can be the first return region or the last return region.
|
|
func (c *pdClient) batchSplitRegionsWithOrigin(
|
|
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
|
|
) (*RegionInfo, []*RegionInfo, error) {
|
|
resp, err := c.sendSplitRegionRequest(ctx, regionInfo, keys)
|
|
if err != nil {
|
|
return nil, nil, errors.Trace(err)
|
|
}
|
|
|
|
regions := resp.GetRegions()
|
|
newRegionInfos := make([]*RegionInfo, 0, len(regions))
|
|
var originRegion *RegionInfo
|
|
for _, region := range regions {
|
|
var leader *metapb.Peer
|
|
|
|
// Assume the leaders will be at the same store.
|
|
if regionInfo.Leader != nil {
|
|
for _, p := range region.GetPeers() {
|
|
if p.GetStoreId() == regionInfo.Leader.GetStoreId() {
|
|
leader = p
|
|
break
|
|
}
|
|
}
|
|
}
|
|
// original region
|
|
if region.GetId() == regionInfo.Region.GetId() {
|
|
originRegion = &RegionInfo{
|
|
Region: region,
|
|
Leader: leader,
|
|
}
|
|
continue
|
|
}
|
|
newRegionInfos = append(newRegionInfos, &RegionInfo{
|
|
Region: region,
|
|
Leader: leader,
|
|
})
|
|
}
|
|
return originRegion, newRegionInfos, nil
|
|
}
|
|
|
|
func (c *pdClient) waitRegionsSplit(ctx context.Context, newRegions []*RegionInfo) error {
|
|
backoffer := NewBackoffMayNotCountBackoffer()
|
|
needRecheck := make([]*RegionInfo, 0, len(newRegions))
|
|
return utils.WithRetryReturnLastErr(ctx, func() error {
|
|
needRecheck = needRecheck[:0]
|
|
|
|
for _, r := range newRegions {
|
|
regionID := r.Region.GetId()
|
|
|
|
ok, err := c.hasHealthyRegion(ctx, regionID)
|
|
if !ok || err != nil {
|
|
if err != nil {
|
|
tidblogutil.Logger(ctx).Warn(
|
|
"wait for split failed",
|
|
zap.Uint64("regionID", regionID),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
needRecheck = append(needRecheck, r)
|
|
}
|
|
}
|
|
|
|
if len(needRecheck) == 0 {
|
|
return nil
|
|
}
|
|
|
|
backoffErr := ErrBackoff
|
|
// if made progress in this round, don't increase the retryCnt
|
|
if len(needRecheck) < len(newRegions) {
|
|
backoffErr = ErrBackoffAndDontCount
|
|
}
|
|
newRegions = slices.Clone(needRecheck)
|
|
|
|
return errors.Annotatef(
|
|
backoffErr,
|
|
"WaitRegionsSplit not finished, needRecheck: %d, the first unfinished region: %s",
|
|
len(needRecheck), needRecheck[0].Region.String(),
|
|
)
|
|
}, backoffer)
|
|
}
|
|
|
|
func (c *pdClient) hasHealthyRegion(ctx context.Context, regionID uint64) (bool, error) {
|
|
regionInfo, err := c.GetRegionByID(ctx, regionID)
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
// the region hasn't get ready.
|
|
if regionInfo == nil {
|
|
return false, nil
|
|
}
|
|
|
|
// check whether the region is healthy and report.
|
|
// TODO: the log may be too verbose. we should use Prometheus metrics once it get ready for BR.
|
|
for _, peer := range regionInfo.PendingPeers {
|
|
log.Debug("unhealthy region detected", logutil.Peer(peer), zap.String("type", "pending"))
|
|
}
|
|
for _, peer := range regionInfo.DownPeers {
|
|
log.Debug("unhealthy region detected", logutil.Peer(peer), zap.String("type", "down"))
|
|
}
|
|
// we ignore down peers for they are (normally) hard to be fixed in reasonable time.
|
|
// (or once there is a peer down, we may get stuck at waiting region get ready.)
|
|
return len(regionInfo.PendingPeers) == 0, nil
|
|
}
|
|
|
|
func (c *pdClient) SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][]byte) ([]*RegionInfo, error) {
|
|
if len(sortedSplitKeys) == 0 {
|
|
return nil, nil
|
|
}
|
|
// we need to find the regions that contain the split keys. However, the scan
|
|
// region API accepts a key range [start, end) where end key is exclusive, and if
|
|
// sortedSplitKeys length is 1, scan region may return empty result. So we
|
|
// increase the end key a bit. If the end key is on the region boundaries, it
|
|
// will be skipped by getSplitKeysOfRegions.
|
|
scanStart := codec.EncodeBytesExt(nil, sortedSplitKeys[0], c.isRawKv)
|
|
lastKey := kv.Key(sortedSplitKeys[len(sortedSplitKeys)-1])
|
|
if len(lastKey) > 0 {
|
|
lastKey = lastKey.Next()
|
|
}
|
|
scanEnd := codec.EncodeBytesExt(nil, lastKey, c.isRawKv)
|
|
|
|
// mu protects ret, retrySplitKeys, lastSplitErr
|
|
mu := sync.Mutex{}
|
|
ret := make([]*RegionInfo, 0, len(sortedSplitKeys)+1)
|
|
retrySplitKeys := make([][]byte, 0, len(sortedSplitKeys))
|
|
var lastSplitErr error
|
|
|
|
err := utils.WithRetryReturnLastErr(ctx, func() error {
|
|
ret = ret[:0]
|
|
|
|
if len(retrySplitKeys) > 0 {
|
|
scanStart = codec.EncodeBytesExt(nil, retrySplitKeys[0], c.isRawKv)
|
|
lastKey2 := kv.Key(retrySplitKeys[len(retrySplitKeys)-1])
|
|
scanEnd = codec.EncodeBytesExt(nil, lastKey2.Next(), c.isRawKv)
|
|
}
|
|
regions, err := PaginateScanRegion(ctx, c, scanStart, scanEnd, ScanRegionPaginationLimit)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Info("paginate scan regions",
|
|
zap.Int("count", len(regions)),
|
|
logutil.Key("start", scanStart),
|
|
logutil.Key("end", scanEnd))
|
|
|
|
allSplitKeys := sortedSplitKeys
|
|
if len(retrySplitKeys) > 0 {
|
|
allSplitKeys = retrySplitKeys
|
|
retrySplitKeys = retrySplitKeys[:0]
|
|
}
|
|
splitKeyMap := getSplitKeysOfRegions(allSplitKeys, regions, c.isRawKv)
|
|
workerPool := tidbutil.NewWorkerPool(uint(c.splitConcurrency), "split keys")
|
|
eg, eCtx := errgroup.WithContext(ctx)
|
|
for region, splitKeys := range splitKeyMap {
|
|
workerPool.ApplyOnErrorGroup(eg, func() error {
|
|
// TODO(lance6716): add error handling to retry from scan or retry from split
|
|
newRegions, err2 := c.SplitWaitAndScatter(eCtx, region, splitKeys)
|
|
if err2 != nil {
|
|
if common.IsContextCanceledError(err2) {
|
|
return err2
|
|
}
|
|
log.Warn("split and scatter region meet error, will retry",
|
|
zap.Uint64("region_id", region.Region.Id),
|
|
zap.Error(err2))
|
|
mu.Lock()
|
|
retrySplitKeys = append(retrySplitKeys, splitKeys...)
|
|
lastSplitErr = err2
|
|
mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
if len(newRegions) != len(splitKeys) {
|
|
log.Warn("split key count and new region count mismatch",
|
|
zap.Int("new region count", len(newRegions)),
|
|
zap.Int("split key count", len(splitKeys)))
|
|
}
|
|
mu.Lock()
|
|
ret = append(ret, newRegions...)
|
|
mu.Unlock()
|
|
return nil
|
|
})
|
|
}
|
|
if err2 := eg.Wait(); err2 != nil {
|
|
return err2
|
|
}
|
|
if len(retrySplitKeys) == 0 {
|
|
return nil
|
|
}
|
|
slices.SortFunc(retrySplitKeys, bytes.Compare)
|
|
return lastSplitErr
|
|
}, utils.NewBackoffRetryAllExceptStrategy(SplitRetryTimes, SplitRetryInterval, SplitMaxRetryInterval, isNonRetryErrForSplit))
|
|
return ret, errors.Trace(err)
|
|
}
|
|
|
|
func isNonRetryErrForSplit(err error) bool {
|
|
return berrors.ErrInvalidRange.Equal(err)
|
|
}
|
|
|
|
func (c *pdClient) SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error) {
|
|
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
|
|
failpoint.Return(nil, errors.New("retryable error"))
|
|
})
|
|
if len(keys) == 0 {
|
|
return []*RegionInfo{region}, nil
|
|
}
|
|
|
|
var (
|
|
start, end = 0, 0
|
|
batchSize = 0
|
|
newRegions = make([]*RegionInfo, 0, len(keys))
|
|
)
|
|
|
|
for end <= len(keys) {
|
|
if end == len(keys) ||
|
|
batchSize+len(keys[end]) > maxBatchSplitSize ||
|
|
end-start >= c.splitBatchKeyCnt {
|
|
// split, wait and scatter for this batch
|
|
originRegion, newRegionsOfBatch, err := c.batchSplitRegionsWithOrigin(ctx, region, keys[start:end])
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
err = c.waitRegionsSplit(ctx, newRegionsOfBatch)
|
|
if err != nil {
|
|
tidblogutil.Logger(ctx).Warn(
|
|
"wait regions split failed, will continue anyway",
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
if err = ctx.Err(); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
err = c.scatterRegions(ctx, newRegionsOfBatch)
|
|
if err != nil {
|
|
tidblogutil.Logger(ctx).Warn(
|
|
"scatter regions failed, will continue anyway",
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
if c.onSplit != nil {
|
|
c.onSplit(keys[start:end])
|
|
}
|
|
|
|
// the region with the max start key is the region need to be further split,
|
|
// depending on the origin region is the first region or last region, we need to
|
|
// compare the origin region and the last one of new regions.
|
|
lastNewRegion := newRegionsOfBatch[len(newRegionsOfBatch)-1]
|
|
if bytes.Compare(originRegion.Region.StartKey, lastNewRegion.Region.StartKey) < 0 {
|
|
region = lastNewRegion
|
|
} else {
|
|
region = originRegion
|
|
}
|
|
newRegions = append(newRegions, newRegionsOfBatch...)
|
|
batchSize = 0
|
|
start = end
|
|
}
|
|
|
|
if end < len(keys) {
|
|
batchSize += len(keys[end])
|
|
}
|
|
end++
|
|
}
|
|
|
|
return newRegions, errors.Trace(ctx.Err())
|
|
}
|
|
|
|
func (c *pdClient) getStoreCount(ctx context.Context) (int, error) {
|
|
stores, err := util.GetAllTiKVStores(ctx, c.client, util.SkipTiFlash)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return len(stores), err
|
|
}
|
|
|
|
func (c *pdClient) getMaxReplica(ctx context.Context) (int, error) {
|
|
resp, err := c.httpCli.GetReplicateConfig(ctx)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
key := "max-replicas"
|
|
val, ok := resp[key]
|
|
if !ok {
|
|
return 0, errors.Errorf("key %s not found in response %v", key, resp)
|
|
}
|
|
return int(val.(float64)), nil
|
|
}
|
|
|
|
func (c *pdClient) checkNeedScatter(ctx context.Context) (bool, error) {
|
|
storeCount, err := c.getStoreCount(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
maxReplica, err := c.getMaxReplica(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
log.Info("checking whether need to scatter", zap.Int("store", storeCount), zap.Int("max-replica", maxReplica))
|
|
// Skipping scatter may lead to leader unbalanced,
|
|
// currently, we skip scatter only when:
|
|
// 1. max-replica > store-count (Probably a misconfigured or playground cluster.)
|
|
// 2. store-count == 1 (No meaning for scattering.)
|
|
// We can still omit scatter when `max-replica == store-count`, if we create a BalanceLeader operator here,
|
|
// however, there isn't evidence for transform leader is much faster than scattering empty regions.
|
|
return storeCount >= maxReplica && storeCount > 1, nil
|
|
}
|
|
|
|
func (c *pdClient) scatterRegion(ctx context.Context, regionInfo *RegionInfo) error {
|
|
if !c.needScatter(ctx) {
|
|
return nil
|
|
}
|
|
return c.client.ScatterRegion(ctx, regionInfo.Region.GetId())
|
|
}
|
|
|
|
func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
|
|
return c.client.GetOperator(ctx, regionID)
|
|
}
|
|
|
|
func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*RegionInfo, error) {
|
|
failpoint.Inject("no-leader-error", func(_ failpoint.Value) {
|
|
logutil.CL(ctx).Debug("failpoint no-leader-error injected.")
|
|
failpoint.Return(nil, status.Error(codes.Unavailable, "not leader"))
|
|
})
|
|
|
|
//nolint:staticcheck
|
|
regions, err := c.client.ScanRegions(ctx, key, endKey, limit, opts...)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
regionInfos := make([]*RegionInfo, 0, len(regions))
|
|
for _, region := range regions {
|
|
regionInfos = append(regionInfos, &RegionInfo{
|
|
Region: region.Meta,
|
|
Leader: region.Leader,
|
|
})
|
|
}
|
|
return regionInfos, nil
|
|
}
|
|
|
|
func (c *pdClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (*pdhttp.Rule, error) {
|
|
resp, err := c.httpCli.GetPlacementRule(ctx, groupID, ruleID)
|
|
return resp, errors.Trace(err)
|
|
}
|
|
|
|
func (c *pdClient) SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error {
|
|
return c.httpCli.SetPlacementRule(ctx, rule)
|
|
}
|
|
|
|
func (c *pdClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error {
|
|
return c.httpCli.DeletePlacementRule(ctx, groupID, ruleID)
|
|
}
|
|
|
|
func (c *pdClient) SetStoresLabel(
|
|
ctx context.Context, stores []uint64, labelKey, labelValue string,
|
|
) error {
|
|
m := map[string]string{labelKey: labelValue}
|
|
for _, id := range stores {
|
|
err := c.httpCli.SetStoreLabels(ctx, int64(id), m)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *pdClient) scatterRegionsSequentially(ctx context.Context, newRegions []*RegionInfo, backoffStrategy utils.BackoffStrategy) {
|
|
newRegionSet := make(map[uint64]*RegionInfo, len(newRegions))
|
|
for _, newRegion := range newRegions {
|
|
newRegionSet[newRegion.Region.Id] = newRegion
|
|
}
|
|
|
|
if err := utils.WithRetry(ctx, func() error {
|
|
log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet)))
|
|
var errs error
|
|
for _, region := range newRegionSet {
|
|
err := c.scatterRegion(ctx, region)
|
|
if err == nil {
|
|
// it is safe according to the Go language spec.
|
|
delete(newRegionSet, region.Region.Id)
|
|
} else if !PdErrorCanRetry(err) {
|
|
log.Warn("scatter meet error cannot be retried, skipping",
|
|
logutil.ShortError(err),
|
|
logutil.Region(region.Region),
|
|
)
|
|
delete(newRegionSet, region.Region.Id)
|
|
}
|
|
errs = multierr.Append(errs, err)
|
|
}
|
|
return errs
|
|
}, backoffStrategy); err != nil {
|
|
log.Warn("Some regions haven't been scattered because errors.",
|
|
zap.Int("count", len(newRegionSet)),
|
|
// if all region are failed to scatter, the short error might also be verbose...
|
|
logutil.ShortError(err),
|
|
logutil.AbbreviatedArray("failed-regions", newRegionSet, func(i any) []string {
|
|
m := i.(map[uint64]*RegionInfo)
|
|
result := make([]string, 0, len(m))
|
|
for id := range m {
|
|
result = append(result, strconv.Itoa(int(id)))
|
|
}
|
|
return result
|
|
}),
|
|
)
|
|
}
|
|
}
|
|
|
|
func (c *pdClient) isScatterRegionFinished(
|
|
ctx context.Context,
|
|
regionID uint64,
|
|
) (scatterDone bool, needRescatter bool, scatterErr error) {
|
|
resp, err := c.GetOperator(ctx, regionID)
|
|
if err != nil {
|
|
if common.IsRetryableError(err) {
|
|
// retry in the next cycle
|
|
return false, false, nil
|
|
}
|
|
return false, false, errors.Trace(err)
|
|
}
|
|
return isScatterRegionFinished(resp)
|
|
}
|
|
|
|
func (c *pdClient) WaitRegionsScattered(ctx context.Context, regions []*RegionInfo) (int, error) {
|
|
var (
|
|
backoffer = NewBackoffMayNotCountBackoffer()
|
|
retryCnt = -1
|
|
needRescatter = make([]*RegionInfo, 0, len(regions))
|
|
needRecheck = make([]*RegionInfo, 0, len(regions))
|
|
)
|
|
|
|
err := utils.WithRetryReturnLastErr(ctx, func() error {
|
|
retryCnt++
|
|
loggedInThisRound := false
|
|
needRecheck = needRecheck[:0]
|
|
needRescatter = needRescatter[:0]
|
|
|
|
for i, region := range regions {
|
|
regionID := region.Region.GetId()
|
|
|
|
if retryCnt > 10 && !loggedInThisRound {
|
|
loggedInThisRound = true
|
|
resp, err := c.GetOperator(ctx, regionID)
|
|
tidblogutil.Logger(ctx).Info(
|
|
"retried many times to wait for scattering regions, checking operator",
|
|
zap.Int("retryCnt", retryCnt),
|
|
zap.Uint64("firstRegionID", regionID),
|
|
zap.Stringer("response", resp),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
|
|
ok, rescatter, err := c.isScatterRegionFinished(ctx, regionID)
|
|
if err != nil {
|
|
if !common.IsRetryableError(err) {
|
|
tidblogutil.Logger(ctx).Warn(
|
|
"wait for scatter region encountered non-retryable error",
|
|
logutil.Region(region.Region),
|
|
zap.Error(err),
|
|
)
|
|
needRecheck = append(needRecheck, regions[i:]...)
|
|
return err
|
|
}
|
|
// if meet retryable error, recheck this region in next round
|
|
tidblogutil.Logger(ctx).Warn(
|
|
"wait for scatter region encountered error, will retry again",
|
|
logutil.Region(region.Region),
|
|
zap.Error(err),
|
|
)
|
|
needRecheck = append(needRecheck, region)
|
|
continue
|
|
}
|
|
|
|
if ok {
|
|
continue
|
|
}
|
|
// not finished scattered, check again in next round
|
|
needRecheck = append(needRecheck, region)
|
|
|
|
if rescatter {
|
|
needRescatter = append(needRescatter, region)
|
|
}
|
|
}
|
|
|
|
if len(needRecheck) == 0 {
|
|
return nil
|
|
}
|
|
|
|
backoffErr := ErrBackoff
|
|
// if made progress in this round, don't increase the retryCnt
|
|
if len(needRecheck) < len(regions) {
|
|
backoffErr = ErrBackoffAndDontCount
|
|
}
|
|
|
|
regions = slices.Clone(needRecheck)
|
|
|
|
if len(needRescatter) > 0 {
|
|
scatterErr := c.scatterRegions(ctx, needRescatter)
|
|
if scatterErr != nil {
|
|
if !common.IsRetryableError(scatterErr) {
|
|
return scatterErr
|
|
}
|
|
|
|
return errors.Annotate(backoffErr, scatterErr.Error())
|
|
}
|
|
}
|
|
return errors.Annotatef(
|
|
backoffErr,
|
|
"scatter region not finished, retryCnt: %d, needRecheck: %d, needRescatter: %d, the first unfinished region: %s",
|
|
retryCnt, len(needRecheck), len(needRescatter), needRecheck[0].Region.String(),
|
|
)
|
|
}, backoffer)
|
|
|
|
return len(needRecheck), err
|
|
}
|
|
|
|
// isScatterRegionFinished checks whether the scatter region operator is
|
|
// finished.
|
|
func isScatterRegionFinished(resp *pdpb.GetOperatorResponse) (
|
|
scatterDone bool,
|
|
needRescatter bool,
|
|
scatterErr error,
|
|
) {
|
|
// Heartbeat may not be sent to PD
|
|
if respErr := resp.GetHeader().GetError(); respErr != nil {
|
|
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
|
|
return true, false, nil
|
|
}
|
|
return false, false, errors.Annotatef(
|
|
berrors.ErrPDInvalidResponse,
|
|
"get operator error: %s, error message: %s",
|
|
respErr.GetType(),
|
|
respErr.GetMessage(),
|
|
)
|
|
}
|
|
// that 'scatter-operator' has finished
|
|
if string(resp.GetDesc()) != "scatter-region" {
|
|
return true, false, nil
|
|
}
|
|
switch resp.GetStatus() {
|
|
case pdpb.OperatorStatus_SUCCESS:
|
|
return true, false, nil
|
|
case pdpb.OperatorStatus_RUNNING:
|
|
return false, false, nil
|
|
default:
|
|
return false, true, nil
|
|
}
|
|
}
|
|
|
|
// CheckRegionEpoch check region epoch.
|
|
func CheckRegionEpoch(_new, _old *RegionInfo) bool {
|
|
return _new.Region.GetId() == _old.Region.GetId() &&
|
|
_new.Region.GetRegionEpoch().GetVersion() == _old.Region.GetRegionEpoch().GetVersion() &&
|
|
_new.Region.GetRegionEpoch().GetConfVer() == _old.Region.GetRegionEpoch().GetConfVer()
|
|
}
|
|
|
|
// ExponentialBackoffer trivially retry any errors it meets.
|
|
// It's useful when the caller has handled the errors but
|
|
// only want to a more semantic backoff implementation.
|
|
type ExponentialBackoffer struct {
|
|
Attempts int
|
|
BaseBackoff time.Duration
|
|
}
|
|
|
|
func (b *ExponentialBackoffer) exponentialBackoff() time.Duration {
|
|
bo := b.BaseBackoff
|
|
b.Attempts--
|
|
if b.Attempts == 0 {
|
|
return 0
|
|
}
|
|
b.BaseBackoff *= 2
|
|
return bo
|
|
}
|
|
|
|
// PdErrorCanRetry when pd error retry.
|
|
func PdErrorCanRetry(err error) bool {
|
|
// There are 3 type of reason that PD would reject a `scatter` request:
|
|
// (1) region %d has no leader
|
|
// (2) region %d is hot
|
|
// (3) region %d is not fully replicated
|
|
// (4) operator canceled because cannot add an operator to the execute queue [PD:store-limit]
|
|
// (5) failed to create scatter region operator [PD:schedule:ErrCreateOperator]
|
|
//
|
|
// (2) shouldn't happen in a recently splitted region.
|
|
// (1) and (3) might happen, and should be retried.
|
|
grpcErr := status.Convert(err)
|
|
if grpcErr == nil {
|
|
return false
|
|
}
|
|
return strings.Contains(grpcErr.Message(), "is not fully replicated") ||
|
|
strings.Contains(grpcErr.Message(), "has no leader") ||
|
|
strings.Contains(grpcErr.Message(), "cannot add an operator to the execute queue") ||
|
|
strings.Contains(grpcErr.Message(), "failed to create scatter region operator")
|
|
}
|
|
|
|
// NextBackoff returns a duration to wait before retrying again.
|
|
func (b *ExponentialBackoffer) NextBackoff(error) time.Duration {
|
|
// trivially exponential back off, because we have handled the error at upper level.
|
|
return b.exponentialBackoff()
|
|
}
|
|
|
|
// Attempt returns the remain attempt times
|
|
func (b *ExponentialBackoffer) Attempt() int {
|
|
return b.Attempts
|
|
}
|
|
|
|
// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error.
|
|
func isUnsupportedError(err error) bool {
|
|
s, ok := status.FromError(errors.Cause(err))
|
|
if !ok {
|
|
// Not a gRPC error. Something other went wrong.
|
|
return false
|
|
}
|
|
// In two conditions, we fallback to ScatterRegion:
|
|
// (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.)
|
|
// (2) If the Message is "region 0 not found":
|
|
// In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering.
|
|
// When the request contains the field `regionIDs`, it would use the batch version,
|
|
// Otherwise, it uses the old version and scatter the region with `regionID` in the request.
|
|
// When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs`
|
|
// which would be ignored by protocol buffers, and leave the `regionID` be zero.
|
|
// Then the older version of PD would try to search the region with ID 0.
|
|
// (Then it consistently fails, and returns "region 0 not found".)
|
|
return s.Code() == codes.Unimplemented ||
|
|
strings.Contains(s.Message(), "region 0 not found")
|
|
}
|