br, lightning: unify IsScatterRegionFinished (#51536)
ref pingcap/tidb#51533
This commit is contained in:
@ -72,7 +72,6 @@ go_library(
|
||||
"@com_github_pingcap_kvproto//pkg/import_sstpb",
|
||||
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
|
||||
"@com_github_pingcap_kvproto//pkg/metapb",
|
||||
"@com_github_pingcap_kvproto//pkg/pdpb",
|
||||
"@com_github_pingcap_tipb//go-tipb",
|
||||
"@com_github_tikv_client_go_v2//kv",
|
||||
"@com_github_tikv_client_go_v2//oracle",
|
||||
|
||||
@ -29,7 +29,6 @@ import (
|
||||
"github.com/pingcap/failpoint"
|
||||
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/kvproto/pkg/pdpb"
|
||||
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
@ -465,35 +464,14 @@ func (local *Backend) waitForScatterRegions(ctx context.Context, regions []*spli
|
||||
}
|
||||
|
||||
func (local *Backend) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
|
||||
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
|
||||
ok, rescatter, err := local.splitCli.IsScatterRegionFinished(ctx, regionInfo.Region.GetId())
|
||||
if err != nil {
|
||||
return false, err
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
// Heartbeat may not be sent to PD
|
||||
if respErr := resp.GetHeader().GetError(); respErr != nil {
|
||||
// TODO: why this is OK?
|
||||
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
|
||||
return true, nil
|
||||
}
|
||||
return false, errors.Errorf(
|
||||
"failed to get region operator, error type: %s, error message: %s",
|
||||
respErr.GetType().String(), respErr.GetMessage())
|
||||
}
|
||||
// If the current operator of the region is not 'scatter-region', we could assume
|
||||
// that 'scatter-operator' has finished.
|
||||
if string(resp.GetDesc()) != "scatter-region" {
|
||||
return true, nil
|
||||
}
|
||||
switch resp.GetStatus() {
|
||||
case pdpb.OperatorStatus_RUNNING:
|
||||
return false, nil
|
||||
case pdpb.OperatorStatus_SUCCESS:
|
||||
return true, nil
|
||||
default:
|
||||
log.FromContext(ctx).Debug("scatter-region operator status is abnormal, will scatter region again",
|
||||
logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus()))
|
||||
return false, local.ScatterRegion(ctx, regionInfo)
|
||||
if !rescatter {
|
||||
return ok, nil
|
||||
}
|
||||
return false, local.ScatterRegion(ctx, regionInfo)
|
||||
}
|
||||
|
||||
func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte {
|
||||
|
||||
@ -38,7 +38,6 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/codec"
|
||||
"github.com/stretchr/testify/require"
|
||||
pdhttp "github.com/tikv/pd/client/http"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
@ -48,6 +47,7 @@ func init() {
|
||||
}
|
||||
|
||||
type testSplitClient struct {
|
||||
split.SplitClient
|
||||
mu sync.RWMutex
|
||||
stores map[uint64]*metapb.Store
|
||||
regions map[uint64]*split.RegionInfo
|
||||
@ -244,12 +244,6 @@ func (c *testSplitClient) ScatterRegion(ctx context.Context, regionInfo *split.R
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
|
||||
return &pdpb.GetOperatorResponse{
|
||||
Header: new(pdpb.ResponseHeader),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@ -278,20 +272,11 @@ func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, l
|
||||
return regions, err
|
||||
}
|
||||
|
||||
func (c *testSplitClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r *pdhttp.Rule, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *testSplitClient) SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *testSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *testSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error {
|
||||
return nil
|
||||
func (c *testSplitClient) IsScatterRegionFinished(
|
||||
ctx context.Context,
|
||||
regionID uint64,
|
||||
) (scatterDone bool, needRescatter bool, scatterErr error) {
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
func cloneRegion(region *split.RegionInfo) *split.RegionInfo {
|
||||
@ -975,6 +960,11 @@ func (c *scatterRegionCli) GetOperator(_ context.Context, regionID uint64) (*pdp
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (c *scatterRegionCli) IsScatterRegionFinished(ctx context.Context, regionID uint64) (scatterDone bool, needRescatter bool, scatterErr error) {
|
||||
resp, _ := c.GetOperator(ctx, regionID)
|
||||
return split.IsScatterRegionFinished(resp)
|
||||
}
|
||||
|
||||
func TestWaitForScatterRegions(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
@ -1053,7 +1043,7 @@ func TestWaitForScatterRegions(t *testing.T) {
|
||||
}
|
||||
local = &Backend{splitCli: cli}
|
||||
cnt, err = local.waitForScatterRegions(ctx, regions)
|
||||
require.ErrorContains(t, err, "failed to get region operator, error type: DATA_COMPACTED")
|
||||
require.ErrorContains(t, err, "get operator error: DATA_COMPACTED")
|
||||
require.Equal(t, 2, cnt)
|
||||
checkRespDrained(cli)
|
||||
checkNoRetry(cli)
|
||||
|
||||
@ -31,7 +31,6 @@ go_library(
|
||||
"//br/pkg/conn/util",
|
||||
"//br/pkg/errors",
|
||||
"//br/pkg/glue",
|
||||
"//br/pkg/lightning/common",
|
||||
"//br/pkg/logutil",
|
||||
"//br/pkg/metautil",
|
||||
"//br/pkg/pdutil",
|
||||
@ -81,7 +80,6 @@ go_library(
|
||||
"@com_github_pingcap_kvproto//pkg/import_sstpb",
|
||||
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
|
||||
"@com_github_pingcap_kvproto//pkg/metapb",
|
||||
"@com_github_pingcap_kvproto//pkg/pdpb",
|
||||
"@com_github_pingcap_kvproto//pkg/recoverdatapb",
|
||||
"@com_github_pingcap_log//:log",
|
||||
"@com_github_tikv_client_go_v2//config",
|
||||
@ -182,7 +180,6 @@ go_test(
|
||||
"@com_github_tikv_client_go_v2//rawkv",
|
||||
"@com_github_tikv_client_go_v2//testutils",
|
||||
"@com_github_tikv_pd_client//:client",
|
||||
"@com_github_tikv_pd_client//http",
|
||||
"@org_golang_google_grpc//codes",
|
||||
"@org_golang_google_grpc//keepalive",
|
||||
"@org_golang_google_grpc//status",
|
||||
|
||||
@ -15,10 +15,8 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
||||
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
|
||||
"github.com/pingcap/kvproto/pkg/pdpb"
|
||||
"github.com/pingcap/log"
|
||||
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/br/pkg/logutil"
|
||||
"github.com/pingcap/tidb/br/pkg/restore/split"
|
||||
"github.com/pingcap/tidb/br/pkg/rtree"
|
||||
@ -33,10 +31,6 @@ import (
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type retryTimeKey struct{}
|
||||
|
||||
var retryTimes = new(retryTimeKey)
|
||||
|
||||
type Granularity string
|
||||
|
||||
const (
|
||||
@ -424,53 +418,6 @@ func (rs *RegionSplitter) hasHealthyRegion(ctx context.Context, regionID uint64)
|
||||
return len(regionInfo.PendingPeers) == 0, nil
|
||||
}
|
||||
|
||||
// isScatterRegionFinished check the latest successful operator and return the follow status:
|
||||
//
|
||||
// return (finished, needRescatter, error)
|
||||
//
|
||||
// if the latest operator is not `scatter-operator`, or its status is SUCCESS, it's likely that the
|
||||
// scatter region operator is finished.
|
||||
//
|
||||
// if the latest operator is `scatter-operator` and its status is TIMEOUT or CANCEL, the needRescatter
|
||||
// is true and the function caller needs to scatter this region again.
|
||||
func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (scatterDone bool, needRescatter bool, scatterErr error) {
|
||||
resp, err := rs.client.GetOperator(ctx, regionID)
|
||||
if err != nil {
|
||||
if common.IsRetryableError(err) {
|
||||
// retry in the next cycle
|
||||
return false, false, nil
|
||||
}
|
||||
return false, false, errors.Trace(err)
|
||||
}
|
||||
// Heartbeat may not be sent to PD
|
||||
if respErr := resp.GetHeader().GetError(); respErr != nil {
|
||||
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
|
||||
return true, false, nil
|
||||
}
|
||||
return false, false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType())
|
||||
}
|
||||
defer func() {
|
||||
if !scatterDone {
|
||||
retryTimes := ctx.Value(retryTimes).(int)
|
||||
if retryTimes > 10 {
|
||||
log.Info("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp))
|
||||
}
|
||||
}
|
||||
}()
|
||||
// that 'scatter-operator' has finished
|
||||
if string(resp.GetDesc()) != "scatter-region" {
|
||||
return true, false, nil
|
||||
}
|
||||
switch resp.GetStatus() {
|
||||
case pdpb.OperatorStatus_SUCCESS:
|
||||
return true, false, nil
|
||||
case pdpb.OperatorStatus_RUNNING:
|
||||
return false, false, nil
|
||||
default:
|
||||
return false, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int {
|
||||
var (
|
||||
startTime = time.Now()
|
||||
@ -481,10 +428,19 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi
|
||||
reScatterRegions = make([]*split.RegionInfo, 0, len(regionInfos))
|
||||
)
|
||||
for {
|
||||
ctx1 := context.WithValue(ctx, retryTimes, retryCnt)
|
||||
loggedLongRetry := false
|
||||
reScatterRegions = reScatterRegions[:0]
|
||||
for regionID, regionInfo := range leftRegions {
|
||||
ok, rescatter, err := rs.isScatterRegionFinished(ctx1, regionID)
|
||||
if retryCnt > 10 && !loggedLongRetry {
|
||||
loggedLongRetry = true
|
||||
resp, err := rs.client.GetOperator(ctx, regionID)
|
||||
log.Info("retried many times to wait for scattering regions, checking operator",
|
||||
zap.Int("retryCnt", retryCnt),
|
||||
zap.Uint64("anyRegionID", regionID),
|
||||
zap.Stringer("resp", resp),
|
||||
zap.Error(err))
|
||||
}
|
||||
ok, rescatter, err := rs.client.IsScatterRegionFinished(ctx, regionID)
|
||||
if err != nil {
|
||||
log.Warn("scatter region failed: do not have the region",
|
||||
logutil.Region(regionInfo.Region), zap.Error(err))
|
||||
@ -506,7 +462,7 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi
|
||||
}
|
||||
|
||||
if len(reScatterRegions) > 0 {
|
||||
rs.ScatterRegions(ctx1, reScatterRegions)
|
||||
rs.ScatterRegions(ctx, reScatterRegions)
|
||||
}
|
||||
|
||||
if time.Since(startTime) > timeout {
|
||||
|
||||
@ -13,6 +13,7 @@ go_library(
|
||||
deps = [
|
||||
"//br/pkg/conn/util",
|
||||
"//br/pkg/errors",
|
||||
"//br/pkg/lightning/common",
|
||||
"//br/pkg/lightning/config",
|
||||
"//br/pkg/logutil",
|
||||
"//br/pkg/redact",
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"github.com/pingcap/log"
|
||||
"github.com/pingcap/tidb/br/pkg/conn/util"
|
||||
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/common"
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/config"
|
||||
"github.com/pingcap/tidb/br/pkg/logutil"
|
||||
pd "github.com/tikv/pd/client"
|
||||
@ -73,6 +74,18 @@ type SplitClient interface {
|
||||
// SetStoresLabel add or update specified label of stores. If labelValue
|
||||
// is empty, it clears the label.
|
||||
SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error
|
||||
// IsScatterRegionFinished check the latest successful operator and return the
|
||||
// follow status:
|
||||
//
|
||||
// return (finished, needRescatter, error)
|
||||
//
|
||||
// if the latest operator is not `scatter-operator`, or its status is SUCCESS,
|
||||
// it's likely that the scatter region operator is finished.
|
||||
//
|
||||
// if the latest operator is `scatter-operator` and its status is TIMEOUT or
|
||||
// CANCEL, the needRescatter is true and the function caller needs to scatter
|
||||
// this region again.
|
||||
IsScatterRegionFinished(ctx context.Context, regionID uint64) (scatterDone bool, needRescatter bool, scatterErr error)
|
||||
}
|
||||
|
||||
// pdClient is a wrapper of pd client, can be used by RegionSplitter.
|
||||
@ -551,12 +564,53 @@ func (c *pdClient) SetStoresLabel(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *pdClient) getPDAPIAddr() string {
|
||||
addr := c.client.GetLeaderAddr()
|
||||
if addr != "" && !strings.HasPrefix(addr, "http") {
|
||||
addr = "http://" + addr
|
||||
func (c *pdClient) IsScatterRegionFinished(
|
||||
ctx context.Context,
|
||||
regionID uint64,
|
||||
) (scatterDone bool, needRescatter bool, scatterErr error) {
|
||||
resp, err := c.GetOperator(ctx, regionID)
|
||||
if err != nil {
|
||||
if common.IsRetryableError(err) {
|
||||
// retry in the next cycle
|
||||
return false, false, nil
|
||||
}
|
||||
return false, false, errors.Trace(err)
|
||||
}
|
||||
return IsScatterRegionFinished(resp)
|
||||
}
|
||||
|
||||
// IsScatterRegionFinished checks whether the scatter region operator is
|
||||
// finished. TODO(lance6716): hide this function after scatter logic is unified
|
||||
// for BR and lightning.
|
||||
func IsScatterRegionFinished(resp *pdpb.GetOperatorResponse) (
|
||||
scatterDone bool,
|
||||
needRescatter bool,
|
||||
scatterErr error,
|
||||
) {
|
||||
// Heartbeat may not be sent to PD
|
||||
if respErr := resp.GetHeader().GetError(); respErr != nil {
|
||||
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
|
||||
return true, false, nil
|
||||
}
|
||||
return false, false, errors.Annotatef(
|
||||
berrors.ErrPDInvalidResponse,
|
||||
"get operator error: %s, error message: %s",
|
||||
respErr.GetType(),
|
||||
respErr.GetMessage(),
|
||||
)
|
||||
}
|
||||
// that 'scatter-operator' has finished
|
||||
if string(resp.GetDesc()) != "scatter-region" {
|
||||
return true, false, nil
|
||||
}
|
||||
switch resp.GetStatus() {
|
||||
case pdpb.OperatorStatus_SUCCESS:
|
||||
return true, false, nil
|
||||
case pdpb.OperatorStatus_RUNNING:
|
||||
return false, false, nil
|
||||
default:
|
||||
return false, true, nil
|
||||
}
|
||||
return strings.TrimRight(addr, "/")
|
||||
}
|
||||
|
||||
// CheckRegionEpoch check region epoch.
|
||||
|
||||
@ -29,13 +29,14 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/tablecodec"
|
||||
"github.com/pingcap/tidb/pkg/util/codec"
|
||||
"github.com/stretchr/testify/require"
|
||||
pdhttp "github.com/tikv/pd/client/http"
|
||||
"go.uber.org/multierr"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type TestClient struct {
|
||||
split.SplitClient
|
||||
|
||||
mu sync.RWMutex
|
||||
stores map[uint64]*metapb.Store
|
||||
regions map[uint64]*split.RegionInfo
|
||||
@ -245,20 +246,12 @@ func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit
|
||||
return regions, nil
|
||||
}
|
||||
|
||||
func (c *TestClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r *pdhttp.Rule, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *TestClient) SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TestClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error {
|
||||
return nil
|
||||
func (c *TestClient) IsScatterRegionFinished(
|
||||
ctx context.Context,
|
||||
regionID uint64,
|
||||
) (scatterDone bool, needRescatter bool, scatterErr error) {
|
||||
resp, _ := c.GetOperator(ctx, regionID)
|
||||
return split.IsScatterRegionFinished(resp)
|
||||
}
|
||||
|
||||
type assertRetryLessThanBackoffer struct {
|
||||
@ -976,6 +969,7 @@ func TestSplitPoint2(t *testing.T) {
|
||||
}
|
||||
|
||||
type fakeSplitClient struct {
|
||||
split.SplitClient
|
||||
regions []*split.RegionInfo
|
||||
}
|
||||
|
||||
@ -994,33 +988,6 @@ func (f *fakeSplitClient) AppendRegion(startKey, endKey []byte) {
|
||||
})
|
||||
}
|
||||
|
||||
func (*fakeSplitClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (*fakeSplitClient) GetRegion(ctx context.Context, key []byte) (*split.RegionInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (*fakeSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (*fakeSplitClient) SplitRegion(ctx context.Context, regionInfo *split.RegionInfo, key []byte) (*split.RegionInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (*fakeSplitClient) BatchSplitRegions(ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (*fakeSplitClient) BatchSplitRegionsWithOrigin(ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte) (*split.RegionInfo, []*split.RegionInfo, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
func (*fakeSplitClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error {
|
||||
return nil
|
||||
}
|
||||
func (*fakeSplitClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error {
|
||||
return nil
|
||||
}
|
||||
func (*fakeSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (f *fakeSplitClient) ScanRegions(ctx context.Context, startKey, endKey []byte, limit int) ([]*split.RegionInfo, error) {
|
||||
result := make([]*split.RegionInfo, 0)
|
||||
count := 0
|
||||
@ -1035,16 +1002,6 @@ func (f *fakeSplitClient) ScanRegions(ctx context.Context, startKey, endKey []by
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
func (*fakeSplitClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (*pdhttp.Rule, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (*fakeSplitClient) SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error { return nil }
|
||||
func (*fakeSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error {
|
||||
return nil
|
||||
}
|
||||
func (*fakeSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestGetRewriteTableID(t *testing.T) {
|
||||
var tableID int64 = 76
|
||||
|
||||
Reference in New Issue
Block a user