diff --git a/store/mockstore/unistore/mock.go b/store/mockstore/unistore/mock.go index 81108b9251..477deb3d6a 100644 --- a/store/mockstore/unistore/mock.go +++ b/store/mockstore/unistore/mock.go @@ -63,6 +63,7 @@ func New(path string) (*RPCClient, pd.Client, *Cluster, error) { cluster: cluster, path: path, persistent: persistent, + rawHandler: newRawHandler(), } pdClient := newPDClient(pd) diff --git a/store/mockstore/unistore/raw_handler.go b/store/mockstore/unistore/raw_handler.go new file mode 100644 index 0000000000..a4fb701409 --- /dev/null +++ b/store/mockstore/unistore/raw_handler.go @@ -0,0 +1,131 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package unistore + +import ( + "bytes" + "context" + "sync" + + "github.com/ngaut/unistore/lockstore" + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) + +type rawHandler struct { + mu sync.RWMutex + store *lockstore.MemStore +} + +func newRawHandler() *rawHandler { + return &rawHandler{ + store: lockstore.NewMemStore(4096), + } +} + +func (h *rawHandler) RawGet(_ context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) { + h.mu.RLock() + defer h.mu.RUnlock() + val := h.store.Get(req.Key, nil) + return &kvrpcpb.RawGetResponse{ + Value: val, + NotFound: len(val) == 0, + }, nil +} + +func (h *rawHandler) RawBatchGet(_ context.Context, req *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) { + h.mu.RLock() + defer h.mu.RUnlock() + pairs := make([]*kvrpcpb.KvPair, len(req.Keys)) + for i, key := range req.Keys { + pairs[i] = &kvrpcpb.KvPair{ + Key: key, + Value: h.store.Get(key, nil), + } + } + return &kvrpcpb.RawBatchGetResponse{Pairs: pairs}, nil +} + +func (h *rawHandler) RawPut(_ context.Context, req *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + h.store.Put(req.Key, req.Value) + return &kvrpcpb.RawPutResponse{}, nil +} + +func (h *rawHandler) RawBatchPut(_ context.Context, req *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + for _, pair := range req.Pairs { + h.store.Put(pair.Key, pair.Value) + } + return &kvrpcpb.RawBatchPutResponse{}, nil +} + +func (h *rawHandler) RawDelete(_ context.Context, req *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + h.store.Delete(req.Key) + return &kvrpcpb.RawDeleteResponse{}, nil +} + +func (h *rawHandler) RawBatchDelete(_ context.Context, req *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + for _, key := range req.Keys { + h.store.Delete(key) + } + return &kvrpcpb.RawBatchDeleteResponse{}, nil +} + +func (h *rawHandler) RawDeleteRange(_ context.Context, req *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) { + h.mu.Lock() + defer h.mu.Unlock() + it := h.store.NewIterator() + var keys [][]byte + for it.Seek(req.StartKey); it.Valid(); it.Next() { + if bytes.Compare(it.Key(), req.EndKey) >= 0 { + break + } + keys = append(keys, safeCopy(it.Key())) + } + for _, key := range keys { + h.store.Delete(key) + } + return &kvrpcpb.RawDeleteRangeResponse{}, nil +} + +func (h *rawHandler) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) { + h.mu.RLock() + defer h.mu.RUnlock() + it := h.store.NewIterator() + var pairs []*kvrpcpb.KvPair + for it.Seek(req.StartKey); it.Valid(); it.Next() { + if len(pairs) >= int(req.Limit) { + break + } + if bytes.Compare(it.Key(), req.EndKey) >= 0 { + break + } + pair := &kvrpcpb.KvPair{ + Key: safeCopy(it.Key()), + Value: safeCopy(it.Value()), + } + pairs = append(pairs, pair) + } + return &kvrpcpb.RawScanResponse{Kvs: pairs}, nil +} + +func safeCopy(val []byte) []byte { + return append([]byte{}, val...) +} diff --git a/store/mockstore/unistore/raw_handler_test.go b/store/mockstore/unistore/raw_handler_test.go new file mode 100644 index 0000000000..9bd1ba83dc --- /dev/null +++ b/store/mockstore/unistore/raw_handler_test.go @@ -0,0 +1,84 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package unistore + +import ( + "fmt" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +type testSuite struct{} + +func (ts testSuite) SetUpSuite(c *C) {} + +func (ts testSuite) TearDownSuite(c *C) {} + +var _ = Suite(testSuite{}) + +func (ts testSuite) TestRawHandler(c *C) { + h := newRawHandler() + keys := make([][]byte, 10) + vals := make([][]byte, 10) + for i := 0; i < 10; i++ { + keys[i] = []byte(fmt.Sprintf("key%d", i)) + vals[i] = []byte(fmt.Sprintf("val%d", i)) + } + putResp, _ := h.RawPut(nil, &kvrpcpb.RawPutRequest{Key: keys[0], Value: vals[0]}) + c.Assert(putResp, NotNil) + getResp, _ := h.RawGet(nil, &kvrpcpb.RawGetRequest{Key: keys[0]}) + c.Assert(getResp, NotNil) + c.Assert(getResp.Value, BytesEquals, vals[0]) + delResp, _ := h.RawDelete(nil, &kvrpcpb.RawDeleteRequest{Key: keys[0]}) + c.Assert(delResp, NotNil) + + batchPutReq := &kvrpcpb.RawBatchPutRequest{Pairs: []*kvrpcpb.KvPair{ + {Key: keys[1], Value: vals[1]}, + {Key: keys[3], Value: vals[3]}, + {Key: keys[5], Value: vals[5]}, + }} + batchPutResp, _ := h.RawBatchPut(nil, batchPutReq) + c.Assert(batchPutResp, NotNil) + batchGetResp, _ := h.RawBatchGet(nil, &kvrpcpb.RawBatchGetRequest{Keys: [][]byte{keys[1], keys[3], keys[5]}}) + c.Assert(batchGetResp, NotNil) + c.Assert(batchGetResp.Pairs, DeepEquals, batchPutReq.Pairs) + batchDelResp, _ := h.RawBatchDelete(nil, &kvrpcpb.RawBatchDeleteRequest{Keys: [][]byte{keys[1], keys[3], keys[5]}}) + c.Assert(batchDelResp, NotNil) + + batchPutReq.Pairs = []*kvrpcpb.KvPair{ + {Key: keys[6], Value: vals[6]}, + {Key: keys[7], Value: vals[7]}, + {Key: keys[8], Value: vals[8]}, + } + batchPutResp, _ = h.RawBatchPut(nil, batchPutReq) + c.Assert(batchPutResp, NotNil) + + scanReq := &kvrpcpb.RawScanRequest{StartKey: keys[0], EndKey: keys[9], Limit: 2} + scanResp, _ := h.RawScan(nil, scanReq) + c.Assert(batchPutResp, NotNil) + c.Assert(scanResp.Kvs, HasLen, 2) + c.Assert(batchPutReq.Pairs[:2], DeepEquals, scanResp.Kvs) + + delRangeResp, _ := h.RawDeleteRange(nil, &kvrpcpb.RawDeleteRangeRequest{StartKey: keys[0], EndKey: keys[9]}) + c.Assert(delRangeResp, NotNil) + + scanResp, _ = h.RawScan(nil, scanReq) + c.Assert(scanResp.Kvs, HasLen, 0) +} diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 17b6206ec8..98f8c9083e 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -48,6 +48,7 @@ type RPCClient struct { usSvr *us.Server cluster *Cluster path string + rawHandler *rawHandler persistent bool closed int32 @@ -149,21 +150,21 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R case tikvrpc.CmdDeleteRange: resp.Resp, err = c.usSvr.KvDeleteRange(ctx, req.DeleteRange()) case tikvrpc.CmdRawGet: - resp.Resp, err = c.usSvr.RawGet(ctx, req.RawGet()) + resp.Resp, err = c.rawHandler.RawGet(ctx, req.RawGet()) case tikvrpc.CmdRawBatchGet: - resp.Resp, err = c.usSvr.RawBatchGet(ctx, req.RawBatchGet()) + resp.Resp, err = c.rawHandler.RawBatchGet(ctx, req.RawBatchGet()) case tikvrpc.CmdRawPut: - resp.Resp, err = c.usSvr.RawPut(ctx, req.RawPut()) + resp.Resp, err = c.rawHandler.RawPut(ctx, req.RawPut()) case tikvrpc.CmdRawBatchPut: - resp.Resp, err = c.usSvr.RawBatchPut(ctx, req.RawBatchPut()) + resp.Resp, err = c.rawHandler.RawBatchPut(ctx, req.RawBatchPut()) case tikvrpc.CmdRawDelete: - resp.Resp, err = c.usSvr.RawDelete(ctx, req.RawDelete()) + resp.Resp, err = c.rawHandler.RawDelete(ctx, req.RawDelete()) case tikvrpc.CmdRawBatchDelete: - resp.Resp, err = c.usSvr.RawBatchDelete(ctx, req.RawBatchDelete()) + resp.Resp, err = c.rawHandler.RawBatchDelete(ctx, req.RawBatchDelete()) case tikvrpc.CmdRawDeleteRange: - resp.Resp, err = c.usSvr.RawDeleteRange(ctx, req.RawDeleteRange()) + resp.Resp, err = c.rawHandler.RawDeleteRange(ctx, req.RawDeleteRange()) case tikvrpc.CmdRawScan: - resp.Resp, err = c.usSvr.RawScan(ctx, req.RawScan()) + resp.Resp, err = c.rawHandler.RawScan(ctx, req.RawScan()) case tikvrpc.CmdCop: resp.Resp, err = c.usSvr.Coprocessor(ctx, req.Cop()) case tikvrpc.CmdCopStream: