store/tikv: update tests with mocktikv (#1216)

* store/tikv: update RegionCache test with mocktikv

* store/tikv: update coprocessor tests with mocktikv

* store/tikv: update scan test with mocktikv
This commit is contained in:
disksing
2016-05-10 17:26:13 +08:00
parent f6e7a331d0
commit 91a2eb1135
8 changed files with 211 additions and 517 deletions

View File

@ -16,6 +16,7 @@ package tikv
import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/mock-tikv"
)
type testCoprocessorSuite struct{}
@ -23,90 +24,82 @@ type testCoprocessorSuite struct{}
var _ = Suite(&testCoprocessorSuite{})
func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
pd := newMockPDClient()
cache := NewRegionCache(pd)
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 1 -> <- 2 -> <- 3 -> <- 4 ->
pd.setStore(100, "addr100")
pd.setRegion(1, nil, []byte("g"), []uint64{100})
pd.setRegion(2, []byte("g"), []byte("n"), []uint64{100})
pd.setRegion(3, []byte("n"), []byte("t"), []uint64{100})
pd.setRegion(4, []byte("t"), nil, []uint64{100})
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
cluster := mocktikv.NewCluster()
_, regionIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "c"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], 1, "a", "c")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], 2, "g", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("m", "n"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], 2, "m", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "k"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], 1, "a", "g")
s.taskEqual(c, tasks[1], 2, "g", "k")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "x"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], 1, "a", "g")
s.taskEqual(c, tasks[1], 2, "g", "n")
s.taskEqual(c, tasks[2], 3, "n", "t")
s.taskEqual(c, tasks[3], 4, "t", "x")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "b", "c"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], 1, "a", "b", "b", "c")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "e", "f"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], 1, "a", "b", "e", "f")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n", "o", "p"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], 2, "g", "n")
s.taskEqual(c, tasks[1], 3, "o", "p")
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("h", "k", "m", "p"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], 2, "h", "k", "m", "n")
s.taskEqual(c, tasks[1], 3, "n", "p")
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")
}
func (s *testCoprocessorSuite) TestRebuild(c *C) {
pd := newMockPDClient()
cache := NewRegionCache(pd)
// nil --- 'm' --- nil
// <- 1 -> <- 2 ->
pd.setStore(100, "addr100")
pd.setRegion(1, nil, []byte("m"), []uint64{100})
pd.setRegion(2, []byte("m"), nil, []uint64{100})
// <- 0 -> <- 1 ->
cluster := mocktikv.NewCluster()
storeID, regionIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("m"))
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "z"), false)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], 1, "a", "m")
s.taskEqual(c, tasks[1], 2, "m", "z")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
s.taskEqual(c, tasks[1], regionIDs[1], "m", "z")
// nil -- 'm' -- 'q' -- nil
// <- 1 -> <--2-> <-3-->
pd.setRegion(2, []byte("m"), []byte("q"), []uint64{100})
pd.setRegion(3, []byte("q"), nil, []uint64{100})
cache.DropRegion(2)
// <- 0 -> <--1-> <-2-->
regionIDs = append(regionIDs, cluster.AllocID())
cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), storeID)
cache.DropRegion(regionIDs[1])
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true)
c.Assert(err, IsNil)
@ -122,9 +115,9 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
err = iter.rebuildCurrentTask(iter.tasks[0])
c.Assert(err, IsNil)
c.Assert(iter.tasks, HasLen, 3)
s.taskEqual(c, iter.tasks[2], 1, "a", "m")
s.taskEqual(c, iter.tasks[1], 2, "m", "q")
s.taskEqual(c, iter.tasks[0], 3, "q", "z")
s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m")
s.taskEqual(c, iter.tasks[1], regionIDs[1], "m", "q")
s.taskEqual(c, iter.tasks[0], regionIDs[2], "q", "z")
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true)
iter = &copIterator{
@ -139,9 +132,9 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
err = iter.rebuildCurrentTask(iter.tasks[2])
c.Assert(err, IsNil)
c.Assert(iter.tasks, HasLen, 3)
s.taskEqual(c, iter.tasks[2], 1, "a", "m")
s.taskEqual(c, iter.tasks[1], 2, "m", "q")
s.taskEqual(c, iter.tasks[0], 3, "q", "z")
s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m")
s.taskEqual(c, iter.tasks[1], regionIDs[1], "m", "q")
s.taskEqual(c, iter.tasks[0], regionIDs[2], "q", "z")
}
func (s *testCoprocessorSuite) buildKeyRanges(keys ...string) []kv.KeyRange {

View File

@ -1,184 +0,0 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
import (
"fmt"
"log"
"time"
"github.com/gogo/protobuf/proto"
"github.com/juju/errors"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/coprocessor"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
)
type testKvSuite struct {
pd *mockPDClient
cache *RegionCache
store *tikvStore
client *stubClient
}
var _ = Suite(&testKvSuite{})
func (s *testKvSuite) SetUpTest(c *C) {
s.pd = newMockPDClient()
s.cache = NewRegionCache(s.pd)
s.client = newStubClient(c)
s.pd.setStore(1, "addr1")
s.pd.setStore(2, "addr2")
s.pd.setRegion(3, nil, nil, []uint64{1, 2})
s.store = newMockStore(s.cache)
s.store.clients["addr1"] = s.client
s.store.clients["addr2"] = s.client
}
func (s *testKvSuite) beginTxn(c *C) *tikvTxn {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
return txn.(*tikvTxn)
}
func (s *testKvSuite) TestSimpleGetOk(c *C) {
expectReq := &pb.Request{}
expectResp := makeGetResp("value")
s.client.push(expectReq, expectResp)
region, err := s.store.getRegion(nil)
c.Assert(err, IsNil)
resp, err := s.store.SendKVReq(expectReq, region)
c.Assert(expectResp, Equals, resp)
}
func (s *testKvSuite) TestGetOk(c *C) {
txn := s.beginTxn(c)
key, val := "key", "value"
region, err := s.cache.GetRegion([]byte(key))
c.Assert(err, IsNil)
c.Assert(region, NotNil)
expectReq := makeGetReq(region, key, txn.StartTS())
expectResp := makeGetResp(val)
s.client.push(expectReq, expectResp)
value, err := txn.Get([]byte(key))
c.Assert(err, IsNil)
c.Assert(value, BytesEquals, []byte(val))
}
// stubClient is used for simulating sending Request to TiKV and receiving Response.
type stubClient struct {
// q is a message queue, it will be pushed by test cases as they expected,
// and popped when calling SendKVReq/SendCopReq once.
// It maybe pushed multiple <req, resp> before calling some function.
// e.g.: Commit(), Get() but key is locked.
q *queueMsg
c *C
}
func newStubClient(c *C) *stubClient {
return &stubClient{
q: &queueMsg{
data: make([]*msgPair, 0),
idx: 0,
},
c: c,
}
}
func (cli *stubClient) push(req *pb.Request, resp *pb.Response) {
cli.q.Push(&msgPair{req: req, resp: resp})
}
func (cli *stubClient) SendKVReq(req *pb.Request) (*pb.Response, error) {
msg, err := cli.q.Pop()
if err != nil {
return nil, errors.Trace(err)
}
if !cli.c.Check(msg.req, DeepEquals, req) {
// HELP: cli.c.Assert can't fatal here.
log.Fatalf("requests don't mismatch.\nexp[%s]\ngot[%s]", msg.req, req)
}
return msg.resp, nil
}
// TODO: implements this.
func (cli *stubClient) SendCopReq(req *coprocessor.Request) (*coprocessor.Response, error) {
return nil, nil
}
// Close clear message queue.
func (cli *stubClient) Close() error {
cli.q = nil
return nil
}
type msgPair struct {
req *pb.Request
resp *pb.Response
}
// queueMsg is a simple queue that contains <Request, Response> pair.
type queueMsg struct {
data []*msgPair
idx int
}
func (q *queueMsg) Push(x *msgPair) {
q.data = append(q.data, x)
}
func (q *queueMsg) Pop() (*msgPair, error) {
if q.Empty() {
return nil, errors.New("queue is empty")
}
ret := q.data[q.idx]
q.idx++
return ret, nil
}
func (q *queueMsg) Empty() bool {
return q.idx >= len(q.data)
}
func newMockStore(cache *RegionCache) *tikvStore {
return &tikvStore{
uuid: fmt.Sprintf("kv_%d", time.Now().Unix()),
oracle: oracles.NewLocalOracle(),
clients: make(map[string]Client),
regionCache: cache,
}
}
func makeGetReq(region *Region, key string, ver uint64) *pb.Request {
return &pb.Request{
Type: pb.MessageType_CmdGet.Enum(),
CmdGetReq: &pb.CmdGetRequest{
Key: []byte(key),
Version: proto.Uint64(ver),
},
Context: region.GetContext(),
}
}
func makeGetResp(value string) *pb.Response {
return &pb.Response{
Type: pb.MessageType_CmdGet.Enum(),
CmdGetResp: &pb.CmdGetResponse{
Value: []byte(value),
},
}
}

View File

@ -187,6 +187,15 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, leaderStoreID
c.regions[newRegionID] = newRegion
}
// Merge merges 2 Regions, their key ranges should be adjacent.
func (c *Cluster) Merge(regionID1, regionID2 uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.regions[regionID1].merge(c.regions[regionID2].meta.GetEndKey())
delete(c.regions, regionID2)
}
// Region is the Region meta data.
type Region struct {
meta *metapb.Region
@ -233,6 +242,11 @@ func (r *Region) split(newRegionID uint64, key []byte, leaderStoreID uint64) *Re
return region
}
func (r *Region) merge(endKey []byte) {
r.meta.EndKey = endKey
r.incVersion()
}
func (r *Region) updateKeyRange(start, end []byte) {
r.meta.StartKey = start
r.meta.EndKey = end
@ -242,13 +256,13 @@ func (r *Region) updateKeyRange(start, end []byte) {
func (r *Region) incConfVer() {
r.meta.RegionEpoch = &metapb.RegionEpoch{
ConfVer: proto.Uint64(r.meta.GetRegionEpoch().GetConfVer() + 1),
Version: r.meta.GetRegionEpoch().Version,
Version: proto.Uint64(r.meta.GetRegionEpoch().GetVersion()),
}
}
func (r *Region) incVersion() {
r.meta.RegionEpoch = &metapb.RegionEpoch{
ConfVer: r.meta.GetRegionEpoch().ConfVer,
ConfVer: proto.Uint64(r.meta.GetRegionEpoch().GetConfVer()),
Version: proto.Uint64(r.meta.GetRegionEpoch().GetVersion() + 1),
}
}

View File

@ -16,9 +16,33 @@ package mocktikv
import "fmt"
// BootstrapWithSingleStore initializes a Cluster with 1 Region and 1 Store.
func BootstrapWithSingleStore(cluster *Cluster) {
func BootstrapWithSingleStore(cluster *Cluster) (storeID, regionID uint64) {
ids := cluster.AllocIDs(2)
storeID, regionID := ids[0], ids[1]
storeID, regionID = ids[0], ids[1]
cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID))
cluster.Bootstrap(regionID, []uint64{storeID}, storeID)
return
}
// BootstrapWithMultiStores initializes a Cluster with 1 Region and n Stores.
func BootstrapWithMultiStores(cluster *Cluster, n int) (storeIDs []uint64, regionID uint64, leaderStore uint64) {
ids := cluster.AllocIDs(n + 1)
regionID, leaderStore, storeIDs = ids[0], ids[1], ids[1:]
for _, storeID := range storeIDs {
cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID))
}
cluster.Bootstrap(regionID, storeIDs, leaderStore)
return
}
// BootstrapWithMultiRegions initializes a Cluster with multiple Regions and 1
// Store. The number of Regions will be len(splitKeys) + 1.
func BootstrapWithMultiRegions(cluster *Cluster, splitKeys ...[]byte) (storeID uint64, regionIDs []uint64) {
var firstRegionID uint64
storeID, firstRegionID = BootstrapWithSingleStore(cluster)
regionIDs = append([]uint64{firstRegionID}, cluster.AllocIDs(len(splitKeys))...)
for i, k := range splitKeys {
cluster.Split(regionIDs[i], regionIDs[i+1], k, storeID)
}
return
}

View File

@ -17,6 +17,7 @@ import (
"sync"
"time"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pd-client"
)
@ -55,11 +56,19 @@ func (c *pdClient) GetTS() (int64, int64, error) {
}
func (c *pdClient) GetRegion(key []byte) (*metapb.Region, error) {
return c.cluster.GetRegionByKey(key), nil
region := c.cluster.GetRegionByKey(key)
if region == nil {
return nil, errors.New("not found")
}
return region, nil
}
func (c *pdClient) GetStore(storeID uint64) (*metapb.Store, error) {
return c.cluster.GetStore(storeID), nil
store := c.cluster.GetStore(storeID)
if store == nil {
return nil, errors.New("not found")
}
return store, nil
}
func (c *pdClient) Close() {

View File

@ -14,67 +14,46 @@
package tikv
import (
"bytes"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/store/tikv/mock-tikv"
)
type testRegionCacheSuite struct {
pd *mockPDClient
cache *RegionCache
cluster *mocktikv.Cluster
store1 uint64
store2 uint64
region1 uint64
cache *RegionCache
}
var _ = Suite(&testRegionCacheSuite{})
func (s *testRegionCacheSuite) SetUpTest(c *C) {
s.pd = newMockPDClient()
s.cache = NewRegionCache(s.pd)
s.cluster = mocktikv.NewCluster()
storeIDs, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2)
s.region1 = regionID
s.store1 = storeIDs[0]
s.store2 = storeIDs[1]
s.cache = NewRegionCache(mocktikv.NewPDClient(s.cluster))
}
s.pd.setStore(1, "addr1")
s.pd.setStore(2, "addr2")
s.pd.setRegion(3, nil, nil, []uint64{1, 2})
func (s *testRegionCacheSuite) storeAddr(id uint64) string {
return fmt.Sprintf("store%d", id)
}
func (s *testRegionCacheSuite) TestSimple(c *C) {
r, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetAddress(), Equals, "addr1")
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
c.Assert(s.cache.regions, HasLen, 1)
}
func (s *testRegionCacheSuite) TestDropRegion(c *C) {
s.cache.GetRegion([]byte("a"))
// remove from pd
s.pd.removeRegion(3)
// read from cache
r, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetAddress(), Equals, "addr1")
c.Assert(s.cache.regions, HasLen, 1)
// drop from cache
s.cache.DropRegion(3)
r, err = s.cache.GetRegion([]byte("a"))
c.Assert(r, IsNil)
c.Assert(err, NotNil)
c.Assert(s.cache.regions, HasLen, 0)
}
func (s *testRegionCacheSuite) TestDropRegion2(c *C) {
// remove store 1 from PD, should cause an error in `loadRegion()`
// Why 1? Because GetRegion will pick 1 "randomly".
s.pd.removeStore(1)
func (s *testRegionCacheSuite) TestDropStore(c *C) {
s.cluster.RemoveStore(s.store1)
r, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, NotNil)
c.Assert(r, IsNil)
@ -84,59 +63,84 @@ func (s *testRegionCacheSuite) TestDropRegion2(c *C) {
func (s *testRegionCacheSuite) TestUpdateLeader(c *C) {
s.cache.GetRegion([]byte("a"))
// tikv-server reports `NotLeader`
s.cache.UpdateLeader(3, 2)
s.cache.UpdateLeader(s.region1, s.store2)
r, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.curStoreIdx, Equals, 1)
c.Assert(r.GetAddress(), Equals, "addr2")
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store2))
}
func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
s.cache.GetRegion([]byte("a"))
// store4 becomes leader
s.pd.setStore(4, "addr4")
s.pd.setRegion(3, nil, nil, []uint64{4, 1, 2})
// new store3 becomes leader
store3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3)
s.cluster.ChangeLeader(s.region1, store3)
// tikv-server reports `NotLeader`
s.cache.UpdateLeader(3, 4)
s.cache.UpdateLeader(s.region1, store3)
// Store3 does not exist in cache, causes a reload from PD.
r, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.curStoreIdx, Equals, 0)
c.Assert(r.GetAddress(), Equals, "addr4")
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
// tikv-server reports `NotLeader` again.
s.cache.UpdateLeader(s.region1, store3)
r, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.curStoreIdx, Equals, 2)
c.Assert(r.GetAddress(), Equals, s.storeAddr(store3))
}
func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) {
s.cache.GetRegion([]byte("a"))
// store2 becomes leader
s.pd.setRegion(3, nil, nil, []uint64{2, 1})
// store2 gone, store4 becomes leader
s.pd.removeStore(2)
s.pd.setStore(4, "addr4")
s.cluster.ChangeLeader(s.region1, s.store2)
// store2 gone, store3 becomes leader
s.cluster.RemoveStore(s.store2)
store3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3)
s.cluster.ChangeLeader(s.region1, store3)
// tikv-server reports `NotLeader`(store2 is the leader)
s.cache.UpdateLeader(3, 2)
s.cache.UpdateLeader(s.region1, s.store2)
s.pd.setRegion(3, nil, nil, []uint64{4, 1})
// Store2 does not exist any more, causes a reload from PD.
r, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetAddress(), Equals, "addr4")
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.curStoreIdx, Equals, 0)
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
// tikv-server reports `NotLeader` again.
s.cache.UpdateLeader(s.region1, store3)
r, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.curStoreIdx, Equals, 2)
c.Assert(r.GetAddress(), Equals, s.storeAddr(store3))
}
func (s *testRegionCacheSuite) TestSplit(c *C) {
r, err := s.cache.GetRegion([]byte("x"))
c.Assert(err, IsNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetAddress(), Equals, "addr1")
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
// split to ['' - 'm' - 'z']
s.pd.setRegion(3, nil, []byte("m"), []uint64{1})
s.pd.setRegion(4, []byte("m"), nil, []uint64{2})
region2 := s.cluster.AllocID()
s.cluster.Split(s.region1, region2, []byte("m"), s.store1)
// tikv-server reports `NotInRegion`
s.cache.DropRegion(r.GetID())
@ -144,24 +148,22 @@ func (s *testRegionCacheSuite) TestSplit(c *C) {
r, err = s.cache.GetRegion([]byte("x"))
c.Assert(err, IsNil)
c.Assert(r.GetID(), Equals, uint64(4))
c.Assert(r.GetAddress(), Equals, "addr2")
c.Assert(r.GetID(), Equals, region2)
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
c.Assert(s.cache.regions, HasLen, 1)
}
func (s *testRegionCacheSuite) TestMerge(c *C) {
// ['' - 'm' - 'z']
s.pd.setRegion(3, nil, []byte("m"), []uint64{1})
s.pd.setRegion(4, []byte("m"), nil, []uint64{2})
region2 := s.cluster.AllocID()
s.cluster.Split(s.region1, region2, []byte("m"), s.store2)
r, err := s.cache.GetRegion([]byte("x"))
c.Assert(err, IsNil)
c.Assert(r.GetID(), Equals, uint64(4))
c.Assert(r.GetAddress(), Equals, "addr2")
c.Assert(r.GetID(), Equals, region2)
// merge to single region
s.pd.removeRegion(4)
s.pd.setRegion(3, nil, nil, []uint64{1, 2})
s.cluster.Merge(s.region1, region2)
// tikv-server reports `NotInRegion`
s.cache.DropRegion(r.GetID())
@ -169,8 +171,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) {
r, err = s.cache.GetRegion([]byte("x"))
c.Assert(err, IsNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetAddress(), Equals, "addr1")
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(s.cache.regions, HasLen, 1)
}
@ -178,13 +179,13 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) {
s.cache.GetRegion([]byte("a"))
// connect tikv-server failed, cause drop cache
s.cache.DropRegion(3)
s.cache.DropRegion(s.region1)
r, err := s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r.GetID(), Equals, uint64(3))
c.Assert(r.GetAddress(), Equals, "addr1")
c.Assert(r.GetID(), Equals, s.region1)
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
c.Assert(s.cache.regions, HasLen, 1)
}
@ -193,90 +194,14 @@ func (s *testRegionCacheSuite) TestNextStore(c *C) {
c.Assert(err, IsNil)
c.Assert(region.curStoreIdx, Equals, 0)
s.cache.NextStore(3)
s.cache.NextStore(s.region1)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(region.curStoreIdx, Equals, 1)
s.cache.NextStore(3)
s.cache.NextStore(s.region1)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
// Out of range of Stores, so get Region again and pick Stores[0] as leader.
c.Assert(region.curStoreIdx, Equals, 0)
s.pd.removeRegion(3)
// regionCache still has more Stores, so pick next store.
s.cache.NextStore(3)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, IsNil)
c.Assert(region.curStoreIdx, Equals, 1)
// region 3 is removed so can't get Region from pd.
s.cache.NextStore(3)
region, err = s.cache.GetRegion([]byte("a"))
c.Assert(err, NotNil)
c.Assert(region, IsNil)
}
type mockPDClient struct {
ts int64
stores map[uint64]*metapb.Store
regions map[uint64]*metapb.Region
}
func newMockPDClient() *mockPDClient {
return &mockPDClient{
stores: make(map[uint64]*metapb.Store),
regions: make(map[uint64]*metapb.Region),
}
}
func (c *mockPDClient) setStore(id uint64, addr string) {
c.stores[id] = &metapb.Store{
Id: proto.Uint64(id),
Address: proto.String(addr),
}
}
func (c *mockPDClient) removeStore(id uint64) {
delete(c.stores, id)
}
func (c *mockPDClient) setRegion(id uint64, startKey, endKey []byte, stores []uint64) {
c.regions[id] = &metapb.Region{
Id: proto.Uint64(id),
StartKey: startKey,
EndKey: endKey,
StoreIds: stores,
}
}
func (c *mockPDClient) removeRegion(id uint64) {
delete(c.regions, id)
}
func (c *mockPDClient) GetTS() (int64, int64, error) {
c.ts++
return c.ts, 0, nil
}
func (c *mockPDClient) GetRegion(key []byte) (*metapb.Region, error) {
for _, r := range c.regions {
if bytes.Compare(r.GetStartKey(), key) <= 0 &&
(bytes.Compare(key, r.GetEndKey()) < 0 || len(r.GetEndKey()) == 0) {
return r, nil
}
}
return nil, errors.Errorf("region not found for key %q", key)
}
func (c *mockPDClient) GetStore(storeID uint64) (*metapb.Store, error) {
s, ok := c.stores[storeID]
if !ok {
return nil, errors.Errorf("store %d not found", storeID)
}
return s, nil
}
func (c *mockPDClient) Close() {}

View File

@ -14,103 +14,39 @@
package tikv
import (
"github.com/gogo/protobuf/proto"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/errorpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/mock-tikv"
)
type testScanMockSuite struct {
pd *mockPDClient
cache *RegionCache
store *tikvStore
client *stubClient
}
var _ = Suite(&testScanMockSuite{})
func (s *testScanMockSuite) SetUpTest(c *C) {
s.pd = newMockPDClient()
s.cache = NewRegionCache(s.pd)
s.client = newStubClient(c)
func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
store, cluster := createMockStoreCluster()
mocktikv.BootstrapWithMultiRegions(cluster, []byte("a"), []byte("m"))
s.pd.setStore(1, "addr1")
s.pd.setStore(2, "addr2")
// ["", "a"), ["a", "m"), ["m", nil)
s.pd.setRegion(3, nil, []byte("a"), []uint64{1, 2})
s.pd.setRegion(4, []byte("a"), []byte("m"), []uint64{1, 2})
s.pd.setRegion(5, []byte("m"), nil, []uint64{1, 2})
s.store = newMockStore(s.cache)
s.store.clients["addr1"] = s.client
s.store.clients["addr2"] = s.client
}
func (s *testScanMockSuite) beginTxn(c *C) (*tikvTxn, *tikvSnapshot) {
transaction, err := s.store.Begin()
txn, err := store.Begin()
c.Assert(err, IsNil)
txn := transaction.(*tikvTxn)
kvSnapshot, err := txn.store.GetSnapshot(kv.Version{Ver: txn.StartTS()})
for ch := byte('a'); ch <= byte('z'); ch++ {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
err = txn.Commit()
c.Assert(err, IsNil)
c.Assert(kvSnapshot, NotNil)
snapshot := kvSnapshot.(*tikvSnapshot)
return txn, snapshot
}
func (s *testScanMockSuite) getRegion(c *C, startKey []byte) *requestRegion {
region, err := s.store.getRegion(startKey)
txn, err = store.Begin()
c.Assert(err, IsNil)
c.Assert(region, NotNil)
return region
}
func (s *testScanMockSuite) TestStaleRegionEpoch(c *C) {
const batchSize = 10
txn, snapshot := s.beginTxn(c)
region := s.getRegion(c, []byte("a"))
expectReq := makeScanReq(region, "a", batchSize, txn.StartTS())
expectResp := makeScanStaleEpochResp()
s.client.push(expectReq, expectResp)
_, err := newScanner(region, []byte("a"), txn.StartTS(), *snapshot, batchSize)
c.Assert(err, NotNil)
}
func (s *testScanMockSuite) TestScanMultiRegions(c *C) {
const batchSize = 10
startKey := []byte("a")
txn, snapshot := s.beginTxn(c)
region4 := s.getRegion(c, startKey)
expectReq := makeScanReq(region4, "a", batchSize, txn.StartTS())
// Assume kv has key r4["a","l"] + r5["m","q"].
// Making first request & response, response returns keys whose length is batchSize.
// That is "a"-"j".
res := makeRangeKvs('a', 'a'+batchSize-1)
expectResp := makeScanOkResp(res)
s.client.push(expectReq, expectResp)
// Making second request & response, response returns the rest keys of region4.
// That is "j"-"l" (total 3, "j" will be skipped).
res2 := makeRangeKvs('a'+batchSize-1, 'l')
expectReq2 := makeScanReq(region4, res2[0].key, batchSize, txn.StartTS())
expectResp2 := makeScanOkResp(res2)
s.client.push(expectReq2, expectResp2)
// Making third request & response, response returns all keys of region5.
// That is "m"-"q" (total 5).
region5 := s.getRegion(c, []byte("m"))
res3 := makeRangeKvs('m', 'q')
expectReq3 := makeScanReq(region5, res3[0].key, batchSize, txn.StartTS())
expectResp3 := makeScanOkResp(res3)
s.client.push(expectReq3, expectResp3)
// Making last request & response, response return empty and scanner will Close().
expectReq4 := makeScanReq(region5, res3[len(res3)-1].key, batchSize, txn.StartTS())
expectResp4 := makeScanOkResp([]*KvPair{})
s.client.push(expectReq4, expectResp4)
scanner, err := newScanner(region4, startKey, txn.StartTS(), *snapshot, batchSize)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
region, err := store.getRegion(nil)
c.Assert(err, IsNil)
for k := 'a'; k <= 'q'; k++ {
c.Assert([]byte(string(k)), BytesEquals, []byte(scanner.Key()))
if k < 'q' {
scanner, err := newScanner(region, []byte("a"), txn.StartTS(), *snapshot, 10)
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key()))
if ch < byte('z') {
c.Assert(scanner.Next(), IsNil)
}
}
@ -118,55 +54,26 @@ func (s *testScanMockSuite) TestScanMultiRegions(c *C) {
c.Assert(scanner.Valid(), IsFalse)
}
func makeScanReq(region *requestRegion, key string, limit uint32, ver uint64) *pb.Request {
return &pb.Request{
Type: pb.MessageType_CmdScan.Enum(),
CmdScanReq: &pb.CmdScanRequest{
StartKey: []byte(key),
Limit: proto.Uint32(limit),
Version: proto.Uint64(ver),
},
Context: region.GetContext(),
}
}
func (s *testScanMockSuite) TestStaleRegionEpoch(c *C) {
store, cluster := createMockStoreCluster()
storeID, regionID := mocktikv.BootstrapWithSingleStore(cluster)
func makeScanOkResp(res []*KvPair) *pb.Response {
var pairs []*pb.KvPair
for _, kv := range res {
pairs = append(pairs, &pb.KvPair{
Key: []byte(kv.key),
Value: []byte(kv.value),
})
txn, err := store.Begin()
c.Assert(err, IsNil)
for ch := byte('a'); ch <= byte('z'); ch++ {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
return &pb.Response{
Type: pb.MessageType_CmdScan.Enum(),
CmdScanResp: &pb.CmdScanResponse{
Pairs: pairs,
},
}
}
err = txn.Commit()
c.Assert(err, IsNil)
func makeScanStaleEpochResp() *pb.Response {
return &pb.Response{
Type: pb.MessageType_CmdScan.Enum(),
RegionError: &errorpb.Error{
Message: proto.String("stale epoch"),
StaleEpoch: &errorpb.StaleEpoch{},
},
}
}
func makeRangeKvs(startKey, endKey rune) []*KvPair {
var res []*KvPair
for key := startKey; key <= endKey; key++ {
// Just set value = key for easy testing.
res = append(res, &KvPair{key: string(key), value: string(key)})
}
return res
txn, err = store.Begin()
c.Assert(err, IsNil)
snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()})
region, err := store.getRegion(nil)
c.Assert(err, IsNil)
}
type KvPair struct {
key string
value string
cluster.Split(regionID, cluster.AllocID(), []byte("m"), storeID)
_, err = newScanner(region, []byte("a"), txn.StartTS(), *snapshot, 10)
c.Assert(err, NotNil)
}

View File

@ -41,11 +41,17 @@ func newTestStore(c *C) *tikvStore {
c.Assert(err, IsNil)
return store.(*tikvStore)
}
cluster := mocktikv.NewCluster()
store, cluster := createMockStoreCluster()
mocktikv.BootstrapWithSingleStore(cluster)
return store
}
func createMockStoreCluster() (*tikvStore, *mocktikv.Cluster) {
cluster := mocktikv.NewCluster()
mvccStore := mocktikv.NewMvccStore()
clientFactory := mockClientFactory(cluster, mvccStore)
return newTikvStore("mock-tikv-store", mocktikv.NewPDClient(cluster), clientFactory)
store := newTikvStore("mock-tikv-store", mocktikv.NewPDClient(cluster), clientFactory)
return store, cluster
}
func mockClientFactory(cluster *mocktikv.Cluster, mvccStore *mocktikv.MvccStore) ClientFactory {