tikv: add id to version map to accelerate get region by id (#24403)
This commit is contained in:
@ -275,9 +275,10 @@ type RegionCache struct {
|
||||
enableForwarding bool
|
||||
|
||||
mu struct {
|
||||
sync.RWMutex // mutex protect cached region
|
||||
regions map[RegionVerID]*Region // cached regions are organized as regionVerID to region ref mapping
|
||||
sorted *btree.BTree // cache regions are organized as sorted key to region ref mapping
|
||||
sync.RWMutex // mutex protect cached region
|
||||
regions map[RegionVerID]*Region // cached regions are organized as regionVerID to region ref mapping
|
||||
latestVersions map[uint64]RegionVerID // cache the map from regionID to its latest RegionVerID
|
||||
sorted *btree.BTree // cache regions are organized as sorted key to region ref mapping
|
||||
}
|
||||
storeMu struct {
|
||||
sync.RWMutex
|
||||
@ -299,6 +300,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
|
||||
pdClient: pdClient,
|
||||
}
|
||||
c.mu.regions = make(map[RegionVerID]*Region)
|
||||
c.mu.latestVersions = make(map[uint64]RegionVerID)
|
||||
c.mu.sorted = btree.New(btreeDegree)
|
||||
c.storeMu.stores = make(map[uint64]*Store)
|
||||
c.notifyCheckCh = make(chan struct{}, 1)
|
||||
@ -976,6 +978,15 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c
|
||||
}
|
||||
}
|
||||
|
||||
// removeVersionFromCache removes a RegionVerID from cache, tries to cleanup
|
||||
// both c.mu.regions and c.mu.versions. Note this function is not thread-safe.
|
||||
func (c *RegionCache) removeVersionFromCache(oldVer RegionVerID, regionID uint64) {
|
||||
delete(c.mu.regions, oldVer)
|
||||
if ver, ok := c.mu.latestVersions[regionID]; ok && ver.Equals(oldVer) {
|
||||
delete(c.mu.latestVersions, regionID)
|
||||
}
|
||||
}
|
||||
|
||||
// insertRegionToCache tries to insert the Region to cache.
|
||||
// It should be protected by c.mu.Lock().
|
||||
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
|
||||
@ -995,9 +1006,14 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
|
||||
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
|
||||
// is under transferring regions.
|
||||
store.workTiFlashIdx = atomic.LoadInt32(&oldRegionStore.workTiFlashIdx)
|
||||
delete(c.mu.regions, oldRegion.VerID())
|
||||
c.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
|
||||
}
|
||||
c.mu.regions[cachedRegion.VerID()] = cachedRegion
|
||||
newVer := cachedRegion.VerID()
|
||||
latest, ok := c.mu.latestVersions[cachedRegion.VerID().id]
|
||||
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
|
||||
c.mu.latestVersions[cachedRegion.VerID().id] = newVer
|
||||
}
|
||||
}
|
||||
|
||||
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
|
||||
@ -1032,34 +1048,26 @@ func (c *RegionCache) searchCachedRegion(key []byte, isEndKey bool) *Region {
|
||||
// `getCachedRegion`, it should be called with c.mu.RLock(), and the returned
|
||||
// Region should not be used after c.mu is RUnlock().
|
||||
func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
|
||||
var newestRegion *Region
|
||||
ts := time.Now().Unix()
|
||||
for v, r := range c.mu.regions {
|
||||
if v.id == regionID {
|
||||
lastAccess := atomic.LoadInt64(&r.lastAccess)
|
||||
if ts-lastAccess > RegionCacheTTLSec {
|
||||
continue
|
||||
}
|
||||
if newestRegion == nil {
|
||||
newestRegion = r
|
||||
continue
|
||||
}
|
||||
nv := newestRegion.VerID()
|
||||
cv := r.VerID()
|
||||
if nv.GetConfVer() < cv.GetConfVer() {
|
||||
newestRegion = r
|
||||
continue
|
||||
}
|
||||
if nv.GetVer() < cv.GetVer() {
|
||||
newestRegion = r
|
||||
continue
|
||||
}
|
||||
}
|
||||
ver, ok := c.mu.latestVersions[regionID]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if newestRegion != nil {
|
||||
atomic.CompareAndSwapInt64(&newestRegion.lastAccess, atomic.LoadInt64(&newestRegion.lastAccess), ts)
|
||||
latestRegion, ok := c.mu.regions[ver]
|
||||
if !ok {
|
||||
// should not happen
|
||||
logutil.BgLogger().Warn("region version not found",
|
||||
zap.Uint64("regionID", regionID), zap.Stringer("version", &ver))
|
||||
return nil
|
||||
}
|
||||
return newestRegion
|
||||
lastAccess := atomic.LoadInt64(&latestRegion.lastAccess)
|
||||
if ts-lastAccess > RegionCacheTTLSec {
|
||||
return nil
|
||||
}
|
||||
if latestRegion != nil {
|
||||
atomic.CompareAndSwapInt64(&latestRegion.lastAccess, atomic.LoadInt64(&latestRegion.lastAccess), ts)
|
||||
}
|
||||
return latestRegion
|
||||
}
|
||||
|
||||
// TODO: revise it by get store by closure.
|
||||
@ -1570,6 +1578,11 @@ func (r *RegionVerID) String() string {
|
||||
return fmt.Sprintf("{ region id: %v, ver: %v, confVer: %v }", r.id, r.ver, r.confVer)
|
||||
}
|
||||
|
||||
// Equals checks whether the RegionVerID equals to another one
|
||||
func (r *RegionVerID) Equals(another RegionVerID) bool {
|
||||
return r.id == another.id && r.confVer == another.confVer && r.ver == another.ver
|
||||
}
|
||||
|
||||
// VerID returns the Region's RegionVerID.
|
||||
func (r *Region) VerID() RegionVerID {
|
||||
return RegionVerID{
|
||||
|
||||
@ -69,9 +69,25 @@ func (s *testRegionCacheSuite) storeAddr(id uint64) string {
|
||||
func (s *testRegionCacheSuite) checkCache(c *C, len int) {
|
||||
ts := time.Now().Unix()
|
||||
c.Assert(validRegions(s.cache.mu.regions, ts), Equals, len)
|
||||
c.Assert(validRegionsSearchedByVersions(s.cache.mu.latestVersions, s.cache.mu.regions, ts), Equals, len)
|
||||
c.Assert(validRegionsInBtree(s.cache.mu.sorted, ts), Equals, len)
|
||||
}
|
||||
|
||||
func validRegionsSearchedByVersions(
|
||||
versions map[uint64]RegionVerID,
|
||||
regions map[RegionVerID]*Region,
|
||||
ts int64,
|
||||
) (count int) {
|
||||
for _, ver := range versions {
|
||||
region, ok := regions[ver]
|
||||
if !ok || !region.checkRegionCacheTTL(ts) {
|
||||
continue
|
||||
}
|
||||
count++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func validRegions(regions map[RegionVerID]*Region, ts int64) (len int) {
|
||||
for _, region := range regions {
|
||||
if !region.checkRegionCacheTTL(ts) {
|
||||
|
||||
Reference in New Issue
Block a user