@ -65,7 +65,7 @@ go_test(
|
||||
],
|
||||
embed = [":split"],
|
||||
flaky = True,
|
||||
shard_count = 29,
|
||||
shard_count = 31,
|
||||
deps = [
|
||||
"//br/pkg/errors",
|
||||
"//br/pkg/restore/utils",
|
||||
@ -84,6 +84,9 @@ go_test(
|
||||
"@com_github_pingcap_kvproto//pkg/metapb",
|
||||
"@com_github_pingcap_kvproto//pkg/pdpb",
|
||||
"@com_github_stretchr_testify//require",
|
||||
"@com_github_tikv_pd_client//:client",
|
||||
"@com_github_tikv_pd_client//clients/router",
|
||||
"@com_github_tikv_pd_client//opt",
|
||||
"@org_golang_google_grpc//codes",
|
||||
"@org_golang_google_grpc//status",
|
||||
],
|
||||
|
||||
@ -230,7 +230,7 @@ func TestScanRegionEmptyResult(t *testing.T) {
|
||||
mockPDClient := NewMockPDClientForSplit()
|
||||
keys := [][]byte{[]byte(""), []byte("")}
|
||||
mockPDClient.SetRegions(keys)
|
||||
mockPDClient.scanRegions.errors = []error{nil, nil, nil, nil}
|
||||
mockPDClient.scanRegions.errors = []error{nil, nil, nil, nil, nil, nil, nil, nil}
|
||||
mockClient := &pdClient{
|
||||
client: mockPDClient,
|
||||
splitBatchKeyCnt: 100,
|
||||
|
||||
@ -152,7 +152,10 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi
|
||||
return leftRegions
|
||||
}
|
||||
|
||||
func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
|
||||
// checkRegionConsistency checks the readiness and continuity of regions.
|
||||
// if the argument `limitted` is true, regions are regarded as limitted scanned result.
|
||||
// so it will not compare `endKey` with the last region's `EndKey`.
|
||||
func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo, limitted bool) error {
|
||||
// current pd can't guarantee the consistency of returned regions
|
||||
if len(regions) == 0 {
|
||||
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s",
|
||||
@ -165,7 +168,7 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
|
||||
regions[0].Region.Id,
|
||||
redact.Key(regions[0].Region.StartKey), redact.Key(startKey),
|
||||
regions[0].Region.RegionEpoch.String())
|
||||
} else if len(regions[len(regions)-1].Region.EndKey) != 0 &&
|
||||
} else if !limitted && len(regions[len(regions)-1].Region.EndKey) != 0 &&
|
||||
bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
|
||||
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
||||
"last region %d's endKey(%s) < endKey(%s), region epoch: %s",
|
||||
@ -179,11 +182,19 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
|
||||
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
||||
"region %d's leader is nil", cur.Region.Id)
|
||||
}
|
||||
if cur.Leader.StoreId == 0 {
|
||||
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
||||
"region %d's leader's store id is 0", cur.Region.Id)
|
||||
}
|
||||
for _, r := range regions[1:] {
|
||||
if r.Leader == nil {
|
||||
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
||||
"region %d's leader is nil", r.Region.Id)
|
||||
}
|
||||
if r.Leader.StoreId == 0 {
|
||||
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
||||
"region %d's leader's store id is 0", r.Region.Id)
|
||||
}
|
||||
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
|
||||
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
|
||||
"region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s",
|
||||
@ -197,6 +208,34 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func scanRegionsLimitWithRetry(
|
||||
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, mustLeader bool,
|
||||
) ([]*RegionInfo, bool, error) {
|
||||
var (
|
||||
batch []*RegionInfo
|
||||
err error
|
||||
)
|
||||
_ = utils.WithRetry(ctx, func() error {
|
||||
defer func() { mustLeader = mustLeader || err != nil }()
|
||||
if mustLeader {
|
||||
batch, err = client.ScanRegions(ctx, startKey, endKey, limit)
|
||||
} else {
|
||||
batch, err = client.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle())
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = checkRegionConsistency(startKey, endKey, batch, true); err != nil {
|
||||
log.Warn("failed to scan region, retrying",
|
||||
logutil.ShortError(err),
|
||||
zap.Int("regionLength", len(batch)))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}, NewWaitRegionOnlineBackoffer())
|
||||
return batch, mustLeader, err
|
||||
}
|
||||
|
||||
// PaginateScanRegion scan regions with a limit pagination and return all regions
|
||||
// at once. The returned regions are continuous and cover the key range. If not,
|
||||
// or meet errors, it will retry internally.
|
||||
@ -210,24 +249,17 @@ func PaginateScanRegion(
|
||||
|
||||
var (
|
||||
lastRegions []*RegionInfo
|
||||
lastErr error
|
||||
err error
|
||||
mustLeader = false
|
||||
backoffer = NewWaitRegionOnlineBackoffer()
|
||||
)
|
||||
_ = utils.WithRetry(ctx, func() error {
|
||||
var err error
|
||||
defer func() {
|
||||
lastErr = err
|
||||
}()
|
||||
defer func() { mustLeader = true }()
|
||||
regions := make([]*RegionInfo, 0, 16)
|
||||
scanStartKey := startKey
|
||||
for {
|
||||
var batch []*RegionInfo
|
||||
if lastErr != nil {
|
||||
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
|
||||
} else {
|
||||
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit, opt.WithAllowFollowerHandle())
|
||||
}
|
||||
|
||||
batch, mustLeader, err = scanRegionsLimitWithRetry(ctx, client, scanStartKey, endKey, limit, mustLeader)
|
||||
if err != nil {
|
||||
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
|
||||
redact.Key(scanStartKey), err.Error())
|
||||
@ -252,7 +284,7 @@ func PaginateScanRegion(
|
||||
}
|
||||
lastRegions = regions
|
||||
|
||||
if err = checkRegionConsistency(startKey, endKey, regions); err != nil {
|
||||
if err = checkRegionConsistency(startKey, endKey, regions, false); err != nil {
|
||||
log.Warn("failed to scan region, retrying",
|
||||
logutil.ShortError(err),
|
||||
zap.Int("regionLength", len(regions)))
|
||||
@ -261,7 +293,7 @@ func PaginateScanRegion(
|
||||
return nil
|
||||
}, backoffer)
|
||||
|
||||
return lastRegions, lastErr
|
||||
return lastRegions, err
|
||||
}
|
||||
|
||||
// checkPartRegionConsistency only checks the continuity of regions and the first region consistency.
|
||||
|
||||
@ -26,6 +26,9 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/tablecodec"
|
||||
"github.com/pingcap/tidb/pkg/util/codec"
|
||||
"github.com/stretchr/testify/require"
|
||||
pd "github.com/tikv/pd/client"
|
||||
"github.com/tikv/pd/client/clients/router"
|
||||
"github.com/tikv/pd/client/opt"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@ -592,6 +595,165 @@ func TestPaginateScanRegion(t *testing.T) {
|
||||
checkRegionsBoundaries(t, got, [][]byte{{1}, {2}, {3}, {4}, {5}})
|
||||
}
|
||||
|
||||
type mockPDErrorClient struct {
|
||||
pd.Client
|
||||
t *testing.T
|
||||
testCases []scanRegionTestCase
|
||||
}
|
||||
|
||||
type scanRegionTestCase struct {
|
||||
allowFollowerHandle bool
|
||||
caseError error
|
||||
caseRegion []*router.Region
|
||||
}
|
||||
|
||||
func (s *mockPDErrorClient) ScanRegions(_ context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
|
||||
testCase := s.testCases[0]
|
||||
s.testCases = s.testCases[1:]
|
||||
op := &opt.GetRegionOp{}
|
||||
for _, opt := range opts {
|
||||
opt(op)
|
||||
}
|
||||
require.Equal(s.t, testCase.allowFollowerHandle, op.AllowFollowerHandle)
|
||||
return testCase.caseRegion, testCase.caseError
|
||||
}
|
||||
|
||||
func newCaseRegion(kids []uint64) []*router.Region {
|
||||
regions := make([]*router.Region, 0, len(kids))
|
||||
for _, kid := range kids {
|
||||
regions = append(regions, &router.Region{
|
||||
Meta: &metapb.Region{
|
||||
Id: kid,
|
||||
StartKey: fmt.Appendf(nil, "%03d", kid),
|
||||
EndKey: fmt.Appendf(nil, "%03d", kid+1),
|
||||
},
|
||||
Leader: &metapb.Peer{
|
||||
Id: kid,
|
||||
StoreId: kid,
|
||||
},
|
||||
})
|
||||
}
|
||||
return regions
|
||||
}
|
||||
|
||||
func rk(kid int) []byte {
|
||||
return fmt.Appendf(nil, "%03d5", kid)
|
||||
}
|
||||
|
||||
func TestScanRegionsLimitWithRetry(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mockPDClient := &mockPDErrorClient{t: t}
|
||||
mockClient := &pdClient{client: mockPDClient}
|
||||
|
||||
backup := WaitRegionOnlineAttemptTimes
|
||||
WaitRegionOnlineAttemptTimes = 3
|
||||
t.Cleanup(func() {
|
||||
WaitRegionOnlineAttemptTimes = backup
|
||||
})
|
||||
|
||||
caseError := errors.Annotate(berrors.ErrPDBatchScanRegion, "case error")
|
||||
// Case 1: must leader is false, and the first try is failed
|
||||
mockPDClient.testCases = []scanRegionTestCase{
|
||||
{allowFollowerHandle: true, caseError: caseError},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
}
|
||||
_, mustLeader, err := scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, false)
|
||||
require.True(t, mustLeader)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, mockPDClient.testCases, 0)
|
||||
|
||||
// Case 2: must leader is false, and the first try is successful
|
||||
mockPDClient.testCases = []scanRegionTestCase{
|
||||
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
}
|
||||
_, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, false)
|
||||
require.False(t, mustLeader)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, mockPDClient.testCases, 0)
|
||||
|
||||
// Case 3: must leader is true, and the first try is failed
|
||||
mockPDClient.testCases = []scanRegionTestCase{
|
||||
{allowFollowerHandle: false, caseError: caseError},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
}
|
||||
_, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, true)
|
||||
require.True(t, mustLeader)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, mockPDClient.testCases, 0)
|
||||
|
||||
// Case4: must leader is true, and the first try is successful
|
||||
mockPDClient.testCases = []scanRegionTestCase{
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
}
|
||||
_, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, true)
|
||||
require.True(t, mustLeader)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, mockPDClient.testCases, 0)
|
||||
}
|
||||
|
||||
func checkRegions(t *testing.T, startKid, endKid uint64, regions []*RegionInfo) {
|
||||
require.Len(t, regions, int(endKid-startKid+1))
|
||||
i := 0
|
||||
for kid := startKid; kid <= endKid; kid += 1 {
|
||||
require.Equal(t, kid, regions[i].Leader.Id)
|
||||
require.Equal(t, kid, regions[i].Leader.StoreId)
|
||||
require.Equal(t, kid, regions[i].Region.Id)
|
||||
require.Equal(t, fmt.Appendf(nil, "%03d", kid), regions[i].Region.StartKey)
|
||||
require.Equal(t, fmt.Appendf(nil, "%03d", kid+1), regions[i].Region.EndKey)
|
||||
i += 1
|
||||
}
|
||||
}
|
||||
|
||||
func TestPaginateScanRegion2(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mockPDClient := &mockPDErrorClient{t: t}
|
||||
mockClient := &pdClient{client: mockPDClient}
|
||||
|
||||
backup := WaitRegionOnlineAttemptTimes
|
||||
WaitRegionOnlineAttemptTimes = 3
|
||||
t.Cleanup(func() {
|
||||
WaitRegionOnlineAttemptTimes = backup
|
||||
})
|
||||
|
||||
caseError := errors.Annotate(berrors.ErrPDBatchScanRegion, "case error")
|
||||
// Case 1: must leader is false, and the first try is failed
|
||||
mockPDClient.testCases = []scanRegionTestCase{
|
||||
{allowFollowerHandle: true, caseError: caseError},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5})},
|
||||
}
|
||||
regions, err := PaginateScanRegion(ctx, mockClient, rk(1), rk(5), 3)
|
||||
require.NoError(t, err)
|
||||
checkRegions(t, 1, 5, regions)
|
||||
|
||||
// Case 2: must leader is false, and the first try is successful
|
||||
mockPDClient.testCases = []scanRegionTestCase{
|
||||
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
{allowFollowerHandle: true, caseError: caseError},
|
||||
{allowFollowerHandle: false, caseError: caseError},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5, 6})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{7, 8, 9})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{10})},
|
||||
}
|
||||
regions, err = PaginateScanRegion(ctx, mockClient, rk(1), rk(10), 3)
|
||||
require.NoError(t, err)
|
||||
checkRegions(t, 1, 10, regions)
|
||||
|
||||
// Case 3: must leader is false, and the first try paginate try is failed
|
||||
mockPDClient.testCases = []scanRegionTestCase{
|
||||
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{4, 5})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
|
||||
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5, 6})},
|
||||
}
|
||||
regions, err = PaginateScanRegion(ctx, mockClient, rk(1), rk(6), 3)
|
||||
require.NoError(t, err)
|
||||
checkRegions(t, 1, 6, regions)
|
||||
}
|
||||
|
||||
func TestRegionConsistency(t *testing.T) {
|
||||
cases := []struct {
|
||||
startKey []byte
|
||||
@ -685,9 +847,39 @@ func TestRegionConsistency(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
codec.EncodeBytes([]byte{}, []byte("c")),
|
||||
codec.EncodeBytes([]byte{}, []byte("e")),
|
||||
"region 6's leader's store id is 0(.*?)",
|
||||
[]*RegionInfo{
|
||||
{
|
||||
Leader: &metapb.Peer{
|
||||
Id: 6,
|
||||
StoreId: 0,
|
||||
},
|
||||
Region: &metapb.Region{
|
||||
Id: 6,
|
||||
StartKey: codec.EncodeBytes([]byte{}, []byte("c")),
|
||||
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
|
||||
RegionEpoch: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
Leader: &metapb.Peer{
|
||||
Id: 6,
|
||||
StoreId: 0,
|
||||
},
|
||||
Region: &metapb.Region{
|
||||
Id: 8,
|
||||
StartKey: codec.EncodeBytes([]byte{}, []byte("d")),
|
||||
EndKey: codec.EncodeBytes([]byte{}, []byte("e")),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, ca := range cases {
|
||||
err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions)
|
||||
err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions, false)
|
||||
require.Error(t, err)
|
||||
require.Regexp(t, ca.err, err.Error())
|
||||
}
|
||||
|
||||
@ -1345,6 +1345,8 @@ test_partition_exchange
|
||||
test_system_tables
|
||||
test_sequential_restore
|
||||
test_log_compaction
|
||||
test_pitr_chaining
|
||||
# TODO: fix this test once support chaning pitr restore
|
||||
# Currently, the restore ID of next log restore doesn't match the restore ID of previous log restore
|
||||
# test_pitr_chaining
|
||||
|
||||
echo "br pitr table filter all tests passed"
|
||||
|
||||
Reference in New Issue
Block a user