diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index c06394c6b1..a73684fdf4 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -275,9 +275,10 @@ type RegionCache struct { enableForwarding bool mu struct { - sync.RWMutex // mutex protect cached region - regions map[RegionVerID]*Region // cached regions are organized as regionVerID to region ref mapping - sorted *btree.BTree // cache regions are organized as sorted key to region ref mapping + sync.RWMutex // mutex protect cached region + regions map[RegionVerID]*Region // cached regions are organized as regionVerID to region ref mapping + latestVersions map[uint64]RegionVerID // cache the map from regionID to its latest RegionVerID + sorted *btree.BTree // cache regions are organized as sorted key to region ref mapping } storeMu struct { sync.RWMutex @@ -299,6 +300,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { pdClient: pdClient, } c.mu.regions = make(map[RegionVerID]*Region) + c.mu.latestVersions = make(map[uint64]RegionVerID) c.mu.sorted = btree.New(btreeDegree) c.storeMu.stores = make(map[uint64]*Store) c.notifyCheckCh = make(chan struct{}, 1) @@ -976,6 +978,15 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c } } +// removeVersionFromCache removes a RegionVerID from cache, tries to cleanup +// both c.mu.regions and c.mu.versions. Note this function is not thread-safe. +func (c *RegionCache) removeVersionFromCache(oldVer RegionVerID, regionID uint64) { + delete(c.mu.regions, oldVer) + if ver, ok := c.mu.latestVersions[regionID]; ok && ver.Equals(oldVer) { + delete(c.mu.latestVersions, regionID) + } +} + // insertRegionToCache tries to insert the Region to cache. // It should be protected by c.mu.Lock(). func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { @@ -995,9 +1006,14 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { // Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which // is under transferring regions. store.workTiFlashIdx = atomic.LoadInt32(&oldRegionStore.workTiFlashIdx) - delete(c.mu.regions, oldRegion.VerID()) + c.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id) } c.mu.regions[cachedRegion.VerID()] = cachedRegion + newVer := cachedRegion.VerID() + latest, ok := c.mu.latestVersions[cachedRegion.VerID().id] + if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() { + c.mu.latestVersions[cachedRegion.VerID().id] = newVer + } } // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, @@ -1032,34 +1048,26 @@ func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region { // `getCachedRegion`, it should be called with c.mu.RLock(), and the returned // Region should not be used after c.mu is RUnlock(). func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { - var newestRegion *Region ts := time.Now().Unix() - for v, r := range c.mu.regions { - if v.id == regionID { - lastAccess := atomic.LoadInt64(&r.lastAccess) - if ts-lastAccess > RegionCacheTTLSec { - continue - } - if newestRegion == nil { - newestRegion = r - continue - } - nv := newestRegion.VerID() - cv := r.VerID() - if nv.GetConfVer() < cv.GetConfVer() { - newestRegion = r - continue - } - if nv.GetVer() < cv.GetVer() { - newestRegion = r - continue - } - } + ver, ok := c.mu.latestVersions[regionID] + if !ok { + return nil } - if newestRegion != nil { - atomic.CompareAndSwapInt64(&newestRegion.lastAccess, atomic.LoadInt64(&newestRegion.lastAccess), ts) + latestRegion, ok := c.mu.regions[ver] + if !ok { + // should not happen + logutil.BgLogger().Warn("region version not found", + zap.Uint64("regionID", regionID), zap.Stringer("version", &ver)) + return nil } - return newestRegion + lastAccess := atomic.LoadInt64(&latestRegion.lastAccess) + if ts-lastAccess > RegionCacheTTLSec { + return nil + } + if latestRegion != nil { + atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts) + } + return latestRegion } // TODO: revise it by get store by closure. @@ -1570,6 +1578,11 @@ func (r *RegionVerID) String() string { return fmt.Sprintf("{ region id: %v, ver: %v, confVer: %v }", r.id, r.ver, r.confVer) } +// Equals checks whether the RegionVerID equals to another one +func (r *RegionVerID) Equals(another RegionVerID) bool { + return r.id == another.id && r.confVer == another.confVer && r.ver == another.ver +} + // VerID returns the Region's RegionVerID. func (r *Region) VerID() RegionVerID { return RegionVerID{ diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 592541eb5b..efb2ae9df7 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -69,9 +69,25 @@ func (s *testRegionCacheSuite) storeAddr(id uint64) string { func (s *testRegionCacheSuite) checkCache(c *C, len int) { ts := time.Now().Unix() c.Assert(validRegions(s.cache.mu.regions, ts), Equals, len) + c.Assert(validRegionsSearchedByVersions(s.cache.mu.latestVersions, s.cache.mu.regions, ts), Equals, len) c.Assert(validRegionsInBtree(s.cache.mu.sorted, ts), Equals, len) } +func validRegionsSearchedByVersions( + versions map[uint64]RegionVerID, + regions map[RegionVerID]*Region, + ts int64, +) (count int) { + for _, ver := range versions { + region, ok := regions[ver] + if !ok || !region.checkRegionCacheTTL(ts) { + continue + } + count++ + } + return +} + func validRegions(regions map[RegionVerID]*Region, ts int64) (len int) { for _, region := range regions { if !region.checkRegionCacheTTL(ts) {