diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index f032a6a629..db1b9c6936 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -72,7 +72,6 @@ go_library( "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_kvproto//pkg/pdpb", "@com_github_pingcap_tipb//go-tipb", "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 452bb9844f..595554fff2 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/failpoint" sst "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -465,35 +464,14 @@ func (local *Backend) waitForScatterRegions(ctx context.Context, regions []*spli } func (local *Backend) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { - resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) + ok, rescatter, err := local.splitCli.IsScatterRegionFinished(ctx, regionInfo.Region.GetId()) if err != nil { - return false, err + return false, errors.Trace(err) } - // Heartbeat may not be sent to PD - if respErr := resp.GetHeader().GetError(); respErr != nil { - // TODO: why this is OK? - if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { - return true, nil - } - return false, errors.Errorf( - "failed to get region operator, error type: %s, error message: %s", - respErr.GetType().String(), respErr.GetMessage()) - } - // If the current operator of the region is not 'scatter-region', we could assume - // that 'scatter-operator' has finished. - if string(resp.GetDesc()) != "scatter-region" { - return true, nil - } - switch resp.GetStatus() { - case pdpb.OperatorStatus_RUNNING: - return false, nil - case pdpb.OperatorStatus_SUCCESS: - return true, nil - default: - log.FromContext(ctx).Debug("scatter-region operator status is abnormal, will scatter region again", - logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus())) - return false, local.ScatterRegion(ctx, regionInfo) + if !rescatter { + return ok, nil } + return false, local.ScatterRegion(ctx, regionInfo) } func getSplitKeysByRanges(ranges []common.Range, regions []*split.RegionInfo, logger log.Logger) map[uint64][][]byte { diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index b36bff1161..6f1e4fd9b5 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" - pdhttp "github.com/tikv/pd/client/http" "go.uber.org/atomic" ) @@ -48,6 +47,7 @@ func init() { } type testSplitClient struct { + split.SplitClient mu sync.RWMutex stores map[uint64]*metapb.Store regions map[uint64]*split.RegionInfo @@ -244,12 +244,6 @@ func (c *testSplitClient) ScatterRegion(ctx context.Context, regionInfo *split.R return nil } -func (c *testSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { - return &pdpb.GetOperatorResponse{ - Header: new(pdpb.ResponseHeader), - }, nil -} - func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) { c.mu.Lock() defer c.mu.Unlock() @@ -278,20 +272,11 @@ func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, l return regions, err } -func (c *testSplitClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r *pdhttp.Rule, err error) { - return -} - -func (c *testSplitClient) SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error { - return nil -} - -func (c *testSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { - return nil -} - -func (c *testSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { - return nil +func (c *testSplitClient) IsScatterRegionFinished( + ctx context.Context, + regionID uint64, +) (scatterDone bool, needRescatter bool, scatterErr error) { + return true, false, nil } func cloneRegion(region *split.RegionInfo) *split.RegionInfo { @@ -975,6 +960,11 @@ func (c *scatterRegionCli) GetOperator(_ context.Context, regionID uint64) (*pdp return ret, nil } +func (c *scatterRegionCli) IsScatterRegionFinished(ctx context.Context, regionID uint64) (scatterDone bool, needRescatter bool, scatterErr error) { + resp, _ := c.GetOperator(ctx, regionID) + return split.IsScatterRegionFinished(resp) +} + func TestWaitForScatterRegions(t *testing.T) { ctx := context.Background() @@ -1053,7 +1043,7 @@ func TestWaitForScatterRegions(t *testing.T) { } local = &Backend{splitCli: cli} cnt, err = local.waitForScatterRegions(ctx, regions) - require.ErrorContains(t, err, "failed to get region operator, error type: DATA_COMPACTED") + require.ErrorContains(t, err, "get operator error: DATA_COMPACTED") require.Equal(t, 2, cnt) checkRespDrained(cli) checkNoRetry(cli) diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 87bb9cb7a9..aa61bb6413 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/glue", - "//br/pkg/lightning/common", "//br/pkg/logutil", "//br/pkg/metautil", "//br/pkg/pdutil", @@ -81,7 +80,6 @@ go_library( "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_kvproto//pkg/pdpb", "@com_github_pingcap_kvproto//pkg/recoverdatapb", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//config", @@ -182,7 +180,6 @@ go_test( "@com_github_tikv_client_go_v2//rawkv", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_pd_client//:client", - "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 0c801a19b8..0790999e8b 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -15,10 +15,8 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" sst "github.com/pingcap/kvproto/pkg/import_sstpb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" @@ -33,10 +31,6 @@ import ( "google.golang.org/grpc/status" ) -type retryTimeKey struct{} - -var retryTimes = new(retryTimeKey) - type Granularity string const ( @@ -424,53 +418,6 @@ func (rs *RegionSplitter) hasHealthyRegion(ctx context.Context, regionID uint64) return len(regionInfo.PendingPeers) == 0, nil } -// isScatterRegionFinished check the latest successful operator and return the follow status: -// -// return (finished, needRescatter, error) -// -// if the latest operator is not `scatter-operator`, or its status is SUCCESS, it's likely that the -// scatter region operator is finished. -// -// if the latest operator is `scatter-operator` and its status is TIMEOUT or CANCEL, the needRescatter -// is true and the function caller needs to scatter this region again. -func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID uint64) (scatterDone bool, needRescatter bool, scatterErr error) { - resp, err := rs.client.GetOperator(ctx, regionID) - if err != nil { - if common.IsRetryableError(err) { - // retry in the next cycle - return false, false, nil - } - return false, false, errors.Trace(err) - } - // Heartbeat may not be sent to PD - if respErr := resp.GetHeader().GetError(); respErr != nil { - if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { - return true, false, nil - } - return false, false, errors.Annotatef(berrors.ErrPDInvalidResponse, "get operator error: %s", respErr.GetType()) - } - defer func() { - if !scatterDone { - retryTimes := ctx.Value(retryTimes).(int) - if retryTimes > 10 { - log.Info("get operator", zap.Uint64("regionID", regionID), zap.Stringer("resp", resp)) - } - } - }() - // that 'scatter-operator' has finished - if string(resp.GetDesc()) != "scatter-region" { - return true, false, nil - } - switch resp.GetStatus() { - case pdpb.OperatorStatus_SUCCESS: - return true, false, nil - case pdpb.OperatorStatus_RUNNING: - return false, false, nil - default: - return false, true, nil - } -} - func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int { var ( startTime = time.Now() @@ -481,10 +428,19 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi reScatterRegions = make([]*split.RegionInfo, 0, len(regionInfos)) ) for { - ctx1 := context.WithValue(ctx, retryTimes, retryCnt) + loggedLongRetry := false reScatterRegions = reScatterRegions[:0] for regionID, regionInfo := range leftRegions { - ok, rescatter, err := rs.isScatterRegionFinished(ctx1, regionID) + if retryCnt > 10 && !loggedLongRetry { + loggedLongRetry = true + resp, err := rs.client.GetOperator(ctx, regionID) + log.Info("retried many times to wait for scattering regions, checking operator", + zap.Int("retryCnt", retryCnt), + zap.Uint64("anyRegionID", regionID), + zap.Stringer("resp", resp), + zap.Error(err)) + } + ok, rescatter, err := rs.client.IsScatterRegionFinished(ctx, regionID) if err != nil { log.Warn("scatter region failed: do not have the region", logutil.Region(regionInfo.Region), zap.Error(err)) @@ -506,7 +462,7 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi } if len(reScatterRegions) > 0 { - rs.ScatterRegions(ctx1, reScatterRegions) + rs.ScatterRegions(ctx, reScatterRegions) } if time.Since(startTime) > timeout { diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 2b685914c4..45b2d8907f 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//br/pkg/conn/util", "//br/pkg/errors", + "//br/pkg/lightning/common", "//br/pkg/lightning/config", "//br/pkg/logutil", "//br/pkg/redact", diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index 3f6271813d..cad86e6602 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/conn/util" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/logutil" pd "github.com/tikv/pd/client" @@ -73,6 +74,18 @@ type SplitClient interface { // SetStoresLabel add or update specified label of stores. If labelValue // is empty, it clears the label. SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error + // IsScatterRegionFinished check the latest successful operator and return the + // follow status: + // + // return (finished, needRescatter, error) + // + // if the latest operator is not `scatter-operator`, or its status is SUCCESS, + // it's likely that the scatter region operator is finished. + // + // if the latest operator is `scatter-operator` and its status is TIMEOUT or + // CANCEL, the needRescatter is true and the function caller needs to scatter + // this region again. + IsScatterRegionFinished(ctx context.Context, regionID uint64) (scatterDone bool, needRescatter bool, scatterErr error) } // pdClient is a wrapper of pd client, can be used by RegionSplitter. @@ -551,12 +564,53 @@ func (c *pdClient) SetStoresLabel( return nil } -func (c *pdClient) getPDAPIAddr() string { - addr := c.client.GetLeaderAddr() - if addr != "" && !strings.HasPrefix(addr, "http") { - addr = "http://" + addr +func (c *pdClient) IsScatterRegionFinished( + ctx context.Context, + regionID uint64, +) (scatterDone bool, needRescatter bool, scatterErr error) { + resp, err := c.GetOperator(ctx, regionID) + if err != nil { + if common.IsRetryableError(err) { + // retry in the next cycle + return false, false, nil + } + return false, false, errors.Trace(err) + } + return IsScatterRegionFinished(resp) +} + +// IsScatterRegionFinished checks whether the scatter region operator is +// finished. TODO(lance6716): hide this function after scatter logic is unified +// for BR and lightning. +func IsScatterRegionFinished(resp *pdpb.GetOperatorResponse) ( + scatterDone bool, + needRescatter bool, + scatterErr error, +) { + // Heartbeat may not be sent to PD + if respErr := resp.GetHeader().GetError(); respErr != nil { + if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { + return true, false, nil + } + return false, false, errors.Annotatef( + berrors.ErrPDInvalidResponse, + "get operator error: %s, error message: %s", + respErr.GetType(), + respErr.GetMessage(), + ) + } + // that 'scatter-operator' has finished + if string(resp.GetDesc()) != "scatter-region" { + return true, false, nil + } + switch resp.GetStatus() { + case pdpb.OperatorStatus_SUCCESS: + return true, false, nil + case pdpb.OperatorStatus_RUNNING: + return false, false, nil + default: + return false, true, nil } - return strings.TrimRight(addr, "/") } // CheckRegionEpoch check region epoch. diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 54f93d71ea..15a5f0a126 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -29,13 +29,14 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" - pdhttp "github.com/tikv/pd/client/http" "go.uber.org/multierr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type TestClient struct { + split.SplitClient + mu sync.RWMutex stores map[uint64]*metapb.Store regions map[uint64]*split.RegionInfo @@ -245,20 +246,12 @@ func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit return regions, nil } -func (c *TestClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (r *pdhttp.Rule, err error) { - return -} - -func (c *TestClient) SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error { - return nil -} - -func (c *TestClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { - return nil -} - -func (c *TestClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { - return nil +func (c *TestClient) IsScatterRegionFinished( + ctx context.Context, + regionID uint64, +) (scatterDone bool, needRescatter bool, scatterErr error) { + resp, _ := c.GetOperator(ctx, regionID) + return split.IsScatterRegionFinished(resp) } type assertRetryLessThanBackoffer struct { @@ -976,6 +969,7 @@ func TestSplitPoint2(t *testing.T) { } type fakeSplitClient struct { + split.SplitClient regions []*split.RegionInfo } @@ -994,33 +988,6 @@ func (f *fakeSplitClient) AppendRegion(startKey, endKey []byte) { }) } -func (*fakeSplitClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { - return nil, nil -} -func (*fakeSplitClient) GetRegion(ctx context.Context, key []byte) (*split.RegionInfo, error) { - return nil, nil -} -func (*fakeSplitClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) { - return nil, nil -} -func (*fakeSplitClient) SplitRegion(ctx context.Context, regionInfo *split.RegionInfo, key []byte) (*split.RegionInfo, error) { - return nil, nil -} -func (*fakeSplitClient) BatchSplitRegions(ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) { - return nil, nil -} -func (*fakeSplitClient) BatchSplitRegionsWithOrigin(ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte) (*split.RegionInfo, []*split.RegionInfo, error) { - return nil, nil, nil -} -func (*fakeSplitClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error { - return nil -} -func (*fakeSplitClient) ScatterRegions(ctx context.Context, regionInfo []*split.RegionInfo) error { - return nil -} -func (*fakeSplitClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) { - return nil, nil -} func (f *fakeSplitClient) ScanRegions(ctx context.Context, startKey, endKey []byte, limit int) ([]*split.RegionInfo, error) { result := make([]*split.RegionInfo, 0) count := 0 @@ -1035,16 +1002,6 @@ func (f *fakeSplitClient) ScanRegions(ctx context.Context, startKey, endKey []by } return result, nil } -func (*fakeSplitClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) (*pdhttp.Rule, error) { - return nil, nil -} -func (*fakeSplitClient) SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error { return nil } -func (*fakeSplitClient) DeletePlacementRule(ctx context.Context, groupID, ruleID string) error { - return nil -} -func (*fakeSplitClient) SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error { - return nil -} func TestGetRewriteTableID(t *testing.T) { var tableID int64 = 76