From aebe108c52a46735f67d7c3d12451bb93a92e1a2 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 8 Jun 2016 17:34:35 +0800 Subject: [PATCH] store/tikv: support parallel && retry for batchGet. (#1304) * store/tikv: support parallel && retry for batchGet. --- store/tikv/snapshot.go | 224 ++++++++++++++++-------------------- store/tikv/snapshot_test.go | 49 -------- store/tikv/split_test.go | 15 ++- store/tikv/txn_committer.go | 8 +- 4 files changed, 113 insertions(+), 183 deletions(-) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index c980d5ef26..3b62ce12f3 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -14,6 +14,9 @@ package tikv import ( + "sync" + "unsafe" + "github.com/golang/protobuf/proto" "github.com/juju/errors" "github.com/ngaut/log" @@ -28,8 +31,7 @@ var ( const ( scanBatchSize = 100 - maxGetCount = 3 - batchGetSize = 100 + batchGetSize = 5120 ) // tikvSnapshot implements MvccSnapshot interface. @@ -46,116 +48,110 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot { } } -// makeBatchGetReqs splits each key into corresponding region. -func (s *tikvSnapshot) makeBatchGetReqs(keys []kv.Key) (map[RegionVerID]*batchGetRegion, error) { - startTS := s.version.Ver - multiBatchGet := map[RegionVerID]*batchGetRegion{} - for _, k := range keys { - region, err := s.store.regionCache.GetRegion(k) - if err != nil { - return nil, errors.Trace(err) - } - regionID := region.VerID() - singleBatchGet, ok := multiBatchGet[regionID] - if !ok { - singleBatchGet = &batchGetRegion{ - CmdBatchGetRequest: &pb.CmdBatchGetRequest{ - Version: proto.Uint64(startTS), - }, - region: regionID, - } - multiBatchGet[regionID] = singleBatchGet - } - cmdBatchGetReq := singleBatchGet.CmdBatchGetRequest - cmdBatchGetReq.Keys = append(cmdBatchGetReq.Keys, k) - } - return multiBatchGet, nil -} - -// doBatchGet sends BatchGet RPC request. If any key is locked, use tikvSnapshot.Get() to retry. -func (s *tikvSnapshot) doBatchGet(singleBatchGet *batchGetRegion) (map[string][]byte, error) { - cmdBatchGetReq := singleBatchGet.CmdBatchGetRequest - keys := cmdBatchGetReq.GetKeys() - if len(keys) == 0 { - return nil, nil - } - req := &pb.Request{ - Type: pb.MessageType_CmdBatchGet.Enum(), - CmdBatchGetReq: cmdBatchGetReq, - } - resp, err := s.store.SendKVReq(req, singleBatchGet.region) - if err != nil { - return nil, errors.Trace(err) - } - if regionErr := resp.GetRegionError(); regionErr != nil { - //TODO: retry internally - return nil, errors.Annotate(errors.New(regionErr.String()), txnRetryableMark) - } - cmdBatchGetResp := resp.GetCmdBatchGetResp() - if cmdBatchGetResp == nil { - return nil, errors.Trace(errBodyMissing) - } - pairs := cmdBatchGetResp.GetPairs() - m := make(map[string][]byte, len(pairs)) - for _, pair := range pairs { - keyErr := pair.GetError() - if keyErr == nil { - if val := pair.GetValue(); len(val) > 0 { - m[string(pair.GetKey())] = val - } - continue - } - lockInfo, err := extractLockInfoFromKeyErr(keyErr) - if err != nil { - return nil, errors.Trace(err) - } - val, err := s.Get(lockInfo.GetKey()) - if err != nil { - if terror.ErrorEqual(err, kv.ErrNotExist) { - continue - } - return nil, errors.Trace(err) - } - m[string(lockInfo.GetKey())] = val - } - return m, nil -} - // BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs. // The map will not contain nonexistent keys. func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { - m := make(map[string][]byte, len(keys)) + // We want [][]byte instead of []kv.Key, use some magic to save memory. + bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys)) - multiBatchGet, err := s.makeBatchGetReqs(keys) + // Create a map to collect key-values from region servers. + var mu sync.Mutex + m := make(map[string][]byte) + err := s.batchGetKeysByRegions(bytesKeys, func(k, v []byte) { + if len(v) == 0 { + return + } + mu.Lock() + m[string(k)] = v + mu.Unlock() + }) if err != nil { return nil, errors.Trace(err) } - for _, singleBatchGet := range multiBatchGet { - keys := singleBatchGet.GetKeys() - for startIdx := 0; startIdx < len(keys); startIdx += batchGetSize { - endIdx := startIdx + batchGetSize - if endIdx > len(keys) { - endIdx = len(keys) - } - newSingleBatchGet := &batchGetRegion{ - CmdBatchGetRequest: &pb.CmdBatchGetRequest{ - Keys: keys[startIdx:endIdx], - Version: proto.Uint64(singleBatchGet.GetVersion()), - }, - region: singleBatchGet.region, - } - res, err := s.doBatchGet(newSingleBatchGet) - if err != nil { - return nil, errors.Trace(err) - } - m, err = mergeResult(m, res) - if err != nil { - return nil, errors.Trace(err) - } - } + return m, nil +} + +func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v []byte)) error { + groups, _, err := s.store.regionCache.GroupKeysByRegion(keys) + if err != nil { + return errors.Trace(err) } - return m, nil + var batches []batchKeys + for id, g := range groups { + batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, batchGetSize) + } + + if len(batches) == 0 { + return nil + } + if len(batches) == 1 { + return errors.Trace(s.batchGetSingleRegion(batches[0], collectF)) + } + ch := make(chan error) + for _, batch := range batches { + go func(batch batchKeys) { + ch <- s.batchGetSingleRegion(batch, collectF) + }(batch) + } + for i := 0; i < len(batches); i++ { + if e := <-ch; e != nil { + log.Warnf("snapshot batchGet failed: %v, tid: %d", e, s.version.Ver) + err = e + } + } + return errors.Trace(err) +} + +func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v []byte)) error { + pending := batch.keys + var backoffErr error + for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() { + req := &pb.Request{ + Type: pb.MessageType_CmdBatchGet.Enum(), + CmdBatchGetReq: &pb.CmdBatchGetRequest{ + Keys: pending, + Version: proto.Uint64(s.version.Ver), + }, + } + resp, err := s.store.SendKVReq(req, batch.region) + if err != nil { + return errors.Trace(err) + } + if regionErr := resp.GetRegionError(); regionErr != nil { + err = s.batchGetKeysByRegions(pending, collectF) + return errors.Trace(err) + } + batchGetResp := resp.GetCmdBatchGetResp() + if batchGetResp == nil { + return errors.Trace(errBodyMissing) + } + var lockedKeys [][]byte + for _, pair := range batchGetResp.Pairs { + keyErr := pair.GetError() + if keyErr == nil { + collectF(pair.GetKey(), pair.GetValue()) + continue + } + // This could be slow if we meet many expired locks. + // TODO: Find a way to do quick unlock. + val, err := s.handleKeyError(keyErr) + if err != nil { + if terror.ErrorNotEqual(err, errInnerRetryable) { + return errors.Trace(err) + } + lockedKeys = append(lockedKeys, pair.GetKey()) + continue + } + collectF(pair.GetKey(), val) + } + if len(lockedKeys) > 0 { + pending = lockedKeys + continue + } + return nil + } + return errors.Annotate(backoffErr, txnRetryableMark) } // Get gets the value for key k from snapshot. @@ -254,25 +250,3 @@ func (s *tikvSnapshot) handleKeyError(keyErr *pb.KeyError) ([]byte, error) { } return val, nil } - -// mergeResult Merge d2 into d1. If d1 and d2 are overlap, it returns error. -func mergeResult(d1, d2 map[string][]byte) (map[string][]byte, error) { - if d1 == nil { - d1 = make(map[string][]byte) - } - for k2, v2 := range d2 { - if v1, ok := d1[k2]; ok { - // Because compare []byte takes too much time, - // if conflict return error directly even their values are same. - return nil, errors.Errorf("add dict conflict key[%s] v1[%q] v2[%q]", - k2, v1, v2) - } - d1[k2] = v2 - } - return d1, nil -} - -type batchGetRegion struct { - *pb.CmdBatchGetRequest - region RegionVerID -} diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 5cdc87dba5..a968ba0c27 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -160,55 +160,6 @@ func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) { } } -func (s *testSnapshotSuite) TestMergeResult(c *C) { - d1 := makeDict([]string{"1", "2"}) - d2 := makeDict([]string{"a", "foo"}) - d1, err := mergeResult(d1, d2) - c.Assert(err, IsNil) - r1 := makeDict([]string{"a", "foo", "1", "2"}) - equalByteDict(c, d1, r1) -} - -func (s *testSnapshotSuite) TestMergeResultNil(c *C) { - var d1 map[string][]byte - d2 := makeDict([]string{"a", "foo"}) - d1, err := mergeResult(d1, d2) - c.Assert(err, IsNil) - r1 := makeDict([]string{"a", "foo"}) - equalByteDict(c, d1, r1) - - var d3 map[string][]byte - var d4 map[string][]byte - d3, err = mergeResult(d3, d4) - c.Assert(err, IsNil) - var r2 map[string][]byte - equalByteDict(c, d3, r2) -} - -func (s *testSnapshotSuite) TestMergeResultConflict(c *C) { - d1 := makeDict([]string{"1", "2"}) - d2 := makeDict([]string{"a", "foo", "1"}) - _, err := mergeResult(d1, d2) - c.Assert(err, NotNil) -} - -func makeDict(keys []string) map[string][]byte { - d := make(map[string][]byte) - for _, k := range keys { - d[k] = []byte(k) - } - return d -} - -func equalByteDict(c *C, lhs, rhs map[string][]byte) { - c.Assert(lhs, HasLen, len(rhs)) - for k, v1 := range lhs { - v2, ok := rhs[k] - c.Assert(ok, IsTrue) - c.Assert(v1, BytesEquals, v2) - } -} - func makeKeys(rowNum int, prefix string) []kv.Key { keys := make([]kv.Key, 0, rowNum) for i := 0; i < rowNum; i++ { diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index 42f955592c..d945f6318d 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -51,14 +51,19 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { txn := s.begin(c) snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) - multiGets, err := snapshot.makeBatchGetReqs([]kv.Key{kv.Key("a"), kv.Key("b"), kv.Key("c")}) + + keys := [][]byte{{'a'}, {'b'}, {'c'}} + _, region, err := s.store.regionCache.GroupKeysByRegion(keys) c.Assert(err, IsNil) + batch := batchKeys{ + region: region, + keys: keys, + } s.split(c, firstRegion.GetID(), []byte("b")) s.store.regionCache.DropRegion(firstRegion.VerID()) - for _, g := range multiGets { - // mock-tikv will panic if it meets a not-in-region key. - snapshot.doBatchGet(g) - } + // mock-tikv will panic if it meets a not-in-region key. + err = snapshot.batchGetSingleRegion(batch, func([]byte, []byte) {}) + c.Assert(err, IsNil) } diff --git a/store/tikv/txn_committer.go b/store/tikv/txn_committer.go index 3e034a2760..dd016f7776 100644 --- a/store/tikv/txn_committer.go +++ b/store/tikv/txn_committer.go @@ -100,11 +100,11 @@ func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn f var batches []batchKeys // Make sure the group that contains primary key goes first. if firstIsPrimary { - batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFn) + batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFn, txnCommitBatchSize) delete(groups, firstRegion) } for id, g := range groups { - batches = appendBatchBySize(batches, id, g, sizeFn) + batches = appendBatchBySize(batches, id, g, sizeFn, txnCommitBatchSize) } if firstIsPrimary { @@ -339,11 +339,11 @@ type batchKeys struct { // appendBatchBySize appends keys to []batchKeys. It may split the keys to make // sure each batch's size does not exceed the limit. -func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int) []batchKeys { +func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int, limit int) []batchKeys { var start, end int for start = 0; start < len(keys); start = end { var size int - for end = start; end < len(keys) && size < txnCommitBatchSize; end++ { + for end = start; end < len(keys) && size < limit; end++ { size += sizeFn(keys[end]) } b = append(b, batchKeys{