store/tikv: udpate backoff. (#1506)
This commit is contained in:
@ -19,6 +19,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -32,17 +33,13 @@ const (
|
||||
DecorrJitter
|
||||
)
|
||||
|
||||
// NewBackoff creates a backoff func which implements exponential backoff with
|
||||
// NewBackoffFn creates a backoff func which implements exponential backoff with
|
||||
// optional jitters.
|
||||
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
|
||||
func NewBackoff(retry, base, cap, jitter int) func() error {
|
||||
func NewBackoffFn(base, cap, jitter int) func() int {
|
||||
attempts := 0
|
||||
totalSleep := 0
|
||||
lastSleep := base
|
||||
return func() error {
|
||||
if attempts >= retry {
|
||||
return errors.Errorf("still fail after %d retries, total sleep %dms", attempts, totalSleep)
|
||||
}
|
||||
return func() int {
|
||||
var sleep int
|
||||
switch jitter {
|
||||
case NoJitter:
|
||||
@ -59,12 +56,95 @@ func NewBackoff(retry, base, cap, jitter int) func() error {
|
||||
time.Sleep(time.Duration(sleep) * time.Millisecond)
|
||||
|
||||
attempts++
|
||||
totalSleep += sleep
|
||||
lastSleep = sleep
|
||||
return nil
|
||||
return lastSleep
|
||||
}
|
||||
}
|
||||
|
||||
func expo(base, cap, n int) int {
|
||||
return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n))))
|
||||
}
|
||||
|
||||
type backoffType int
|
||||
|
||||
const (
|
||||
boTiKVRPC backoffType = iota
|
||||
boTxnLock
|
||||
boPDRPC
|
||||
boRegionMiss
|
||||
)
|
||||
|
||||
func (t backoffType) createFn() func() int {
|
||||
switch t {
|
||||
case boTiKVRPC:
|
||||
return NewBackoffFn(100, 2000, EqualJitter)
|
||||
case boTxnLock:
|
||||
return NewBackoffFn(300, 3000, EqualJitter)
|
||||
case boPDRPC:
|
||||
return NewBackoffFn(500, 3000, EqualJitter)
|
||||
case boRegionMiss:
|
||||
return NewBackoffFn(100, 500, NoJitter)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Maximum total sleep time(in ms) for kv/cop commands.
|
||||
const (
|
||||
copBuildTaskMaxBackoff = 5000
|
||||
tsoMaxBackoff = 5000
|
||||
scannerNextMaxBackoff = 5000
|
||||
batchGetMaxBackoff = 10000
|
||||
copNextMaxBackoff = 10000
|
||||
getMaxBackoff = 10000
|
||||
prewriteMaxBackoff = 10000
|
||||
commitMaxBackoff = 10000
|
||||
cleanupMaxBackoff = 10000
|
||||
)
|
||||
|
||||
// Backoffer is a utility for retrying queries.
|
||||
type Backoffer struct {
|
||||
fn map[backoffType]func() int
|
||||
maxSleep int
|
||||
totalSleep int
|
||||
errors []error
|
||||
}
|
||||
|
||||
// NewBackoffer creates a Backoffer with maximum sleep time(in ms).
|
||||
func NewBackoffer(maxSleep int) *Backoffer {
|
||||
return &Backoffer{
|
||||
maxSleep: maxSleep,
|
||||
}
|
||||
}
|
||||
|
||||
// Backoff sleeps a while base on the backoffType and records the error message.
|
||||
// It returns a retryable error if total sleep time exceeds maxSleep.
|
||||
func (b *Backoffer) Backoff(typ backoffType, err error) error {
|
||||
// Lazy initialize.
|
||||
if b.fn == nil {
|
||||
b.fn = make(map[backoffType]func() int)
|
||||
}
|
||||
f, ok := b.fn[typ]
|
||||
if !ok {
|
||||
f = typ.createFn()
|
||||
b.fn[typ] = f
|
||||
}
|
||||
|
||||
b.totalSleep += f()
|
||||
|
||||
log.Warnf("%v, retry later(totalSleep %dms, maxSleep %dms)", err, b.totalSleep, b.maxSleep)
|
||||
b.errors = append(b.errors, err)
|
||||
if b.totalSleep >= b.maxSleep {
|
||||
e := errors.Errorf("backoffer.maxSleep %dms is exceeded, errors: %v", b.maxSleep, b.errors)
|
||||
return errors.Annotate(e, txnRetryableMark)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors.
|
||||
func (b *Backoffer) Fork() *Backoffer {
|
||||
return &Backoffer{
|
||||
maxSleep: b.maxSleep,
|
||||
totalSleep: b.totalSleep,
|
||||
errors: b.errors,
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,14 +126,3 @@ func (c *rpcClient) Close() error {
|
||||
c.p.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// rpcBackoff is for RPC (with TiKV) retry.
|
||||
// It is expected to sleep for about 10s(+/-3s) in total before abort.
|
||||
func rpcBackoff() func() error {
|
||||
const (
|
||||
maxRetry = 10
|
||||
sleepBase = 100
|
||||
sleepCap = 2000
|
||||
)
|
||||
return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter)
|
||||
}
|
||||
|
||||
@ -76,7 +76,8 @@ func supportExpr(exprType tipb.ExprType) bool {
|
||||
|
||||
// Send builds the request and gets the coprocessor iterator response.
|
||||
func (c *CopClient) Send(req *kv.Request) kv.Response {
|
||||
tasks, err := buildCopTasks(c.store.regionCache, req.KeyRanges, req.Desc)
|
||||
bo := NewBackoffer(copBuildTaskMaxBackoff)
|
||||
tasks, err := buildCopTasks(bo, c.store.regionCache, req.KeyRanges, req.Desc)
|
||||
if err != nil {
|
||||
return copErrorResponse{err}
|
||||
}
|
||||
@ -133,11 +134,11 @@ func (t *copTask) pbRanges() []*coprocessor.KeyRange {
|
||||
return ranges
|
||||
}
|
||||
|
||||
func buildCopTasks(cache *RegionCache, ranges []kv.KeyRange, desc bool) ([]*copTask, error) {
|
||||
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges []kv.KeyRange, desc bool) ([]*copTask, error) {
|
||||
var tasks []*copTask
|
||||
for _, r := range ranges {
|
||||
var err error
|
||||
if tasks, err = appendTask(tasks, cache, r); err != nil {
|
||||
if tasks, err = appendTask(tasks, bo, cache, r); err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
@ -155,14 +156,14 @@ func reverseTasks(tasks []*copTask) {
|
||||
}
|
||||
}
|
||||
|
||||
func appendTask(tasks []*copTask, cache *RegionCache, r kv.KeyRange) ([]*copTask, error) {
|
||||
func appendTask(tasks []*copTask, bo *Backoffer, cache *RegionCache, r kv.KeyRange) ([]*copTask, error) {
|
||||
var last *copTask
|
||||
if len(tasks) > 0 {
|
||||
last = tasks[len(tasks)-1]
|
||||
}
|
||||
// Ensure `r` (or part of `r`) is inside `last`, create a task if need.
|
||||
if last == nil || !last.region.Contains(r.StartKey) {
|
||||
region, err := cache.GetRegion(r.StartKey)
|
||||
region, err := cache.GetRegion(bo, r.StartKey)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -187,7 +188,7 @@ func appendTask(tasks []*copTask, cache *RegionCache, r kv.KeyRange) ([]*copTask
|
||||
StartKey: last.region.EndKey(),
|
||||
EndKey: r.EndKey,
|
||||
}
|
||||
return appendTask(tasks, cache, remain)
|
||||
return appendTask(tasks, bo, cache, remain)
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
@ -207,6 +208,7 @@ type copIterator struct {
|
||||
|
||||
// Pick the next new copTask and send request to tikv-server.
|
||||
func (it *copIterator) work() {
|
||||
bo := NewBackoffer(copNextMaxBackoff)
|
||||
for {
|
||||
it.mu.Lock()
|
||||
if it.finished {
|
||||
@ -227,7 +229,7 @@ func (it *copIterator) work() {
|
||||
}
|
||||
task.status = taskRunning
|
||||
it.mu.Unlock()
|
||||
resp, err := it.handleTask(task)
|
||||
resp, err := it.handleTask(bo, task)
|
||||
if err != nil {
|
||||
it.errChan <- err
|
||||
break
|
||||
@ -303,9 +305,8 @@ func (it *copIterator) Next() (io.ReadCloser, error) {
|
||||
}
|
||||
|
||||
// Handle single copTask.
|
||||
func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error) {
|
||||
var backoffErr error
|
||||
for backoff := rpcBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
func (it *copIterator) handleTask(bo *Backoffer, task *copTask) (*coprocessor.Response, error) {
|
||||
for {
|
||||
req := &coprocessor.Request{
|
||||
Context: task.region.GetContext(),
|
||||
Tp: proto.Int64(it.req.Tp),
|
||||
@ -315,9 +316,13 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error)
|
||||
resp, err := it.store.client.SendCopReq(task.region.GetAddress(), req)
|
||||
if err != nil {
|
||||
it.store.regionCache.NextPeer(task.region.VerID())
|
||||
err1 := it.rebuildCurrentTask(task)
|
||||
if err1 != nil {
|
||||
return nil, errors.Trace(err1)
|
||||
err = bo.Backoff(boTiKVRPC, err)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
err = it.rebuildCurrentTask(bo, task)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
log.Warnf("send coprocessor request error: %v, try next peer later", err)
|
||||
continue
|
||||
@ -328,7 +333,11 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error)
|
||||
} else {
|
||||
it.store.regionCache.DropRegion(task.region.VerID())
|
||||
}
|
||||
err = it.rebuildCurrentTask(task)
|
||||
err = bo.Backoff(boRegionMiss, err)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
err = it.rebuildCurrentTask(bo, task)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -337,8 +346,12 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error)
|
||||
}
|
||||
if e := resp.GetLocked(); e != nil {
|
||||
lock := newLock(it.store, e.GetPrimaryLock(), e.GetLockVersion(), e.GetKey(), e.GetLockVersion())
|
||||
_, lockErr := lock.cleanup()
|
||||
_, lockErr := lock.cleanup(bo)
|
||||
if lockErr == nil || terror.ErrorEqual(lockErr, errInnerRetryable) {
|
||||
err = bo.Backoff(boTxnLock, lockErr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
log.Warnf("cleanup lock error: %v", lockErr)
|
||||
@ -351,12 +364,11 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
return nil, errors.Trace(backoffErr)
|
||||
}
|
||||
|
||||
// Rebuild current task. It may be split into multiple tasks (in region split scenario).
|
||||
func (it *copIterator) rebuildCurrentTask(task *copTask) error {
|
||||
newTasks, err := buildCopTasks(it.store.regionCache, task.ranges, it.req.Desc)
|
||||
func (it *copIterator) rebuildCurrentTask(bo *Backoffer, task *copTask) error {
|
||||
newTasks, err := buildCopTasks(bo, it.store.regionCache, task.ranges, it.req.Desc)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -30,28 +30,30 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
_, regionIDs, _ := mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
|
||||
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
|
||||
|
||||
tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "c"), false)
|
||||
bo := NewBackoffer(3000)
|
||||
|
||||
tasks, err := buildCopTasks(bo, cache, s.buildKeyRanges("a", "c"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("g", "n"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("m", "n"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("m", "n"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "k"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "k"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 2)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
|
||||
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "x"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "x"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 4)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
|
||||
@ -59,23 +61,23 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
|
||||
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
|
||||
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "b", "c"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "b", "b", "c"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "e", "f"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "b", "e", "f"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 1)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n", "o", "p"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("g", "n", "o", "p"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 2)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
|
||||
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("h", "k", "m", "p"), false)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("h", "k", "m", "p"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 2)
|
||||
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
|
||||
@ -88,8 +90,9 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
|
||||
cluster := mocktikv.NewCluster()
|
||||
storeID, regionIDs, peerIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("m"))
|
||||
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
|
||||
bo := NewBackoffer(3000)
|
||||
|
||||
tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "z"), false)
|
||||
tasks, err := buildCopTasks(bo, cache, s.buildKeyRanges("a", "z"), false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tasks, HasLen, 2)
|
||||
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
|
||||
@ -102,7 +105,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
|
||||
cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), []uint64{peerIDs[2]}, storeID)
|
||||
cache.DropRegion(tasks[1].region.VerID())
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "z"), true)
|
||||
c.Assert(err, IsNil)
|
||||
iter := &copIterator{
|
||||
store: &tikvStore{
|
||||
@ -113,14 +116,14 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
|
||||
},
|
||||
tasks: tasks,
|
||||
}
|
||||
err = iter.rebuildCurrentTask(iter.tasks[0])
|
||||
err = iter.rebuildCurrentTask(bo, iter.tasks[0])
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(iter.tasks, HasLen, 3)
|
||||
s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m")
|
||||
s.taskEqual(c, iter.tasks[1], regionIDs[1], "m", "q")
|
||||
s.taskEqual(c, iter.tasks[0], regionIDs[2], "q", "z")
|
||||
|
||||
tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true)
|
||||
tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "z"), true)
|
||||
iter = &copIterator{
|
||||
store: &tikvStore{
|
||||
regionCache: cache,
|
||||
@ -130,7 +133,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
|
||||
},
|
||||
tasks: tasks,
|
||||
}
|
||||
err = iter.rebuildCurrentTask(iter.tasks[2])
|
||||
err = iter.rebuildCurrentTask(bo, iter.tasks[2])
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(iter.tasks, HasLen, 3)
|
||||
s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m")
|
||||
|
||||
@ -124,46 +124,45 @@ func (s *tikvStore) UUID() string {
|
||||
}
|
||||
|
||||
func (s *tikvStore) CurrentVersion() (kv.Version, error) {
|
||||
startTS, err := s.getTimestampWithRetry()
|
||||
bo := NewBackoffer(tsoMaxBackoff)
|
||||
startTS, err := s.getTimestampWithRetry(bo)
|
||||
if err != nil {
|
||||
return kv.NewVersion(0), errors.Trace(err)
|
||||
}
|
||||
|
||||
return kv.NewVersion(startTS), nil
|
||||
}
|
||||
|
||||
func (s *tikvStore) getTimestampWithRetry() (uint64, error) {
|
||||
var backoffErr error
|
||||
for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) {
|
||||
for {
|
||||
startTS, err := s.oracle.GetTimestamp()
|
||||
if err != nil {
|
||||
log.Warnf("get timestamp failed: %v, retry later", err)
|
||||
continue
|
||||
if err == nil {
|
||||
return startTS, nil
|
||||
}
|
||||
err = bo.Backoff(boPDRPC, errors.Errorf("get timestamp failed: %v", err))
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
return startTS, nil
|
||||
}
|
||||
return 0, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
func (s *tikvStore) checkTimestampExpiredWithRetry(ts uint64, TTL uint64) (bool, error) {
|
||||
var backoffErr error
|
||||
for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
func (s *tikvStore) checkTimestampExpiredWithRetry(bo *Backoffer, ts uint64, TTL uint64) (bool, error) {
|
||||
for {
|
||||
expired, err := s.oracle.IsExpired(ts, TTL)
|
||||
if err != nil {
|
||||
log.Warnf("check expired failed: %v, retry later", err)
|
||||
continue
|
||||
if err == nil {
|
||||
return expired, nil
|
||||
}
|
||||
err = bo.Backoff(boPDRPC, errors.Errorf("check expired failed: %v", err))
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
return expired, nil
|
||||
}
|
||||
return false, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
// sendKVReq sends req to tikv server. It will retry internally to find the right
|
||||
// region leader if i) fails to establish a connection to server or ii) server
|
||||
// returns `NotLeader`.
|
||||
func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Response, error) {
|
||||
var backoffErr error
|
||||
for backoff := rpcBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
func (s *tikvStore) SendKVReq(bo *Backoffer, req *pb.Request, regionID RegionVerID) (*pb.Response, error) {
|
||||
for {
|
||||
region := s.regionCache.GetRegionByVerID(regionID)
|
||||
if region == nil {
|
||||
// If the region is not found in cache, it must be out
|
||||
@ -177,8 +176,11 @@ func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Respon
|
||||
req.Context = region.GetContext()
|
||||
resp, err := s.client.SendKVReq(region.GetAddress(), req)
|
||||
if err != nil {
|
||||
log.Warnf("send tikv request error: %v, ctx: %s, try next peer later", err, req.Context)
|
||||
s.regionCache.NextPeer(region.VerID())
|
||||
err = bo.Backoff(boTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %s, try next peer later", err, req.Context))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
@ -186,6 +188,12 @@ func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Respon
|
||||
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
|
||||
log.Warnf("tikv reports `NotLeader`: %s, ctx: %s, retry later", notLeader, req.Context)
|
||||
s.regionCache.UpdateLeader(region.VerID(), notLeader.GetLeader().GetId())
|
||||
if notLeader.GetLeader() == nil {
|
||||
err = bo.Backoff(boRegionMiss, errors.Errorf("not leader: %v, ctx: %s", notLeader, req.Context))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
// For other errors, we only drop cache here.
|
||||
@ -199,7 +207,6 @@ func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Respon
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
return nil, errors.Trace(backoffErr)
|
||||
}
|
||||
|
||||
func parsePath(path string) (etcdAddrs []string, clusterID uint64, err error) {
|
||||
|
||||
@ -36,24 +36,14 @@ func newLock(store *tikvStore, pLock []byte, lockVer uint64, key []byte, ver uin
|
||||
}
|
||||
}
|
||||
|
||||
// txnLockBackoff is for transaction lock retry.
|
||||
func txnLockBackoff() func() error {
|
||||
const (
|
||||
maxRetry = 6
|
||||
sleepBase = 300
|
||||
sleepCap = 3000
|
||||
)
|
||||
return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter)
|
||||
}
|
||||
|
||||
// locks after 3000ms is considered unusual (the client created the lock might
|
||||
// be dead). Other client may cleanup this kind of lock.
|
||||
// For locks created recently, we will do backoff and retry.
|
||||
var lockTTL uint64 = 3000
|
||||
|
||||
// cleanup cleanup the lock
|
||||
func (l *txnLock) cleanup() ([]byte, error) {
|
||||
expired, err := l.store.checkTimestampExpiredWithRetry(l.pl.version, lockTTL)
|
||||
func (l *txnLock) cleanup(bo *Backoffer) ([]byte, error) {
|
||||
expired, err := l.store.checkTimestampExpiredWithRetry(bo, l.pl.version, lockTTL)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -67,17 +57,20 @@ func (l *txnLock) cleanup() ([]byte, error) {
|
||||
StartVersion: proto.Uint64(l.pl.version),
|
||||
},
|
||||
}
|
||||
var backoffErr error
|
||||
for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
region, err := l.store.regionCache.GetRegion(l.pl.key)
|
||||
for {
|
||||
region, err := l.store.regionCache.GetRegion(bo, l.pl.key)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
resp, err := l.store.SendKVReq(req, region.VerID())
|
||||
resp, err := l.store.SendKVReq(bo, req, region.VerID())
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdCleanupResp := resp.GetCmdCleanupResp()
|
||||
@ -89,16 +82,15 @@ func (l *txnLock) cleanup() ([]byte, error) {
|
||||
}
|
||||
if cmdCleanupResp.CommitVersion == nil {
|
||||
// cleanup successfully
|
||||
return l.rollbackThenGet()
|
||||
return l.rollbackThenGet(bo)
|
||||
}
|
||||
// already committed
|
||||
return l.commitThenGet(cmdCleanupResp.GetCommitVersion())
|
||||
return l.commitThenGet(bo, cmdCleanupResp.GetCommitVersion())
|
||||
}
|
||||
return nil, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
// If key == nil then only rollback but value is nil
|
||||
func (l *txnLock) rollbackThenGet() ([]byte, error) {
|
||||
func (l *txnLock) rollbackThenGet(bo *Backoffer) ([]byte, error) {
|
||||
req := &pb.Request{
|
||||
Type: pb.MessageType_CmdRollbackThenGet.Enum(),
|
||||
CmdRbGetReq: &pb.CmdRollbackThenGetRequest{
|
||||
@ -106,17 +98,20 @@ func (l *txnLock) rollbackThenGet() ([]byte, error) {
|
||||
LockVersion: proto.Uint64(l.pl.version),
|
||||
},
|
||||
}
|
||||
var backoffErr error
|
||||
for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
region, err := l.store.regionCache.GetRegion(l.key)
|
||||
for {
|
||||
region, err := l.store.regionCache.GetRegion(bo, l.key)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
resp, err := l.store.SendKVReq(req, region.VerID())
|
||||
resp, err := l.store.SendKVReq(bo, req, region.VerID())
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdRbGResp := resp.GetCmdRbGetResp()
|
||||
@ -128,11 +123,10 @@ func (l *txnLock) rollbackThenGet() ([]byte, error) {
|
||||
}
|
||||
return cmdRbGResp.GetValue(), nil
|
||||
}
|
||||
return nil, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
// If key == nil then only commit but value is nil
|
||||
func (l *txnLock) commitThenGet(commitVersion uint64) ([]byte, error) {
|
||||
func (l *txnLock) commitThenGet(bo *Backoffer, commitVersion uint64) ([]byte, error) {
|
||||
req := &pb.Request{
|
||||
Type: pb.MessageType_CmdCommitThenGet.Enum(),
|
||||
CmdCommitGetReq: &pb.CmdCommitThenGetRequest{
|
||||
@ -142,17 +136,20 @@ func (l *txnLock) commitThenGet(commitVersion uint64) ([]byte, error) {
|
||||
GetVersion: proto.Uint64(l.ver),
|
||||
},
|
||||
}
|
||||
var backoffErr error
|
||||
for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
region, err := l.store.regionCache.GetRegion(l.key)
|
||||
for {
|
||||
region, err := l.store.regionCache.GetRegion(bo, l.key)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
resp, err := l.store.SendKVReq(req, region.VerID())
|
||||
resp, err := l.store.SendKVReq(bo, req, region.VerID())
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdCommitGetResp := resp.GetCmdCommitGetResp()
|
||||
@ -164,7 +161,6 @@ func (l *txnLock) commitThenGet(commitVersion uint64) ([]byte, error) {
|
||||
}
|
||||
return cmdCommitGetResp.GetValue(), nil
|
||||
}
|
||||
return nil, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
type pLock struct {
|
||||
|
||||
@ -47,13 +47,13 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt
|
||||
c.Assert(err, IsNil)
|
||||
committer.keys = [][]byte{primaryKey, key}
|
||||
|
||||
err = committer.prewriteKeys(committer.keys)
|
||||
err = committer.prewriteKeys(NewBackoffer(prewriteMaxBackoff), committer.keys)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
if commitPrimary {
|
||||
committer.commitTS, err = s.store.oracle.GetTimestamp()
|
||||
c.Assert(err, IsNil)
|
||||
err = committer.commitKeys([][]byte{primaryKey})
|
||||
err = committer.commitKeys(NewBackoffer(commitMaxBackoff), [][]byte{primaryKey})
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,14 +55,14 @@ func (c *RegionCache) GetRegionByVerID(id RegionVerID) *Region {
|
||||
}
|
||||
|
||||
// GetRegion find in cache, or get new region.
|
||||
func (c *RegionCache) GetRegion(key []byte) (*Region, error) {
|
||||
func (c *RegionCache) GetRegion(bo *Backoffer, key []byte) (*Region, error) {
|
||||
c.mu.RLock()
|
||||
r := c.getRegionFromCache(key)
|
||||
c.mu.RUnlock()
|
||||
if r != nil {
|
||||
return r, nil
|
||||
}
|
||||
r, err := c.loadRegion(key)
|
||||
r, err := c.loadRegion(bo, key)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -74,7 +74,7 @@ func (c *RegionCache) GetRegion(key []byte) (*Region, error) {
|
||||
// GroupKeysByRegion separates keys into groups by their belonging Regions.
|
||||
// Specially it also returns the first key's region which may be used as the
|
||||
// 'PrimaryLockKey' and should be committed ahead of others.
|
||||
func (c *RegionCache) GroupKeysByRegion(keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) {
|
||||
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) {
|
||||
groups := make(map[RegionVerID][][]byte)
|
||||
var first RegionVerID
|
||||
var lastRegion *Region
|
||||
@ -84,7 +84,7 @@ func (c *RegionCache) GroupKeysByRegion(keys [][]byte) (map[RegionVerID][][]byte
|
||||
region = lastRegion
|
||||
} else {
|
||||
var err error
|
||||
region, err = c.GetRegion(k)
|
||||
region, err = c.GetRegion(bo, k)
|
||||
if err != nil {
|
||||
return nil, first, errors.Trace(err)
|
||||
}
|
||||
@ -196,17 +196,23 @@ func (c *RegionCache) dropRegionFromCache(verID RegionVerID) {
|
||||
}
|
||||
|
||||
// loadRegion loads region from pd client, and picks the first peer as leader.
|
||||
func (c *RegionCache) loadRegion(key []byte) (*Region, error) {
|
||||
var region *Region
|
||||
func (c *RegionCache) loadRegion(bo *Backoffer, key []byte) (*Region, error) {
|
||||
var backoffErr error
|
||||
for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
for {
|
||||
if backoffErr != nil {
|
||||
err := bo.Backoff(boPDRPC, backoffErr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
meta, leader, err := c.pdClient.GetRegion(key)
|
||||
if err != nil {
|
||||
log.Warnf("loadRegion from PD failed, key: %q, err: %v", key, err)
|
||||
backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err)
|
||||
continue
|
||||
}
|
||||
if meta == nil {
|
||||
log.Warnf("region not found for key %q", key)
|
||||
backoffErr = errors.Errorf("region not found for key %q", key)
|
||||
continue
|
||||
}
|
||||
if len(meta.Peers) == 0 {
|
||||
@ -224,21 +230,17 @@ func (c *RegionCache) loadRegion(key []byte) (*Region, error) {
|
||||
peer := meta.Peers[0]
|
||||
store, err := c.pdClient.GetStore(peer.GetStoreId())
|
||||
if err != nil {
|
||||
log.Warnf("loadStore from PD failed, key %q, storeID: %d, err: %v", key, peer.GetStoreId(), err)
|
||||
backoffErr = errors.Errorf("loadStore from PD failed, key %q, storeID: %d, err: %v", key, peer.GetStoreId(), err)
|
||||
continue
|
||||
}
|
||||
region = &Region{
|
||||
region := &Region{
|
||||
meta: meta,
|
||||
peer: peer,
|
||||
addr: store.GetAddress(),
|
||||
curPeerIdx: 0,
|
||||
}
|
||||
break
|
||||
return region, nil
|
||||
}
|
||||
if backoffErr != nil {
|
||||
return nil, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
return region, nil
|
||||
}
|
||||
|
||||
// llrbItem is llrbTree's Item that uses []byte for compare.
|
||||
@ -342,23 +344,3 @@ func (r *Region) NextPeer() (*metapb.Peer, error) {
|
||||
}
|
||||
return r.meta.Peers[nextPeerIdx], nil
|
||||
}
|
||||
|
||||
// regionMissBackoff is for region cache miss retry.
|
||||
func regionMissBackoff() func() error {
|
||||
const (
|
||||
maxRetry = 2
|
||||
sleepBase = 1
|
||||
sleepCap = 1
|
||||
)
|
||||
return NewBackoff(maxRetry, sleepBase, sleepCap, NoJitter)
|
||||
}
|
||||
|
||||
// pdBackoff is for PD RPC retry.
|
||||
func pdBackoff() func() error {
|
||||
const (
|
||||
maxRetry = 10
|
||||
sleepBase = 500
|
||||
sleepCap = 3000
|
||||
)
|
||||
return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter)
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ type testRegionCacheSuite struct {
|
||||
peer2 uint64
|
||||
region1 uint64
|
||||
cache *RegionCache
|
||||
bo *Backoffer
|
||||
}
|
||||
|
||||
var _ = Suite(&testRegionCacheSuite{})
|
||||
@ -42,6 +43,7 @@ func (s *testRegionCacheSuite) SetUpTest(c *C) {
|
||||
s.peer1 = peerIDs[0]
|
||||
s.peer2 = peerIDs[1]
|
||||
s.cache = NewRegionCache(mocktikv.NewPDClient(s.cluster))
|
||||
s.bo = NewBackoffer(5000)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) storeAddr(id uint64) string {
|
||||
@ -57,7 +59,7 @@ func (s *testRegionCacheSuite) checkCache(c *C, len int) {
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestSimple(c *C) {
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r, NotNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
@ -66,15 +68,12 @@ func (s *testRegionCacheSuite) TestSimple(c *C) {
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestDropStore(c *C) {
|
||||
// This test is disabled since it costs too much time due to the retry
|
||||
// mechanism.
|
||||
// TODO: Find a way to change retry timeout in test then uncomment it.
|
||||
//
|
||||
// s.cluster.RemoveStore(s.store1)
|
||||
// r, err := s.cache.GetRegion([]byte("a"))
|
||||
// c.Assert(err, NotNil)
|
||||
// c.Assert(r, IsNil)
|
||||
// s.checkCache(c, 0)
|
||||
bo := NewBackoffer(100)
|
||||
s.cluster.RemoveStore(s.store1)
|
||||
r, err := s.cache.GetRegion(bo, []byte("a"))
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(r, IsNil)
|
||||
s.checkCache(c, 0)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) {
|
||||
@ -85,19 +84,19 @@ func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) {
|
||||
s.cluster.AddStore(s.store1, s.storeAddr(s.store1))
|
||||
close(done)
|
||||
}()
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
<-done
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestUpdateLeader(c *C) {
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
// tikv-server reports `NotLeader`
|
||||
s.cache.UpdateLeader(r.VerID(), s.peer2)
|
||||
|
||||
r, err = s.cache.GetRegion([]byte("a"))
|
||||
r, err = s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r, NotNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
@ -106,7 +105,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) {
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
// new store3 becomes leader
|
||||
store3 := s.cluster.AllocID()
|
||||
@ -117,7 +116,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
|
||||
s.cache.UpdateLeader(r.VerID(), peer3)
|
||||
|
||||
// Store3 does not exist in cache, causes a reload from PD.
|
||||
r, err = s.cache.GetRegion([]byte("a"))
|
||||
r, err = s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r, NotNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
@ -128,7 +127,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
|
||||
s.cluster.ChangeLeader(s.region1, peer3)
|
||||
// tikv-server reports `NotLeader` again.
|
||||
s.cache.UpdateLeader(r.VerID(), peer3)
|
||||
r, err = s.cache.GetRegion([]byte("a"))
|
||||
r, err = s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r, NotNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
@ -137,7 +136,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) {
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) {
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
// store2 becomes leader
|
||||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
@ -153,7 +152,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) {
|
||||
s.cache.UpdateLeader(r.VerID(), s.peer2)
|
||||
|
||||
// Store2 does not exist any more, causes a reload from PD.
|
||||
r, err = s.cache.GetRegion([]byte("a"))
|
||||
r, err = s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r, NotNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
@ -163,7 +162,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) {
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestSplit(c *C) {
|
||||
r, err := s.cache.GetRegion([]byte("x"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("x"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
|
||||
@ -177,7 +176,7 @@ func (s *testRegionCacheSuite) TestSplit(c *C) {
|
||||
s.cache.DropRegion(r.VerID())
|
||||
s.checkCache(c, 0)
|
||||
|
||||
r, err = s.cache.GetRegion([]byte("x"))
|
||||
r, err = s.cache.GetRegion(s.bo, []byte("x"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r.GetID(), Equals, region2)
|
||||
c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1))
|
||||
@ -190,7 +189,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) {
|
||||
newPeers := s.cluster.AllocIDs(2)
|
||||
s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0])
|
||||
|
||||
r, err := s.cache.GetRegion([]byte("x"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("x"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r.GetID(), Equals, region2)
|
||||
|
||||
@ -201,20 +200,20 @@ func (s *testRegionCacheSuite) TestMerge(c *C) {
|
||||
s.cache.DropRegion(r.VerID())
|
||||
s.checkCache(c, 0)
|
||||
|
||||
r, err = s.cache.GetRegion([]byte("x"))
|
||||
r, err = s.cache.GetRegion(s.bo, []byte("x"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
s.checkCache(c, 1)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestReconnect(c *C) {
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
r, err := s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// connect tikv-server failed, cause drop cache
|
||||
s.cache.DropRegion(r.VerID())
|
||||
|
||||
r, err = s.cache.GetRegion([]byte("a"))
|
||||
r, err = s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r, NotNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
@ -223,17 +222,17 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) {
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestNextPeer(c *C) {
|
||||
region, err := s.cache.GetRegion([]byte("a"))
|
||||
region, err := s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(region.curPeerIdx, Equals, 0)
|
||||
|
||||
s.cache.NextPeer(region.VerID())
|
||||
region, err = s.cache.GetRegion([]byte("a"))
|
||||
region, err = s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(region.curPeerIdx, Equals, 1)
|
||||
|
||||
s.cache.NextPeer(region.VerID())
|
||||
region, err = s.cache.GetRegion([]byte("a"))
|
||||
region, err = s.cache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
// Out of range of Peers, so get Region again and pick Stores[0] as leader.
|
||||
c.Assert(region.curPeerIdx, Equals, 0)
|
||||
|
||||
@ -74,6 +74,7 @@ func (s *Scanner) Value() []byte {
|
||||
|
||||
// Next return next element.
|
||||
func (s *Scanner) Next() error {
|
||||
bo := NewBackoffer(scannerNextMaxBackoff)
|
||||
if !s.valid {
|
||||
return errors.New("scanner iterator is invalid")
|
||||
}
|
||||
@ -84,7 +85,7 @@ func (s *Scanner) Next() error {
|
||||
s.Close()
|
||||
return kv.ErrNotExist
|
||||
}
|
||||
err := s.getData()
|
||||
err := s.getData(bo)
|
||||
if err != nil {
|
||||
s.Close()
|
||||
return errors.Trace(err)
|
||||
@ -93,7 +94,7 @@ func (s *Scanner) Next() error {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := s.resolveCurrentLock(); err != nil {
|
||||
if err := s.resolveCurrentLock(bo); err != nil {
|
||||
s.Close()
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -114,16 +115,19 @@ func (s *Scanner) startTS() uint64 {
|
||||
return s.snapshot.version.Ver
|
||||
}
|
||||
|
||||
func (s *Scanner) resolveCurrentLock() error {
|
||||
func (s *Scanner) resolveCurrentLock(bo *Backoffer) error {
|
||||
current := s.cache[s.idx]
|
||||
if current.GetError() == nil {
|
||||
return nil
|
||||
}
|
||||
var backoffErr error
|
||||
for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
val, err := s.snapshot.handleKeyError(current.GetError())
|
||||
for {
|
||||
val, err := s.snapshot.handleKeyError(bo, current.GetError())
|
||||
if err != nil {
|
||||
if terror.ErrorEqual(err, errInnerRetryable) {
|
||||
err = bo.Backoff(boTxnLock, err)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return errors.Trace(err)
|
||||
@ -132,15 +136,12 @@ func (s *Scanner) resolveCurrentLock() error {
|
||||
current.Value = val
|
||||
return nil
|
||||
}
|
||||
return errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
func (s *Scanner) getData() error {
|
||||
func (s *Scanner) getData(bo *Backoffer) error {
|
||||
log.Debugf("txn getData nextStartKey[%q], txn %d", s.nextStartKey, s.startTS())
|
||||
|
||||
var backoffErr error
|
||||
for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
region, err := s.snapshot.store.regionCache.GetRegion(s.nextStartKey)
|
||||
for {
|
||||
region, err := s.snapshot.store.regionCache.GetRegion(bo, s.nextStartKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -152,12 +153,16 @@ func (s *Scanner) getData() error {
|
||||
Version: proto.Uint64(s.startTS()),
|
||||
},
|
||||
}
|
||||
resp, err := s.snapshot.store.SendKVReq(req, region.VerID())
|
||||
resp, err := s.snapshot.store.SendKVReq(bo, req, region.VerID())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
log.Warnf("scanner getData failed: %s", regionErr)
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdScanResp := resp.GetCmdScanResp()
|
||||
@ -196,5 +201,4 @@ func (s *Scanner) getData() error {
|
||||
s.nextStartKey = kv.Key(lastKey).Next()
|
||||
return nil
|
||||
}
|
||||
return errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
@ -53,11 +53,12 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot {
|
||||
func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
|
||||
// We want [][]byte instead of []kv.Key, use some magic to save memory.
|
||||
bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys))
|
||||
bo := NewBackoffer(batchGetMaxBackoff)
|
||||
|
||||
// Create a map to collect key-values from region servers.
|
||||
var mu sync.Mutex
|
||||
m := make(map[string][]byte)
|
||||
err := s.batchGetKeysByRegions(bytesKeys, func(k, v []byte) {
|
||||
err := s.batchGetKeysByRegions(bo, bytesKeys, func(k, v []byte) {
|
||||
if len(v) == 0 {
|
||||
return
|
||||
}
|
||||
@ -71,8 +72,8 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v []byte)) error {
|
||||
groups, _, err := s.store.regionCache.GroupKeysByRegion(keys)
|
||||
func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
|
||||
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -86,12 +87,12 @@ func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v [
|
||||
return nil
|
||||
}
|
||||
if len(batches) == 1 {
|
||||
return errors.Trace(s.batchGetSingleRegion(batches[0], collectF))
|
||||
return errors.Trace(s.batchGetSingleRegion(bo, batches[0], collectF))
|
||||
}
|
||||
ch := make(chan error)
|
||||
for _, batch := range batches {
|
||||
go func(batch batchKeys) {
|
||||
ch <- s.batchGetSingleRegion(batch, collectF)
|
||||
ch <- s.batchGetSingleRegion(bo.Fork(), batch, collectF)
|
||||
}(batch)
|
||||
}
|
||||
for i := 0; i < len(batches); i++ {
|
||||
@ -103,10 +104,9 @@ func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v [
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v []byte)) error {
|
||||
func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
|
||||
pending := batch.keys
|
||||
var backoffErr error
|
||||
for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
for {
|
||||
req := &pb.Request{
|
||||
Type: pb.MessageType_CmdBatchGet.Enum(),
|
||||
CmdBatchGetReq: &pb.CmdBatchGetRequest{
|
||||
@ -114,12 +114,16 @@ func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v
|
||||
Version: proto.Uint64(s.version.Ver),
|
||||
},
|
||||
}
|
||||
resp, err := s.store.SendKVReq(req, batch.region)
|
||||
resp, err := s.store.SendKVReq(bo, req, batch.region)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
err = s.batchGetKeysByRegions(pending, collectF)
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = s.batchGetKeysByRegions(bo, pending, collectF)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
batchGetResp := resp.GetCmdBatchGetResp()
|
||||
@ -135,7 +139,8 @@ func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v
|
||||
}
|
||||
// This could be slow if we meet many expired locks.
|
||||
// TODO: Find a way to do quick unlock.
|
||||
val, err := s.handleKeyError(keyErr)
|
||||
var val []byte
|
||||
val, err = s.handleKeyError(bo, keyErr)
|
||||
if err != nil {
|
||||
if terror.ErrorNotEqual(err, errInnerRetryable) {
|
||||
return errors.Trace(err)
|
||||
@ -147,15 +152,20 @@ func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v
|
||||
}
|
||||
if len(lockedKeys) > 0 {
|
||||
pending = lockedKeys
|
||||
err = bo.Backoff(boTxnLock, errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys)))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
// Get gets the value for key k from snapshot.
|
||||
func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) {
|
||||
bo := NewBackoffer(getMaxBackoff)
|
||||
|
||||
req := &pb.Request{
|
||||
Type: pb.MessageType_CmdGet.Enum(),
|
||||
CmdGetReq: &pb.CmdGetRequest{
|
||||
@ -163,23 +173,20 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) {
|
||||
Version: proto.Uint64(s.version.Ver),
|
||||
},
|
||||
}
|
||||
|
||||
var (
|
||||
backoffErr error
|
||||
regionBackoff = regionMissBackoff()
|
||||
txnBackoff = txnLockBackoff()
|
||||
)
|
||||
for backoffErr == nil {
|
||||
region, err := s.store.regionCache.GetRegion(k)
|
||||
for {
|
||||
region, err := s.store.regionCache.GetRegion(bo, k)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
resp, err := s.store.SendKVReq(req, region.VerID())
|
||||
resp, err := s.store.SendKVReq(bo, req, region.VerID())
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
backoffErr = regionBackoff()
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdGetResp := resp.GetCmdGetResp()
|
||||
@ -188,10 +195,13 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) {
|
||||
}
|
||||
val := cmdGetResp.GetValue()
|
||||
if keyErr := cmdGetResp.GetError(); keyErr != nil {
|
||||
val, err = s.handleKeyError(keyErr)
|
||||
val, err = s.handleKeyError(bo, keyErr)
|
||||
if err != nil {
|
||||
if terror.ErrorEqual(err, errInnerRetryable) {
|
||||
backoffErr = txnBackoff()
|
||||
err = bo.Backoff(boTxnLock, err)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
return nil, errors.Trace(err)
|
||||
@ -202,7 +212,6 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) {
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
return nil, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
// Seek return a list of key-value pair after `k`.
|
||||
@ -234,13 +243,13 @@ func extractLockInfoFromKeyErr(keyErr *pb.KeyError) (*pb.LockInfo, error) {
|
||||
}
|
||||
|
||||
// handleKeyError tries to resolve locks then retry to get value.
|
||||
func (s *tikvSnapshot) handleKeyError(keyErr *pb.KeyError) ([]byte, error) {
|
||||
func (s *tikvSnapshot) handleKeyError(bo *Backoffer, keyErr *pb.KeyError) ([]byte, error) {
|
||||
lockInfo, err := extractLockInfoFromKeyErr(keyErr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
lock := newLock(s.store, lockInfo.GetPrimaryLock(), lockInfo.GetLockVersion(), lockInfo.GetKey(), s.version.Ver)
|
||||
val, err := lock.cleanup()
|
||||
val, err := lock.cleanup(bo)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
type testSplitSuite struct {
|
||||
cluster *mocktikv.Cluster
|
||||
store *tikvStore
|
||||
bo *Backoffer
|
||||
}
|
||||
|
||||
var _ = Suite(&testSplitSuite{})
|
||||
@ -32,6 +33,7 @@ func (s *testSplitSuite) SetUpTest(c *C) {
|
||||
mvccStore := mocktikv.NewMvccStore()
|
||||
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
|
||||
s.store = newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), client)
|
||||
s.bo = NewBackoffer(5000)
|
||||
}
|
||||
|
||||
func (s *testSplitSuite) begin(c *C) *tikvTxn {
|
||||
@ -46,14 +48,14 @@ func (s *testSplitSuite) split(c *C, regionID uint64, key []byte) {
|
||||
}
|
||||
|
||||
func (s *testSplitSuite) TestSplitBatchGet(c *C) {
|
||||
firstRegion, err := s.store.regionCache.GetRegion([]byte("a"))
|
||||
firstRegion, err := s.store.regionCache.GetRegion(s.bo, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn := s.begin(c)
|
||||
snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()})
|
||||
|
||||
keys := [][]byte{{'a'}, {'b'}, {'c'}}
|
||||
_, region, err := s.store.regionCache.GroupKeysByRegion(keys)
|
||||
_, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys)
|
||||
c.Assert(err, IsNil)
|
||||
batch := batchKeys{
|
||||
region: region,
|
||||
@ -64,6 +66,6 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) {
|
||||
s.store.regionCache.DropRegion(firstRegion.VerID())
|
||||
|
||||
// mock-tikv will panic if it meets a not-in-region key.
|
||||
err = snapshot.batchGetSingleRegion(batch, func([]byte, []byte) {})
|
||||
err = snapshot.batchGetSingleRegion(s.bo, batch, func([]byte, []byte) {})
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
@ -42,9 +42,9 @@ func (s *testStoreSuite) TestOracle(c *C) {
|
||||
o := newMockOracle(s.store.oracle)
|
||||
s.store.oracle = o
|
||||
|
||||
t1, err := s.store.getTimestampWithRetry()
|
||||
t1, err := s.store.getTimestampWithRetry(NewBackoffer(100))
|
||||
c.Assert(err, IsNil)
|
||||
t2, err := s.store.getTimestampWithRetry()
|
||||
t2, err := s.store.getTimestampWithRetry(NewBackoffer(100))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(t1, Less, t2)
|
||||
|
||||
@ -54,20 +54,20 @@ func (s *testStoreSuite) TestOracle(c *C) {
|
||||
|
||||
o.disable()
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
o.enable()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
t3, err := s.store.getTimestampWithRetry()
|
||||
t3, err := s.store.getTimestampWithRetry(NewBackoffer(tsoMaxBackoff))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(t2, Less, t3)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
expired, err := s.store.checkTimestampExpiredWithRetry(t2, 500)
|
||||
expired, err := s.store.checkTimestampExpiredWithRetry(NewBackoffer(tsoMaxBackoff), t2, 50)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(expired, IsTrue)
|
||||
wg.Done()
|
||||
|
||||
@ -37,7 +37,8 @@ type tikvTxn struct {
|
||||
}
|
||||
|
||||
func newTiKVTxn(store *tikvStore) (*tikvTxn, error) {
|
||||
startTS, err := store.getTimestampWithRetry()
|
||||
bo := NewBackoffer(tsoMaxBackoff)
|
||||
startTS, err := store.getTimestampWithRetry(bo)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -89,11 +89,11 @@ func (c *txnCommitter) primary() []byte {
|
||||
// iterKeys groups keys into batches, then applies `f` to them. If the flag
|
||||
// asyncNonPrimary is set, it will return as soon as the primary batch is
|
||||
// processed.
|
||||
func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn func([]byte) int, asyncNonPrimary bool) error {
|
||||
func (c *txnCommitter) iterKeys(bo *Backoffer, keys [][]byte, f func(*Backoffer, batchKeys) error, sizeFn func([]byte) int, asyncNonPrimary bool) error {
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(keys)
|
||||
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -110,7 +110,7 @@ func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn f
|
||||
}
|
||||
|
||||
if firstIsPrimary {
|
||||
err = c.doBatches(batches[:1], f)
|
||||
err = c.doBatches(bo, batches[:1], f)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -118,21 +118,21 @@ func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn f
|
||||
}
|
||||
if asyncNonPrimary {
|
||||
go func() {
|
||||
c.doBatches(batches, f)
|
||||
c.doBatches(bo, batches, f)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
err = c.doBatches(batches, f)
|
||||
err = c.doBatches(bo, batches, f)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// doBatches applies f to batches parallelly.
|
||||
func (c *txnCommitter) doBatches(batches []batchKeys, f func(batchKeys) error) error {
|
||||
func (c *txnCommitter) doBatches(bo *Backoffer, batches []batchKeys, f func(*Backoffer, batchKeys) error) error {
|
||||
if len(batches) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(batches) == 1 {
|
||||
e := f(batches[0])
|
||||
e := f(bo, batches[0])
|
||||
if e != nil {
|
||||
log.Warnf("txnCommitter doBatches failed: %v, tid: %d", e, c.startTS)
|
||||
}
|
||||
@ -143,7 +143,7 @@ func (c *txnCommitter) doBatches(batches []batchKeys, f func(batchKeys) error) e
|
||||
ch := make(chan error)
|
||||
for _, batch := range batches {
|
||||
go func(batch batchKeys) {
|
||||
ch <- f(batch)
|
||||
ch <- f(bo.Fork(), batch)
|
||||
}(batch)
|
||||
}
|
||||
var err error
|
||||
@ -168,7 +168,7 @@ func (c *txnCommitter) keySize(key []byte) int {
|
||||
return len(key)
|
||||
}
|
||||
|
||||
func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error {
|
||||
func (c *txnCommitter) prewriteSingleRegion(bo *Backoffer, batch batchKeys) error {
|
||||
mutations := make([]*pb.Mutation, len(batch.keys))
|
||||
for i, k := range batch.keys {
|
||||
mutations[i] = c.mutations[string(k)]
|
||||
@ -182,18 +182,17 @@ func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error {
|
||||
},
|
||||
}
|
||||
|
||||
var backoffErr error
|
||||
for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
resp, err := c.store.SendKVReq(req, batch.region)
|
||||
for {
|
||||
resp, err := c.store.SendKVReq(bo, req, batch.region)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
// re-split keys and prewrite again.
|
||||
// TODO: The recursive maybe not able to exit if TiKV &
|
||||
// PD are implemented incorrectly. A possible fix is
|
||||
// introducing a 'max backoff time'.
|
||||
err = c.prewriteKeys(batch.keys)
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = c.prewriteKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
prewriteResp := resp.GetCmdPrewriteResp()
|
||||
@ -209,22 +208,26 @@ func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error {
|
||||
return nil
|
||||
}
|
||||
for _, keyErr := range keyErrs {
|
||||
lockInfo, err := extractLockInfoFromKeyErr(keyErr)
|
||||
var lockInfo *pb.LockInfo
|
||||
lockInfo, err = extractLockInfoFromKeyErr(keyErr)
|
||||
if err != nil {
|
||||
// It could be `Retryable` or `Abort`.
|
||||
return errors.Trace(err)
|
||||
}
|
||||
lock := newLock(c.store, lockInfo.GetPrimaryLock(), lockInfo.GetLockVersion(), lockInfo.GetKey(), c.startTS)
|
||||
_, err = lock.cleanup()
|
||||
_, err = lock.cleanup(bo)
|
||||
if err != nil && terror.ErrorNotEqual(err, errInnerRetryable) {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
err = bo.Backoff(boTxnLock, errors.New("prewrite encounter locks"))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
func (c *txnCommitter) commitSingleRegion(batch batchKeys) error {
|
||||
func (c *txnCommitter) commitSingleRegion(bo *Backoffer, batch batchKeys) error {
|
||||
req := &pb.Request{
|
||||
Type: pb.MessageType_CmdCommit.Enum(),
|
||||
CmdCommitReq: &pb.CmdCommitRequest{
|
||||
@ -234,13 +237,17 @@ func (c *txnCommitter) commitSingleRegion(batch batchKeys) error {
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := c.store.SendKVReq(req, batch.region)
|
||||
resp, err := c.store.SendKVReq(bo, req, batch.region)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// re-split keys and commit again.
|
||||
err = c.commitKeys(batch.keys)
|
||||
err = c.commitKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
commitResp := resp.GetCmdCommitResp()
|
||||
@ -270,7 +277,7 @@ func (c *txnCommitter) commitSingleRegion(batch batchKeys) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error {
|
||||
func (c *txnCommitter) cleanupSingleRegion(bo *Backoffer, batch batchKeys) error {
|
||||
req := &pb.Request{
|
||||
Type: pb.MessageType_CmdBatchRollback.Enum(),
|
||||
CmdBatchRollbackReq: &pb.CmdBatchRollbackRequest{
|
||||
@ -278,12 +285,16 @@ func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error {
|
||||
StartVersion: proto.Uint64(c.startTS),
|
||||
},
|
||||
}
|
||||
resp, err := c.store.SendKVReq(req, batch.region)
|
||||
resp, err := c.store.SendKVReq(bo, req, batch.region)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr := resp.GetRegionError(); regionErr != nil {
|
||||
err = c.cleanupKeys(batch.keys)
|
||||
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = c.cleanupKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if keyErr := resp.GetCmdBatchRollbackResp().GetError(); keyErr != nil {
|
||||
@ -294,16 +305,16 @@ func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *txnCommitter) prewriteKeys(keys [][]byte) error {
|
||||
return c.iterKeys(keys, c.prewriteSingleRegion, c.keyValueSize, false)
|
||||
func (c *txnCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error {
|
||||
return c.iterKeys(bo, keys, c.prewriteSingleRegion, c.keyValueSize, false)
|
||||
}
|
||||
|
||||
func (c *txnCommitter) commitKeys(keys [][]byte) error {
|
||||
return c.iterKeys(keys, c.commitSingleRegion, c.keySize, true)
|
||||
func (c *txnCommitter) commitKeys(bo *Backoffer, keys [][]byte) error {
|
||||
return c.iterKeys(bo, keys, c.commitSingleRegion, c.keySize, true)
|
||||
}
|
||||
|
||||
func (c *txnCommitter) cleanupKeys(keys [][]byte) error {
|
||||
return c.iterKeys(keys, c.cleanupSingleRegion, c.keySize, false)
|
||||
func (c *txnCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
|
||||
return c.iterKeys(bo, keys, c.cleanupSingleRegion, c.keySize, false)
|
||||
}
|
||||
|
||||
func (c *txnCommitter) Commit() error {
|
||||
@ -311,26 +322,29 @@ func (c *txnCommitter) Commit() error {
|
||||
// Always clean up all written keys if the txn does not commit.
|
||||
if !c.committed {
|
||||
go func() {
|
||||
c.cleanupKeys(c.writtenKeys)
|
||||
c.mu.RLock()
|
||||
writtenKeys := c.writtenKeys
|
||||
c.mu.RUnlock()
|
||||
c.cleanupKeys(NewBackoffer(cleanupMaxBackoff), writtenKeys)
|
||||
log.Infof("txn clean up done, tid: %d", c.startTS)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
err := c.prewriteKeys(c.keys)
|
||||
err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff), c.keys)
|
||||
if err != nil {
|
||||
log.Warnf("txn commit failed on prewrite: %v, tid: %d", err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
commitTS, err := c.store.getTimestampWithRetry()
|
||||
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(tsoMaxBackoff))
|
||||
if err != nil {
|
||||
log.Warnf("txn get commitTS failed: %v, tid: %d", err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
c.commitTS = commitTS
|
||||
|
||||
err = c.commitKeys(c.keys)
|
||||
err = c.commitKeys(NewBackoffer(commitMaxBackoff), c.keys)
|
||||
if err != nil {
|
||||
if !c.committed {
|
||||
log.Warnf("txn commit failed on commit: %v, tid: %d", err, c.startTS)
|
||||
|
||||
Reference in New Issue
Block a user