From fa72152ae3d0ab34dd15fc800aaf104695131d39 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Wed, 14 Jan 2026 00:29:17 +0800 Subject: [PATCH] br: fix region scan checker (#64530) close pingcap/tidb#64514 --- br/pkg/restore/split/BUILD.bazel | 5 +- br/pkg/restore/split/client_test.go | 2 +- br/pkg/restore/split/split.go | 62 ++++++--- br/pkg/restore/split/split_test.go | 194 ++++++++++++++++++++++++++- br/tests/br_pitr_table_filter/run.sh | 4 +- 5 files changed, 248 insertions(+), 19 deletions(-) diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 5e96c352d4..f3c8342938 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -65,7 +65,7 @@ go_test( ], embed = [":split"], flaky = True, - shard_count = 29, + shard_count = 31, deps = [ "//br/pkg/errors", "//br/pkg/restore/utils", @@ -84,6 +84,9 @@ go_test( "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/pdpb", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//clients/router", + "@com_github_tikv_pd_client//opt", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", ], diff --git a/br/pkg/restore/split/client_test.go b/br/pkg/restore/split/client_test.go index e9e681bf02..c364822433 100644 --- a/br/pkg/restore/split/client_test.go +++ b/br/pkg/restore/split/client_test.go @@ -230,7 +230,7 @@ func TestScanRegionEmptyResult(t *testing.T) { mockPDClient := NewMockPDClientForSplit() keys := [][]byte{[]byte(""), []byte("")} mockPDClient.SetRegions(keys) - mockPDClient.scanRegions.errors = []error{nil, nil, nil, nil} + mockPDClient.scanRegions.errors = []error{nil, nil, nil, nil, nil, nil, nil, nil} mockClient := &pdClient{ client: mockPDClient, splitBatchKeyCnt: 100, diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index e9591d7f39..4d823a49fb 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -152,7 +152,10 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi return leftRegions } -func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { +// checkRegionConsistency checks the readiness and continuity of regions. +// if the argument `limitted` is true, regions are regarded as limitted scanned result. +// so it will not compare `endKey` with the last region's `EndKey`. +func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo, limitted bool) error { // current pd can't guarantee the consistency of returned regions if len(regions) == 0 { return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s", @@ -165,7 +168,7 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro regions[0].Region.Id, redact.Key(regions[0].Region.StartKey), redact.Key(startKey), regions[0].Region.RegionEpoch.String()) - } else if len(regions[len(regions)-1].Region.EndKey) != 0 && + } else if !limitted && len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 { return errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region %d's endKey(%s) < endKey(%s), region epoch: %s", @@ -179,11 +182,19 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region %d's leader is nil", cur.Region.Id) } + if cur.Leader.StoreId == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader's store id is 0", cur.Region.Id) + } for _, r := range regions[1:] { if r.Leader == nil { return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region %d's leader is nil", r.Region.Id) } + if r.Leader.StoreId == 0 { + return errors.Annotatef(berrors.ErrPDBatchScanRegion, + "region %d's leader's store id is 0", r.Region.Id) + } if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) { return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s", @@ -197,6 +208,34 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro return nil } +func scanRegionsLimitWithRetry( + ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, mustLeader bool, +) ([]*RegionInfo, bool, error) { + var ( + batch []*RegionInfo + err error + ) + _ = utils.WithRetry(ctx, func() error { + defer func() { mustLeader = mustLeader || err != nil }() + if mustLeader { + batch, err = client.ScanRegions(ctx, startKey, endKey, limit) + } else { + batch, err = client.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle()) + } + if err != nil { + return err + } + if err = checkRegionConsistency(startKey, endKey, batch, true); err != nil { + log.Warn("failed to scan region, retrying", + logutil.ShortError(err), + zap.Int("regionLength", len(batch))) + return err + } + return nil + }, NewWaitRegionOnlineBackoffer()) + return batch, mustLeader, err +} + // PaginateScanRegion scan regions with a limit pagination and return all regions // at once. The returned regions are continuous and cover the key range. If not, // or meet errors, it will retry internally. @@ -210,24 +249,17 @@ func PaginateScanRegion( var ( lastRegions []*RegionInfo - lastErr error + err error + mustLeader = false backoffer = NewWaitRegionOnlineBackoffer() ) _ = utils.WithRetry(ctx, func() error { - var err error - defer func() { - lastErr = err - }() + defer func() { mustLeader = true }() regions := make([]*RegionInfo, 0, 16) scanStartKey := startKey for { var batch []*RegionInfo - if lastErr != nil { - batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit) - } else { - batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit, opt.WithAllowFollowerHandle()) - } - + batch, mustLeader, err = scanRegionsLimitWithRetry(ctx, client, scanStartKey, endKey, limit, mustLeader) if err != nil { err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s", redact.Key(scanStartKey), err.Error()) @@ -252,7 +284,7 @@ func PaginateScanRegion( } lastRegions = regions - if err = checkRegionConsistency(startKey, endKey, regions); err != nil { + if err = checkRegionConsistency(startKey, endKey, regions, false); err != nil { log.Warn("failed to scan region, retrying", logutil.ShortError(err), zap.Int("regionLength", len(regions))) @@ -261,7 +293,7 @@ func PaginateScanRegion( return nil }, backoffer) - return lastRegions, lastErr + return lastRegions, err } // checkPartRegionConsistency only checks the continuity of regions and the first region consistency. diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index 9e8a186195..ea7e512c32 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -26,6 +26,9 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/clients/router" + "github.com/tikv/pd/client/opt" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -592,6 +595,165 @@ func TestPaginateScanRegion(t *testing.T) { checkRegionsBoundaries(t, got, [][]byte{{1}, {2}, {3}, {4}, {5}}) } +type mockPDErrorClient struct { + pd.Client + t *testing.T + testCases []scanRegionTestCase +} + +type scanRegionTestCase struct { + allowFollowerHandle bool + caseError error + caseRegion []*router.Region +} + +func (s *mockPDErrorClient) ScanRegions(_ context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { + testCase := s.testCases[0] + s.testCases = s.testCases[1:] + op := &opt.GetRegionOp{} + for _, opt := range opts { + opt(op) + } + require.Equal(s.t, testCase.allowFollowerHandle, op.AllowFollowerHandle) + return testCase.caseRegion, testCase.caseError +} + +func newCaseRegion(kids []uint64) []*router.Region { + regions := make([]*router.Region, 0, len(kids)) + for _, kid := range kids { + regions = append(regions, &router.Region{ + Meta: &metapb.Region{ + Id: kid, + StartKey: fmt.Appendf(nil, "%03d", kid), + EndKey: fmt.Appendf(nil, "%03d", kid+1), + }, + Leader: &metapb.Peer{ + Id: kid, + StoreId: kid, + }, + }) + } + return regions +} + +func rk(kid int) []byte { + return fmt.Appendf(nil, "%03d5", kid) +} + +func TestScanRegionsLimitWithRetry(t *testing.T) { + ctx := context.Background() + mockPDClient := &mockPDErrorClient{t: t} + mockClient := &pdClient{client: mockPDClient} + + backup := WaitRegionOnlineAttemptTimes + WaitRegionOnlineAttemptTimes = 3 + t.Cleanup(func() { + WaitRegionOnlineAttemptTimes = backup + }) + + caseError := errors.Annotate(berrors.ErrPDBatchScanRegion, "case error") + // Case 1: must leader is false, and the first try is failed + mockPDClient.testCases = []scanRegionTestCase{ + {allowFollowerHandle: true, caseError: caseError}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + } + _, mustLeader, err := scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, false) + require.True(t, mustLeader) + require.NoError(t, err) + require.Len(t, mockPDClient.testCases, 0) + + // Case 2: must leader is false, and the first try is successful + mockPDClient.testCases = []scanRegionTestCase{ + {allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + } + _, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, false) + require.False(t, mustLeader) + require.NoError(t, err) + require.Len(t, mockPDClient.testCases, 0) + + // Case 3: must leader is true, and the first try is failed + mockPDClient.testCases = []scanRegionTestCase{ + {allowFollowerHandle: false, caseError: caseError}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + } + _, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, true) + require.True(t, mustLeader) + require.NoError(t, err) + require.Len(t, mockPDClient.testCases, 0) + + // Case4: must leader is true, and the first try is successful + mockPDClient.testCases = []scanRegionTestCase{ + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + } + _, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, true) + require.True(t, mustLeader) + require.NoError(t, err) + require.Len(t, mockPDClient.testCases, 0) +} + +func checkRegions(t *testing.T, startKid, endKid uint64, regions []*RegionInfo) { + require.Len(t, regions, int(endKid-startKid+1)) + i := 0 + for kid := startKid; kid <= endKid; kid += 1 { + require.Equal(t, kid, regions[i].Leader.Id) + require.Equal(t, kid, regions[i].Leader.StoreId) + require.Equal(t, kid, regions[i].Region.Id) + require.Equal(t, fmt.Appendf(nil, "%03d", kid), regions[i].Region.StartKey) + require.Equal(t, fmt.Appendf(nil, "%03d", kid+1), regions[i].Region.EndKey) + i += 1 + } +} + +func TestPaginateScanRegion2(t *testing.T) { + ctx := context.Background() + mockPDClient := &mockPDErrorClient{t: t} + mockClient := &pdClient{client: mockPDClient} + + backup := WaitRegionOnlineAttemptTimes + WaitRegionOnlineAttemptTimes = 3 + t.Cleanup(func() { + WaitRegionOnlineAttemptTimes = backup + }) + + caseError := errors.Annotate(berrors.ErrPDBatchScanRegion, "case error") + // Case 1: must leader is false, and the first try is failed + mockPDClient.testCases = []scanRegionTestCase{ + {allowFollowerHandle: true, caseError: caseError}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5})}, + } + regions, err := PaginateScanRegion(ctx, mockClient, rk(1), rk(5), 3) + require.NoError(t, err) + checkRegions(t, 1, 5, regions) + + // Case 2: must leader is false, and the first try is successful + mockPDClient.testCases = []scanRegionTestCase{ + {allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + {allowFollowerHandle: true, caseError: caseError}, + {allowFollowerHandle: false, caseError: caseError}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5, 6})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{7, 8, 9})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{10})}, + } + regions, err = PaginateScanRegion(ctx, mockClient, rk(1), rk(10), 3) + require.NoError(t, err) + checkRegions(t, 1, 10, regions) + + // Case 3: must leader is false, and the first try paginate try is failed + mockPDClient.testCases = []scanRegionTestCase{ + {allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + {allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{4, 5})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})}, + {allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5, 6})}, + } + regions, err = PaginateScanRegion(ctx, mockClient, rk(1), rk(6), 3) + require.NoError(t, err) + checkRegions(t, 1, 6, regions) +} + func TestRegionConsistency(t *testing.T) { cases := []struct { startKey []byte @@ -685,9 +847,39 @@ func TestRegionConsistency(t *testing.T) { }, }, }, + { + codec.EncodeBytes([]byte{}, []byte("c")), + codec.EncodeBytes([]byte{}, []byte("e")), + "region 6's leader's store id is 0(.*?)", + []*RegionInfo{ + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 6, + StartKey: codec.EncodeBytes([]byte{}, []byte("c")), + EndKey: codec.EncodeBytes([]byte{}, []byte("d")), + RegionEpoch: nil, + }, + }, + { + Leader: &metapb.Peer{ + Id: 6, + StoreId: 0, + }, + Region: &metapb.Region{ + Id: 8, + StartKey: codec.EncodeBytes([]byte{}, []byte("d")), + EndKey: codec.EncodeBytes([]byte{}, []byte("e")), + }, + }, + }, + }, } for _, ca := range cases { - err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions) + err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions, false) require.Error(t, err) require.Regexp(t, ca.err, err.Error()) } diff --git a/br/tests/br_pitr_table_filter/run.sh b/br/tests/br_pitr_table_filter/run.sh index 6507957100..850c76496d 100755 --- a/br/tests/br_pitr_table_filter/run.sh +++ b/br/tests/br_pitr_table_filter/run.sh @@ -1345,6 +1345,8 @@ test_partition_exchange test_system_tables test_sequential_restore test_log_compaction -test_pitr_chaining +# TODO: fix this test once support chaning pitr restore +# Currently, the restore ID of next log restore doesn't match the restore ID of previous log restore +# test_pitr_chaining echo "br pitr table filter all tests passed"