From b9c3bc4e43de75ac94ef13a7839cbfe095e1f9dc Mon Sep 17 00:00:00 2001 From: atmzhou Date: Mon, 25 Sep 2017 12:59:04 +0800 Subject: [PATCH] store-tikv: drop invalid cached region (#4506) --- store/tikv/region_cache.go | 107 ++++++++++++++++++++------------ store/tikv/region_cache_test.go | 5 +- 2 files changed, 71 insertions(+), 41 deletions(-) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index a168df77c8..0b01921334 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -16,6 +16,8 @@ package tikv import ( "bytes" "sync" + "sync/atomic" + "time" log "github.com/Sirupsen/logrus" "github.com/juju/errors" @@ -26,12 +28,23 @@ import ( goctx "golang.org/x/net/context" ) +const ( + rcDefaultRegionCacheTTL = time.Minute * 10 +) + +// CachedRegion encapsulates {Region, TTL} +type CachedRegion struct { + region *Region + lastAccess int64 +} + // RegionCache caches Regions loaded from PD. type RegionCache struct { pdClient pd.Client - mu struct { + + mu struct { sync.RWMutex - regions map[RegionVerID]*Region + regions map[RegionVerID]*CachedRegion sorted *llrb.LLRB } storeMu struct { @@ -45,7 +58,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c := &RegionCache{ pdClient: pdClient, } - c.mu.regions = make(map[RegionVerID]*Region) + c.mu.regions = make(map[RegionVerID]*CachedRegion) c.mu.sorted = llrb.New() c.storeMu.stores = make(map[uint64]*Store) return c @@ -66,17 +79,36 @@ func (c *RPCContext) GetStoreID() uint64 { return 0 } +func (c *CachedRegion) isValid() bool { + lastAccess := atomic.LoadInt64(&c.lastAccess) + lastAccessTime := time.Unix(lastAccess, 0) + return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL +} + +// GetCachedRegion returns a valid region +func (c *RegionCache) GetCachedRegion(id RegionVerID) *Region { + c.mu.RLock() + cachedregion, ok := c.mu.regions[id] + c.mu.RUnlock() + if !ok { + return nil + } + if cachedregion.isValid() { + atomic.StoreInt64(&cachedregion.lastAccess, time.Now().Unix()) + return cachedregion.region + } + c.DropRegion(id) + return nil +} + // GetRPCContext returns RPCContext for a region. If it returns nil, the region // must be out of date and already dropped from cache. func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) { - c.mu.RLock() - region, ok := c.mu.regions[id] - if !ok { - c.mu.RUnlock() + region := c.GetCachedRegion(id) + if region == nil { return nil, nil } kvCtx := region.GetContext() - c.mu.RUnlock() addr, err := c.GetStoreAddr(bo, kvCtx.GetPeer().GetStoreId()) if err != nil { @@ -109,17 +141,15 @@ func (l *KeyLocation) Contains(key []byte) bool { // LocateKey searches for the region and range that the key is located. func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) { - c.mu.RLock() - if r := c.getRegionFromCache(key); r != nil { + r := c.getRegionFromCache(key) + if r != nil { loc := &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), EndKey: r.EndKey(), } - c.mu.RUnlock() return loc, nil } - c.mu.RUnlock() r, err := c.loadRegion(bo, key) if err != nil { @@ -127,8 +157,8 @@ func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) } c.mu.Lock() - defer c.mu.Unlock() - r = c.insertRegionToCache(r) + c.insertRegionToCache(r) + c.mu.Unlock() return &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), @@ -138,17 +168,15 @@ func (c *RegionCache) LocateKey(bo *Backoffer, key []byte) (*KeyLocation, error) // LocateRegionByID searches for the region with ID func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLocation, error) { - c.mu.RLock() - if r := c.getRegionByIDFromCache(regionID); r != nil { + r := c.getRegionByIDFromCache(regionID) + if r != nil { loc := &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), EndKey: r.EndKey(), } - c.mu.RUnlock() return loc, nil } - c.mu.RUnlock() r, err := c.loadRegionByID(bo, regionID) if err != nil { @@ -156,8 +184,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca } c.mu.Lock() - defer c.mu.Unlock() - r = c.insertRegionToCache(r) + c.insertRegionToCache(r) + c.mu.Unlock() return &KeyLocation{ Region: r.VerID(), StartKey: r.StartKey(), @@ -209,58 +237,57 @@ func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey [] func (c *RegionCache) DropRegion(id RegionVerID) { c.mu.Lock() defer c.mu.Unlock() - c.dropRegionFromCache(id) } // UpdateLeader update some region cache with newer leader info. func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) { - c.mu.Lock() - defer c.mu.Unlock() - - r, ok := c.mu.regions[regionID] - if !ok { + r := c.GetCachedRegion(regionID) + if r == nil { log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderStoreID) return } if !r.SwitchPeer(leaderStoreID) { log.Debugf("regionCache: cannot find peer when updating leader %d,%d", regionID, leaderStoreID) - c.dropRegionFromCache(r.VerID()) + c.DropRegion(r.VerID()) } } func (c *RegionCache) getRegionFromCache(key []byte) *Region { + c.mu.RLock() var r *Region c.mu.sorted.DescendLessOrEqual(newRBSearchItem(key), func(item llrb.Item) bool { r = item.(*llrbItem).region return false }) + c.mu.RUnlock() if r != nil && r.Contains(key) { - return r + return c.GetCachedRegion(r.VerID()) } return nil } -// insertRegionToCache tries to insert the Region to cache. If there is an old -// Region with the same VerID, it will return the old one instead. +// insertRegionToCache tries to insert the Region to cache. func (c *RegionCache) insertRegionToCache(r *Region) *Region { - if old, ok := c.mu.regions[r.VerID()]; ok { - return old - } old := c.mu.sorted.ReplaceOrInsert(newRBItem(r)) if old != nil { delete(c.mu.regions, old.(*llrbItem).region.VerID()) } - c.mu.regions[r.VerID()] = r + c.mu.regions[r.VerID()] = &CachedRegion{ + region: r, + lastAccess: time.Now().Unix(), + } return r } // getRegionByIDFromCache tries to get region by regionID from cache func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { + c.mu.RLock() + defer c.mu.RUnlock() for v, r := range c.mu.regions { if v.id == regionID { - return r + return r.region } } return nil @@ -271,8 +298,8 @@ func (c *RegionCache) dropRegionFromCache(verID RegionVerID) { if !ok { return } - c.mu.sorted.Delete(newRBItem(r)) - delete(c.mu.regions, r.VerID()) + c.mu.sorted.Delete(newRBItem(r.region)) + delete(c.mu.regions, verID) } // loadRegion loads region from pd client, and picks the first peer as leader. @@ -403,13 +430,13 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) { // Switch region's leader peer to next one. regionID := ctx.Region c.mu.Lock() - if region, ok := c.mu.regions[regionID]; ok { + if cachedregion, ok := c.mu.regions[regionID]; ok { + region := cachedregion.region if !region.OnRequestFail(ctx.KVCtx.GetPeer().GetStoreId()) { c.dropRegionFromCache(regionID) } } c.mu.Unlock() - // Store's meta may be out of date. storeID := ctx.KVCtx.GetPeer().GetStoreId() c.storeMu.Lock() @@ -420,7 +447,7 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) { c.mu.Lock() for id, r := range c.mu.regions { - if r.peer.GetStoreId() == storeID { + if r.region.peer.GetStoreId() == storeID { c.dropRegionFromCache(id) } } diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 54d8fe7caf..b438e06f9d 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -57,7 +57,7 @@ func (s *testRegionCacheSuite) checkCache(c *C, len int) { c.Assert(s.cache.mu.regions, HasLen, len) c.Assert(s.cache.mu.sorted.Len(), Equals, len) for _, r := range s.cache.mu.regions { - c.Assert(r, DeepEquals, s.cache.getRegionFromCache(r.StartKey())) + c.Assert(r.region, DeepEquals, s.cache.getRegionFromCache(r.region.StartKey())) } } @@ -84,6 +84,9 @@ func (s *testRegionCacheSuite) TestSimple(c *C) { c.Assert(r.GetID(), Equals, s.region1) c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) s.checkCache(c, 1) + s.cache.mu.regions[r.VerID()].lastAccess = 0 + r = s.cache.getRegionFromCache([]byte("a")) + c.Assert(r, IsNil) } func (s *testRegionCacheSuite) TestDropStore(c *C) {