diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 1b48dfb115..1caff933fd 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -19,6 +19,7 @@ import ( "time" "github.com/juju/errors" + "github.com/ngaut/log" ) const ( @@ -32,17 +33,13 @@ const ( DecorrJitter ) -// NewBackoff creates a backoff func which implements exponential backoff with +// NewBackoffFn creates a backoff func which implements exponential backoff with // optional jitters. // See http://www.awsarchitectureblog.com/2015/03/backoff.html -func NewBackoff(retry, base, cap, jitter int) func() error { +func NewBackoffFn(base, cap, jitter int) func() int { attempts := 0 - totalSleep := 0 lastSleep := base - return func() error { - if attempts >= retry { - return errors.Errorf("still fail after %d retries, total sleep %dms", attempts, totalSleep) - } + return func() int { var sleep int switch jitter { case NoJitter: @@ -59,12 +56,95 @@ func NewBackoff(retry, base, cap, jitter int) func() error { time.Sleep(time.Duration(sleep) * time.Millisecond) attempts++ - totalSleep += sleep lastSleep = sleep - return nil + return lastSleep } } func expo(base, cap, n int) int { return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n)))) } + +type backoffType int + +const ( + boTiKVRPC backoffType = iota + boTxnLock + boPDRPC + boRegionMiss +) + +func (t backoffType) createFn() func() int { + switch t { + case boTiKVRPC: + return NewBackoffFn(100, 2000, EqualJitter) + case boTxnLock: + return NewBackoffFn(300, 3000, EqualJitter) + case boPDRPC: + return NewBackoffFn(500, 3000, EqualJitter) + case boRegionMiss: + return NewBackoffFn(100, 500, NoJitter) + } + return nil +} + +// Maximum total sleep time(in ms) for kv/cop commands. +const ( + copBuildTaskMaxBackoff = 5000 + tsoMaxBackoff = 5000 + scannerNextMaxBackoff = 5000 + batchGetMaxBackoff = 10000 + copNextMaxBackoff = 10000 + getMaxBackoff = 10000 + prewriteMaxBackoff = 10000 + commitMaxBackoff = 10000 + cleanupMaxBackoff = 10000 +) + +// Backoffer is a utility for retrying queries. +type Backoffer struct { + fn map[backoffType]func() int + maxSleep int + totalSleep int + errors []error +} + +// NewBackoffer creates a Backoffer with maximum sleep time(in ms). +func NewBackoffer(maxSleep int) *Backoffer { + return &Backoffer{ + maxSleep: maxSleep, + } +} + +// Backoff sleeps a while base on the backoffType and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. +func (b *Backoffer) Backoff(typ backoffType, err error) error { + // Lazy initialize. + if b.fn == nil { + b.fn = make(map[backoffType]func() int) + } + f, ok := b.fn[typ] + if !ok { + f = typ.createFn() + b.fn[typ] = f + } + + b.totalSleep += f() + + log.Warnf("%v, retry later(totalSleep %dms, maxSleep %dms)", err, b.totalSleep, b.maxSleep) + b.errors = append(b.errors, err) + if b.totalSleep >= b.maxSleep { + e := errors.Errorf("backoffer.maxSleep %dms is exceeded, errors: %v", b.maxSleep, b.errors) + return errors.Annotate(e, txnRetryableMark) + } + return nil +} + +// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors. +func (b *Backoffer) Fork() *Backoffer { + return &Backoffer{ + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + errors: b.errors, + } +} diff --git a/store/tikv/client.go b/store/tikv/client.go index 6bf02ac68f..b05ff8e4e2 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -126,14 +126,3 @@ func (c *rpcClient) Close() error { c.p.Close() return nil } - -// rpcBackoff is for RPC (with TiKV) retry. -// It is expected to sleep for about 10s(+/-3s) in total before abort. -func rpcBackoff() func() error { - const ( - maxRetry = 10 - sleepBase = 100 - sleepCap = 2000 - ) - return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter) -} diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index d3df632696..baad42962a 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -76,7 +76,8 @@ func supportExpr(exprType tipb.ExprType) bool { // Send builds the request and gets the coprocessor iterator response. func (c *CopClient) Send(req *kv.Request) kv.Response { - tasks, err := buildCopTasks(c.store.regionCache, req.KeyRanges, req.Desc) + bo := NewBackoffer(copBuildTaskMaxBackoff) + tasks, err := buildCopTasks(bo, c.store.regionCache, req.KeyRanges, req.Desc) if err != nil { return copErrorResponse{err} } @@ -133,11 +134,11 @@ func (t *copTask) pbRanges() []*coprocessor.KeyRange { return ranges } -func buildCopTasks(cache *RegionCache, ranges []kv.KeyRange, desc bool) ([]*copTask, error) { +func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges []kv.KeyRange, desc bool) ([]*copTask, error) { var tasks []*copTask for _, r := range ranges { var err error - if tasks, err = appendTask(tasks, cache, r); err != nil { + if tasks, err = appendTask(tasks, bo, cache, r); err != nil { return nil, errors.Trace(err) } } @@ -155,14 +156,14 @@ func reverseTasks(tasks []*copTask) { } } -func appendTask(tasks []*copTask, cache *RegionCache, r kv.KeyRange) ([]*copTask, error) { +func appendTask(tasks []*copTask, bo *Backoffer, cache *RegionCache, r kv.KeyRange) ([]*copTask, error) { var last *copTask if len(tasks) > 0 { last = tasks[len(tasks)-1] } // Ensure `r` (or part of `r`) is inside `last`, create a task if need. if last == nil || !last.region.Contains(r.StartKey) { - region, err := cache.GetRegion(r.StartKey) + region, err := cache.GetRegion(bo, r.StartKey) if err != nil { return nil, errors.Trace(err) } @@ -187,7 +188,7 @@ func appendTask(tasks []*copTask, cache *RegionCache, r kv.KeyRange) ([]*copTask StartKey: last.region.EndKey(), EndKey: r.EndKey, } - return appendTask(tasks, cache, remain) + return appendTask(tasks, bo, cache, remain) } return tasks, nil } @@ -207,6 +208,7 @@ type copIterator struct { // Pick the next new copTask and send request to tikv-server. func (it *copIterator) work() { + bo := NewBackoffer(copNextMaxBackoff) for { it.mu.Lock() if it.finished { @@ -227,7 +229,7 @@ func (it *copIterator) work() { } task.status = taskRunning it.mu.Unlock() - resp, err := it.handleTask(task) + resp, err := it.handleTask(bo, task) if err != nil { it.errChan <- err break @@ -303,9 +305,8 @@ func (it *copIterator) Next() (io.ReadCloser, error) { } // Handle single copTask. -func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error) { - var backoffErr error - for backoff := rpcBackoff(); backoffErr == nil; backoffErr = backoff() { +func (it *copIterator) handleTask(bo *Backoffer, task *copTask) (*coprocessor.Response, error) { + for { req := &coprocessor.Request{ Context: task.region.GetContext(), Tp: proto.Int64(it.req.Tp), @@ -315,9 +316,13 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error) resp, err := it.store.client.SendCopReq(task.region.GetAddress(), req) if err != nil { it.store.regionCache.NextPeer(task.region.VerID()) - err1 := it.rebuildCurrentTask(task) - if err1 != nil { - return nil, errors.Trace(err1) + err = bo.Backoff(boTiKVRPC, err) + if err != nil { + return nil, errors.Trace(err) + } + err = it.rebuildCurrentTask(bo, task) + if err != nil { + return nil, errors.Trace(err) } log.Warnf("send coprocessor request error: %v, try next peer later", err) continue @@ -328,7 +333,11 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error) } else { it.store.regionCache.DropRegion(task.region.VerID()) } - err = it.rebuildCurrentTask(task) + err = bo.Backoff(boRegionMiss, err) + if err != nil { + return nil, errors.Trace(err) + } + err = it.rebuildCurrentTask(bo, task) if err != nil { return nil, errors.Trace(err) } @@ -337,8 +346,12 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error) } if e := resp.GetLocked(); e != nil { lock := newLock(it.store, e.GetPrimaryLock(), e.GetLockVersion(), e.GetKey(), e.GetLockVersion()) - _, lockErr := lock.cleanup() + _, lockErr := lock.cleanup(bo) if lockErr == nil || terror.ErrorEqual(lockErr, errInnerRetryable) { + err = bo.Backoff(boTxnLock, lockErr) + if err != nil { + return nil, errors.Trace(err) + } continue } log.Warnf("cleanup lock error: %v", lockErr) @@ -351,12 +364,11 @@ func (it *copIterator) handleTask(task *copTask) (*coprocessor.Response, error) } return resp, nil } - return nil, errors.Trace(backoffErr) } // Rebuild current task. It may be split into multiple tasks (in region split scenario). -func (it *copIterator) rebuildCurrentTask(task *copTask) error { - newTasks, err := buildCopTasks(it.store.regionCache, task.ranges, it.req.Desc) +func (it *copIterator) rebuildCurrentTask(bo *Backoffer, task *copTask) error { + newTasks, err := buildCopTasks(bo, it.store.regionCache, task.ranges, it.req.Desc) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/coprocessor_test.go b/store/tikv/coprocessor_test.go index 2265caf12e..c96e6a7162 100644 --- a/store/tikv/coprocessor_test.go +++ b/store/tikv/coprocessor_test.go @@ -30,28 +30,30 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { _, regionIDs, _ := mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) cache := NewRegionCache(mocktikv.NewPDClient(cluster)) - tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "c"), false) + bo := NewBackoffer(3000) + + tasks, err := buildCopTasks(bo, cache, s.buildKeyRanges("a", "c"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("g", "n"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("m", "n"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("m", "n"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "k"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "k"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "x"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "x"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") @@ -59,23 +61,23 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "b", "c"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "b", "b", "c"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "b", "e", "f"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "b", "e", "f"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("g", "n", "o", "p"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("g", "n", "o", "p"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("h", "k", "m", "p"), false) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("h", "k", "m", "p"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") @@ -88,8 +90,9 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { cluster := mocktikv.NewCluster() storeID, regionIDs, peerIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("m")) cache := NewRegionCache(mocktikv.NewPDClient(cluster)) + bo := NewBackoffer(3000) - tasks, err := buildCopTasks(cache, s.buildKeyRanges("a", "z"), false) + tasks, err := buildCopTasks(bo, cache, s.buildKeyRanges("a", "z"), false) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "m") @@ -102,7 +105,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { cluster.Split(regionIDs[1], regionIDs[2], []byte("q"), []uint64{peerIDs[2]}, storeID) cache.DropRegion(tasks[1].region.VerID()) - tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "z"), true) c.Assert(err, IsNil) iter := &copIterator{ store: &tikvStore{ @@ -113,14 +116,14 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { }, tasks: tasks, } - err = iter.rebuildCurrentTask(iter.tasks[0]) + err = iter.rebuildCurrentTask(bo, iter.tasks[0]) c.Assert(err, IsNil) c.Assert(iter.tasks, HasLen, 3) s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m") s.taskEqual(c, iter.tasks[1], regionIDs[1], "m", "q") s.taskEqual(c, iter.tasks[0], regionIDs[2], "q", "z") - tasks, err = buildCopTasks(cache, s.buildKeyRanges("a", "z"), true) + tasks, err = buildCopTasks(bo, cache, s.buildKeyRanges("a", "z"), true) iter = &copIterator{ store: &tikvStore{ regionCache: cache, @@ -130,7 +133,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { }, tasks: tasks, } - err = iter.rebuildCurrentTask(iter.tasks[2]) + err = iter.rebuildCurrentTask(bo, iter.tasks[2]) c.Assert(err, IsNil) c.Assert(iter.tasks, HasLen, 3) s.taskEqual(c, iter.tasks[2], regionIDs[0], "a", "m") diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 4e66e5679a..ae07476032 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -124,46 +124,45 @@ func (s *tikvStore) UUID() string { } func (s *tikvStore) CurrentVersion() (kv.Version, error) { - startTS, err := s.getTimestampWithRetry() + bo := NewBackoffer(tsoMaxBackoff) + startTS, err := s.getTimestampWithRetry(bo) if err != nil { return kv.NewVersion(0), errors.Trace(err) } - return kv.NewVersion(startTS), nil } -func (s *tikvStore) getTimestampWithRetry() (uint64, error) { - var backoffErr error - for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() { +func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { + for { startTS, err := s.oracle.GetTimestamp() - if err != nil { - log.Warnf("get timestamp failed: %v, retry later", err) - continue + if err == nil { + return startTS, nil + } + err = bo.Backoff(boPDRPC, errors.Errorf("get timestamp failed: %v", err)) + if err != nil { + return 0, errors.Trace(err) } - return startTS, nil } - return 0, errors.Annotate(backoffErr, txnRetryableMark) } -func (s *tikvStore) checkTimestampExpiredWithRetry(ts uint64, TTL uint64) (bool, error) { - var backoffErr error - for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() { +func (s *tikvStore) checkTimestampExpiredWithRetry(bo *Backoffer, ts uint64, TTL uint64) (bool, error) { + for { expired, err := s.oracle.IsExpired(ts, TTL) - if err != nil { - log.Warnf("check expired failed: %v, retry later", err) - continue + if err == nil { + return expired, nil + } + err = bo.Backoff(boPDRPC, errors.Errorf("check expired failed: %v", err)) + if err != nil { + return false, errors.Trace(err) } - return expired, nil } - return false, errors.Annotate(backoffErr, txnRetryableMark) } // sendKVReq sends req to tikv server. It will retry internally to find the right // region leader if i) fails to establish a connection to server or ii) server // returns `NotLeader`. -func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Response, error) { - var backoffErr error - for backoff := rpcBackoff(); backoffErr == nil; backoffErr = backoff() { +func (s *tikvStore) SendKVReq(bo *Backoffer, req *pb.Request, regionID RegionVerID) (*pb.Response, error) { + for { region := s.regionCache.GetRegionByVerID(regionID) if region == nil { // If the region is not found in cache, it must be out @@ -177,8 +176,11 @@ func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Respon req.Context = region.GetContext() resp, err := s.client.SendKVReq(region.GetAddress(), req) if err != nil { - log.Warnf("send tikv request error: %v, ctx: %s, try next peer later", err, req.Context) s.regionCache.NextPeer(region.VerID()) + err = bo.Backoff(boTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %s, try next peer later", err, req.Context)) + if err != nil { + return nil, errors.Trace(err) + } continue } if regionErr := resp.GetRegionError(); regionErr != nil { @@ -186,6 +188,12 @@ func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Respon if notLeader := regionErr.GetNotLeader(); notLeader != nil { log.Warnf("tikv reports `NotLeader`: %s, ctx: %s, retry later", notLeader, req.Context) s.regionCache.UpdateLeader(region.VerID(), notLeader.GetLeader().GetId()) + if notLeader.GetLeader() == nil { + err = bo.Backoff(boRegionMiss, errors.Errorf("not leader: %v, ctx: %s", notLeader, req.Context)) + if err != nil { + return nil, errors.Trace(err) + } + } continue } // For other errors, we only drop cache here. @@ -199,7 +207,6 @@ func (s *tikvStore) SendKVReq(req *pb.Request, regionID RegionVerID) (*pb.Respon } return resp, nil } - return nil, errors.Trace(backoffErr) } func parsePath(path string) (etcdAddrs []string, clusterID uint64, err error) { diff --git a/store/tikv/lock.go b/store/tikv/lock.go index 817239e46c..ccf4c20be0 100644 --- a/store/tikv/lock.go +++ b/store/tikv/lock.go @@ -36,24 +36,14 @@ func newLock(store *tikvStore, pLock []byte, lockVer uint64, key []byte, ver uin } } -// txnLockBackoff is for transaction lock retry. -func txnLockBackoff() func() error { - const ( - maxRetry = 6 - sleepBase = 300 - sleepCap = 3000 - ) - return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter) -} - // locks after 3000ms is considered unusual (the client created the lock might // be dead). Other client may cleanup this kind of lock. // For locks created recently, we will do backoff and retry. var lockTTL uint64 = 3000 // cleanup cleanup the lock -func (l *txnLock) cleanup() ([]byte, error) { - expired, err := l.store.checkTimestampExpiredWithRetry(l.pl.version, lockTTL) +func (l *txnLock) cleanup(bo *Backoffer) ([]byte, error) { + expired, err := l.store.checkTimestampExpiredWithRetry(bo, l.pl.version, lockTTL) if err != nil { return nil, errors.Trace(err) } @@ -67,17 +57,20 @@ func (l *txnLock) cleanup() ([]byte, error) { StartVersion: proto.Uint64(l.pl.version), }, } - var backoffErr error - for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() { - region, err := l.store.regionCache.GetRegion(l.pl.key) + for { + region, err := l.store.regionCache.GetRegion(bo, l.pl.key) if err != nil { return nil, errors.Trace(err) } - resp, err := l.store.SendKVReq(req, region.VerID()) + resp, err := l.store.SendKVReq(bo, req, region.VerID()) if err != nil { return nil, errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, errors.Trace(err) + } continue } cmdCleanupResp := resp.GetCmdCleanupResp() @@ -89,16 +82,15 @@ func (l *txnLock) cleanup() ([]byte, error) { } if cmdCleanupResp.CommitVersion == nil { // cleanup successfully - return l.rollbackThenGet() + return l.rollbackThenGet(bo) } // already committed - return l.commitThenGet(cmdCleanupResp.GetCommitVersion()) + return l.commitThenGet(bo, cmdCleanupResp.GetCommitVersion()) } - return nil, errors.Annotate(backoffErr, txnRetryableMark) } // If key == nil then only rollback but value is nil -func (l *txnLock) rollbackThenGet() ([]byte, error) { +func (l *txnLock) rollbackThenGet(bo *Backoffer) ([]byte, error) { req := &pb.Request{ Type: pb.MessageType_CmdRollbackThenGet.Enum(), CmdRbGetReq: &pb.CmdRollbackThenGetRequest{ @@ -106,17 +98,20 @@ func (l *txnLock) rollbackThenGet() ([]byte, error) { LockVersion: proto.Uint64(l.pl.version), }, } - var backoffErr error - for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() { - region, err := l.store.regionCache.GetRegion(l.key) + for { + region, err := l.store.regionCache.GetRegion(bo, l.key) if err != nil { return nil, errors.Trace(err) } - resp, err := l.store.SendKVReq(req, region.VerID()) + resp, err := l.store.SendKVReq(bo, req, region.VerID()) if err != nil { return nil, errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, errors.Trace(err) + } continue } cmdRbGResp := resp.GetCmdRbGetResp() @@ -128,11 +123,10 @@ func (l *txnLock) rollbackThenGet() ([]byte, error) { } return cmdRbGResp.GetValue(), nil } - return nil, errors.Annotate(backoffErr, txnRetryableMark) } // If key == nil then only commit but value is nil -func (l *txnLock) commitThenGet(commitVersion uint64) ([]byte, error) { +func (l *txnLock) commitThenGet(bo *Backoffer, commitVersion uint64) ([]byte, error) { req := &pb.Request{ Type: pb.MessageType_CmdCommitThenGet.Enum(), CmdCommitGetReq: &pb.CmdCommitThenGetRequest{ @@ -142,17 +136,20 @@ func (l *txnLock) commitThenGet(commitVersion uint64) ([]byte, error) { GetVersion: proto.Uint64(l.ver), }, } - var backoffErr error - for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() { - region, err := l.store.regionCache.GetRegion(l.key) + for { + region, err := l.store.regionCache.GetRegion(bo, l.key) if err != nil { return nil, errors.Trace(err) } - resp, err := l.store.SendKVReq(req, region.VerID()) + resp, err := l.store.SendKVReq(bo, req, region.VerID()) if err != nil { return nil, errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, errors.Trace(err) + } continue } cmdCommitGetResp := resp.GetCmdCommitGetResp() @@ -164,7 +161,6 @@ func (l *txnLock) commitThenGet(commitVersion uint64) ([]byte, error) { } return cmdCommitGetResp.GetValue(), nil } - return nil, errors.Annotate(backoffErr, txnRetryableMark) } type pLock struct { diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index bf73ee25df..f109de9056 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -47,13 +47,13 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt c.Assert(err, IsNil) committer.keys = [][]byte{primaryKey, key} - err = committer.prewriteKeys(committer.keys) + err = committer.prewriteKeys(NewBackoffer(prewriteMaxBackoff), committer.keys) c.Assert(err, IsNil) if commitPrimary { committer.commitTS, err = s.store.oracle.GetTimestamp() c.Assert(err, IsNil) - err = committer.commitKeys([][]byte{primaryKey}) + err = committer.commitKeys(NewBackoffer(commitMaxBackoff), [][]byte{primaryKey}) c.Assert(err, IsNil) } } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 3b6d4bfa14..72c7633a05 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -55,14 +55,14 @@ func (c *RegionCache) GetRegionByVerID(id RegionVerID) *Region { } // GetRegion find in cache, or get new region. -func (c *RegionCache) GetRegion(key []byte) (*Region, error) { +func (c *RegionCache) GetRegion(bo *Backoffer, key []byte) (*Region, error) { c.mu.RLock() r := c.getRegionFromCache(key) c.mu.RUnlock() if r != nil { return r, nil } - r, err := c.loadRegion(key) + r, err := c.loadRegion(bo, key) if err != nil { return nil, errors.Trace(err) } @@ -74,7 +74,7 @@ func (c *RegionCache) GetRegion(key []byte) (*Region, error) { // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. -func (c *RegionCache) GroupKeysByRegion(keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) { +func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) { groups := make(map[RegionVerID][][]byte) var first RegionVerID var lastRegion *Region @@ -84,7 +84,7 @@ func (c *RegionCache) GroupKeysByRegion(keys [][]byte) (map[RegionVerID][][]byte region = lastRegion } else { var err error - region, err = c.GetRegion(k) + region, err = c.GetRegion(bo, k) if err != nil { return nil, first, errors.Trace(err) } @@ -196,17 +196,23 @@ func (c *RegionCache) dropRegionFromCache(verID RegionVerID) { } // loadRegion loads region from pd client, and picks the first peer as leader. -func (c *RegionCache) loadRegion(key []byte) (*Region, error) { - var region *Region +func (c *RegionCache) loadRegion(bo *Backoffer, key []byte) (*Region, error) { var backoffErr error - for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() { + for { + if backoffErr != nil { + err := bo.Backoff(boPDRPC, backoffErr) + if err != nil { + return nil, errors.Trace(err) + } + } + meta, leader, err := c.pdClient.GetRegion(key) if err != nil { - log.Warnf("loadRegion from PD failed, key: %q, err: %v", key, err) + backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err) continue } if meta == nil { - log.Warnf("region not found for key %q", key) + backoffErr = errors.Errorf("region not found for key %q", key) continue } if len(meta.Peers) == 0 { @@ -224,21 +230,17 @@ func (c *RegionCache) loadRegion(key []byte) (*Region, error) { peer := meta.Peers[0] store, err := c.pdClient.GetStore(peer.GetStoreId()) if err != nil { - log.Warnf("loadStore from PD failed, key %q, storeID: %d, err: %v", key, peer.GetStoreId(), err) + backoffErr = errors.Errorf("loadStore from PD failed, key %q, storeID: %d, err: %v", key, peer.GetStoreId(), err) continue } - region = &Region{ + region := &Region{ meta: meta, peer: peer, addr: store.GetAddress(), curPeerIdx: 0, } - break + return region, nil } - if backoffErr != nil { - return nil, errors.Annotate(backoffErr, txnRetryableMark) - } - return region, nil } // llrbItem is llrbTree's Item that uses []byte for compare. @@ -342,23 +344,3 @@ func (r *Region) NextPeer() (*metapb.Peer, error) { } return r.meta.Peers[nextPeerIdx], nil } - -// regionMissBackoff is for region cache miss retry. -func regionMissBackoff() func() error { - const ( - maxRetry = 2 - sleepBase = 1 - sleepCap = 1 - ) - return NewBackoff(maxRetry, sleepBase, sleepCap, NoJitter) -} - -// pdBackoff is for PD RPC retry. -func pdBackoff() func() error { - const ( - maxRetry = 10 - sleepBase = 500 - sleepCap = 3000 - ) - return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter) -} diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 53990a69ee..80205666cb 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -29,6 +29,7 @@ type testRegionCacheSuite struct { peer2 uint64 region1 uint64 cache *RegionCache + bo *Backoffer } var _ = Suite(&testRegionCacheSuite{}) @@ -42,6 +43,7 @@ func (s *testRegionCacheSuite) SetUpTest(c *C) { s.peer1 = peerIDs[0] s.peer2 = peerIDs[1] s.cache = NewRegionCache(mocktikv.NewPDClient(s.cluster)) + s.bo = NewBackoffer(5000) } func (s *testRegionCacheSuite) storeAddr(id uint64) string { @@ -57,7 +59,7 @@ func (s *testRegionCacheSuite) checkCache(c *C, len int) { } func (s *testRegionCacheSuite) TestSimple(c *C) { - r, err := s.cache.GetRegion([]byte("a")) + r, err := s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) @@ -66,15 +68,12 @@ func (s *testRegionCacheSuite) TestSimple(c *C) { } func (s *testRegionCacheSuite) TestDropStore(c *C) { - // This test is disabled since it costs too much time due to the retry - // mechanism. - // TODO: Find a way to change retry timeout in test then uncomment it. - // - // s.cluster.RemoveStore(s.store1) - // r, err := s.cache.GetRegion([]byte("a")) - // c.Assert(err, NotNil) - // c.Assert(r, IsNil) - // s.checkCache(c, 0) + bo := NewBackoffer(100) + s.cluster.RemoveStore(s.store1) + r, err := s.cache.GetRegion(bo, []byte("a")) + c.Assert(err, NotNil) + c.Assert(r, IsNil) + s.checkCache(c, 0) } func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { @@ -85,19 +84,19 @@ func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { s.cluster.AddStore(s.store1, s.storeAddr(s.store1)) close(done) }() - r, err := s.cache.GetRegion([]byte("a")) + r, err := s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(r.GetID(), Equals, s.region1) <-done } func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { - r, err := s.cache.GetRegion([]byte("a")) + r, err := s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) // tikv-server reports `NotLeader` s.cache.UpdateLeader(r.VerID(), s.peer2) - r, err = s.cache.GetRegion([]byte("a")) + r, err = s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) @@ -106,7 +105,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { } func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { - r, err := s.cache.GetRegion([]byte("a")) + r, err := s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) // new store3 becomes leader store3 := s.cluster.AllocID() @@ -117,7 +116,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { s.cache.UpdateLeader(r.VerID(), peer3) // Store3 does not exist in cache, causes a reload from PD. - r, err = s.cache.GetRegion([]byte("a")) + r, err = s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) @@ -128,7 +127,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { s.cluster.ChangeLeader(s.region1, peer3) // tikv-server reports `NotLeader` again. s.cache.UpdateLeader(r.VerID(), peer3) - r, err = s.cache.GetRegion([]byte("a")) + r, err = s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) @@ -137,7 +136,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { } func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { - r, err := s.cache.GetRegion([]byte("a")) + r, err := s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) // store2 becomes leader s.cluster.ChangeLeader(s.region1, s.peer2) @@ -153,7 +152,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { s.cache.UpdateLeader(r.VerID(), s.peer2) // Store2 does not exist any more, causes a reload from PD. - r, err = s.cache.GetRegion([]byte("a")) + r, err = s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) @@ -163,7 +162,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { } func (s *testRegionCacheSuite) TestSplit(c *C) { - r, err := s.cache.GetRegion([]byte("x")) + r, err := s.cache.GetRegion(s.bo, []byte("x")) c.Assert(err, IsNil) c.Assert(r.GetID(), Equals, s.region1) c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) @@ -177,7 +176,7 @@ func (s *testRegionCacheSuite) TestSplit(c *C) { s.cache.DropRegion(r.VerID()) s.checkCache(c, 0) - r, err = s.cache.GetRegion([]byte("x")) + r, err = s.cache.GetRegion(s.bo, []byte("x")) c.Assert(err, IsNil) c.Assert(r.GetID(), Equals, region2) c.Assert(r.GetAddress(), Equals, s.storeAddr(s.store1)) @@ -190,7 +189,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) { newPeers := s.cluster.AllocIDs(2) s.cluster.Split(s.region1, region2, []byte("m"), newPeers, newPeers[0]) - r, err := s.cache.GetRegion([]byte("x")) + r, err := s.cache.GetRegion(s.bo, []byte("x")) c.Assert(err, IsNil) c.Assert(r.GetID(), Equals, region2) @@ -201,20 +200,20 @@ func (s *testRegionCacheSuite) TestMerge(c *C) { s.cache.DropRegion(r.VerID()) s.checkCache(c, 0) - r, err = s.cache.GetRegion([]byte("x")) + r, err = s.cache.GetRegion(s.bo, []byte("x")) c.Assert(err, IsNil) c.Assert(r.GetID(), Equals, s.region1) s.checkCache(c, 1) } func (s *testRegionCacheSuite) TestReconnect(c *C) { - r, err := s.cache.GetRegion([]byte("a")) + r, err := s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) // connect tikv-server failed, cause drop cache s.cache.DropRegion(r.VerID()) - r, err = s.cache.GetRegion([]byte("a")) + r, err = s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) @@ -223,17 +222,17 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { } func (s *testRegionCacheSuite) TestNextPeer(c *C) { - region, err := s.cache.GetRegion([]byte("a")) + region, err := s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(region.curPeerIdx, Equals, 0) s.cache.NextPeer(region.VerID()) - region, err = s.cache.GetRegion([]byte("a")) + region, err = s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) c.Assert(region.curPeerIdx, Equals, 1) s.cache.NextPeer(region.VerID()) - region, err = s.cache.GetRegion([]byte("a")) + region, err = s.cache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) // Out of range of Peers, so get Region again and pick Stores[0] as leader. c.Assert(region.curPeerIdx, Equals, 0) diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 6c3c439426..a6acc09666 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -74,6 +74,7 @@ func (s *Scanner) Value() []byte { // Next return next element. func (s *Scanner) Next() error { + bo := NewBackoffer(scannerNextMaxBackoff) if !s.valid { return errors.New("scanner iterator is invalid") } @@ -84,7 +85,7 @@ func (s *Scanner) Next() error { s.Close() return kv.ErrNotExist } - err := s.getData() + err := s.getData(bo) if err != nil { s.Close() return errors.Trace(err) @@ -93,7 +94,7 @@ func (s *Scanner) Next() error { continue } } - if err := s.resolveCurrentLock(); err != nil { + if err := s.resolveCurrentLock(bo); err != nil { s.Close() return errors.Trace(err) } @@ -114,16 +115,19 @@ func (s *Scanner) startTS() uint64 { return s.snapshot.version.Ver } -func (s *Scanner) resolveCurrentLock() error { +func (s *Scanner) resolveCurrentLock(bo *Backoffer) error { current := s.cache[s.idx] if current.GetError() == nil { return nil } - var backoffErr error - for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() { - val, err := s.snapshot.handleKeyError(current.GetError()) + for { + val, err := s.snapshot.handleKeyError(bo, current.GetError()) if err != nil { if terror.ErrorEqual(err, errInnerRetryable) { + err = bo.Backoff(boTxnLock, err) + if err != nil { + return errors.Trace(err) + } continue } return errors.Trace(err) @@ -132,15 +136,12 @@ func (s *Scanner) resolveCurrentLock() error { current.Value = val return nil } - return errors.Annotate(backoffErr, txnRetryableMark) } -func (s *Scanner) getData() error { +func (s *Scanner) getData(bo *Backoffer) error { log.Debugf("txn getData nextStartKey[%q], txn %d", s.nextStartKey, s.startTS()) - - var backoffErr error - for backoff := regionMissBackoff(); backoffErr == nil; backoffErr = backoff() { - region, err := s.snapshot.store.regionCache.GetRegion(s.nextStartKey) + for { + region, err := s.snapshot.store.regionCache.GetRegion(bo, s.nextStartKey) if err != nil { return errors.Trace(err) } @@ -152,12 +153,16 @@ func (s *Scanner) getData() error { Version: proto.Uint64(s.startTS()), }, } - resp, err := s.snapshot.store.SendKVReq(req, region.VerID()) + resp, err := s.snapshot.store.SendKVReq(bo, req, region.VerID()) if err != nil { return errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { log.Warnf("scanner getData failed: %s", regionErr) + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } continue } cmdScanResp := resp.GetCmdScanResp() @@ -196,5 +201,4 @@ func (s *Scanner) getData() error { s.nextStartKey = kv.Key(lastKey).Next() return nil } - return errors.Annotate(backoffErr, txnRetryableMark) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 2c5a4d89bb..cf128c2fbb 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -53,11 +53,12 @@ func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot { func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { // We want [][]byte instead of []kv.Key, use some magic to save memory. bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys)) + bo := NewBackoffer(batchGetMaxBackoff) // Create a map to collect key-values from region servers. var mu sync.Mutex m := make(map[string][]byte) - err := s.batchGetKeysByRegions(bytesKeys, func(k, v []byte) { + err := s.batchGetKeysByRegions(bo, bytesKeys, func(k, v []byte) { if len(v) == 0 { return } @@ -71,8 +72,8 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { return m, nil } -func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v []byte)) error { - groups, _, err := s.store.regionCache.GroupKeysByRegion(keys) +func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { + groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys) if err != nil { return errors.Trace(err) } @@ -86,12 +87,12 @@ func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v [ return nil } if len(batches) == 1 { - return errors.Trace(s.batchGetSingleRegion(batches[0], collectF)) + return errors.Trace(s.batchGetSingleRegion(bo, batches[0], collectF)) } ch := make(chan error) for _, batch := range batches { go func(batch batchKeys) { - ch <- s.batchGetSingleRegion(batch, collectF) + ch <- s.batchGetSingleRegion(bo.Fork(), batch, collectF) }(batch) } for i := 0; i < len(batches); i++ { @@ -103,10 +104,9 @@ func (s *tikvSnapshot) batchGetKeysByRegions(keys [][]byte, collectF func(k, v [ return errors.Trace(err) } -func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v []byte)) error { +func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error { pending := batch.keys - var backoffErr error - for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() { + for { req := &pb.Request{ Type: pb.MessageType_CmdBatchGet.Enum(), CmdBatchGetReq: &pb.CmdBatchGetRequest{ @@ -114,12 +114,16 @@ func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v Version: proto.Uint64(s.version.Ver), }, } - resp, err := s.store.SendKVReq(req, batch.region) + resp, err := s.store.SendKVReq(bo, req, batch.region) if err != nil { return errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { - err = s.batchGetKeysByRegions(pending, collectF) + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + err = s.batchGetKeysByRegions(bo, pending, collectF) return errors.Trace(err) } batchGetResp := resp.GetCmdBatchGetResp() @@ -135,7 +139,8 @@ func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v } // This could be slow if we meet many expired locks. // TODO: Find a way to do quick unlock. - val, err := s.handleKeyError(keyErr) + var val []byte + val, err = s.handleKeyError(bo, keyErr) if err != nil { if terror.ErrorNotEqual(err, errInnerRetryable) { return errors.Trace(err) @@ -147,15 +152,20 @@ func (s *tikvSnapshot) batchGetSingleRegion(batch batchKeys, collectF func(k, v } if len(lockedKeys) > 0 { pending = lockedKeys + err = bo.Backoff(boTxnLock, errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys))) + if err != nil { + return errors.Trace(err) + } continue } return nil } - return errors.Annotate(backoffErr, txnRetryableMark) } // Get gets the value for key k from snapshot. func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) { + bo := NewBackoffer(getMaxBackoff) + req := &pb.Request{ Type: pb.MessageType_CmdGet.Enum(), CmdGetReq: &pb.CmdGetRequest{ @@ -163,23 +173,20 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) { Version: proto.Uint64(s.version.Ver), }, } - - var ( - backoffErr error - regionBackoff = regionMissBackoff() - txnBackoff = txnLockBackoff() - ) - for backoffErr == nil { - region, err := s.store.regionCache.GetRegion(k) + for { + region, err := s.store.regionCache.GetRegion(bo, k) if err != nil { return nil, errors.Trace(err) } - resp, err := s.store.SendKVReq(req, region.VerID()) + resp, err := s.store.SendKVReq(bo, req, region.VerID()) if err != nil { return nil, errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { - backoffErr = regionBackoff() + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, errors.Trace(err) + } continue } cmdGetResp := resp.GetCmdGetResp() @@ -188,10 +195,13 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) { } val := cmdGetResp.GetValue() if keyErr := cmdGetResp.GetError(); keyErr != nil { - val, err = s.handleKeyError(keyErr) + val, err = s.handleKeyError(bo, keyErr) if err != nil { if terror.ErrorEqual(err, errInnerRetryable) { - backoffErr = txnBackoff() + err = bo.Backoff(boTxnLock, err) + if err != nil { + return nil, errors.Trace(err) + } continue } return nil, errors.Trace(err) @@ -202,7 +212,6 @@ func (s *tikvSnapshot) Get(k kv.Key) ([]byte, error) { } return val, nil } - return nil, errors.Annotate(backoffErr, txnRetryableMark) } // Seek return a list of key-value pair after `k`. @@ -234,13 +243,13 @@ func extractLockInfoFromKeyErr(keyErr *pb.KeyError) (*pb.LockInfo, error) { } // handleKeyError tries to resolve locks then retry to get value. -func (s *tikvSnapshot) handleKeyError(keyErr *pb.KeyError) ([]byte, error) { +func (s *tikvSnapshot) handleKeyError(bo *Backoffer, keyErr *pb.KeyError) ([]byte, error) { lockInfo, err := extractLockInfoFromKeyErr(keyErr) if err != nil { return nil, errors.Trace(err) } lock := newLock(s.store, lockInfo.GetPrimaryLock(), lockInfo.GetLockVersion(), lockInfo.GetKey(), s.version.Ver) - val, err := lock.cleanup() + val, err := lock.cleanup(bo) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index 938239f735..342ffe7a74 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -22,6 +22,7 @@ import ( type testSplitSuite struct { cluster *mocktikv.Cluster store *tikvStore + bo *Backoffer } var _ = Suite(&testSplitSuite{}) @@ -32,6 +33,7 @@ func (s *testSplitSuite) SetUpTest(c *C) { mvccStore := mocktikv.NewMvccStore() client := mocktikv.NewRPCClient(s.cluster, mvccStore) s.store = newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), client) + s.bo = NewBackoffer(5000) } func (s *testSplitSuite) begin(c *C) *tikvTxn { @@ -46,14 +48,14 @@ func (s *testSplitSuite) split(c *C, regionID uint64, key []byte) { } func (s *testSplitSuite) TestSplitBatchGet(c *C) { - firstRegion, err := s.store.regionCache.GetRegion([]byte("a")) + firstRegion, err := s.store.regionCache.GetRegion(s.bo, []byte("a")) c.Assert(err, IsNil) txn := s.begin(c) snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) keys := [][]byte{{'a'}, {'b'}, {'c'}} - _, region, err := s.store.regionCache.GroupKeysByRegion(keys) + _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys) c.Assert(err, IsNil) batch := batchKeys{ region: region, @@ -64,6 +66,6 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { s.store.regionCache.DropRegion(firstRegion.VerID()) // mock-tikv will panic if it meets a not-in-region key. - err = snapshot.batchGetSingleRegion(batch, func([]byte, []byte) {}) + err = snapshot.batchGetSingleRegion(s.bo, batch, func([]byte, []byte) {}) c.Assert(err, IsNil) } diff --git a/store/tikv/store_test.go b/store/tikv/store_test.go index 698931220c..22ecf324cf 100644 --- a/store/tikv/store_test.go +++ b/store/tikv/store_test.go @@ -42,9 +42,9 @@ func (s *testStoreSuite) TestOracle(c *C) { o := newMockOracle(s.store.oracle) s.store.oracle = o - t1, err := s.store.getTimestampWithRetry() + t1, err := s.store.getTimestampWithRetry(NewBackoffer(100)) c.Assert(err, IsNil) - t2, err := s.store.getTimestampWithRetry() + t2, err := s.store.getTimestampWithRetry(NewBackoffer(100)) c.Assert(err, IsNil) c.Assert(t1, Less, t2) @@ -54,20 +54,20 @@ func (s *testStoreSuite) TestOracle(c *C) { o.disable() go func() { - time.Sleep(time.Second) + time.Sleep(time.Millisecond * 100) o.enable() wg.Done() }() go func() { - t3, err := s.store.getTimestampWithRetry() + t3, err := s.store.getTimestampWithRetry(NewBackoffer(tsoMaxBackoff)) c.Assert(err, IsNil) c.Assert(t2, Less, t3) wg.Done() }() go func() { - expired, err := s.store.checkTimestampExpiredWithRetry(t2, 500) + expired, err := s.store.checkTimestampExpiredWithRetry(NewBackoffer(tsoMaxBackoff), t2, 50) c.Assert(err, IsNil) c.Assert(expired, IsTrue) wg.Done() diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 463f36a612..fb10cf63ba 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -37,7 +37,8 @@ type tikvTxn struct { } func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { - startTS, err := store.getTimestampWithRetry() + bo := NewBackoffer(tsoMaxBackoff) + startTS, err := store.getTimestampWithRetry(bo) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/txn_committer.go b/store/tikv/txn_committer.go index 274902a6f7..6bd614d25b 100644 --- a/store/tikv/txn_committer.go +++ b/store/tikv/txn_committer.go @@ -89,11 +89,11 @@ func (c *txnCommitter) primary() []byte { // iterKeys groups keys into batches, then applies `f` to them. If the flag // asyncNonPrimary is set, it will return as soon as the primary batch is // processed. -func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn func([]byte) int, asyncNonPrimary bool) error { +func (c *txnCommitter) iterKeys(bo *Backoffer, keys [][]byte, f func(*Backoffer, batchKeys) error, sizeFn func([]byte) int, asyncNonPrimary bool) error { if len(keys) == 0 { return nil } - groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(keys) + groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys) if err != nil { return errors.Trace(err) } @@ -110,7 +110,7 @@ func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn f } if firstIsPrimary { - err = c.doBatches(batches[:1], f) + err = c.doBatches(bo, batches[:1], f) if err != nil { return errors.Trace(err) } @@ -118,21 +118,21 @@ func (c *txnCommitter) iterKeys(keys [][]byte, f func(batchKeys) error, sizeFn f } if asyncNonPrimary { go func() { - c.doBatches(batches, f) + c.doBatches(bo, batches, f) }() return nil } - err = c.doBatches(batches, f) + err = c.doBatches(bo, batches, f) return errors.Trace(err) } // doBatches applies f to batches parallelly. -func (c *txnCommitter) doBatches(batches []batchKeys, f func(batchKeys) error) error { +func (c *txnCommitter) doBatches(bo *Backoffer, batches []batchKeys, f func(*Backoffer, batchKeys) error) error { if len(batches) == 0 { return nil } if len(batches) == 1 { - e := f(batches[0]) + e := f(bo, batches[0]) if e != nil { log.Warnf("txnCommitter doBatches failed: %v, tid: %d", e, c.startTS) } @@ -143,7 +143,7 @@ func (c *txnCommitter) doBatches(batches []batchKeys, f func(batchKeys) error) e ch := make(chan error) for _, batch := range batches { go func(batch batchKeys) { - ch <- f(batch) + ch <- f(bo.Fork(), batch) }(batch) } var err error @@ -168,7 +168,7 @@ func (c *txnCommitter) keySize(key []byte) int { return len(key) } -func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error { +func (c *txnCommitter) prewriteSingleRegion(bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { mutations[i] = c.mutations[string(k)] @@ -182,18 +182,17 @@ func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error { }, } - var backoffErr error - for backoff := txnLockBackoff(); backoffErr == nil; backoffErr = backoff() { - resp, err := c.store.SendKVReq(req, batch.region) + for { + resp, err := c.store.SendKVReq(bo, req, batch.region) if err != nil { return errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { - // re-split keys and prewrite again. - // TODO: The recursive maybe not able to exit if TiKV & - // PD are implemented incorrectly. A possible fix is - // introducing a 'max backoff time'. - err = c.prewriteKeys(batch.keys) + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + err = c.prewriteKeys(bo, batch.keys) return errors.Trace(err) } prewriteResp := resp.GetCmdPrewriteResp() @@ -209,22 +208,26 @@ func (c *txnCommitter) prewriteSingleRegion(batch batchKeys) error { return nil } for _, keyErr := range keyErrs { - lockInfo, err := extractLockInfoFromKeyErr(keyErr) + var lockInfo *pb.LockInfo + lockInfo, err = extractLockInfoFromKeyErr(keyErr) if err != nil { // It could be `Retryable` or `Abort`. return errors.Trace(err) } lock := newLock(c.store, lockInfo.GetPrimaryLock(), lockInfo.GetLockVersion(), lockInfo.GetKey(), c.startTS) - _, err = lock.cleanup() + _, err = lock.cleanup(bo) if err != nil && terror.ErrorNotEqual(err, errInnerRetryable) { return errors.Trace(err) } } + err = bo.Backoff(boTxnLock, errors.New("prewrite encounter locks")) + if err != nil { + return errors.Trace(err) + } } - return errors.Annotate(backoffErr, txnRetryableMark) } -func (c *txnCommitter) commitSingleRegion(batch batchKeys) error { +func (c *txnCommitter) commitSingleRegion(bo *Backoffer, batch batchKeys) error { req := &pb.Request{ Type: pb.MessageType_CmdCommit.Enum(), CmdCommitReq: &pb.CmdCommitRequest{ @@ -234,13 +237,17 @@ func (c *txnCommitter) commitSingleRegion(batch batchKeys) error { }, } - resp, err := c.store.SendKVReq(req, batch.region) + resp, err := c.store.SendKVReq(bo, req, batch.region) if err != nil { return errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } // re-split keys and commit again. - err = c.commitKeys(batch.keys) + err = c.commitKeys(bo, batch.keys) return errors.Trace(err) } commitResp := resp.GetCmdCommitResp() @@ -270,7 +277,7 @@ func (c *txnCommitter) commitSingleRegion(batch batchKeys) error { return nil } -func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error { +func (c *txnCommitter) cleanupSingleRegion(bo *Backoffer, batch batchKeys) error { req := &pb.Request{ Type: pb.MessageType_CmdBatchRollback.Enum(), CmdBatchRollbackReq: &pb.CmdBatchRollbackRequest{ @@ -278,12 +285,16 @@ func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error { StartVersion: proto.Uint64(c.startTS), }, } - resp, err := c.store.SendKVReq(req, batch.region) + resp, err := c.store.SendKVReq(bo, req, batch.region) if err != nil { return errors.Trace(err) } if regionErr := resp.GetRegionError(); regionErr != nil { - err = c.cleanupKeys(batch.keys) + err = bo.Backoff(boRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + err = c.cleanupKeys(bo, batch.keys) return errors.Trace(err) } if keyErr := resp.GetCmdBatchRollbackResp().GetError(); keyErr != nil { @@ -294,16 +305,16 @@ func (c *txnCommitter) cleanupSingleRegion(batch batchKeys) error { return nil } -func (c *txnCommitter) prewriteKeys(keys [][]byte) error { - return c.iterKeys(keys, c.prewriteSingleRegion, c.keyValueSize, false) +func (c *txnCommitter) prewriteKeys(bo *Backoffer, keys [][]byte) error { + return c.iterKeys(bo, keys, c.prewriteSingleRegion, c.keyValueSize, false) } -func (c *txnCommitter) commitKeys(keys [][]byte) error { - return c.iterKeys(keys, c.commitSingleRegion, c.keySize, true) +func (c *txnCommitter) commitKeys(bo *Backoffer, keys [][]byte) error { + return c.iterKeys(bo, keys, c.commitSingleRegion, c.keySize, true) } -func (c *txnCommitter) cleanupKeys(keys [][]byte) error { - return c.iterKeys(keys, c.cleanupSingleRegion, c.keySize, false) +func (c *txnCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { + return c.iterKeys(bo, keys, c.cleanupSingleRegion, c.keySize, false) } func (c *txnCommitter) Commit() error { @@ -311,26 +322,29 @@ func (c *txnCommitter) Commit() error { // Always clean up all written keys if the txn does not commit. if !c.committed { go func() { - c.cleanupKeys(c.writtenKeys) + c.mu.RLock() + writtenKeys := c.writtenKeys + c.mu.RUnlock() + c.cleanupKeys(NewBackoffer(cleanupMaxBackoff), writtenKeys) log.Infof("txn clean up done, tid: %d", c.startTS) }() } }() - err := c.prewriteKeys(c.keys) + err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff), c.keys) if err != nil { log.Warnf("txn commit failed on prewrite: %v, tid: %d", err, c.startTS) return errors.Trace(err) } - commitTS, err := c.store.getTimestampWithRetry() + commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(tsoMaxBackoff)) if err != nil { log.Warnf("txn get commitTS failed: %v, tid: %d", err, c.startTS) return errors.Trace(err) } c.commitTS = commitTS - err = c.commitKeys(c.keys) + err = c.commitKeys(NewBackoffer(commitMaxBackoff), c.keys) if err != nil { if !c.committed { log.Warnf("txn commit failed on commit: %v, tid: %d", err, c.startTS)