unistore: support RawKV API (#18872)

This commit is contained in:
Evan Zhou
2020-07-30 15:21:23 +08:00
committed by GitHub
parent 43c2d30706
commit bb87111d6c
4 changed files with 225 additions and 8 deletions

View File

@ -63,6 +63,7 @@ func New(path string) (*RPCClient, pd.Client, *Cluster, error) {
cluster: cluster,
path: path,
persistent: persistent,
rawHandler: newRawHandler(),
}
pdClient := newPDClient(pd)

View File

@ -0,0 +1,131 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package unistore
import (
"bytes"
"context"
"sync"
"github.com/ngaut/unistore/lockstore"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)
type rawHandler struct {
mu sync.RWMutex
store *lockstore.MemStore
}
func newRawHandler() *rawHandler {
return &rawHandler{
store: lockstore.NewMemStore(4096),
}
}
func (h *rawHandler) RawGet(_ context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) {
h.mu.RLock()
defer h.mu.RUnlock()
val := h.store.Get(req.Key, nil)
return &kvrpcpb.RawGetResponse{
Value: val,
NotFound: len(val) == 0,
}, nil
}
func (h *rawHandler) RawBatchGet(_ context.Context, req *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) {
h.mu.RLock()
defer h.mu.RUnlock()
pairs := make([]*kvrpcpb.KvPair, len(req.Keys))
for i, key := range req.Keys {
pairs[i] = &kvrpcpb.KvPair{
Key: key,
Value: h.store.Get(key, nil),
}
}
return &kvrpcpb.RawBatchGetResponse{Pairs: pairs}, nil
}
func (h *rawHandler) RawPut(_ context.Context, req *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) {
h.mu.Lock()
defer h.mu.Unlock()
h.store.Put(req.Key, req.Value)
return &kvrpcpb.RawPutResponse{}, nil
}
func (h *rawHandler) RawBatchPut(_ context.Context, req *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) {
h.mu.Lock()
defer h.mu.Unlock()
for _, pair := range req.Pairs {
h.store.Put(pair.Key, pair.Value)
}
return &kvrpcpb.RawBatchPutResponse{}, nil
}
func (h *rawHandler) RawDelete(_ context.Context, req *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) {
h.mu.Lock()
defer h.mu.Unlock()
h.store.Delete(req.Key)
return &kvrpcpb.RawDeleteResponse{}, nil
}
func (h *rawHandler) RawBatchDelete(_ context.Context, req *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) {
h.mu.Lock()
defer h.mu.Unlock()
for _, key := range req.Keys {
h.store.Delete(key)
}
return &kvrpcpb.RawBatchDeleteResponse{}, nil
}
func (h *rawHandler) RawDeleteRange(_ context.Context, req *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) {
h.mu.Lock()
defer h.mu.Unlock()
it := h.store.NewIterator()
var keys [][]byte
for it.Seek(req.StartKey); it.Valid(); it.Next() {
if bytes.Compare(it.Key(), req.EndKey) >= 0 {
break
}
keys = append(keys, safeCopy(it.Key()))
}
for _, key := range keys {
h.store.Delete(key)
}
return &kvrpcpb.RawDeleteRangeResponse{}, nil
}
func (h *rawHandler) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
h.mu.RLock()
defer h.mu.RUnlock()
it := h.store.NewIterator()
var pairs []*kvrpcpb.KvPair
for it.Seek(req.StartKey); it.Valid(); it.Next() {
if len(pairs) >= int(req.Limit) {
break
}
if bytes.Compare(it.Key(), req.EndKey) >= 0 {
break
}
pair := &kvrpcpb.KvPair{
Key: safeCopy(it.Key()),
Value: safeCopy(it.Value()),
}
pairs = append(pairs, pair)
}
return &kvrpcpb.RawScanResponse{Kvs: pairs}, nil
}
func safeCopy(val []byte) []byte {
return append([]byte{}, val...)
}

View File

@ -0,0 +1,84 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package unistore
import (
"fmt"
"testing"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)
func TestT(t *testing.T) {
TestingT(t)
}
type testSuite struct{}
func (ts testSuite) SetUpSuite(c *C) {}
func (ts testSuite) TearDownSuite(c *C) {}
var _ = Suite(testSuite{})
func (ts testSuite) TestRawHandler(c *C) {
h := newRawHandler()
keys := make([][]byte, 10)
vals := make([][]byte, 10)
for i := 0; i < 10; i++ {
keys[i] = []byte(fmt.Sprintf("key%d", i))
vals[i] = []byte(fmt.Sprintf("val%d", i))
}
putResp, _ := h.RawPut(nil, &kvrpcpb.RawPutRequest{Key: keys[0], Value: vals[0]})
c.Assert(putResp, NotNil)
getResp, _ := h.RawGet(nil, &kvrpcpb.RawGetRequest{Key: keys[0]})
c.Assert(getResp, NotNil)
c.Assert(getResp.Value, BytesEquals, vals[0])
delResp, _ := h.RawDelete(nil, &kvrpcpb.RawDeleteRequest{Key: keys[0]})
c.Assert(delResp, NotNil)
batchPutReq := &kvrpcpb.RawBatchPutRequest{Pairs: []*kvrpcpb.KvPair{
{Key: keys[1], Value: vals[1]},
{Key: keys[3], Value: vals[3]},
{Key: keys[5], Value: vals[5]},
}}
batchPutResp, _ := h.RawBatchPut(nil, batchPutReq)
c.Assert(batchPutResp, NotNil)
batchGetResp, _ := h.RawBatchGet(nil, &kvrpcpb.RawBatchGetRequest{Keys: [][]byte{keys[1], keys[3], keys[5]}})
c.Assert(batchGetResp, NotNil)
c.Assert(batchGetResp.Pairs, DeepEquals, batchPutReq.Pairs)
batchDelResp, _ := h.RawBatchDelete(nil, &kvrpcpb.RawBatchDeleteRequest{Keys: [][]byte{keys[1], keys[3], keys[5]}})
c.Assert(batchDelResp, NotNil)
batchPutReq.Pairs = []*kvrpcpb.KvPair{
{Key: keys[6], Value: vals[6]},
{Key: keys[7], Value: vals[7]},
{Key: keys[8], Value: vals[8]},
}
batchPutResp, _ = h.RawBatchPut(nil, batchPutReq)
c.Assert(batchPutResp, NotNil)
scanReq := &kvrpcpb.RawScanRequest{StartKey: keys[0], EndKey: keys[9], Limit: 2}
scanResp, _ := h.RawScan(nil, scanReq)
c.Assert(batchPutResp, NotNil)
c.Assert(scanResp.Kvs, HasLen, 2)
c.Assert(batchPutReq.Pairs[:2], DeepEquals, scanResp.Kvs)
delRangeResp, _ := h.RawDeleteRange(nil, &kvrpcpb.RawDeleteRangeRequest{StartKey: keys[0], EndKey: keys[9]})
c.Assert(delRangeResp, NotNil)
scanResp, _ = h.RawScan(nil, scanReq)
c.Assert(scanResp.Kvs, HasLen, 0)
}

View File

@ -48,6 +48,7 @@ type RPCClient struct {
usSvr *us.Server
cluster *Cluster
path string
rawHandler *rawHandler
persistent bool
closed int32
@ -149,21 +150,21 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
case tikvrpc.CmdDeleteRange:
resp.Resp, err = c.usSvr.KvDeleteRange(ctx, req.DeleteRange())
case tikvrpc.CmdRawGet:
resp.Resp, err = c.usSvr.RawGet(ctx, req.RawGet())
resp.Resp, err = c.rawHandler.RawGet(ctx, req.RawGet())
case tikvrpc.CmdRawBatchGet:
resp.Resp, err = c.usSvr.RawBatchGet(ctx, req.RawBatchGet())
resp.Resp, err = c.rawHandler.RawBatchGet(ctx, req.RawBatchGet())
case tikvrpc.CmdRawPut:
resp.Resp, err = c.usSvr.RawPut(ctx, req.RawPut())
resp.Resp, err = c.rawHandler.RawPut(ctx, req.RawPut())
case tikvrpc.CmdRawBatchPut:
resp.Resp, err = c.usSvr.RawBatchPut(ctx, req.RawBatchPut())
resp.Resp, err = c.rawHandler.RawBatchPut(ctx, req.RawBatchPut())
case tikvrpc.CmdRawDelete:
resp.Resp, err = c.usSvr.RawDelete(ctx, req.RawDelete())
resp.Resp, err = c.rawHandler.RawDelete(ctx, req.RawDelete())
case tikvrpc.CmdRawBatchDelete:
resp.Resp, err = c.usSvr.RawBatchDelete(ctx, req.RawBatchDelete())
resp.Resp, err = c.rawHandler.RawBatchDelete(ctx, req.RawBatchDelete())
case tikvrpc.CmdRawDeleteRange:
resp.Resp, err = c.usSvr.RawDeleteRange(ctx, req.RawDeleteRange())
resp.Resp, err = c.rawHandler.RawDeleteRange(ctx, req.RawDeleteRange())
case tikvrpc.CmdRawScan:
resp.Resp, err = c.usSvr.RawScan(ctx, req.RawScan())
resp.Resp, err = c.rawHandler.RawScan(ctx, req.RawScan())
case tikvrpc.CmdCop:
resp.Resp, err = c.usSvr.Coprocessor(ctx, req.Cop())
case tikvrpc.CmdCopStream: