tikv: fix notLeader in region cache (#10572)

This commit is contained in:
lysu
2019-05-23 20:23:06 +08:00
committed by Jack Yu
parent 9071ab9131
commit eb38947c81
4 changed files with 286 additions and 390 deletions

View File

@ -23,7 +23,6 @@ import (
"unsafe"
"github.com/google/btree"
"github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/client"
@ -36,7 +35,6 @@ const (
btreeDegree = 32
rcDefaultRegionCacheTTLSec = 600
invalidatedLastAccessTime = -1
reloadRegionThreshold = 5
)
var (
@ -65,17 +63,15 @@ type Region struct {
// RegionStore represents region stores info
// it will be store as unsafe.Pointer and be load at once
type RegionStore struct {
workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx)
stores []*Store // stores in this region
attemptAfterLoad uint8 // indicate switch peer attempts after load region info
workStoreIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx)
stores []*Store // stores in this region
}
// clone clones region store struct.
func (r *RegionStore) clone() *RegionStore {
return &RegionStore{
workStoreIdx: r.workStoreIdx,
stores: r.stores,
attemptAfterLoad: r.attemptAfterLoad,
workStoreIdx: r.workStoreIdx,
stores: r.stores,
}
}
@ -210,8 +206,8 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
state := store.getState()
if state.resolveState == needCheck {
state := store.getResolveState()
if state == needCheck {
needCheckStores = append(needCheckStores, store)
}
}
@ -224,11 +220,12 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {
// RPCContext contains data that is needed to send RPC to a region.
type RPCContext struct {
Region RegionVerID
Meta *metapb.Region
Peer *metapb.Peer
Store *Store
Addr string
Region RegionVerID
Meta *metapb.Region
Peer *metapb.Peer
PeerIdx int
Store *Store
Addr string
}
// GetStoreID returns StoreID.
@ -240,8 +237,8 @@ func (c *RPCContext) GetStoreID() uint64 {
}
func (c *RPCContext) String() string {
return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s",
c.Region.GetID(), c.Meta, c.Peer, c.Addr)
return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d",
c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.PeerIdx)
}
// GetRPCContext returns RPCContext for a region. If it returns nil, the region
@ -258,7 +255,9 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext,
return nil, nil
}
store, peer, addr, err := c.routeStoreInRegion(bo, cachedRegion, ts)
regionStore := cachedRegion.getStore()
store, peer, storeIdx := cachedRegion.WorkStorePeer(regionStore)
addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx)
if err != nil {
return nil, err
}
@ -269,11 +268,12 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext,
}
return &RPCContext{
Region: id,
Meta: cachedRegion.meta,
Peer: peer,
Store: store,
Addr: addr,
Region: id,
Meta: cachedRegion.meta,
Peer: peer,
PeerIdx: storeIdx,
Store: store,
Addr: addr,
}, nil
}
@ -358,20 +358,43 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool)
return r, nil
}
// OnSendFail handles send request fail logic.
func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool) {
r := c.getCachedRegionWithRLock(ctx.Region)
if r != nil {
c.switchNextPeer(r, ctx.PeerIdx)
if scheduleReload {
r.scheduleReload()
}
}
}
// LocateRegionByID searches for the region with ID.
func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error) {
c.mu.RLock()
r := c.getRegionByIDFromCache(regionID)
c.mu.RUnlock()
if r != nil {
if r.needReload() {
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.ctx).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
} else {
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.mu.Unlock()
}
}
loc := &KeyLocation{
Region: r.VerID(),
StartKey: r.StartKey(),
EndKey: r.EndKey(),
}
c.mu.RUnlock()
return loc, nil
}
c.mu.RUnlock()
r, err := c.loadRegionByID(bo, regionID)
if err != nil {
@ -439,7 +462,7 @@ func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) {
}
// UpdateLeader update some region cache with newer leader info.
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) {
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, currentPeerIdx int) {
r := c.getCachedRegionWithRLock(regionID)
if r == nil {
logutil.Logger(context.Background()).Debug("regionCache: cannot find region when updating leader",
@ -447,7 +470,13 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) {
zap.Uint64("leaderStoreID", leaderStoreID))
return
}
if !c.switchWorkStore(r, leaderStoreID) {
if leaderStoreID == 0 {
c.switchNextPeer(r, currentPeerIdx)
return
}
if !c.switchToPeer(r, leaderStoreID) {
logutil.Logger(context.Background()).Debug("regionCache: cannot find peer when updating leader",
zap.Uint64("regionID", regionID.GetID()),
zap.Uint64("leaderStoreID", leaderStoreID))
@ -486,9 +515,6 @@ func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region {
})
c.mu.RUnlock()
if r != nil && (!isEndKey && r.Contains(key) || isEndKey && r.ContainsByEnd(key)) {
if !c.hasAvailableStore(r, ts) {
return nil
}
return r
}
return nil
@ -549,7 +575,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
region := &Region{meta: meta}
region.init(c)
if leader != nil {
c.switchWorkStore(region, leader.StoreId)
c.switchToPeer(region, leader.StoreId)
}
return region, nil
}
@ -585,7 +611,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
region := &Region{meta: meta}
region.init(c)
if leader != nil {
c.switchWorkStore(region, leader.GetStoreId())
c.switchToPeer(region, leader.GetStoreId())
}
return region, nil
}
@ -598,66 +624,9 @@ func (c *RegionCache) getCachedRegionWithRLock(regionID RegionVerID) (r *Region)
return
}
// routeStoreInRegion ensures region have workable store and return it.
func (c *RegionCache) routeStoreInRegion(bo *Backoffer, region *Region, ts int64) (workStore *Store, workPeer *metapb.Peer, workAddr string, err error) {
retry:
regionStore := region.getStore()
cachedStore, cachedPeer, cachedIdx := region.WorkStorePeer(regionStore)
// Most of the time, requests are directly routed to stable leader.
// returns if store is stable leader and no need retry other node.
state := cachedStore.getState()
if cachedStore != nil && state.failedAttempt == 0 && state.lastFailedTime == 0 {
workStore = cachedStore
workAddr, err = c.getStoreAddr(bo, region, workStore, cachedIdx, state)
workPeer = cachedPeer
return
}
// try round-robin find & switch to other peers when old leader meet error.
newIdx := -1
storeNum := len(regionStore.stores)
i := (cachedIdx + 1) % storeNum
start := i
for {
store := regionStore.stores[i]
state = store.getState()
if state.Available(ts) {
newIdx = i
break
}
i = (i + 1) % storeNum
if i == start {
break
}
}
if newIdx < 0 {
return
}
newRegionStore := regionStore.clone()
newRegionStore.workStoreIdx = int32(newIdx)
newRegionStore.attemptAfterLoad++
attemptOverThreshold := newRegionStore.attemptAfterLoad == reloadRegionThreshold
if attemptOverThreshold {
newRegionStore.attemptAfterLoad = 0
}
if !region.compareAndSwapStore(regionStore, newRegionStore) {
goto retry
}
// reload region info after attempts more than reloadRegionThreshold
if attemptOverThreshold {
region.scheduleReload()
}
workStore = newRegionStore.stores[newIdx]
workAddr, err = c.getStoreAddr(bo, region, workStore, newIdx, state)
workPeer = region.meta.Peers[newIdx]
return
}
func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, storeIdx int, state storeState) (addr string, err error) {
switch state.resolveState {
func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, storeIdx int) (addr string, err error) {
state := store.getResolveState()
switch state {
case resolved, needCheck:
addr = store.addr
return
@ -695,19 +664,6 @@ func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx
return
}
// hasAvailableStore checks whether region has available store.
// different to `routeStoreInRegion` just check and never change work store or peer.
func (c *RegionCache) hasAvailableStore(region *Region, ts int64) bool {
regionStore := region.getStore()
for _, store := range regionStore.stores {
state := store.getState()
if state.Available(ts) {
return true
}
}
return false
}
func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) {
var ok bool
c.storeMu.Lock()
@ -722,18 +678,6 @@ func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) {
return
}
// OnSendRequestFail is used for clearing cache when a tikv server does not respond.
func (c *RegionCache) OnSendRequestFail(ctx *RPCContext, err error) {
failedStoreID := ctx.Store.storeID
c.storeMu.RLock()
store, exists := c.storeMu.stores[failedStoreID]
c.storeMu.RUnlock()
if !exists {
return
}
store.markAccess(c.notifyCheckCh, false)
}
// OnRegionEpochNotMatch removes the old region and inserts new regions into the cache.
func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error {
// Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff.
@ -759,7 +703,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr
}
region := &Region{meta: meta}
region.init(c)
c.switchWorkStore(region, ctx.Store.storeID)
c.switchToPeer(region, ctx.Store.storeID)
c.insertRegionToCache(region)
if ctx.Region == region.VerID() {
needInvalidateOld = false
@ -847,22 +791,40 @@ func (r *Region) EndKey() []byte {
return r.meta.EndKey
}
// switchWorkStore switches current store to the one on specific store. It returns
// switchToPeer switches current store to the one on specific store. It returns
// false if no peer matches the storeID.
func (c *RegionCache) switchWorkStore(r *Region, targetStoreID uint64) (switchToTarget bool) {
func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) {
leaderIdx, found := c.getPeerStoreIndex(r, targetStoreID)
c.switchWorkIdx(r, leaderIdx)
return
}
func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int) {
regionStore := r.getStore()
if int(regionStore.workStoreIdx) != currentPeerIdx {
return
}
nextIdx := (currentPeerIdx + 1) % len(regionStore.stores)
newRegionStore := regionStore.clone()
newRegionStore.workStoreIdx = int32(nextIdx)
r.compareAndSwapStore(regionStore, newRegionStore)
}
func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) {
if len(r.meta.Peers) == 0 {
return
}
leaderIdx := 0
for i, p := range r.meta.Peers {
if p.GetStoreId() == targetStoreID {
leaderIdx = i
switchToTarget = true
break
if p.GetStoreId() == id {
idx = i
found = true
return
}
}
return
}
func (c *RegionCache) switchWorkIdx(r *Region, leaderIdx int) {
retry:
// switch to new leader.
oldRegionStore := r.getStore()
@ -900,15 +862,7 @@ type Store struct {
resolveMutex sync.Mutex // protect pd from concurrent init requests
}
// storeState contains store's access info.
type storeState struct {
lastFailedTime uint32
failedAttempt uint16
resolveState resolveState
_Align int8
}
type resolveState uint8
type resolveState uint64
const (
unresolved resolveState = iota
@ -920,9 +874,9 @@ const (
// initResolve resolves addr for store that never resolved.
func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) {
s.resolveMutex.Lock()
state := s.getState()
state := s.getResolveState()
defer s.resolveMutex.Unlock()
if state.resolveState != unresolved {
if state != unresolved {
addr = s.addr
return
}
@ -951,14 +905,12 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
addr = store.GetAddress()
s.addr = addr
retry:
state = s.getState()
if state.resolveState != unresolved {
state = s.getResolveState()
if state != unresolved {
addr = s.addr
return
}
newState := state
newState.resolveState = resolved
if !s.compareAndSwapState(state, newState) {
if !s.compareAndSwapState(state, resolved) {
goto retry
}
return
@ -985,8 +937,7 @@ func (s *Store) reResolve(c *RegionCache) {
addr = store.GetAddress()
if s.addr != addr {
var state storeState
state.resolveState = resolved
state := resolved
newStore := &Store{storeID: s.storeID, addr: addr}
newStore.state = *(*uint64)(unsafe.Pointer(&state))
c.storeMu.Lock()
@ -994,120 +945,48 @@ func (s *Store) reResolve(c *RegionCache) {
c.storeMu.Unlock()
retryMarkDel:
// all region used those
oldState := s.getState()
if oldState.resolveState == deleted {
oldState := s.getResolveState()
if oldState == deleted {
return
}
newState := oldState
newState.resolveState = deleted
newState := deleted
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkDel
}
return
}
retryMarkResolved:
oldState := s.getState()
if oldState.resolveState != needCheck {
oldState := s.getResolveState()
if oldState != needCheck {
return
}
newState := oldState
newState.resolveState = resolved
newState := resolved
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkResolved
}
return
}
const (
// maxExponentAttempt before this blackout time is exponent increment.
maxExponentAttempt = 10
// startBlackoutAfterAttempt after continue fail attempts start blackout store.
startBlackoutAfterAttempt = 20
)
func (s *Store) getState() storeState {
var state storeState
func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
return state
}
x := atomic.LoadUint64(&s.state)
*(*uint64)(unsafe.Pointer(&state)) = x
return state
return resolveState(atomic.LoadUint64(&s.state))
}
func (s *Store) compareAndSwapState(oldState, newState storeState) bool {
oldValue, newValue := *(*uint64)(unsafe.Pointer(&oldState)), *(*uint64)(unsafe.Pointer(&newState))
return atomic.CompareAndSwapUint64(&s.state, oldValue, newValue)
}
func (s *Store) storeState(newState storeState) {
newValue := *(*uint64)(unsafe.Pointer(&newState))
atomic.StoreUint64(&s.state, newValue)
}
// Available returns whether store be available for current.
func (state storeState) Available(ts int64) bool {
if state.failedAttempt == 0 || state.lastFailedTime == 0 {
// return quickly if it's continue success.
return true
}
// first `startBlackoutAfterAttempt` can retry immediately.
if state.failedAttempt < startBlackoutAfterAttempt {
return true
}
// continue fail over than `startBlackoutAfterAttempt` start blackout store logic.
// check blackout time window to determine store's reachable.
if state.failedAttempt > startBlackoutAfterAttempt+maxExponentAttempt {
state.failedAttempt = startBlackoutAfterAttempt + maxExponentAttempt
}
blackoutDeadline := int64(state.lastFailedTime) + 1*int64(backoffutils.ExponentBase2(uint(state.failedAttempt-startBlackoutAfterAttempt+1)))
return blackoutDeadline <= ts
}
// markAccess marks the processing result.
func (s *Store) markAccess(notifyCheckCh chan struct{}, success bool) {
retry:
var triggerCheck bool
oldState := s.getState()
if (oldState.failedAttempt == 0 && success) || (!success && oldState.failedAttempt >= (startBlackoutAfterAttempt+maxExponentAttempt)) {
// return quickly if continue success, and no more mark when attempt meet max bound.
return
}
state := oldState
if !success {
if state.lastFailedTime == 0 {
state.lastFailedTime = uint32(time.Now().Unix())
}
state.failedAttempt = state.failedAttempt + 1
if state.resolveState == resolved {
state.resolveState = needCheck
triggerCheck = true
}
} else {
state.lastFailedTime = 0
state.failedAttempt = 0
}
if !s.compareAndSwapState(oldState, state) {
goto retry
}
if triggerCheck {
select {
case notifyCheckCh <- struct{}{}:
default:
}
}
func (s *Store) compareAndSwapState(oldState, newState resolveState) bool {
return atomic.CompareAndSwapUint64(&s.state, uint64(oldState), uint64(newState))
}
// markNeedCheck marks resolved store to be async resolve to check store addr change.
func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) {
retry:
oldState := s.getState()
if oldState.resolveState != resolved {
oldState := s.getResolveState()
if oldState != resolved {
return
}
state := oldState
state.resolveState = needCheck
if !s.compareAndSwapState(oldState, state) {
if !s.compareAndSwapState(oldState, needCheck) {
goto retry
}
select {

View File

@ -15,7 +15,6 @@ package tikv
import (
"context"
"errors"
"fmt"
"testing"
"time"
@ -63,58 +62,32 @@ func (s *testRegionCacheSuite) storeAddr(id uint64) string {
func (s *testRegionCacheSuite) checkCache(c *C, len int) {
ts := time.Now().Unix()
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, len)
c.Assert(workableRegionsInBtree(s.cache, s.cache.mu.sorted, ts), Equals, len)
for _, r := range s.cache.mu.regions {
if r.checkRegionCacheTTL(ts) {
bo := NewBackoffer(context.Background(), 100)
if store, _, _, _ := s.cache.routeStoreInRegion(bo, r, ts); store != nil {
c.Assert(r, DeepEquals, s.cache.searchCachedRegion(r.StartKey(), false))
}
}
}
c.Assert(validRegions(s.cache.mu.regions, ts), Equals, len)
c.Assert(validRegionsInBtree(s.cache.mu.sorted, ts), Equals, len)
}
func workableRegions(cache *RegionCache, regions map[RegionVerID]*Region, ts int64) (len int) {
func validRegions(regions map[RegionVerID]*Region, ts int64) (len int) {
for _, region := range regions {
if !region.checkRegionCacheTTL(ts) {
continue
}
bo := NewBackoffer(context.Background(), 100)
store, _, _, _ := cache.routeStoreInRegion(bo, region, ts)
if store != nil {
len++
}
len++
}
return
}
func workableRegionsInBtree(cache *RegionCache, t *btree.BTree, ts int64) (len int) {
func validRegionsInBtree(t *btree.BTree, ts int64) (len int) {
t.Descend(func(item btree.Item) bool {
r := item.(*btreeItem).cachedRegion
if !r.checkRegionCacheTTL(ts) {
return true
}
bo := NewBackoffer(context.Background(), 100)
store, _, _, _ := cache.routeStoreInRegion(bo, r, ts)
if store != nil {
len++
}
len++
return true
})
return
}
func reachableStore(stores map[uint64]*Store, ts int64) (cnt int) {
for _, store := range stores {
state := store.getState()
if state.Available(ts) {
cnt++
}
}
return
}
func (s *testRegionCacheSuite) getRegion(c *C, key []byte) *Region {
_, err := s.cache.LocateKey(s.bo, key)
c.Assert(err, IsNil)
@ -182,7 +155,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) {
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
// tikv-server reports `NotLeader`
s.cache.UpdateLeader(loc.Region, s.store2)
s.cache.UpdateLeader(loc.Region, s.store2, 0)
r := s.getRegion(c, []byte("a"))
c.Assert(r, NotNil)
@ -204,7 +177,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
// tikv-server reports `NotLeader`
s.cache.UpdateLeader(loc.Region, store3)
s.cache.UpdateLeader(loc.Region, store3, 0)
// Store3 does not exist in cache, causes a reload from PD.
r := s.getRegion(c, []byte("a"))
@ -215,7 +188,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
// tikv-server notifies new leader to pd-server.
s.cluster.ChangeLeader(s.region1, peer3)
// tikv-server reports `NotLeader` again.
s.cache.UpdateLeader(r.VerID(), store3)
s.cache.UpdateLeader(r.VerID(), store3, 0)
r = s.getRegion(c, []byte("a"))
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, s.region1)
@ -236,7 +209,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) {
// tikv-server notifies new leader to pd-server.
s.cluster.ChangeLeader(s.region1, peer3)
// tikv-server reports `NotLeader`(store2 is the leader)
s.cache.UpdateLeader(loc.Region, s.store2)
s.cache.UpdateLeader(loc.Region, s.store2, 0)
// Store2 does not exist any more, causes a reload from PD.
r := s.getRegion(c, []byte("a"))
@ -250,6 +223,104 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) {
c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3))
}
func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
ctx, err := s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer1)
c.Assert(len(ctx.Meta.Peers), Equals, 3)
// send fail leader switch to 2
s.cache.OnSendFail(s.bo, ctx, false)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer2)
// access 1 it will return NotLeader, leader back to 2 again
s.cache.UpdateLeader(loc.Region, s.store2, ctx.PeerIdx)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer2)
}
func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
ctx, err := s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer1)
c.Assert(len(ctx.Meta.Peers), Equals, 3)
// send fail leader switch to 2
s.cache.OnSendFail(s.bo, ctx, false)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer2)
// access 2, it's in hibernate and return 0 leader, so switch to 3
s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, peer3)
// again peer back to 1
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer1)
}
func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) {
// 3 nodes and no.1 is leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
ctx, err := s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer1)
c.Assert(len(ctx.Meta.Peers), Equals, 3)
// send fail leader switch to 2
s.cache.OnSendFail(s.bo, ctx, false)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer2)
// send 2 fail leader switch to 3
s.cache.OnSendFail(s.bo, ctx, false)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, peer3)
// 3 can be access, so switch to 1
s.cache.UpdateLeader(loc.Region, s.store1, ctx.PeerIdx)
ctx, err = s.cache.GetRPCContext(s.bo, loc.Region)
c.Assert(err, IsNil)
c.Assert(ctx.Peer.Id, Equals, s.peer1)
}
func (s *testRegionCacheSuite) TestSplit(c *C) {
r := s.getRegion(c, []byte("x"))
c.Assert(r.GetID(), Equals, s.region1)
@ -311,86 +382,6 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) {
s.checkCache(c, 1)
}
func (s *testRegionCacheSuite) TestSendFailBlackout(c *C) {
ts := time.Now().Unix()
region := s.getRegion(c, []byte("a"))
// init with 1 region 2 stores
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1)
c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2)
// for each stores has 20 chance to retry, and still have chance to access store for 21
for i := 0; i < 38; i++ {
ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID())
if ctx == nil {
fmt.Println()
}
s.cache.OnSendRequestFail(ctx, errors.New("test error"))
}
ts = time.Now().Unix()
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1)
c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2)
// 21 fail attempts will start blackout store in 1 second
for i := 0; i < 2; i++ {
// first fail request make 1st store' failAttempt + 1
ctx, _ := s.cache.GetRPCContext(s.bo, region.VerID())
s.cache.OnSendRequestFail(ctx, errors.New("test error"))
}
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 0)
c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 0)
// after 1 second blackout, 2 store can be access again.
time.Sleep(1 * time.Second)
ts = time.Now().Unix()
s.getRegion(c, []byte("a"))
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 1)
c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2)
}
func (s *testRegionCacheSuite) TestSendFailBlackTwoRegion(c *C) {
ts := time.Now().Unix()
// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := s.cluster.AllocIDs(2)
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])
// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)
loc2, err := s.cache.LocateKey(s.bo, []byte("x"))
c.Assert(err, IsNil)
c.Assert(loc2.Region.id, Equals, region2)
c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2)
s.checkCache(c, 2)
// send request fail in 2 regions backed by same 2 stores.
for i := 0; i < startBlackoutAfterAttempt; i++ {
ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region)
s.cache.OnSendRequestFail(ctx, errors.New("test error"))
}
for i := 0; i < startBlackoutAfterAttempt; i++ {
ctx, _ := s.cache.GetRPCContext(s.bo, loc2.Region)
s.cache.OnSendRequestFail(ctx, errors.New("test error"))
}
// both 2 region are invalidate and both 2 store are available.
c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 0)
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 0)
// after sleep 1 second, region recover
time.Sleep(1 * time.Second)
ts = time.Now().Unix()
c.Assert(reachableStore(s.cache.storeMu.stores, ts), Equals, 2)
s.getRegion(c, []byte("a"))
s.getRegion(c, []byte("x"))
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 2)
c.Assert(workableRegions(s.cache, s.cache.mu.regions, ts), Equals, 2)
}
func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) {
// Create a separated region cache to do this test.
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
@ -414,31 +405,6 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) {
c.Assert(len(bo.errors), Equals, 2)
}
func (s *testRegionCacheSuite) TestDropStoreOnSendRequestFail(c *C) {
ts := time.Now().Unix()
regionCnt, storeCount := 8, 3
cluster := createClusterWithStoresAndRegions(regionCnt, storeCount)
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
defer cache.Close()
loadRegionsToCache(cache, regionCnt)
c.Assert(workableRegions(cache, cache.mu.regions, ts), Equals, regionCnt)
bo := NewBackoffer(context.Background(), 1)
loc, err := cache.LocateKey(bo, []byte{})
c.Assert(err, IsNil)
// fail on one region make all stores be unavailable.
for j := 0; j < 20; j++ {
for i := 0; i < storeCount; i++ {
rpcCtx, err := cache.GetRPCContext(bo, loc.Region)
c.Assert(err, IsNil)
cache.OnSendRequestFail(rpcCtx, errors.New("test error"))
}
}
c.Assert(workableRegions(cache, cache.mu.regions, ts), Equals, 0)
}
const regionSplitKeyFormat = "t%08d"
func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster {
@ -563,16 +529,20 @@ func BenchmarkOnRequestFail(b *testing.B) {
region := cache.getRegionByIDFromCache(loc.Region.id)
b.ResetTimer()
regionStore := region.getStore()
store, peer, _ := region.WorkStorePeer(regionStore)
store, peer, idx := region.WorkStorePeer(regionStore)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
rpcCtx := &RPCContext{
Region: loc.Region,
Meta: region.meta,
Peer: peer,
Store: store,
Region: loc.Region,
Meta: region.meta,
PeerIdx: idx,
Peer: peer,
Store: store,
}
r := cache.getCachedRegionWithRLock(rpcCtx.Region)
if r == nil {
cache.switchNextPeer(r, rpcCtx.PeerIdx)
}
cache.OnSendRequestFail(rpcCtx, nil)
}
})
if len(cache.mu.regions) != regionCnt*2/3 {

View File

@ -52,10 +52,11 @@ var ShuttingDown uint32
// errors, since region range have changed, the request may need to split, so we
// simply return the error to caller.
type RegionRequestSender struct {
regionCache *RegionCache
client Client
storeAddr string
rpcError error
regionCache *RegionCache
client Client
storeAddr string
rpcError error
failStoreIDs map[uint64]struct{}
}
// NewRegionRequestSender creates a new sender.
@ -149,15 +150,9 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
}
return nil, true, nil
}
s.onSendSuccess(ctx)
return
}
func (s *RegionRequestSender) onSendSuccess(ctx *RPCContext) {
store := s.regionCache.getStoreByStoreID(ctx.Store.storeID)
store.markAccess(s.regionCache.notifyCheckCh, true)
}
func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled {
@ -177,7 +172,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
}
}
s.regionCache.OnSendRequestFail(ctx, err)
s.regionCache.OnSendFail(bo, ctx, s.needReloadRegion(ctx))
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
@ -187,6 +182,19 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
return errors.Trace(err)
}
// needReloadRegion checks is all peers has sent failed, if so need reload.
func (s *RegionRequestSender) needReloadRegion(ctx *RPCContext) (need bool) {
if s.failStoreIDs == nil {
s.failStoreIDs = make(map[uint64]struct{})
}
s.failStoreIDs[ctx.Store.storeID] = struct{}{}
need = len(s.failStoreIDs) == len(ctx.Meta.Peers)
if need {
s.failStoreIDs = nil
}
return
}
func regionErrorToLabel(e *errorpb.Error) string {
if e.GetNotLeader() != nil {
return "not_leader"
@ -213,7 +221,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi
logutil.Logger(context.Background()).Debug("tikv reports `NotLeader` retry later",
zap.String("notLeader", notLeader.String()),
zap.String("ctx", ctx.String()))
s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId())
s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId(), ctx.PeerIdx)
var boType backoffType
if notLeader.GetLeader() != nil {

View File

@ -15,6 +15,7 @@ package tikv
import (
"context"
"fmt"
"net"
"sync"
"time"
@ -91,6 +92,44 @@ func (s *testRegionRequestSuite) TestOnSendFailedWithStoreRestart(c *C) {
c.Assert(resp.RawPut, NotNil)
}
func (s *testRegionRequestSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOne(c *C) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawPut,
RawPut: &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
},
}
region, err := s.cache.LocateRegionByID(s.bo, s.region)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
resp, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
c.Assert(err, IsNil)
c.Assert(resp.RawPut, NotNil)
// add new unknown region
store2 := s.cluster.AllocID()
peer2 := s.cluster.AllocID()
s.cluster.AddStore(store2, fmt.Sprintf("store%d", store2))
s.cluster.AddPeer(region.Region.id, store2, peer2)
// stop known region
s.cluster.StopStore(s.store)
// send to failed store
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
// retry to send store by old region info
region, err = s.cache.LocateRegionByID(s.bo, s.region)
c.Assert(region, NotNil)
c.Assert(err, IsNil)
// retry again, reload region info and send to new store.
resp, err = s.regionRequestSender.SendReq(NewBackoffer(context.Background(), 100), req, region.Region, time.Second)
c.Assert(err, NotNil)
}
func (s *testRegionRequestSuite) TestSendReqCtx(c *C) {
req := &tikvrpc.Request{
Type: tikvrpc.CmdRawPut,