store/tikv: support parallel && retry for batchGet. (#1304)

* store/tikv: support parallel && retry for batchGet.
This commit is contained in:
disksing
2016-06-08 17:34:35 +08:00
parent 9ec42dbc4c
commit aebe108c52
4 changed files with 113 additions and 183 deletions

View File

@ -14,6 +14,9 @@
package tikv
import (
"sync"
"unsafe"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/ngaut/log"
@ -28,8 +31,7 @@ var (
const (
scanBatchSize = 100
maxGetCount = 3
batchGetSize = 100
batchGetSize = 5120
)
// tikvSnapshot implements MvccSnapshot interface.
@ -46,116 +48,110 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
}
}
// makeBatchGetReqs splits each key into corresponding region.
func (s *tikvSnapshot) makeBatchGetReqs(keys []kv.Key) (map[RegionVerID]*batchGetRegion, error) {
startTS := s.version.Ver
multiBatchGet := map[RegionVerID]*batchGetRegion{}
for _, k := range keys {
region, err := s.store.regionCache.GetRegion(k)
if err != nil {
return nil, errors.Trace(err)
}
regionID := region.VerID()
singleBatchGet, ok := multiBatchGet[regionID]
if !ok {
singleBatchGet = &batchGetRegion{
CmdBatchGetRequest: &pb.CmdBatchGetRequest{
Version: proto.Uint64(startTS),
},
region: regionID,
}
multiBatchGet[regionID] = singleBatchGet
}
cmdBatchGetReq := singleBatchGet.CmdBatchGetRequest
cmdBatchGetReq.Keys = append(cmdBatchGetReq.Keys, k)
}
return multiBatchGet, nil
}
// doBatchGet sends BatchGet RPC request. If any key is locked, use tikvSnapshot.Get() to retry.
func (s *tikvSnapshot) doBatchGet(singleBatchGet *batchGetRegion) (map[string][]byte, error) {
cmdBatchGetReq := singleBatchGet.CmdBatchGetRequest
keys := cmdBatchGetReq.GetKeys()
if len(keys) == 0 {
return nil, nil
}
req := &pb.Request{
Type: pb.MessageType_CmdBatchGet.Enum(),
CmdBatchGetReq: cmdBatchGetReq,
}
resp, err := s.store.SendKVReq(req, singleBatchGet.region)
if err != nil {
return nil, errors.Trace(err)
}
if regionErr := resp.GetRegionError(); regionErr != nil {
//TODO: retry internally
return nil, errors.Annotate(errors.New(regionErr.String()), txnRetryableMark)
}
cmdBatchGetResp := resp.GetCmdBatchGetResp()
if cmdBatchGetResp == nil {
return nil, errors.Trace(errBodyMissing)
}
pairs := cmdBatchGetResp.GetPairs()
m := make(map[string][]byte, len(pairs))
for _, pair := range pairs {
keyErr := pair.GetError()
if keyErr == nil {
if val := pair.GetValue(); len(val) > 0 {
m[string(pair.GetKey())] = val
}
continue
}
lockInfo, err := extractLockInfoFromKeyErr(keyErr)
if err != nil {
return nil, errors.Trace(err)
}
val, err := s.Get(lockInfo.GetKey())
if err != nil {
if terror.ErrorEqual(err, kv.ErrNotExist) {
continue
}
return nil, errors.Trace(err)
}
m[string(lockInfo.GetKey())] = val
}
return m, nil
}
// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
m := make(map[string][]byte, len(keys))
// We want [][]byte instead of []kv.Key, use some magic to save memory.
bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys))
multiBatchGet, err := s.makeBatchGetReqs(keys)
// Create a map to collect key-values from region servers.
var mu sync.Mutex
m := make(map[string][]byte)
err := s.batchGetKeysByRegions(bytesKeys, func(k, v []byte) {
if len(v) == 0 {
return
}
mu.Lock()
m[string(k)] = v
mu.Unlock()
})
if err != nil {
return nil, errors.Trace(err)
}
for _, singleBatchGet := range multiBatchGet {
keys := singleBatchGet.GetKeys()
for startIdx := 0; startIdx < len(keys); startIdx += batchGetSize {
endIdx := startIdx + batchGetSize
if endIdx > len(keys) {
endIdx = len(keys)
}
newSingleBatchGet := &batchGetRegion{
CmdBatchGetRequest: &pb.CmdBatchGetRequest{
Keys: keys[startIdx:endIdx],
Version: proto.Uint64(singleBatchGet.GetVersion()),
},
region: singleBatchGet.region,
}
res, err := s.doBatchGet(newSingleBatchGet)
if err != nil {
return nil, errors.Trace(err)
}
m, err = mergeResult(m, res)
if err != nil {
return nil, errors.Trace(err)
}
}
return m, nil
}
func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v []byte)) error {
groups, _, err := s.store.regionCache.GroupKeysByRegion(keys)
if err != nil {
return errors.Trace(err)
}
return m, nil
var batches []batchKeys
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, batchGetSize)
}
if len(batches) == 0 {
return nil
}
if len(batches) == 1 {
return errors.Trace(s.batchGetSingleRegion(batches[0], collectF))
}
ch := make(chan error)
for _, batch := range batches {
go func(batch batchKeys) {
ch <- s.batchGetSingleRegion(batch, collectF)
}(batch)
}
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Warnf("snapshot batchGet failed: %v, tid: %d", e, s.version.Ver)
err = e
}
}
return errors.Trace(err)
}
func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v []byte)) error {
pending := batch.keys
var backoffErr error
for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() {
req := &pb.Request{
Type: pb.MessageType_CmdBatchGet.Enum(),
CmdBatchGetReq: &pb.CmdBatchGetRequest{
Keys: pending,
Version: proto.Uint64(s.version.Ver),
},
}
resp, err := s.store.SendKVReq(req, batch.region)
if err != nil {
return errors.Trace(err)
}
if regionErr := resp.GetRegionError(); regionErr != nil {
err = s.batchGetKeysByRegions(pending, collectF)
return errors.Trace(err)
}
batchGetResp := resp.GetCmdBatchGetResp()
if batchGetResp == nil {
return errors.Trace(errBodyMissing)
}
var lockedKeys [][]byte
for _, pair := range batchGetResp.Pairs {
keyErr := pair.GetError()
if keyErr == nil {
collectF(pair.GetKey(), pair.GetValue())
continue
}
// This could be slow if we meet many expired locks.
// TODO: Find a way to do quick unlock.
val, err := s.handleKeyError(keyErr)
if err != nil {
if terror.ErrorNotEqual(err, errInnerRetryable) {
return errors.Trace(err)
}
lockedKeys = append(lockedKeys, pair.GetKey())
continue
}
collectF(pair.GetKey(), val)
}
if len(lockedKeys) > 0 {
pending = lockedKeys
continue
}
return nil
}
return errors.Annotate(backoffErr, txnRetryableMark)
}
// Get gets the value for key k from snapshot.
@ -254,25 +250,3 @@ func (s *tikvSnapshot) handleKeyError(keyErr *pb.KeyError) ([]byte, error) {
}
return val, nil
}
// mergeResult Merge d2 into d1. If d1 and d2 are overlap, it returns error.
func mergeResult(d1, d2 map[string][]byte) (map[string][]byte, error) {
if d1 == nil {
d1 = make(map[string][]byte)
}
for k2, v2 := range d2 {
if v1, ok := d1[k2]; ok {
// Because compare []byte takes too much time,
// if conflict return error directly even their values are same.
return nil, errors.Errorf("add dict conflict key[%s] v1[%q] v2[%q]",
k2, v1, v2)
}
d1[k2] = v2
}
return d1, nil
}
type batchGetRegion struct {
*pb.CmdBatchGetRequest
region RegionVerID
}

View File

@ -160,55 +160,6 @@ func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) {
}
}
func (s *testSnapshotSuite) TestMergeResult(c *C) {
d1 := makeDict([]string{"1", "2"})
d2 := makeDict([]string{"a", "foo"})
d1, err := mergeResult(d1, d2)
c.Assert(err, IsNil)
r1 := makeDict([]string{"a", "foo", "1", "2"})
equalByteDict(c, d1, r1)
}
func (s *testSnapshotSuite) TestMergeResultNil(c *C) {
var d1 map[string][]byte
d2 := makeDict([]string{"a", "foo"})
d1, err := mergeResult(d1, d2)
c.Assert(err, IsNil)
r1 := makeDict([]string{"a", "foo"})
equalByteDict(c, d1, r1)
var d3 map[string][]byte
var d4 map[string][]byte
d3, err = mergeResult(d3, d4)
c.Assert(err, IsNil)
var r2 map[string][]byte
equalByteDict(c, d3, r2)
}
func (s *testSnapshotSuite) TestMergeResultConflict(c *C) {
d1 := makeDict([]string{"1", "2"})
d2 := makeDict([]string{"a", "foo", "1"})
_, err := mergeResult(d1, d2)
c.Assert(err, NotNil)
}
func makeDict(keys []string) map[string][]byte {
d := make(map[string][]byte)
for _, k := range keys {
d[k] = []byte(k)
}
return d
}
func equalByteDict(c *C, lhs, rhs map[string][]byte) {
c.Assert(lhs, HasLen, len(rhs))
for k, v1 := range lhs {
v2, ok := rhs[k]
c.Assert(ok, IsTrue)
c.Assert(v1, BytesEquals, v2)
}
}
func makeKeys(rowNum int, prefix string) []kv.Key {
keys := make([]kv.Key, 0, rowNum)
for i := 0; i < rowNum; i++ {

View File

@ -51,14 +51,19 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) {
txn := s.begin(c)
snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()})
multiGets, err := snapshot.makeBatchGetReqs([]kv.Key{kv.Key("a"), kv.Key("b"), kv.Key("c")})
keys := [][]byte{{'a'}, {'b'}, {'c'}}
_, region, err := s.store.regionCache.GroupKeysByRegion(keys)
c.Assert(err, IsNil)
batch := batchKeys{
region: region,
keys: keys,
}
s.split(c, firstRegion.GetID(), []byte("b"))
s.store.regionCache.DropRegion(firstRegion.VerID())
for _, g := range multiGets {
// mock-tikv will panic if it meets a not-in-region key.
snapshot.doBatchGet(g)
}
// mock-tikv will panic if it meets a not-in-region key.
err = snapshot.batchGetSingleRegion(batch, func([]byte, []byte) {})
c.Assert(err, IsNil)
}

View File

@ -100,11 +100,11 @@ func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn f
var batches []batchKeys
// Make sure the group that contains primary key goes first.
if firstIsPrimary {
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFn)
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFn, txnCommitBatchSize)
delete(groups, firstRegion)
}
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, sizeFn)
batches = appendBatchBySize(batches, id, g, sizeFn, txnCommitBatchSize)
}
if firstIsPrimary {
@ -339,11 +339,11 @@ type batchKeys struct {
// appendBatchBySize appends keys to []batchKeys. It may split the keys to make
// sure each batch's size does not exceed the limit.
func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int) []batchKeys {
func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int, limit int) []batchKeys {
var start, end int
for start = 0; start < len(keys); start = end {
var size int
for end = start; end < len(keys) && size < txnCommitBatchSize; end++ {
for end = start; end < len(keys) && size < limit; end++ {
size += sizeFn(keys[end])
}
b = append(b, batchKeys{