From e66daf3e04d01dbe36644eebd6c7412884e1901d Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 12 Feb 2020 12:36:47 +0800 Subject: [PATCH] store/tikv: support batch load regions with key range (#14716) --- store/tikv/region_cache.go | 52 +++++++++++++++++++------------ store/tikv/region_cache_test.go | 54 +++++++++++++++++++-------------- 2 files changed, 65 insertions(+), 41 deletions(-) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 1403373fe9..e89042e6b5 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -38,6 +38,7 @@ import ( const ( btreeDegree = 32 invalidatedLastAccessTime = -1 + defaultRegionsPerBatch = 128 ) // RegionCacheTTLSec is the max idle time for regions in the region cache. @@ -590,36 +591,38 @@ func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey [] return regionIDs, nil } -// LoadRegionsInKeyRange lists ids of regions in [start_key,end_key]. +// LoadRegionsInKeyRange lists regions in [start_key,end_key]. func (c *RegionCache) LoadRegionsInKeyRange(bo *Backoffer, startKey, endKey []byte) (regions []*Region, err error) { + var batchRegions []*Region for { - curRegion, err := c.loadRegion(bo, startKey, false) + batchRegions, err = c.BatchLoadRegionsWithKeyRange(bo, startKey, endKey, defaultRegionsPerBatch) if err != nil { return nil, errors.Trace(err) } - c.mu.Lock() - c.insertRegionToCache(curRegion) - c.mu.Unlock() - - regions = append(regions, curRegion) - if curRegion.Contains(endKey) { + if len(batchRegions) == 0 { + // should never happen break } - startKey = curRegion.EndKey() + regions = append(regions, batchRegions...) + endRegion := batchRegions[len(batchRegions)-1] + if endRegion.Contains(endKey) { + break + } + startKey = endRegion.EndKey() } - return regions, nil + return } -// BatchLoadRegionsFromKey loads at most given numbers of regions to the RegionCache, from the given startKey. Returns -// the endKey of the last loaded region. If some of the regions has no leader, their entries in RegionCache will not be -// updated. -func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, count int) ([]byte, error) { - regions, err := c.scanRegions(bo, startKey, count) +// BatchLoadRegionsWithKeyRange loads at most given numbers of regions to the RegionCache, +// within the given key range from the startKey to endKey. Returns the loaded regions. +func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *Backoffer, startKey []byte, endKey []byte, count int) (regions []*Region, err error) { + regions, err = c.scanRegions(bo, startKey, endKey, count) if err != nil { - return nil, errors.Trace(err) + return } if len(regions) == 0 { - return nil, errors.New("PD returned no region") + err = errors.New("PD returned no region") + return } c.mu.Lock() @@ -629,6 +632,17 @@ func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, co c.insertRegionToCache(region) } + return +} + +// BatchLoadRegionsFromKey loads at most given numbers of regions to the RegionCache, from the given startKey. Returns +// the endKey of the last loaded region. If some of the regions has no leader, their entries in RegionCache will not be +// updated. +func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, count int) ([]byte, error) { + regions, err := c.BatchLoadRegionsWithKeyRange(bo, startKey, nil, count) + if err != nil { + return nil, errors.Trace(err) + } return regions[len(regions)-1].EndKey(), nil } @@ -813,7 +827,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e // scanRegions scans at most `limit` regions from PD, starts from the region containing `startKey` and in key order. // Regions with no leader will not be returned. -func (c *RegionCache) scanRegions(bo *Backoffer, startKey []byte, limit int) ([]*Region, error) { +func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit int) ([]*Region, error) { if limit == 0 { return nil, nil } @@ -826,7 +840,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey []byte, limit int) ([] return nil, errors.Trace(err) } } - metas, leaders, err := c.pdClient.ScanRegions(bo.ctx, startKey, nil, limit) + metas, leaders, err := c.pdClient.ScanRegions(bo.ctx, startKey, endKey, limit) if err != nil { tikvRegionCacheCounterWithScanRegionsError.Inc() backoffErr = errors.Errorf( diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 2ae8e51985..b39b541385 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -821,7 +821,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { s.cluster.Split(regions[i], regions[i+1], []byte{'a' + byte(i)}, peers[i+1], peers[i+1][0]) } - scannedRegions, err := s.cache.scanRegions(s.bo, []byte(""), 100) + scannedRegions, err := s.cache.scanRegions(s.bo, []byte(""), nil, 100) c.Assert(err, IsNil) c.Assert(len(scannedRegions), Equals, 5) for i := 0; i < 5; i++ { @@ -832,7 +832,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { c.Assert(p.Id, Equals, peers[i][0]) } - scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a"), 3) + scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a"), nil, 3) c.Assert(err, IsNil) c.Assert(len(scannedRegions), Equals, 3) for i := 1; i < 4; i++ { @@ -843,7 +843,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { c.Assert(p.Id, Equals, peers[i][0]) } - scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a1"), 1) + scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a1"), nil, 1) c.Assert(err, IsNil) c.Assert(len(scannedRegions), Equals, 1) @@ -855,7 +855,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { // Test region with no leader s.cluster.GiveUpLeader(regions[1]) s.cluster.GiveUpLeader(regions[3]) - scannedRegions, err = s.cache.scanRegions(s.bo, []byte(""), 5) + scannedRegions, err = s.cache.scanRegions(s.bo, []byte(""), nil, 5) c.Assert(err, IsNil) for i := 0; i < 3; i++ { r := scannedRegions[i] @@ -880,25 +880,35 @@ func (s *testRegionCacheSuite) TestBatchLoadRegions(c *C) { s.cluster.Split(regions[i], regions[i+1], []byte{'a' + byte(i)}, peers[i+1], peers[i+1][0]) } - key, err := s.cache.BatchLoadRegionsFromKey(s.bo, []byte(""), 1) - c.Assert(err, IsNil) - c.Assert(key, DeepEquals, []byte("a")) + testCases := []struct { + startKey []byte + endKey []byte + limit int + expectKey []byte + expectRegions []uint64 + }{ + {[]byte(""), []byte("a"), 1, []byte("a"), []uint64{regions[0]}}, + {[]byte("a"), []byte("b1"), 2, []byte("c"), []uint64{regions[1], regions[2]}}, + {[]byte("a1"), []byte("d"), 2, []byte("c"), []uint64{regions[1], regions[2]}}, + {[]byte("c"), []byte("c1"), 2, nil, []uint64{regions[3]}}, + {[]byte("d"), nil, 2, nil, []uint64{regions[4]}}, + } - key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("a"), 2) - c.Assert(err, IsNil) - c.Assert(key, DeepEquals, []byte("c")) - - key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("a1"), 2) - c.Assert(err, IsNil) - c.Assert(key, DeepEquals, []byte("c")) - - key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("c"), 2) - c.Assert(err, IsNil) - c.Assert(len(key), Equals, 0) - - key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("d"), 2) - c.Assert(err, IsNil) - c.Assert(len(key), Equals, 0) + for _, tc := range testCases { + key, err := s.cache.BatchLoadRegionsFromKey(s.bo, tc.startKey, tc.limit) + c.Assert(err, IsNil) + if tc.expectKey != nil { + c.Assert(key, DeepEquals, tc.expectKey) + } else { + c.Assert(key, HasLen, 0) + } + loadRegions, err := s.cache.BatchLoadRegionsWithKeyRange(s.bo, tc.startKey, tc.endKey, tc.limit) + c.Assert(err, IsNil) + c.Assert(loadRegions, HasLen, len(tc.expectRegions)) + for i := range loadRegions { + c.Assert(loadRegions[i].GetID(), Equals, tc.expectRegions[i]) + } + } s.checkCache(c, len(regions)) }