tikv: fix panic in RegionStore.accessStore (#20204)
This commit is contained in:
@ -554,6 +554,16 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload
|
||||
tikvRegionCacheCounterWithSendFail.Inc()
|
||||
r := c.getCachedRegionWithRLock(ctx.Region)
|
||||
if r != nil {
|
||||
peersNum := len(r.meta.Peers)
|
||||
if len(ctx.Meta.Peers) != peersNum {
|
||||
logutil.Logger(bo.ctx).Info("retry and refresh current ctx after send request fail and up/down stores length changed",
|
||||
zap.Stringer("current", ctx),
|
||||
zap.Bool("needReload", scheduleReload),
|
||||
zap.Reflect("oldPeers", ctx.Meta.Peers),
|
||||
zap.Reflect("newPeers", r.meta.Peers),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
rs := r.getStore()
|
||||
if err != nil {
|
||||
storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx)
|
||||
@ -1211,7 +1221,7 @@ func (r *Region) GetMeta() *metapb.Region {
|
||||
// GetLeaderPeerID returns leader peer ID.
|
||||
func (r *Region) GetLeaderPeerID() uint64 {
|
||||
store := r.getStore()
|
||||
if int(store.workTiKVIdx) >= len(r.meta.Peers) {
|
||||
if int(store.workTiKVIdx) >= store.accessStoreNum(TiKvOnly) {
|
||||
return 0
|
||||
}
|
||||
storeIdx, _ := store.accessStore(TiKvOnly, store.workTiKVIdx)
|
||||
@ -1221,7 +1231,7 @@ func (r *Region) GetLeaderPeerID() uint64 {
|
||||
// GetLeaderStoreID returns the store ID of the leader region.
|
||||
func (r *Region) GetLeaderStoreID() uint64 {
|
||||
store := r.getStore()
|
||||
if int(store.workTiKVIdx) >= len(r.meta.Peers) {
|
||||
if int(store.workTiKVIdx) >= store.accessStoreNum(TiKvOnly) {
|
||||
return 0
|
||||
}
|
||||
storeIdx, _ := store.accessStore(TiKvOnly, store.workTiKVIdx)
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/mockstore/mocktikv"
|
||||
pd "github.com/tikv/pd/client"
|
||||
)
|
||||
|
||||
type testRegionCacheSuite struct {
|
||||
@ -1079,6 +1080,42 @@ func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch(c *C) {
|
||||
c.Assert(followReqSeed, Equals, uint32(1))
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestPeersLenChange(c *C) {
|
||||
// 2 peers [peer1, peer2] and let peer2 become leader
|
||||
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
s.cache.UpdateLeader(loc.Region, s.store2, 0)
|
||||
|
||||
// current leader is peer2 in [peer1, peer2]
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(ctx.Peer.StoreId, Equals, s.store2)
|
||||
|
||||
// simulate peer1 became down in kv heartbeat and loaded before response back.
|
||||
cpMeta := &metapb.Region{
|
||||
Id: ctx.Meta.Id,
|
||||
StartKey: ctx.Meta.StartKey,
|
||||
EndKey: ctx.Meta.EndKey,
|
||||
RegionEpoch: ctx.Meta.RegionEpoch,
|
||||
Peers: make([]*metapb.Peer, len(ctx.Meta.Peers)),
|
||||
}
|
||||
copy(cpMeta.Peers, ctx.Meta.Peers)
|
||||
cpRegion := &pd.Region{
|
||||
Meta: cpMeta,
|
||||
DownPeers: []*metapb.Peer{{Id: s.peer1, StoreId: s.store1}},
|
||||
}
|
||||
filterUnavailablePeers(cpRegion)
|
||||
region := &Region{meta: cpRegion.Meta}
|
||||
err = region.init(s.cache)
|
||||
c.Assert(err, IsNil)
|
||||
s.cache.insertRegionToCache(region)
|
||||
|
||||
// OnSendFail should not panic
|
||||
s.cache.OnSendFail(NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
|
||||
}
|
||||
|
||||
func (s *testRegionRequestSuite) TestGetRegionByIDFromCache(c *C) {
|
||||
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
Reference in New Issue
Block a user