From 5846fc3ee0a3cd8928b130bd3c05ab7c0fa96cfd Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 8 Jun 2016 15:19:08 +0800 Subject: [PATCH] store/tikv: parallel commit. (#1290) * store/tikv: parallel commit. * store/tikv: commit && cleanup asynchronously. --- store/tikv/region_cache.go | 30 +++++- store/tikv/txn_committer.go | 177 +++++++++++++++++++------------ store/tikv/txn_committer_test.go | 118 +++++++++++++++++++++ 3 files changed, 255 insertions(+), 70 deletions(-) create mode 100644 store/tikv/txn_committer_test.go diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 4cc225c511..ca72640829 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -71,7 +71,35 @@ func (c *RegionCache) GetRegion(key []byte) (*Region, error) { return c.insertRegionToCache(r), nil } -// DropRegion remove some region cache. +// GroupKeysByRegion separates keys into groups by their belonging Regions. +// Specially it also returns the first key's region which may be used as the +// 'PrimaryLockKey' and should be committed ahead of others. +func (c *RegionCache) GroupKeysByRegion(keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) { + groups := make(map[RegionVerID][][]byte) + var first RegionVerID + var lastRegion *Region + for i, k := range keys { + var region *Region + if lastRegion != nil && lastRegion.Contains(k) { + region = lastRegion + } else { + var err error + region, err = c.GetRegion(k) + if err != nil { + return nil, first, errors.Trace(err) + } + lastRegion = region + } + id := region.VerID() + if i == 0 { + first = id + } + groups[id] = append(groups[id], k) + } + return groups, first, nil +} + +// DropRegion removes a cached Region. func (c *RegionCache) DropRegion(id RegionVerID) { c.mu.Lock() defer c.mu.Unlock() diff --git a/store/tikv/txn_committer.go b/store/tikv/txn_committer.go index ac10180cdb..8e17247e58 100644 --- a/store/tikv/txn_committer.go +++ b/store/tikv/txn_committer.go @@ -15,6 +15,7 @@ package tikv import ( "bytes" + "sync" "github.com/golang/protobuf/proto" "github.com/juju/errors" @@ -29,8 +30,9 @@ type txnCommitter struct { startTS uint64 keys [][]byte mutations map[string]*pb.Mutation - writtenKeys [][]byte commitTS uint64 + mu sync.RWMutex + writtenKeys [][]byte committed bool } @@ -82,42 +84,72 @@ func (c *txnCommitter) primary() []byte { return c.keys[0] } -func (c *txnCommitter) iterKeysByRegion(keys [][]byte, f func(RegionVerID, [][]byte) error) error { - groups := make(map[RegionVerID][][]byte) - var primaryRegionID RegionVerID - var lastRegion *Region - for _, k := range keys { - var region *Region - if lastRegion != nil && lastRegion.Contains(k) { - region = lastRegion - } else { - var err error - region, err = c.store.regionCache.GetRegion(k) - if err != nil { - return errors.Trace(err) - } - lastRegion = region - } - id := region.VerID() - if bytes.Compare(k, c.primary()) == 0 { - primaryRegionID = id - } - groups[id] = append(groups[id], k) +// iterKeys groups keys into batches, then applies `f` to them. If the flag +// asyncNonPrimary is set, it will return as soon as the primary batch is +// processed. +func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn func([]byte) int, asyncNonPrimary bool) error { + if len(keys) == 0 { + return nil } + groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(keys) + if err != nil { + return errors.Trace(err) + } + firstIsPrimary := bytes.Equal(keys[0], c.primary()) + var batches []batchKeys // Make sure the group that contains primary key goes first. - if primaryRegionID.id != 0 { - if err := f(primaryRegionID, groups[primaryRegionID]); err != nil { - return errors.Trace(err) - } - delete(groups, primaryRegionID) + if firstIsPrimary { + batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFn) + delete(groups, firstRegion) } for id, g := range groups { - if err := f(id, g); err != nil { + batches = appendBatchBySize(batches, id, g, sizeFn) + } + + if firstIsPrimary { + err = c.doBatches(batches[:1], f) + if err != nil { return errors.Trace(err) } + batches = batches[1:] } - return nil + if asyncNonPrimary { + go c.doBatches(batches, f) + return nil + } + err = c.doBatches(batches, f) + return errors.Trace(err) +} + +// doBatches applies f to batches parallelly. +func (c *txnCommitter) doBatches(batches []batchKeys, f func(batchKeys) error) error { + if len(batches) == 0 { + return nil + } + if len(batches) == 1 { + e := f(batches[0]) + if e != nil { + log.Warnf("txnCommitter doBatches failed: %v, tid: %d", e, c.startTS) + } + return errors.Trace(e) + } + + // TODO: For prewrite, stop sending other requests after receiving first error. + ch := make(chan error) + for _, batch := range batches { + go func(batch batchKeys) { + ch <- f(batch) + }(batch) + } + var err error + for i := 0; i < len(batches); i++ { + if e := <-ch; e != nil { + log.Warnf("txnCommitter doBatches failed: %v, tid: %d", e, c.startTS) + err = e + } + } + return errors.Trace(err) } func (c *txnCommitter) keyValueSize(key []byte) int { @@ -132,9 +164,9 @@ func (c *txnCommitter) keySize(key []byte) int { return len(key) } -func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte) error { - mutations := make([]*pb.Mutation, len(keys)) - for i, k := range keys { +func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error { + mutations := make([]*pb.Mutation, len(batch.keys)) + for i, k := range batch.keys { mutations[i] = c.mutations[string(k)] } req := &pb.Request{ @@ -148,7 +180,7 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte) var backoffErr error for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() { - resp, err := c.store.SendKVReq(req, regionID) + resp, err := c.store.SendKVReq(req, batch.region) if err != nil { return errors.Trace(err) } @@ -157,7 +189,7 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte) // TODO: The recursive maybe not able to exit if TiKV & // PD are implemented incorrectly. A possible fix is // introducing a 'max backoff time'. - err = c.prewriteKeys(keys) + err = c.prewriteKeys(batch.keys) return errors.Trace(err) } prewriteResp := resp.GetCmdPrewriteResp() @@ -167,7 +199,9 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte) keyErrs := prewriteResp.GetErrors() if len(keyErrs) == 0 { // We need to cleanup all written keys if transaction aborts. - c.writtenKeys = append(c.writtenKeys, keys...) + c.mu.Lock() + defer c.mu.Unlock() + c.writtenKeys = append(c.writtenKeys, batch.keys...) return nil } for _, keyErr := range keyErrs { @@ -186,23 +220,23 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte) return errors.Annotate(backoffErr, txnRetryableMark) } -func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) error { +func (c *txnCommitter) commitSingleRegion(batch batchKeys) error { req := &pb.Request{ Type: pb.MessageType_CmdCommit.Enum(), CmdCommitReq: &pb.CmdCommitRequest{ StartVersion: proto.Uint64(c.startTS), - Keys: keys, + Keys: batch.keys, CommitVersion: proto.Uint64(c.commitTS), }, } - resp, err := c.store.SendKVReq(req, regionID) + resp, err := c.store.SendKVReq(req, batch.region) if err != nil { return errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { // re-split keys and commit again. - err = c.commitKeys(keys) + err = c.commitKeys(batch.keys) return errors.Trace(err) } commitResp := resp.GetCmdCommitResp() @@ -210,6 +244,8 @@ func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) e return errors.Trace(errBodyMissing) } if keyErr := commitResp.GetError(); keyErr != nil { + c.mu.RLock() + defer c.mu.RUnlock() err = errors.Errorf("commit failed: %v", keyErr.String()) if c.committed { // No secondary key could be rolled back after it's primary key is committed. @@ -222,33 +258,31 @@ func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) e return errors.Annotate(err, txnRetryableMark) } + c.mu.Lock() + defer c.mu.Unlock() // Group that contains primary key is always the first. // We mark transaction's status committed when we receive the first success response. c.committed = true return nil } -func (c *txnCommitter) cleanupSingleRegion(regionID RegionVerID, keys [][]byte) error { +func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error { req := &pb.Request{ Type: pb.MessageType_CmdBatchRollback.Enum(), CmdBatchRollbackReq: &pb.CmdBatchRollbackRequest{ - Keys: keys, + Keys: batch.keys, StartVersion: proto.Uint64(c.startTS), }, } - resp, err := c.store.SendKVReq(req, regionID) + resp, err := c.store.SendKVReq(req, batch.region) if err != nil { return errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { - err = c.cleanupKeys(keys) + err = c.cleanupKeys(batch.keys) return errors.Trace(err) } - rollbackResp := resp.GetCmdBatchRollbackResp() - if rollbackResp == nil { - return errors.Trace(errBodyMissing) - } - if keyErr := rollbackResp.GetError(); keyErr != nil { + if keyErr := resp.GetCmdBatchRollbackResp().GetError(); keyErr != nil { err = errors.Errorf("cleanup failed: %s", keyErr) log.Errorf("txn failed cleanup key: %v, tid: %d", err, c.startTS) return errors.Trace(err) @@ -257,22 +291,22 @@ func (c *txnCommitter) cleanupSingleRegion(regionID RegionVerID, keys [][]byte) } func (c *txnCommitter) prewriteKeys(keys [][]byte) error { - return c.iterKeysByRegion(keys, batchIterFn(c.prewriteSingleRegion, c.keyValueSize)) + return c.iterKeys(keys, c.prewriteSingleRegion, c.keyValueSize, false) } func (c *txnCommitter) commitKeys(keys [][]byte) error { - return c.iterKeysByRegion(keys, batchIterFn(c.commitSingleRegion, c.keySize)) + return c.iterKeys(keys, c.commitSingleRegion, c.keySize, true) } func (c *txnCommitter) cleanupKeys(keys [][]byte) error { - return c.iterKeysByRegion(keys, batchIterFn(c.cleanupSingleRegion, c.keySize)) + return c.iterKeys(keys, c.cleanupSingleRegion, c.keySize, false) } func (c *txnCommitter) Commit() error { err := c.prewriteKeys(c.keys) if err != nil { log.Warnf("txn commit failed on prewrite: %v, tid: %d", err, c.startTS) - c.cleanupKeys(c.writtenKeys) + go c.cleanupKeys(c.writtenKeys) return errors.Trace(err) } @@ -285,7 +319,7 @@ func (c *txnCommitter) Commit() error { err = c.commitKeys(c.keys) if err != nil { if !c.committed { - c.cleanupKeys(c.writtenKeys) + go c.cleanupKeys(c.writtenKeys) return errors.Trace(err) } log.Warnf("txn commit succeed with error: %v, tid: %d", err, c.startTS) @@ -297,20 +331,25 @@ func (c *txnCommitter) Commit() error { // Key+Value size below 512KB. const txnCommitBatchSize = 512 * 1024 -// batchIterfn wraps an iteration function and returns a new one that iterates -// keys by batch size. -func batchIterFn(f func(RegionVerID, [][]byte) error, sizeFn func([]byte) int) func(RegionVerID, [][]byte) error { - return func(id RegionVerID, keys [][]byte) error { - var start, end int - for start = 0; start < len(keys); start = end { - var size int - for end = start; end < len(keys) && size < txnCommitBatchSize; end++ { - size += sizeFn(keys[end]) - } - if err := f(id, keys[start:end]); err != nil { - return errors.Trace(err) - } - } - return nil - } +// batchKeys is a batch of keys in the same region. +type batchKeys struct { + region RegionVerID + keys [][]byte +} + +// appendBatchBySize appends keys to []batchKeys. It may split the keys to make +// sure each batch's size does not exceed the limit. +func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int) []batchKeys { + var start, end int + for start = 0; start < len(keys); start = end { + var size int + for end = start; end < len(keys) && size < txnCommitBatchSize; end++ { + size += sizeFn(keys[end]) + } + b = append(b, batchKeys{ + region: region, + keys: keys[start:end], + }) + } + return b } diff --git a/store/tikv/txn_committer_test.go b/store/tikv/txn_committer_test.go new file mode 100644 index 0000000000..c72f22a5d5 --- /dev/null +++ b/store/tikv/txn_committer_test.go @@ -0,0 +1,118 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "math/rand" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/store/tikv/mock-tikv" +) + +type testCommitterSuite struct { + cluster *mocktikv.Cluster + store *tikvStore +} + +var _ = Suite(&testCommitterSuite{}) + +func (s *testCommitterSuite) SetUpTest(c *C) { + s.cluster = mocktikv.NewCluster() + mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c")) + mvccStore := mocktikv.NewMvccStore() + clientFactory := mockClientFactory(s.cluster, mvccStore) + s.store = newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), clientFactory) +} + +func (s *testCommitterSuite) begin(c *C) *tikvTxn { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + return txn.(*tikvTxn) +} + +func (s *testCommitterSuite) checkValues(c *C, m map[string]string) { + txn := s.begin(c) + for k, v := range m { + val, err := txn.Get([]byte(k)) + c.Assert(err, IsNil) + c.Assert(string(val), Equals, v) + } +} + +func (s *testCommitterSuite) mustCommit(c *C, m map[string]string) { + txn := s.begin(c) + for k, v := range m { + err := txn.Set([]byte(k), []byte(v)) + c.Assert(err, IsNil) + } + err := txn.Commit() + c.Assert(err, IsNil) + + s.checkValues(c, m) +} + +func randKV(keyLen, valLen int) (string, string) { + const letters = "abc" + k, v := make([]byte, keyLen), make([]byte, valLen) + for i := range k { + k[i] = letters[rand.Intn(len(letters))] + } + for i := range v { + v[i] = letters[rand.Intn(len(letters))] + } + return string(k), string(v) +} + +func (s *testCommitterSuite) TestCommitMultipleRegions(c *C) { + m := make(map[string]string) + for i := 0; i < 1000; i++ { + k, v := randKV(10, 10) + m[k] = v + } + s.mustCommit(c, m) + + // Test big values. + m = make(map[string]string) + for i := 0; i < 500; i++ { + k, v := randKV(11, txnCommitBatchSize/7) + m[k] = v + } + s.mustCommit(c, m) +} + +func (s *testCommitterSuite) TestCommitRollback(c *C) { + s.mustCommit(c, map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }) + + txn := s.begin(c) + txn.Set([]byte("a"), []byte("a1")) + txn.Set([]byte("b"), []byte("b1")) + txn.Set([]byte("c"), []byte("c1")) + + s.mustCommit(c, map[string]string{ + "c": "c2", + }) + + err := txn.Commit() + c.Assert(err, NotNil) + + s.checkValues(c, map[string]string{ + "a": "a", + "b": "b", + "c": "c2", + }) +}