store-tikv: drop invalid cached region (#4506)
This commit is contained in:
@ -16,6 +16,8 @@ package tikv
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/juju/errors"
|
||||
@ -26,12 +28,23 @@ import (
|
||||
goctx "golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const (
|
||||
rcDefaultRegionCacheTTL = time.Minute * 10
|
||||
)
|
||||
|
||||
// CachedRegion encapsulates {Region, TTL}
|
||||
type CachedRegion struct {
|
||||
region *Region
|
||||
lastAccess int64
|
||||
}
|
||||
|
||||
// RegionCache caches Regions loaded from PD.
|
||||
type RegionCache struct {
|
||||
pdClient pd.Client
|
||||
mu struct {
|
||||
|
||||
mu struct {
|
||||
sync.RWMutex
|
||||
regions map[RegionVerID]*Region
|
||||
regions map[RegionVerID]*CachedRegion
|
||||
sorted *llrb.LLRB
|
||||
}
|
||||
storeMu struct {
|
||||
@ -45,7 +58,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
|
||||
c := &RegionCache{
|
||||
pdClient: pdClient,
|
||||
}
|
||||
c.mu.regions = make(map[RegionVerID]*Region)
|
||||
c.mu.regions = make(map[RegionVerID]*CachedRegion)
|
||||
c.mu.sorted = llrb.New()
|
||||
c.storeMu.stores = make(map[uint64]*Store)
|
||||
return c
|
||||
@ -66,17 +79,36 @@ func (c *RPCContext) GetStoreID() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *CachedRegion) isValid() bool {
|
||||
lastAccess := atomic.LoadInt64(&c.lastAccess)
|
||||
lastAccessTime := time.Unix(lastAccess, 0)
|
||||
return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL
|
||||
}
|
||||
|
||||
// GetCachedRegion returns a valid region
|
||||
func (c *RegionCache) GetCachedRegion(id RegionVerID) *Region {
|
||||
c.mu.RLock()
|
||||
cachedregion, ok := c.mu.regions[id]
|
||||
c.mu.RUnlock()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if cachedregion.isValid() {
|
||||
atomic.StoreInt64(&cachedregion.lastAccess, time.Now().Unix())
|
||||
return cachedregion.region
|
||||
}
|
||||
c.DropRegion(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRPCContext returns RPCContext for a region. If it returns nil, the region
|
||||
// must be out of date and already dropped from cache.
|
||||
func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) {
|
||||
c.mu.RLock()
|
||||
region, ok := c.mu.regions[id]
|
||||
if !ok {
|
||||
c.mu.RUnlock()
|
||||
region := c.GetCachedRegion(id)
|
||||
if region == nil {
|
||||
return nil, nil
|
||||
}
|
||||
kvCtx := region.GetContext()
|
||||
c.mu.RUnlock()
|
||||
|
||||
addr, err := c.GetStoreAddr(bo, kvCtx.GetPeer().GetStoreId())
|
||||
if err != nil {
|
||||
@ -109,17 +141,15 @@ func (l *KeyLocation) Contains(key []byte) bool {
|
||||
|
||||
// LocateKey searches for the region and range that the key is located.
|
||||
func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) {
|
||||
c.mu.RLock()
|
||||
if r := c.getRegionFromCache(key); r != nil {
|
||||
r := c.getRegionFromCache(key)
|
||||
if r != nil {
|
||||
loc := &KeyLocation{
|
||||
Region: r.VerID(),
|
||||
StartKey: r.StartKey(),
|
||||
EndKey: r.EndKey(),
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
return loc, nil
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
r, err := c.loadRegion(bo, key)
|
||||
if err != nil {
|
||||
@ -127,8 +157,8 @@ func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
r = c.insertRegionToCache(r)
|
||||
c.insertRegionToCache(r)
|
||||
c.mu.Unlock()
|
||||
return &KeyLocation{
|
||||
Region: r.VerID(),
|
||||
StartKey: r.StartKey(),
|
||||
@ -138,17 +168,15 @@ func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error)
|
||||
|
||||
// LocateRegionByID searches for the region with ID
|
||||
func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error) {
|
||||
c.mu.RLock()
|
||||
if r := c.getRegionByIDFromCache(regionID); r != nil {
|
||||
r := c.getRegionByIDFromCache(regionID)
|
||||
if r != nil {
|
||||
loc := &KeyLocation{
|
||||
Region: r.VerID(),
|
||||
StartKey: r.StartKey(),
|
||||
EndKey: r.EndKey(),
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
return loc, nil
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
r, err := c.loadRegionByID(bo, regionID)
|
||||
if err != nil {
|
||||
@ -156,8 +184,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
r = c.insertRegionToCache(r)
|
||||
c.insertRegionToCache(r)
|
||||
c.mu.Unlock()
|
||||
return &KeyLocation{
|
||||
Region: r.VerID(),
|
||||
StartKey: r.StartKey(),
|
||||
@ -209,58 +237,57 @@ func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey []
|
||||
func (c *RegionCache) DropRegion(id RegionVerID) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.dropRegionFromCache(id)
|
||||
}
|
||||
|
||||
// UpdateLeader update some region cache with newer leader info.
|
||||
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
r, ok := c.mu.regions[regionID]
|
||||
if !ok {
|
||||
r := c.GetCachedRegion(regionID)
|
||||
if r == nil {
|
||||
log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderStoreID)
|
||||
return
|
||||
}
|
||||
|
||||
if !r.SwitchPeer(leaderStoreID) {
|
||||
log.Debugf("regionCache: cannot find peer when updating leader %d,%d", regionID, leaderStoreID)
|
||||
c.dropRegionFromCache(r.VerID())
|
||||
c.DropRegion(r.VerID())
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RegionCache) getRegionFromCache(key []byte) *Region {
|
||||
c.mu.RLock()
|
||||
var r *Region
|
||||
c.mu.sorted.DescendLessOrEqual(newRBSearchItem(key), func(item llrb.Item) bool {
|
||||
r = item.(*llrbItem).region
|
||||
return false
|
||||
})
|
||||
c.mu.RUnlock()
|
||||
if r != nil && r.Contains(key) {
|
||||
return r
|
||||
return c.GetCachedRegion(r.VerID())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// insertRegionToCache tries to insert the Region to cache. If there is an old
|
||||
// Region with the same VerID, it will return the old one instead.
|
||||
// insertRegionToCache tries to insert the Region to cache.
|
||||
func (c *RegionCache) insertRegionToCache(r *Region) *Region {
|
||||
if old, ok := c.mu.regions[r.VerID()]; ok {
|
||||
return old
|
||||
}
|
||||
old := c.mu.sorted.ReplaceOrInsert(newRBItem(r))
|
||||
if old != nil {
|
||||
delete(c.mu.regions, old.(*llrbItem).region.VerID())
|
||||
}
|
||||
c.mu.regions[r.VerID()] = r
|
||||
c.mu.regions[r.VerID()] = &CachedRegion{
|
||||
region: r,
|
||||
lastAccess: time.Now().Unix(),
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// getRegionByIDFromCache tries to get region by regionID from cache
|
||||
func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
for v, r := range c.mu.regions {
|
||||
if v.id == regionID {
|
||||
return r
|
||||
return r.region
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -271,8 +298,8 @@ func (c *RegionCache) dropRegionFromCache(verID RegionVerID) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
c.mu.sorted.Delete(newRBItem(r))
|
||||
delete(c.mu.regions, r.VerID())
|
||||
c.mu.sorted.Delete(newRBItem(r.region))
|
||||
delete(c.mu.regions, verID)
|
||||
}
|
||||
|
||||
// loadRegion loads region from pd client, and picks the first peer as leader.
|
||||
@ -403,13 +430,13 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) {
|
||||
// Switch region's leader peer to next one.
|
||||
regionID := ctx.Region
|
||||
c.mu.Lock()
|
||||
if region, ok := c.mu.regions[regionID]; ok {
|
||||
if cachedregion, ok := c.mu.regions[regionID]; ok {
|
||||
region := cachedregion.region
|
||||
if !region.OnRequestFail(ctx.KVCtx.GetPeer().GetStoreId()) {
|
||||
c.dropRegionFromCache(regionID)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// Store's meta may be out of date.
|
||||
storeID := ctx.KVCtx.GetPeer().GetStoreId()
|
||||
c.storeMu.Lock()
|
||||
@ -420,7 +447,7 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) {
|
||||
|
||||
c.mu.Lock()
|
||||
for id, r := range c.mu.regions {
|
||||
if r.peer.GetStoreId() == storeID {
|
||||
if r.region.peer.GetStoreId() == storeID {
|
||||
c.dropRegionFromCache(id)
|
||||
}
|
||||
}
|
||||
|
||||
@ -57,7 +57,7 @@ func (s *testRegionCacheSuite) checkCache(c *C, len int) {
|
||||
c.Assert(s.cache.mu.regions, HasLen, len)
|
||||
c.Assert(s.cache.mu.sorted.Len(), Equals, len)
|
||||
for _, r := range s.cache.mu.regions {
|
||||
c.Assert(r, DeepEquals, s.cache.getRegionFromCache(r.StartKey()))
|
||||
c.Assert(r.region, DeepEquals, s.cache.getRegionFromCache(r.region.StartKey()))
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,6 +84,9 @@ func (s *testRegionCacheSuite) TestSimple(c *C) {
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1))
|
||||
s.checkCache(c, 1)
|
||||
s.cache.mu.regions[r.VerID()].lastAccess = 0
|
||||
r = s.cache.getRegionFromCache([]byte("a"))
|
||||
c.Assert(r, IsNil)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestDropStore(c *C) {
|
||||
|
||||
Reference in New Issue
Block a user