coprocessor: fix wrong cop task range for tiflash. (#13292)
This commit is contained in:
committed by
pingcap-github-bot
parent
5737729493
commit
f3c8abb2e2
@ -30,12 +30,15 @@ import (
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/coprocessor"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/distsql"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/metrics"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
"github.com/pingcap/tidb/util/execdetails"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/memory"
|
||||
"github.com/pingcap/tidb/util/ranger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -218,6 +221,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
|
||||
if req.Streaming {
|
||||
cmdType = tikvrpc.CmdCopStream
|
||||
}
|
||||
var tableStart, tableEnd kv.Key
|
||||
if req.StoreType == kv.TiFlash {
|
||||
tableID := tablecodec.DecodeTableID(ranges.at(0).StartKey)
|
||||
fullRange := ranger.FullIntRange(false)
|
||||
keyRange := distsql.TableRangesToKVRanges(tableID, fullRange, nil)
|
||||
tableStart, tableEnd = keyRange[0].StartKey, keyRange[0].EndKey
|
||||
}
|
||||
|
||||
var tasks []*copTask
|
||||
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
|
||||
@ -239,7 +249,14 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
|
||||
i = nextI
|
||||
}
|
||||
} else if req.StoreType == kv.TiFlash {
|
||||
fullRange := kv.KeyRange{StartKey: regionWithRangeInfo.StartKey, EndKey: regionWithRangeInfo.EndKey}
|
||||
left, right := regionWithRangeInfo.StartKey, regionWithRangeInfo.EndKey
|
||||
if bytes.Compare(tableStart, left) >= 0 {
|
||||
left = tableStart
|
||||
}
|
||||
if bytes.Compare(tableEnd, right) <= 0 || len(right) == 0 {
|
||||
right = tableEnd
|
||||
}
|
||||
fullRange := kv.KeyRange{StartKey: left, EndKey: right}
|
||||
tasks = append(tasks, &copTask{
|
||||
region: regionWithRangeInfo.Region,
|
||||
// TiFlash only support full range scan for the region, ignore the real ranges
|
||||
|
||||
@ -50,7 +50,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -60,7 +60,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -70,7 +70,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -81,8 +81,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 2)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
|
||||
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
|
||||
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -95,10 +95,10 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 4)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
|
||||
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
|
||||
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
|
||||
s.taskEqual(c, tasks[3], regionIDs[3], "t", "")
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
|
||||
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
|
||||
s.taskEqual(c, tasks[2], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
|
||||
s.taskEqual(c, tasks[3], regionIDs[3], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t\x80\x00\x00\x00\x00\x00\x00\x00_r\xff\xff\xff\xff\xff\xff\xff\xff\x00")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -108,7 +108,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -118,7 +118,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -129,8 +129,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 2)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
|
||||
s.taskEqual(c, tasks[1], regionIDs[2], "n", "t")
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
|
||||
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
|
||||
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
|
||||
c.Assert(err, IsNil)
|
||||
@ -141,8 +141,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 2)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
|
||||
s.taskEqual(c, tasks[1], regionIDs[2], "n", "t")
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
|
||||
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
|
||||
}
|
||||
|
||||
func (s *testCoprocessorSuite) TestSplitRegionRanges(c *C) {
|
||||
|
||||
Reference in New Issue
Block a user