diff --git a/store/tikv/coprocessor_test.go b/store/tikv/coprocessor_test.go index 1f932d1ef9..53f1cf0967 100644 --- a/store/tikv/coprocessor_test.go +++ b/store/tikv/coprocessor_test.go @@ -16,6 +16,7 @@ package tikv import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv/mock-tikv" ) type testCoprocessorSuite struct{} @@ -23,90 +24,82 @@ type testCoprocessorSuite struct{} var _ = Suite(&testCoprocessorSuite{}) func (s *testCoprocessorSuite) TestBuildTasks(c *C) { - pd := newMockPDClient() - cache := NewRegionCache(pd) - // nil --- 'g' --- 'n' --- 't' --- nil - // <- 1 -> <- 2 -> <- 3 -> <- 4 -> - pd.setStore(100, "addr100") - pd.setRegion(1, nil, []byte("g"), []uint64{100}) - pd.setRegion(2, []byte("g"), []byte("n"), []uint64{100}) - pd.setRegion(3, []byte("n"), []byte("t"), []uint64{100}) - pd.setRegion(4, []byte("t"), nil, []uint64{100}) + // <- 0 -> <- 1 -> <- 2 -> <- 3 -> + cluster := mocktikv.NewCluster() + _, regionIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + cache := NewRegionCache(mocktikv.NewPDClient(cluster)) tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "c"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) - s.taskEqual(c, tasks[0], 1, "a", "c") + s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) - s.taskEqual(c, tasks[0], 2, "g", "n") + s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") tasks, err = buildCopTasks(cache, s.buildKeyRanges("m", "n"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) - s.taskEqual(c, tasks[0], 2, "m", "n") + s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "k"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) - s.taskEqual(c, tasks[0], 1, "a", "g") - s.taskEqual(c, tasks[1], 2, "g", "k") + s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") + s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "x"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) - s.taskEqual(c, tasks[0], 1, "a", "g") - s.taskEqual(c, tasks[1], 2, "g", "n") - s.taskEqual(c, tasks[2], 3, "n", "t") - s.taskEqual(c, tasks[3], 4, "t", "x") + s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") + s.taskEqual(c, tasks[1], regionIDs[1], "g", "n") + s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") + s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "b", "c"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) - s.taskEqual(c, tasks[0], 1, "a", "b", "b", "c") + s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "e", "f"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) - s.taskEqual(c, tasks[0], 1, "a", "b", "e", "f") + s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n", "o", "p"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) - s.taskEqual(c, tasks[0], 2, "g", "n") - s.taskEqual(c, tasks[1], 3, "o", "p") + s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") + s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") tasks, err = buildCopTasks(cache, s.buildKeyRanges("h", "k", "m", "p"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) - s.taskEqual(c, tasks[0], 2, "h", "k", "m", "n") - s.taskEqual(c, tasks[1], 3, "n", "p") + s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") + s.taskEqual(c, tasks[1], regionIDs[2], "n", "p") } func (s *testCoprocessorSuite) TestRebuild(c *C) { - pd := newMockPDClient() - cache := NewRegionCache(pd) - // nil --- 'm' --- nil - // <- 1 -> <- 2 -> - pd.setStore(100, "addr100") - pd.setRegion(1, nil, []byte("m"), []uint64{100}) - pd.setRegion(2, []byte("m"), nil, []uint64{100}) + // <- 0 -> <- 1 -> + cluster := mocktikv.NewCluster() + storeID, regionIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("m")) + cache := NewRegionCache(mocktikv.NewPDClient(cluster)) tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "z"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) - s.taskEqual(c, tasks[0], 1, "a", "m") - s.taskEqual(c, tasks[1], 2, "m", "z") + s.taskEqual(c, tasks[0], regionIDs[0], "a", "m") + s.taskEqual(c, tasks[1], regionIDs[1], "m", "z") // nil -- 'm' -- 'q' -- nil - // <- 1 -> <--2-> <-3--> - pd.setRegion(2, []byte("m"), []byte("q"), []uint64{100}) - pd.setRegion(3, []byte("q"), nil, []uint64{100}) - cache.DropRegion(2) + // <- 0 -> <--1-> <-2--> + regionIDs = append(regionIDs, cluster.AllocID()) + cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), storeID) + cache.DropRegion(regionIDs[1]) tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true) c.Assert(err, IsNil) @@ -122,9 +115,9 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { err = iter.rebuildCurrentTask(iter.tasks[0]) c.Assert(err, IsNil) c.Assert(iter.tasks, HasLen, 3) - s.taskEqual(c, iter.tasks[2], 1, "a", "m") - s.taskEqual(c, iter.tasks[1], 2, "m", "q") - s.taskEqual(c, iter.tasks[0], 3, "q", "z") + s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m") + s.taskEqual(c, iter.tasks[1], regionIDs[1], "m", "q") + s.taskEqual(c, iter.tasks[0], regionIDs[2], "q", "z") tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true) iter = &copIterator{ @@ -139,9 +132,9 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { err = iter.rebuildCurrentTask(iter.tasks[2]) c.Assert(err, IsNil) c.Assert(iter.tasks, HasLen, 3) - s.taskEqual(c, iter.tasks[2], 1, "a", "m") - s.taskEqual(c, iter.tasks[1], 2, "m", "q") - s.taskEqual(c, iter.tasks[0], 3, "q", "z") + s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m") + s.taskEqual(c, iter.tasks[1], regionIDs[1], "m", "q") + s.taskEqual(c, iter.tasks[0], regionIDs[2], "q", "z") } func (s *testCoprocessorSuite) buildKeyRanges(keys ...string) []kv.KeyRange { diff --git a/store/tikv/kv_test.go b/store/tikv/kv_test.go deleted file mode 100644 index ae2d98cc64..0000000000 --- a/store/tikv/kv_test.go +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2016 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tikv - -import ( - "fmt" - "log" - "time" - - "github.com/gogo/protobuf/proto" - "github.com/juju/errors" - . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/coprocessor" - pb "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/store/tikv/oracle/oracles" -) - -type testKvSuite struct { - pd *mockPDClient - cache *RegionCache - store *tikvStore - client *stubClient -} - -var _ = Suite(&testKvSuite{}) - -func (s *testKvSuite) SetUpTest(c *C) { - s.pd = newMockPDClient() - s.cache = NewRegionCache(s.pd) - s.client = newStubClient(c) - - s.pd.setStore(1, "addr1") - s.pd.setStore(2, "addr2") - s.pd.setRegion(3, nil, nil, []uint64{1, 2}) - - s.store = newMockStore(s.cache) - s.store.clients["addr1"] = s.client - s.store.clients["addr2"] = s.client -} - -func (s *testKvSuite) beginTxn(c *C) *tikvTxn { - txn, err := s.store.Begin() - c.Assert(err, IsNil) - return txn.(*tikvTxn) -} - -func (s *testKvSuite) TestSimpleGetOk(c *C) { - expectReq := &pb.Request{} - expectResp := makeGetResp("value") - s.client.push(expectReq, expectResp) - region, err := s.store.getRegion(nil) - c.Assert(err, IsNil) - resp, err := s.store.SendKVReq(expectReq, region) - c.Assert(expectResp, Equals, resp) -} - -func (s *testKvSuite) TestGetOk(c *C) { - txn := s.beginTxn(c) - key, val := "key", "value" - region, err := s.cache.GetRegion([]byte(key)) - c.Assert(err, IsNil) - c.Assert(region, NotNil) - expectReq := makeGetReq(region, key, txn.StartTS()) - expectResp := makeGetResp(val) - s.client.push(expectReq, expectResp) - value, err := txn.Get([]byte(key)) - c.Assert(err, IsNil) - c.Assert(value, BytesEquals, []byte(val)) -} - -// stubClient is used for simulating sending Request to TiKV and receiving Response. -type stubClient struct { - // q is a message queue, it will be pushed by test cases as they expected, - // and popped when calling SendKVReq/SendCopReq once. - // It maybe pushed multiple before calling some function. - // e.g.: Commit(), Get() but key is locked. - q *queueMsg - c *C -} - -func newStubClient(c *C) *stubClient { - return &stubClient{ - q: &queueMsg{ - data: make([]*msgPair, 0), - idx: 0, - }, - c: c, - } -} - -func (cli *stubClient) push(req *pb.Request, resp *pb.Response) { - cli.q.Push(&msgPair{req: req, resp: resp}) -} - -func (cli *stubClient) SendKVReq(req *pb.Request) (*pb.Response, error) { - msg, err := cli.q.Pop() - if err != nil { - return nil, errors.Trace(err) - } - if !cli.c.Check(msg.req, DeepEquals, req) { - // HELP: cli.c.Assert can't fatal here. - log.Fatalf("requests don't mismatch.\nexp[%s]\ngot[%s]", msg.req, req) - } - return msg.resp, nil -} - -// TODO: implements this. -func (cli *stubClient) SendCopReq(req *coprocessor.Request) (*coprocessor.Response, error) { - return nil, nil -} - -// Close clear message queue. -func (cli *stubClient) Close() error { - cli.q = nil - return nil -} - -type msgPair struct { - req *pb.Request - resp *pb.Response -} - -// queueMsg is a simple queue that contains pair. -type queueMsg struct { - data []*msgPair - idx int -} - -func (q *queueMsg) Push(x *msgPair) { - q.data = append(q.data, x) -} - -func (q *queueMsg) Pop() (*msgPair, error) { - if q.Empty() { - return nil, errors.New("queue is empty") - } - ret := q.data[q.idx] - q.idx++ - return ret, nil -} - -func (q *queueMsg) Empty() bool { - return q.idx >= len(q.data) -} - -func newMockStore(cache *RegionCache) *tikvStore { - return &tikvStore{ - uuid: fmt.Sprintf("kv_%d", time.Now().Unix()), - oracle: oracles.NewLocalOracle(), - clients: make(map[string]Client), - regionCache: cache, - } -} - -func makeGetReq(region *Region, key string, ver uint64) *pb.Request { - return &pb.Request{ - Type: pb.MessageType_CmdGet.Enum(), - CmdGetReq: &pb.CmdGetRequest{ - Key: []byte(key), - Version: proto.Uint64(ver), - }, - Context: region.GetContext(), - } -} - -func makeGetResp(value string) *pb.Response { - return &pb.Response{ - Type: pb.MessageType_CmdGet.Enum(), - CmdGetResp: &pb.CmdGetResponse{ - Value: []byte(value), - }, - } -} diff --git a/store/tikv/mock-tikv/cluster.go b/store/tikv/mock-tikv/cluster.go index 7c535bcc13..5f7b68ef4b 100644 --- a/store/tikv/mock-tikv/cluster.go +++ b/store/tikv/mock-tikv/cluster.go @@ -187,6 +187,15 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, leaderStoreID c.regions[newRegionID] = newRegion } +// Merge merges 2 Regions, their key ranges should be adjacent. +func (c *Cluster) Merge(regionID1, regionID2 uint64) { + c.mu.Lock() + defer c.mu.Unlock() + + c.regions[regionID1].merge(c.regions[regionID2].meta.GetEndKey()) + delete(c.regions, regionID2) +} + // Region is the Region meta data. type Region struct { meta *metapb.Region @@ -233,6 +242,11 @@ func (r *Region) split(newRegionID uint64, key []byte, leaderStoreID uint64) *Re return region } +func (r *Region) merge(endKey []byte) { + r.meta.EndKey = endKey + r.incVersion() +} + func (r *Region) updateKeyRange(start, end []byte) { r.meta.StartKey = start r.meta.EndKey = end @@ -242,13 +256,13 @@ func (r *Region) updateKeyRange(start, end []byte) { func (r *Region) incConfVer() { r.meta.RegionEpoch = &metapb.RegionEpoch{ ConfVer: proto.Uint64(r.meta.GetRegionEpoch().GetConfVer() + 1), - Version: r.meta.GetRegionEpoch().Version, + Version: proto.Uint64(r.meta.GetRegionEpoch().GetVersion()), } } func (r *Region) incVersion() { r.meta.RegionEpoch = &metapb.RegionEpoch{ - ConfVer: r.meta.GetRegionEpoch().ConfVer, + ConfVer: proto.Uint64(r.meta.GetRegionEpoch().GetConfVer()), Version: proto.Uint64(r.meta.GetRegionEpoch().GetVersion() + 1), } } diff --git a/store/tikv/mock-tikv/cluster_manipulate.go b/store/tikv/mock-tikv/cluster_manipulate.go index e061c9a22c..aad243cae7 100644 --- a/store/tikv/mock-tikv/cluster_manipulate.go +++ b/store/tikv/mock-tikv/cluster_manipulate.go @@ -16,9 +16,33 @@ package mocktikv import "fmt" // BootstrapWithSingleStore initializes a Cluster with 1 Region and 1 Store. -func BootstrapWithSingleStore(cluster *Cluster) { +func BootstrapWithSingleStore(cluster *Cluster) (storeID, regionID uint64) { ids := cluster.AllocIDs(2) - storeID, regionID := ids[0], ids[1] + storeID, regionID = ids[0], ids[1] cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID)) cluster.Bootstrap(regionID, []uint64{storeID}, storeID) + return +} + +// BootstrapWithMultiStores initializes a Cluster with 1 Region and n Stores. +func BootstrapWithMultiStores(cluster *Cluster, n int) (storeIDs []uint64, regionID uint64, leaderStore uint64) { + ids := cluster.AllocIDs(n + 1) + regionID, leaderStore, storeIDs = ids[0], ids[1], ids[1:] + for _, storeID := range storeIDs { + cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID)) + } + cluster.Bootstrap(regionID, storeIDs, leaderStore) + return +} + +// BootstrapWithMultiRegions initializes a Cluster with multiple Regions and 1 +// Store. The number of Regions will be len(splitKeys) + 1. +func BootstrapWithMultiRegions(cluster *Cluster, splitKeys ...[]byte) (storeID uint64, regionIDs []uint64) { + var firstRegionID uint64 + storeID, firstRegionID = BootstrapWithSingleStore(cluster) + regionIDs = append([]uint64{firstRegionID}, cluster.AllocIDs(len(splitKeys))...) + for i, k := range splitKeys { + cluster.Split(regionIDs[i], regionIDs[i+1], k, storeID) + } + return } diff --git a/store/tikv/mock-tikv/pd.go b/store/tikv/mock-tikv/pd.go index 5cadca393a..ad9211ce86 100644 --- a/store/tikv/mock-tikv/pd.go +++ b/store/tikv/mock-tikv/pd.go @@ -17,6 +17,7 @@ import ( "sync" "time" + "github.com/juju/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pd-client" ) @@ -55,11 +56,19 @@ func (c *pdClient) GetTS() (int64, int64, error) { } func (c *pdClient) GetRegion(key []byte) (*metapb.Region, error) { - return c.cluster.GetRegionByKey(key), nil + region := c.cluster.GetRegionByKey(key) + if region == nil { + return nil, errors.New("not found") + } + return region, nil } func (c *pdClient) GetStore(storeID uint64) (*metapb.Store, error) { - return c.cluster.GetStore(storeID), nil + store := c.cluster.GetStore(storeID) + if store == nil { + return nil, errors.New("not found") + } + return store, nil } func (c *pdClient) Close() { diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 87bc49fe6b..97acda0ecc 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -14,67 +14,46 @@ package tikv import ( - "bytes" + "fmt" - "github.com/golang/protobuf/proto" - "github.com/juju/errors" . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/store/tikv/mock-tikv" ) type testRegionCacheSuite struct { - pd *mockPDClient - cache *RegionCache + cluster *mocktikv.Cluster + store1 uint64 + store2 uint64 + region1 uint64 + cache *RegionCache } var _ = Suite(&testRegionCacheSuite{}) func (s *testRegionCacheSuite) SetUpTest(c *C) { - s.pd = newMockPDClient() - s.cache = NewRegionCache(s.pd) + s.cluster = mocktikv.NewCluster() + storeIDs, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2) + s.region1 = regionID + s.store1 = storeIDs[0] + s.store2 = storeIDs[1] + s.cache = NewRegionCache(mocktikv.NewPDClient(s.cluster)) +} - s.pd.setStore(1, "addr1") - s.pd.setStore(2, "addr2") - s.pd.setRegion(3, nil, nil, []uint64{1, 2}) +func (s *testRegionCacheSuite) storeAddr(id uint64) string { + return fmt.Sprintf("store%d", id) } func (s *testRegionCacheSuite) TestSimple(c *C) { r, err := s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) - c.Assert(r.GetID(), Equals, uint64(3)) - c.Assert(r.GetAddress(), Equals, "addr1") + c.Assert(r.GetID(), Equals, s.region1) + c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) c.Assert(s.cache.regions, HasLen, 1) } -func (s *testRegionCacheSuite) TestDropRegion(c *C) { - s.cache.GetRegion([]byte("a")) - - // remove from pd - s.pd.removeRegion(3) - - // read from cache - r, err := s.cache.GetRegion([]byte("a")) - c.Assert(err, IsNil) - c.Assert(r, NotNil) - c.Assert(r.GetID(), Equals, uint64(3)) - c.Assert(r.GetAddress(), Equals, "addr1") - c.Assert(s.cache.regions, HasLen, 1) - - // drop from cache - s.cache.DropRegion(3) - - r, err = s.cache.GetRegion([]byte("a")) - c.Assert(r, IsNil) - c.Assert(err, NotNil) - c.Assert(s.cache.regions, HasLen, 0) -} - -func (s *testRegionCacheSuite) TestDropRegion2(c *C) { - // remove store 1 from PD, should cause an error in `loadRegion()` - // Why 1? Because GetRegion will pick 1 "randomly". - s.pd.removeStore(1) - +func (s *testRegionCacheSuite) TestDropStore(c *C) { + s.cluster.RemoveStore(s.store1) r, err := s.cache.GetRegion([]byte("a")) c.Assert(err, NotNil) c.Assert(r, IsNil) @@ -84,59 +63,84 @@ func (s *testRegionCacheSuite) TestDropRegion2(c *C) { func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { s.cache.GetRegion([]byte("a")) // tikv-server reports `NotLeader` - s.cache.UpdateLeader(3, 2) + s.cache.UpdateLeader(s.region1, s.store2) r, err := s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) - c.Assert(r.GetID(), Equals, uint64(3)) + c.Assert(r.GetID(), Equals, s.region1) c.Assert(r.curStoreIdx, Equals, 1) - c.Assert(r.GetAddress(), Equals, "addr2") + c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store2)) } func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { s.cache.GetRegion([]byte("a")) - // store4 becomes leader - s.pd.setStore(4, "addr4") - s.pd.setRegion(3, nil, nil, []uint64{4, 1, 2}) + // new store3 becomes leader + store3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3) + s.cluster.ChangeLeader(s.region1, store3) // tikv-server reports `NotLeader` - s.cache.UpdateLeader(3, 4) + s.cache.UpdateLeader(s.region1, store3) + // Store3 does not exist in cache, causes a reload from PD. r, err := s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) - c.Assert(r.GetID(), Equals, uint64(3)) + c.Assert(r.GetID(), Equals, s.region1) c.Assert(r.curStoreIdx, Equals, 0) - c.Assert(r.GetAddress(), Equals, "addr4") + c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) + + // tikv-server reports `NotLeader` again. + s.cache.UpdateLeader(s.region1, store3) + r, err = s.cache.GetRegion([]byte("a")) + c.Assert(err, IsNil) + c.Assert(r, NotNil) + c.Assert(r.GetID(), Equals, s.region1) + c.Assert(r.curStoreIdx, Equals, 2) + c.Assert(r.GetAddress(), Equals, s.storeAddr(store3)) } func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { s.cache.GetRegion([]byte("a")) // store2 becomes leader - s.pd.setRegion(3, nil, nil, []uint64{2, 1}) - // store2 gone, store4 becomes leader - s.pd.removeStore(2) - s.pd.setStore(4, "addr4") + s.cluster.ChangeLeader(s.region1, s.store2) + // store2 gone, store3 becomes leader + s.cluster.RemoveStore(s.store2) + store3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3) + s.cluster.ChangeLeader(s.region1, store3) // tikv-server reports `NotLeader`(store2 is the leader) - s.cache.UpdateLeader(3, 2) + s.cache.UpdateLeader(s.region1, s.store2) - s.pd.setRegion(3, nil, nil, []uint64{4, 1}) + // Store2 does not exist any more, causes a reload from PD. r, err := s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) - c.Assert(r.GetID(), Equals, uint64(3)) - c.Assert(r.GetAddress(), Equals, "addr4") + c.Assert(r.GetID(), Equals, s.region1) + c.Assert(r.curStoreIdx, Equals, 0) + c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) + + // tikv-server reports `NotLeader` again. + s.cache.UpdateLeader(s.region1, store3) + r, err = s.cache.GetRegion([]byte("a")) + c.Assert(err, IsNil) + c.Assert(r, NotNil) + c.Assert(r.GetID(), Equals, s.region1) + c.Assert(r.curStoreIdx, Equals, 2) + c.Assert(r.GetAddress(), Equals, s.storeAddr(store3)) } func (s *testRegionCacheSuite) TestSplit(c *C) { r, err := s.cache.GetRegion([]byte("x")) c.Assert(err, IsNil) - c.Assert(r.GetID(), Equals, uint64(3)) - c.Assert(r.GetAddress(), Equals, "addr1") + c.Assert(r.GetID(), Equals, s.region1) + c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) // split to ['' - 'm' - 'z'] - s.pd.setRegion(3, nil, []byte("m"), []uint64{1}) - s.pd.setRegion(4, []byte("m"), nil, []uint64{2}) + region2 := s.cluster.AllocID() + s.cluster.Split(s.region1, region2, []byte("m"), s.store1) // tikv-server reports `NotInRegion` s.cache.DropRegion(r.GetID()) @@ -144,24 +148,22 @@ func (s *testRegionCacheSuite) TestSplit(c *C) { r, err = s.cache.GetRegion([]byte("x")) c.Assert(err, IsNil) - c.Assert(r.GetID(), Equals, uint64(4)) - c.Assert(r.GetAddress(), Equals, "addr2") + c.Assert(r.GetID(), Equals, region2) + c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) c.Assert(s.cache.regions, HasLen, 1) } func (s *testRegionCacheSuite) TestMerge(c *C) { // ['' - 'm' - 'z'] - s.pd.setRegion(3, nil, []byte("m"), []uint64{1}) - s.pd.setRegion(4, []byte("m"), nil, []uint64{2}) + region2 := s.cluster.AllocID() + s.cluster.Split(s.region1, region2, []byte("m"), s.store2) r, err := s.cache.GetRegion([]byte("x")) c.Assert(err, IsNil) - c.Assert(r.GetID(), Equals, uint64(4)) - c.Assert(r.GetAddress(), Equals, "addr2") + c.Assert(r.GetID(), Equals, region2) // merge to single region - s.pd.removeRegion(4) - s.pd.setRegion(3, nil, nil, []uint64{1, 2}) + s.cluster.Merge(s.region1, region2) // tikv-server reports `NotInRegion` s.cache.DropRegion(r.GetID()) @@ -169,8 +171,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) { r, err = s.cache.GetRegion([]byte("x")) c.Assert(err, IsNil) - c.Assert(r.GetID(), Equals, uint64(3)) - c.Assert(r.GetAddress(), Equals, "addr1") + c.Assert(r.GetID(), Equals, s.region1) c.Assert(s.cache.regions, HasLen, 1) } @@ -178,13 +179,13 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { s.cache.GetRegion([]byte("a")) // connect tikv-server failed, cause drop cache - s.cache.DropRegion(3) + s.cache.DropRegion(s.region1) r, err := s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) - c.Assert(r.GetID(), Equals, uint64(3)) - c.Assert(r.GetAddress(), Equals, "addr1") + c.Assert(r.GetID(), Equals, s.region1) + c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) c.Assert(s.cache.regions, HasLen, 1) } @@ -193,90 +194,14 @@ func (s *testRegionCacheSuite) TestNextStore(c *C) { c.Assert(err, IsNil) c.Assert(region.curStoreIdx, Equals, 0) - s.cache.NextStore(3) + s.cache.NextStore(s.region1) region, err = s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) c.Assert(region.curStoreIdx, Equals, 1) - s.cache.NextStore(3) + s.cache.NextStore(s.region1) region, err = s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) // Out of range of Stores, so get Region again and pick Stores[0] as leader. c.Assert(region.curStoreIdx, Equals, 0) - - s.pd.removeRegion(3) - // regionCache still has more Stores, so pick next store. - s.cache.NextStore(3) - region, err = s.cache.GetRegion([]byte("a")) - c.Assert(err, IsNil) - c.Assert(region.curStoreIdx, Equals, 1) - - // region 3 is removed so can't get Region from pd. - s.cache.NextStore(3) - region, err = s.cache.GetRegion([]byte("a")) - c.Assert(err, NotNil) - c.Assert(region, IsNil) - } - -type mockPDClient struct { - ts int64 - stores map[uint64]*metapb.Store - regions map[uint64]*metapb.Region -} - -func newMockPDClient() *mockPDClient { - return &mockPDClient{ - stores: make(map[uint64]*metapb.Store), - regions: make(map[uint64]*metapb.Region), - } -} - -func (c *mockPDClient) setStore(id uint64, addr string) { - c.stores[id] = &metapb.Store{ - Id: proto.Uint64(id), - Address: proto.String(addr), - } -} - -func (c *mockPDClient) removeStore(id uint64) { - delete(c.stores, id) -} - -func (c *mockPDClient) setRegion(id uint64, startKey, endKey []byte, stores []uint64) { - c.regions[id] = &metapb.Region{ - Id: proto.Uint64(id), - StartKey: startKey, - EndKey: endKey, - StoreIds: stores, - } -} - -func (c *mockPDClient) removeRegion(id uint64) { - delete(c.regions, id) -} - -func (c *mockPDClient) GetTS() (int64, int64, error) { - c.ts++ - return c.ts, 0, nil -} - -func (c *mockPDClient) GetRegion(key []byte) (*metapb.Region, error) { - for _, r := range c.regions { - if bytes.Compare(r.GetStartKey(), key) <= 0 && - (bytes.Compare(key, r.GetEndKey()) < 0 || len(r.GetEndKey()) == 0) { - return r, nil - } - } - return nil, errors.Errorf("region not found for key %q", key) -} - -func (c *mockPDClient) GetStore(storeID uint64) (*metapb.Store, error) { - s, ok := c.stores[storeID] - if !ok { - return nil, errors.Errorf("store %d not found", storeID) - } - return s, nil -} - -func (c *mockPDClient) Close() {} diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index 9fccaf5ff1..44c4d395c4 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -14,103 +14,39 @@ package tikv import ( - "github.com/gogo/protobuf/proto" . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/errorpb" - pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv/mock-tikv" ) type testScanMockSuite struct { - pd *mockPDClient - cache *RegionCache - store *tikvStore - client *stubClient } var _ = Suite(&testScanMockSuite{}) -func (s *testScanMockSuite) SetUpTest(c *C) { - s.pd = newMockPDClient() - s.cache = NewRegionCache(s.pd) - s.client = newStubClient(c) +func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { + store, cluster := createMockStoreCluster() + mocktikv.BootstrapWithMultiRegions(cluster, []byte("a"), []byte("m")) - s.pd.setStore(1, "addr1") - s.pd.setStore(2, "addr2") - // ["", "a"), ["a", "m"), ["m", nil) - s.pd.setRegion(3, nil, []byte("a"), []uint64{1, 2}) - s.pd.setRegion(4, []byte("a"), []byte("m"), []uint64{1, 2}) - s.pd.setRegion(5, []byte("m"), nil, []uint64{1, 2}) - - s.store = newMockStore(s.cache) - s.store.clients["addr1"] = s.client - s.store.clients["addr2"] = s.client -} - -func (s *testScanMockSuite) beginTxn(c *C) (*tikvTxn, *tikvSnapshot) { - transaction, err := s.store.Begin() + txn, err := store.Begin() c.Assert(err, IsNil) - txn := transaction.(*tikvTxn) - kvSnapshot, err := txn.store.GetSnapshot(kv.Version{Ver: txn.StartTS()}) + for ch := byte('a'); ch <= byte('z'); ch++ { + err = txn.Set([]byte{ch}, []byte{ch}) + c.Assert(err, IsNil) + } + err = txn.Commit() c.Assert(err, IsNil) - c.Assert(kvSnapshot, NotNil) - snapshot := kvSnapshot.(*tikvSnapshot) - return txn, snapshot -} -func (s *testScanMockSuite) getRegion(c *C, startKey []byte) *requestRegion { - region, err := s.store.getRegion(startKey) + txn, err = store.Begin() c.Assert(err, IsNil) - c.Assert(region, NotNil) - return region -} - -func (s *testScanMockSuite) TestStaleRegionEpoch(c *C) { - const batchSize = 10 - txn, snapshot := s.beginTxn(c) - region := s.getRegion(c, []byte("a")) - expectReq := makeScanReq(region, "a", batchSize, txn.StartTS()) - expectResp := makeScanStaleEpochResp() - s.client.push(expectReq, expectResp) - _, err := newScanner(region, []byte("a"), txn.StartTS(), *snapshot, batchSize) - c.Assert(err, NotNil) -} - -func (s *testScanMockSuite) TestScanMultiRegions(c *C) { - const batchSize = 10 - startKey := []byte("a") - txn, snapshot := s.beginTxn(c) - region4 := s.getRegion(c, startKey) - expectReq := makeScanReq(region4, "a", batchSize, txn.StartTS()) - // Assume kv has key r4["a","l"] + r5["m","q"]. - // Making first request & response, response returns keys whose length is batchSize. - // That is "a"-"j". - res := makeRangeKvs('a', 'a'+batchSize-1) - expectResp := makeScanOkResp(res) - s.client.push(expectReq, expectResp) - // Making second request & response, response returns the rest keys of region4. - // That is "j"-"l" (total 3, "j" will be skipped). - res2 := makeRangeKvs('a'+batchSize-1, 'l') - expectReq2 := makeScanReq(region4, res2[0].key, batchSize, txn.StartTS()) - expectResp2 := makeScanOkResp(res2) - s.client.push(expectReq2, expectResp2) - // Making third request & response, response returns all keys of region5. - // That is "m"-"q" (total 5). - region5 := s.getRegion(c, []byte("m")) - res3 := makeRangeKvs('m', 'q') - expectReq3 := makeScanReq(region5, res3[0].key, batchSize, txn.StartTS()) - expectResp3 := makeScanOkResp(res3) - s.client.push(expectReq3, expectResp3) - // Making last request & response, response return empty and scanner will Close(). - expectReq4 := makeScanReq(region5, res3[len(res3)-1].key, batchSize, txn.StartTS()) - expectResp4 := makeScanOkResp([]*KvPair{}) - s.client.push(expectReq4, expectResp4) - - scanner, err := newScanner(region4, startKey, txn.StartTS(), *snapshot, batchSize) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + region, err := store.getRegion(nil) c.Assert(err, IsNil) - for k := 'a'; k <= 'q'; k++ { - c.Assert([]byte(string(k)), BytesEquals, []byte(scanner.Key())) - if k < 'q' { + scanner, err := newScanner(region, []byte("a"), txn.StartTS(), *snapshot, 10) + c.Assert(err, IsNil) + for ch := byte('a'); ch <= byte('z'); ch++ { + c.Assert([]byte{ch}, BytesEquals, []byte(scanner.Key())) + if ch < byte('z') { c.Assert(scanner.Next(), IsNil) } } @@ -118,55 +54,26 @@ func (s *testScanMockSuite) TestScanMultiRegions(c *C) { c.Assert(scanner.Valid(), IsFalse) } -func makeScanReq(region *requestRegion, key string, limit uint32, ver uint64) *pb.Request { - return &pb.Request{ - Type: pb.MessageType_CmdScan.Enum(), - CmdScanReq: &pb.CmdScanRequest{ - StartKey: []byte(key), - Limit: proto.Uint32(limit), - Version: proto.Uint64(ver), - }, - Context: region.GetContext(), - } -} +func (s *testScanMockSuite) TestStaleRegionEpoch(c *C) { + store, cluster := createMockStoreCluster() + storeID, regionID := mocktikv.BootstrapWithSingleStore(cluster) -func makeScanOkResp(res []*KvPair) *pb.Response { - var pairs []*pb.KvPair - for _, kv := range res { - pairs = append(pairs, &pb.KvPair{ - Key: []byte(kv.key), - Value: []byte(kv.value), - }) + txn, err := store.Begin() + c.Assert(err, IsNil) + for ch := byte('a'); ch <= byte('z'); ch++ { + err = txn.Set([]byte{ch}, []byte{ch}) + c.Assert(err, IsNil) } - return &pb.Response{ - Type: pb.MessageType_CmdScan.Enum(), - CmdScanResp: &pb.CmdScanResponse{ - Pairs: pairs, - }, - } -} + err = txn.Commit() + c.Assert(err, IsNil) -func makeScanStaleEpochResp() *pb.Response { - return &pb.Response{ - Type: pb.MessageType_CmdScan.Enum(), - RegionError: &errorpb.Error{ - Message: proto.String("stale epoch"), - StaleEpoch: &errorpb.StaleEpoch{}, - }, - } -} - -func makeRangeKvs(startKey, endKey rune) []*KvPair { - var res []*KvPair - for key := startKey; key <= endKey; key++ { - // Just set value = key for easy testing. - res = append(res, &KvPair{key: string(key), value: string(key)}) - } - return res + txn, err = store.Begin() + c.Assert(err, IsNil) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + region, err := store.getRegion(nil) + c.Assert(err, IsNil) -} - -type KvPair struct { - key string - value string + cluster.Split(regionID, cluster.AllocID(), []byte("m"), storeID) + _, err = newScanner(region, []byte("a"), txn.StartTS(), *snapshot, 10) + c.Assert(err, NotNil) } diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index 758be0d5e2..e1bce2c302 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -41,11 +41,17 @@ func newTestStore(c *C) *tikvStore { c.Assert(err, IsNil) return store.(*tikvStore) } - cluster := mocktikv.NewCluster() + store, cluster := createMockStoreCluster() mocktikv.BootstrapWithSingleStore(cluster) + return store +} + +func createMockStoreCluster() (*tikvStore, *mocktikv.Cluster) { + cluster := mocktikv.NewCluster() mvccStore := mocktikv.NewMvccStore() clientFactory := mockClientFactory(cluster, mvccStore) - return newTikvStore("mock-tikv-store", mocktikv.NewPDClient(cluster), clientFactory) + store := newTikvStore("mock-tikv-store", mocktikv.NewPDClient(cluster), clientFactory) + return store, cluster } func mockClientFactory(cluster *mocktikv.Cluster, mvccStore *mocktikv.MvccStore) ClientFactory {