store/tikv: parallel commit. (#1290)

* store/tikv: parallel commit.

* store/tikv: commit && cleanup asynchronously.
This commit is contained in:
disksing
2016-06-08 15:19:08 +08:00
parent c6637049ab
commit 5846fc3ee0
3 changed files with 255 additions and 70 deletions

View File

@ -71,7 +71,35 @@ func (c *RegionCache) GetRegion(key []byte) (*Region, error) {
return c.insertRegionToCache(r), nil
}
// DropRegion remove some region cache.
// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
func (c *RegionCache) GroupKeysByRegion(keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) {
groups := make(map[RegionVerID][][]byte)
var first RegionVerID
var lastRegion *Region
for i, k := range keys {
var region *Region
if lastRegion != nil && lastRegion.Contains(k) {
region = lastRegion
} else {
var err error
region, err = c.GetRegion(k)
if err != nil {
return nil, first, errors.Trace(err)
}
lastRegion = region
}
id := region.VerID()
if i == 0 {
first = id
}
groups[id] = append(groups[id], k)
}
return groups, first, nil
}
// DropRegion removes a cached Region.
func (c *RegionCache) DropRegion(id RegionVerID) {
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -15,6 +15,7 @@ package tikv
import (
"bytes"
"sync"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
@ -29,8 +30,9 @@ type txnCommitter struct {
startTS uint64
keys [][]byte
mutations map[string]*pb.Mutation
writtenKeys [][]byte
commitTS uint64
mu sync.RWMutex
writtenKeys [][]byte
committed bool
}
@ -82,42 +84,72 @@ func (c *txnCommitter) primary() []byte {
return c.keys[0]
}
func (c *txnCommitter) iterKeysByRegion(keys [][]byte, f func(RegionVerID, [][]byte) error) error {
groups := make(map[RegionVerID][][]byte)
var primaryRegionID RegionVerID
var lastRegion *Region
for _, k := range keys {
var region *Region
if lastRegion != nil && lastRegion.Contains(k) {
region = lastRegion
} else {
var err error
region, err = c.store.regionCache.GetRegion(k)
if err != nil {
return errors.Trace(err)
}
lastRegion = region
}
id := region.VerID()
if bytes.Compare(k, c.primary()) == 0 {
primaryRegionID = id
}
groups[id] = append(groups[id], k)
// iterKeys groups keys into batches, then applies `f` to them. If the flag
// asyncNonPrimary is set, it will return as soon as the primary batch is
// processed.
func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn func([]byte) int, asyncNonPrimary bool) error {
if len(keys) == 0 {
return nil
}
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(keys)
if err != nil {
return errors.Trace(err)
}
firstIsPrimary := bytes.Equal(keys[0], c.primary())
var batches []batchKeys
// Make sure the group that contains primary key goes first.
if primaryRegionID.id != 0 {
if err := f(primaryRegionID, groups[primaryRegionID]); err != nil {
return errors.Trace(err)
}
delete(groups, primaryRegionID)
if firstIsPrimary {
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFn)
delete(groups, firstRegion)
}
for id, g := range groups {
if err := f(id, g); err != nil {
batches = appendBatchBySize(batches, id, g, sizeFn)
}
if firstIsPrimary {
err = c.doBatches(batches[:1], f)
if err != nil {
return errors.Trace(err)
}
batches = batches[1:]
}
return nil
if asyncNonPrimary {
go c.doBatches(batches, f)
return nil
}
err = c.doBatches(batches, f)
return errors.Trace(err)
}
// doBatches applies f to batches parallelly.
func (c *txnCommitter) doBatches(batches []batchKeys, f func(batchKeys) error) error {
if len(batches) == 0 {
return nil
}
if len(batches) == 1 {
e := f(batches[0])
if e != nil {
log.Warnf("txnCommitter doBatches failed: %v, tid: %d", e, c.startTS)
}
return errors.Trace(e)
}
// TODO: For prewrite, stop sending other requests after receiving first error.
ch := make(chan error)
for _, batch := range batches {
go func(batch batchKeys) {
ch <- f(batch)
}(batch)
}
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Warnf("txnCommitter doBatches failed: %v, tid: %d", e, c.startTS)
err = e
}
}
return errors.Trace(err)
}
func (c *txnCommitter) keyValueSize(key []byte) int {
@ -132,9 +164,9 @@ func (c *txnCommitter) keySize(key []byte) int {
return len(key)
}
func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte) error {
mutations := make([]*pb.Mutation, len(keys))
for i, k := range keys {
func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = c.mutations[string(k)]
}
req := &pb.Request{
@ -148,7 +180,7 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte)
var backoffErr error
for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() {
resp, err := c.store.SendKVReq(req, regionID)
resp, err := c.store.SendKVReq(req, batch.region)
if err != nil {
return errors.Trace(err)
}
@ -157,7 +189,7 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte)
// TODO: The recursive maybe not able to exit if TiKV &
// PD are implemented incorrectly. A possible fix is
// introducing a 'max backoff time'.
err = c.prewriteKeys(keys)
err = c.prewriteKeys(batch.keys)
return errors.Trace(err)
}
prewriteResp := resp.GetCmdPrewriteResp()
@ -167,7 +199,9 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte)
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
// We need to cleanup all written keys if transaction aborts.
c.writtenKeys = append(c.writtenKeys, keys...)
c.mu.Lock()
defer c.mu.Unlock()
c.writtenKeys = append(c.writtenKeys, batch.keys...)
return nil
}
for _, keyErr := range keyErrs {
@ -186,23 +220,23 @@ func (c *txnCommitter) prewriteSingleRegion(regionID RegionVerID, keys [][]byte)
return errors.Annotate(backoffErr, txnRetryableMark)
}
func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) error {
func (c *txnCommitter) commitSingleRegion(batch batchKeys) error {
req := &pb.Request{
Type: pb.MessageType_CmdCommit.Enum(),
CmdCommitReq: &pb.CmdCommitRequest{
StartVersion: proto.Uint64(c.startTS),
Keys: keys,
Keys: batch.keys,
CommitVersion: proto.Uint64(c.commitTS),
},
}
resp, err := c.store.SendKVReq(req, regionID)
resp, err := c.store.SendKVReq(req, batch.region)
if err != nil {
return errors.Trace(err)
}
if regionErr := resp.GetRegionError(); regionErr != nil {
// re-split keys and commit again.
err = c.commitKeys(keys)
err = c.commitKeys(batch.keys)
return errors.Trace(err)
}
commitResp := resp.GetCmdCommitResp()
@ -210,6 +244,8 @@ func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) e
return errors.Trace(errBodyMissing)
}
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
defer c.mu.RUnlock()
err = errors.Errorf("commit failed: %v", keyErr.String())
if c.committed {
// No secondary key could be rolled back after it's primary key is committed.
@ -222,33 +258,31 @@ func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) e
return errors.Annotate(err, txnRetryableMark)
}
c.mu.Lock()
defer c.mu.Unlock()
// Group that contains primary key is always the first.
// We mark transaction's status committed when we receive the first success response.
c.committed = true
return nil
}
func (c *txnCommitter) cleanupSingleRegion(regionID RegionVerID, keys [][]byte) error {
func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error {
req := &pb.Request{
Type: pb.MessageType_CmdBatchRollback.Enum(),
CmdBatchRollbackReq: &pb.CmdBatchRollbackRequest{
Keys: keys,
Keys: batch.keys,
StartVersion: proto.Uint64(c.startTS),
},
}
resp, err := c.store.SendKVReq(req, regionID)
resp, err := c.store.SendKVReq(req, batch.region)
if err != nil {
return errors.Trace(err)
}
if regionErr := resp.GetRegionError(); regionErr != nil {
err = c.cleanupKeys(keys)
err = c.cleanupKeys(batch.keys)
return errors.Trace(err)
}
rollbackResp := resp.GetCmdBatchRollbackResp()
if rollbackResp == nil {
return errors.Trace(errBodyMissing)
}
if keyErr := rollbackResp.GetError(); keyErr != nil {
if keyErr := resp.GetCmdBatchRollbackResp().GetError(); keyErr != nil {
err = errors.Errorf("cleanup failed: %s", keyErr)
log.Errorf("txn failed cleanup key: %v, tid: %d", err, c.startTS)
return errors.Trace(err)
@ -257,22 +291,22 @@ func (c *txnCommitter) cleanupSingleRegion(regionID RegionVerID, keys [][]byte)
}
func (c *txnCommitter) prewriteKeys(keys [][]byte) error {
return c.iterKeysByRegion(keys, batchIterFn(c.prewriteSingleRegion, c.keyValueSize))
return c.iterKeys(keys, c.prewriteSingleRegion, c.keyValueSize, false)
}
func (c *txnCommitter) commitKeys(keys [][]byte) error {
return c.iterKeysByRegion(keys, batchIterFn(c.commitSingleRegion, c.keySize))
return c.iterKeys(keys, c.commitSingleRegion, c.keySize, true)
}
func (c *txnCommitter) cleanupKeys(keys [][]byte) error {
return c.iterKeysByRegion(keys, batchIterFn(c.cleanupSingleRegion, c.keySize))
return c.iterKeys(keys, c.cleanupSingleRegion, c.keySize, false)
}
func (c *txnCommitter) Commit() error {
err := c.prewriteKeys(c.keys)
if err != nil {
log.Warnf("txn commit failed on prewrite: %v, tid: %d", err, c.startTS)
c.cleanupKeys(c.writtenKeys)
go c.cleanupKeys(c.writtenKeys)
return errors.Trace(err)
}
@ -285,7 +319,7 @@ func (c *txnCommitter) Commit() error {
err = c.commitKeys(c.keys)
if err != nil {
if !c.committed {
c.cleanupKeys(c.writtenKeys)
go c.cleanupKeys(c.writtenKeys)
return errors.Trace(err)
}
log.Warnf("txn commit succeed with error: %v, tid: %d", err, c.startTS)
@ -297,20 +331,25 @@ func (c *txnCommitter) Commit() error {
// Key+Value size below 512KB.
const txnCommitBatchSize = 512 * 1024
// batchIterfn wraps an iteration function and returns a new one that iterates
// keys by batch size.
func batchIterFn(f func(RegionVerID, [][]byte) error, sizeFn func([]byte) int) func(RegionVerID, [][]byte) error {
return func(id RegionVerID, keys [][]byte) error {
var start, end int
for start = 0; start < len(keys); start = end {
var size int
for end = start; end < len(keys) && size < txnCommitBatchSize; end++ {
size += sizeFn(keys[end])
}
if err := f(id, keys[start:end]); err != nil {
return errors.Trace(err)
}
}
return nil
}
// batchKeys is a batch of keys in the same region.
type batchKeys struct {
region RegionVerID
keys [][]byte
}
// appendBatchBySize appends keys to []batchKeys. It may split the keys to make
// sure each batch's size does not exceed the limit.
func appendBatchBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int) []batchKeys {
var start, end int
for start = 0; start < len(keys); start = end {
var size int
for end = start; end < len(keys) && size < txnCommitBatchSize; end++ {
size += sizeFn(keys[end])
}
b = append(b, batchKeys{
region: region,
keys: keys[start:end],
})
}
return b
}

View File

@ -0,0 +1,118 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
import (
"math/rand"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv/mock-tikv"
)
type testCommitterSuite struct {
cluster *mocktikv.Cluster
store *tikvStore
}
var _ = Suite(&testCommitterSuite{})
func (s *testCommitterSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
mvccStore := mocktikv.NewMvccStore()
clientFactory := mockClientFactory(s.cluster, mvccStore)
s.store = newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), clientFactory)
}
func (s *testCommitterSuite) begin(c *C) *tikvTxn {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
return txn.(*tikvTxn)
}
func (s *testCommitterSuite) checkValues(c *C, m map[string]string) {
txn := s.begin(c)
for k, v := range m {
val, err := txn.Get([]byte(k))
c.Assert(err, IsNil)
c.Assert(string(val), Equals, v)
}
}
func (s *testCommitterSuite) mustCommit(c *C, m map[string]string) {
txn := s.begin(c)
for k, v := range m {
err := txn.Set([]byte(k), []byte(v))
c.Assert(err, IsNil)
}
err := txn.Commit()
c.Assert(err, IsNil)
s.checkValues(c, m)
}
func randKV(keyLen, valLen int) (string, string) {
const letters = "abc"
k, v := make([]byte, keyLen), make([]byte, valLen)
for i := range k {
k[i] = letters[rand.Intn(len(letters))]
}
for i := range v {
v[i] = letters[rand.Intn(len(letters))]
}
return string(k), string(v)
}
func (s *testCommitterSuite) TestCommitMultipleRegions(c *C) {
m := make(map[string]string)
for i := 0; i < 1000; i++ {
k, v := randKV(10, 10)
m[k] = v
}
s.mustCommit(c, m)
// Test big values.
m = make(map[string]string)
for i := 0; i < 500; i++ {
k, v := randKV(11, txnCommitBatchSize/7)
m[k] = v
}
s.mustCommit(c, m)
}
func (s *testCommitterSuite) TestCommitRollback(c *C) {
s.mustCommit(c, map[string]string{
"a": "a",
"b": "b",
"c": "c",
})
txn := s.begin(c)
txn.Set([]byte("a"), []byte("a1"))
txn.Set([]byte("b"), []byte("b1"))
txn.Set([]byte("c"), []byte("c1"))
s.mustCommit(c, map[string]string{
"c": "c2",
})
err := txn.Commit()
c.Assert(err, NotNil)
s.checkValues(c, map[string]string{
"a": "a",
"b": "b",
"c": "c2",
})
}