diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 55f16969da..71e0f2a876 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -145,26 +145,37 @@ func (c *RegionCache) getRegionFromCache(key []byte) *Region { // loadRegion get region from pd client, and pick the random peer as leader. func (c *RegionCache) loadRegion(key []byte) (*Region, error) { - meta, err := c.pdClient.GetRegion(key) - if err != nil { - // We assume PD will recover soon. - return nil, errors.Annotate(err, txnRetryableMark) + var region *Region + var backoffErr error + for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() { + meta, err := c.pdClient.GetRegion(key) + if err != nil { + log.Warnf("loadRegion from PD failed, key: %q, err: %v", key, err) + continue + } + if meta == nil { + log.Warnf("region not found for key %q", key) + continue + } + if len(meta.Peers) == 0 { + return nil, errors.New("receive Region with no peer") + } + peer := meta.Peers[0] + store, err := c.pdClient.GetStore(peer.GetStoreId()) + if err != nil { + log.Warnf("loadStore from PD failed, key %q, storeID: %d, err: %v", key, peer.GetStoreId(), err) + continue + } + region = &Region{ + meta: meta, + peer: peer, + addr: store.GetAddress(), + curPeerIdx: 0, + } + break } - if len(meta.Peers) == 0 { - return nil, errors.New("receive Region with no peer") - } - curPeerIdx := 0 - peer := meta.Peers[curPeerIdx] - store, err := c.pdClient.GetStore(peer.GetStoreId()) - if err != nil { - // We assume PD will recover soon. - return nil, errors.Annotate(err, txnRetryableMark) - } - region := &Region{ - meta: meta, - peer: peer, - addr: store.GetAddress(), - curPeerIdx: curPeerIdx, + if backoffErr != nil { + return nil, errors.Annotate(backoffErr, txnRetryableMark) } c.mu.Lock() @@ -255,3 +266,13 @@ func regionMissBackoff() func() error { ) return NewBackoff(maxRetry, sleepBase, sleepCap, NoJitter) } + +// pdBackoff is for PD RPC retry. +func pdBackoff() func() error { + const ( + maxRetry = 5 + sleepBase = 500 + sleepCap = 1000 + ) + return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter) +} diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 460c68b024..b91c9dc612 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -15,6 +15,7 @@ package tikv import ( "fmt" + "time" . "github.com/pingcap/check" "github.com/pingcap/tidb/store/tikv/mock-tikv" @@ -64,6 +65,20 @@ func (s *testRegionCacheSuite) TestDropStore(c *C) { c.Assert(s.cache.regions, HasLen, 0) } +func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { + s.cluster.RemoveStore(s.store1) + done := make(chan struct{}) + go func() { + time.Sleep(time.Millisecond * 10) + s.cluster.AddStore(s.store1, s.storeAddr(s.store1)) + close(done) + }() + r, err := s.cache.GetRegion([]byte("a")) + c.Assert(err, IsNil) + c.Assert(r.GetID(), Equals, s.region1) + <-done +} + func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { r, err := s.cache.GetRegion([]byte("a")) c.Assert(err, IsNil) diff --git a/store/tikv/txn_committer.go b/store/tikv/txn_committer.go index 357983d9e3..1149336b3d 100644 --- a/store/tikv/txn_committer.go +++ b/store/tikv/txn_committer.go @@ -206,11 +206,11 @@ func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) e if c.committed { // No secondary key could be rolled back after it's primary key is committed. // There must be a serious bug somewhere. - log.Errorf("txn failed commit key after primary key committed: %v", err) + log.Errorf("txn failed commit key after primary key committed: %v, tid: %d", err, c.startTS) return errors.Trace(err) } // The transaction maybe rolled back by concurrent transactions. - log.Warnf("txn failed commit primary key: %v, retry later", err) + log.Warnf("txn failed commit primary key: %v, retry later, tid: %d", err, c.startTS) return errors.Annotate(err, txnRetryableMark) } @@ -242,7 +242,7 @@ func (c *txnCommitter) cleanupSingleRegion(regionID RegionVerID, keys [][]byte) } if keyErr := rollbackResp.GetError(); keyErr != nil { err = errors.Errorf("cleanup failed: %s", keyErr) - log.Errorf("txn failed cleanup key: %v", err) + log.Errorf("txn failed cleanup key: %v, tid: %d", err, c.startTS) return errors.Trace(err) } return nil @@ -263,7 +263,7 @@ func (c *txnCommitter) cleanupKeys(keys [][]byte) error { func (c *txnCommitter) Commit() error { err := c.prewriteKeys(c.keys) if err != nil { - log.Warnf("txn commit failed on prewrite: %v", err) + log.Warnf("txn commit failed on prewrite: %v, tid: %d", err, c.startTS) c.cleanupKeys(c.writtenKeys) return errors.Trace(err) } @@ -280,7 +280,7 @@ func (c *txnCommitter) Commit() error { c.cleanupKeys(c.writtenKeys) return errors.Trace(err) } - log.Warnf("txn commit succeed with error: %v", err) + log.Warnf("txn commit succeed with error: %v, tid: %d", err, c.startTS) } return nil }