store/tikv: support batch load regions with key range (#14716)
This commit is contained in:
@ -38,6 +38,7 @@ import (
|
||||
const (
|
||||
btreeDegree = 32
|
||||
invalidatedLastAccessTime = -1
|
||||
defaultRegionsPerBatch = 128
|
||||
)
|
||||
|
||||
// RegionCacheTTLSec is the max idle time for regions in the region cache.
|
||||
@ -590,36 +591,38 @@ func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey []
|
||||
return regionIDs, nil
|
||||
}
|
||||
|
||||
// LoadRegionsInKeyRange lists ids of regions in [start_key,end_key].
|
||||
// LoadRegionsInKeyRange lists regions in [start_key,end_key].
|
||||
func (c *RegionCache) LoadRegionsInKeyRange(bo *Backoffer, startKey, endKey []byte) (regions []*Region, err error) {
|
||||
var batchRegions []*Region
|
||||
for {
|
||||
curRegion, err := c.loadRegion(bo, startKey, false)
|
||||
batchRegions, err = c.BatchLoadRegionsWithKeyRange(bo, startKey, endKey, defaultRegionsPerBatch)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(curRegion)
|
||||
c.mu.Unlock()
|
||||
|
||||
regions = append(regions, curRegion)
|
||||
if curRegion.Contains(endKey) {
|
||||
if len(batchRegions) == 0 {
|
||||
// should never happen
|
||||
break
|
||||
}
|
||||
startKey = curRegion.EndKey()
|
||||
regions = append(regions, batchRegions...)
|
||||
endRegion := batchRegions[len(batchRegions)-1]
|
||||
if endRegion.Contains(endKey) {
|
||||
break
|
||||
}
|
||||
startKey = endRegion.EndKey()
|
||||
}
|
||||
return regions, nil
|
||||
return
|
||||
}
|
||||
|
||||
// BatchLoadRegionsFromKey loads at most given numbers of regions to the RegionCache, from the given startKey. Returns
|
||||
// the endKey of the last loaded region. If some of the regions has no leader, their entries in RegionCache will not be
|
||||
// updated.
|
||||
func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, count int) ([]byte, error) {
|
||||
regions, err := c.scanRegions(bo, startKey, count)
|
||||
// BatchLoadRegionsWithKeyRange loads at most given numbers of regions to the RegionCache,
|
||||
// within the given key range from the startKey to endKey. Returns the loaded regions.
|
||||
func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *Backoffer, startKey []byte, endKey []byte, count int) (regions []*Region, err error) {
|
||||
regions, err = c.scanRegions(bo, startKey, endKey, count)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return
|
||||
}
|
||||
if len(regions) == 0 {
|
||||
return nil, errors.New("PD returned no region")
|
||||
err = errors.New("PD returned no region")
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
@ -629,6 +632,17 @@ func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, co
|
||||
c.insertRegionToCache(region)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// BatchLoadRegionsFromKey loads at most given numbers of regions to the RegionCache, from the given startKey. Returns
|
||||
// the endKey of the last loaded region. If some of the regions has no leader, their entries in RegionCache will not be
|
||||
// updated.
|
||||
func (c *RegionCache) BatchLoadRegionsFromKey(bo *Backoffer, startKey []byte, count int) ([]byte, error) {
|
||||
regions, err := c.BatchLoadRegionsWithKeyRange(bo, startKey, nil, count)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return regions[len(regions)-1].EndKey(), nil
|
||||
}
|
||||
|
||||
@ -813,7 +827,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
|
||||
|
||||
// scanRegions scans at most `limit` regions from PD, starts from the region containing `startKey` and in key order.
|
||||
// Regions with no leader will not be returned.
|
||||
func (c *RegionCache) scanRegions(bo *Backoffer, startKey []byte, limit int) ([]*Region, error) {
|
||||
func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit int) ([]*Region, error) {
|
||||
if limit == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@ -826,7 +840,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey []byte, limit int) ([]
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
metas, leaders, err := c.pdClient.ScanRegions(bo.ctx, startKey, nil, limit)
|
||||
metas, leaders, err := c.pdClient.ScanRegions(bo.ctx, startKey, endKey, limit)
|
||||
if err != nil {
|
||||
tikvRegionCacheCounterWithScanRegionsError.Inc()
|
||||
backoffErr = errors.Errorf(
|
||||
|
||||
@ -821,7 +821,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) {
|
||||
s.cluster.Split(regions[i], regions[i+1], []byte{'a' + byte(i)}, peers[i+1], peers[i+1][0])
|
||||
}
|
||||
|
||||
scannedRegions, err := s.cache.scanRegions(s.bo, []byte(""), 100)
|
||||
scannedRegions, err := s.cache.scanRegions(s.bo, []byte(""), nil, 100)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(scannedRegions), Equals, 5)
|
||||
for i := 0; i < 5; i++ {
|
||||
@ -832,7 +832,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) {
|
||||
c.Assert(p.Id, Equals, peers[i][0])
|
||||
}
|
||||
|
||||
scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a"), 3)
|
||||
scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a"), nil, 3)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(scannedRegions), Equals, 3)
|
||||
for i := 1; i < 4; i++ {
|
||||
@ -843,7 +843,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) {
|
||||
c.Assert(p.Id, Equals, peers[i][0])
|
||||
}
|
||||
|
||||
scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a1"), 1)
|
||||
scannedRegions, err = s.cache.scanRegions(s.bo, []byte("a1"), nil, 1)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(scannedRegions), Equals, 1)
|
||||
|
||||
@ -855,7 +855,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) {
|
||||
// Test region with no leader
|
||||
s.cluster.GiveUpLeader(regions[1])
|
||||
s.cluster.GiveUpLeader(regions[3])
|
||||
scannedRegions, err = s.cache.scanRegions(s.bo, []byte(""), 5)
|
||||
scannedRegions, err = s.cache.scanRegions(s.bo, []byte(""), nil, 5)
|
||||
c.Assert(err, IsNil)
|
||||
for i := 0; i < 3; i++ {
|
||||
r := scannedRegions[i]
|
||||
@ -880,25 +880,35 @@ func (s *testRegionCacheSuite) TestBatchLoadRegions(c *C) {
|
||||
s.cluster.Split(regions[i], regions[i+1], []byte{'a' + byte(i)}, peers[i+1], peers[i+1][0])
|
||||
}
|
||||
|
||||
key, err := s.cache.BatchLoadRegionsFromKey(s.bo, []byte(""), 1)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(key, DeepEquals, []byte("a"))
|
||||
testCases := []struct {
|
||||
startKey []byte
|
||||
endKey []byte
|
||||
limit int
|
||||
expectKey []byte
|
||||
expectRegions []uint64
|
||||
}{
|
||||
{[]byte(""), []byte("a"), 1, []byte("a"), []uint64{regions[0]}},
|
||||
{[]byte("a"), []byte("b1"), 2, []byte("c"), []uint64{regions[1], regions[2]}},
|
||||
{[]byte("a1"), []byte("d"), 2, []byte("c"), []uint64{regions[1], regions[2]}},
|
||||
{[]byte("c"), []byte("c1"), 2, nil, []uint64{regions[3]}},
|
||||
{[]byte("d"), nil, 2, nil, []uint64{regions[4]}},
|
||||
}
|
||||
|
||||
key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("a"), 2)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(key, DeepEquals, []byte("c"))
|
||||
|
||||
key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("a1"), 2)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(key, DeepEquals, []byte("c"))
|
||||
|
||||
key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("c"), 2)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(key), Equals, 0)
|
||||
|
||||
key, err = s.cache.BatchLoadRegionsFromKey(s.bo, []byte("d"), 2)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(key), Equals, 0)
|
||||
for _, tc := range testCases {
|
||||
key, err := s.cache.BatchLoadRegionsFromKey(s.bo, tc.startKey, tc.limit)
|
||||
c.Assert(err, IsNil)
|
||||
if tc.expectKey != nil {
|
||||
c.Assert(key, DeepEquals, tc.expectKey)
|
||||
} else {
|
||||
c.Assert(key, HasLen, 0)
|
||||
}
|
||||
loadRegions, err := s.cache.BatchLoadRegionsWithKeyRange(s.bo, tc.startKey, tc.endKey, tc.limit)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(loadRegions, HasLen, len(tc.expectRegions))
|
||||
for i := range loadRegions {
|
||||
c.Assert(loadRegions[i].GetID(), Equals, tc.expectRegions[i])
|
||||
}
|
||||
}
|
||||
|
||||
s.checkCache(c, len(regions))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user