store/tikv: add pd retry, add tid in commit logs (#1255)
* store/tikv: add retry for PD RPC * store/tikv: add tid in log
This commit is contained in:
@ -145,26 +145,37 @@ func (c *RegionCache) getRegionFromCache(key []byte) *Region {
|
||||
|
||||
// loadRegion get region from pd client, and pick the random peer as leader.
|
||||
func (c *RegionCache) loadRegion(key []byte) (*Region, error) {
|
||||
meta, err := c.pdClient.GetRegion(key)
|
||||
if err != nil {
|
||||
// We assume PD will recover soon.
|
||||
return nil, errors.Annotate(err, txnRetryableMark)
|
||||
var region *Region
|
||||
var backoffErr error
|
||||
for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
meta, err := c.pdClient.GetRegion(key)
|
||||
if err != nil {
|
||||
log.Warnf("loadRegion from PD failed, key: %q, err: %v", key, err)
|
||||
continue
|
||||
}
|
||||
if meta == nil {
|
||||
log.Warnf("region not found for key %q", key)
|
||||
continue
|
||||
}
|
||||
if len(meta.Peers) == 0 {
|
||||
return nil, errors.New("receive Region with no peer")
|
||||
}
|
||||
peer := meta.Peers[0]
|
||||
store, err := c.pdClient.GetStore(peer.GetStoreId())
|
||||
if err != nil {
|
||||
log.Warnf("loadStore from PD failed, key %q, storeID: %d, err: %v", key, peer.GetStoreId(), err)
|
||||
continue
|
||||
}
|
||||
region = &Region{
|
||||
meta: meta,
|
||||
peer: peer,
|
||||
addr: store.GetAddress(),
|
||||
curPeerIdx: 0,
|
||||
}
|
||||
break
|
||||
}
|
||||
if len(meta.Peers) == 0 {
|
||||
return nil, errors.New("receive Region with no peer")
|
||||
}
|
||||
curPeerIdx := 0
|
||||
peer := meta.Peers[curPeerIdx]
|
||||
store, err := c.pdClient.GetStore(peer.GetStoreId())
|
||||
if err != nil {
|
||||
// We assume PD will recover soon.
|
||||
return nil, errors.Annotate(err, txnRetryableMark)
|
||||
}
|
||||
region := &Region{
|
||||
meta: meta,
|
||||
peer: peer,
|
||||
addr: store.GetAddress(),
|
||||
curPeerIdx: curPeerIdx,
|
||||
if backoffErr != nil {
|
||||
return nil, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
@ -255,3 +266,13 @@ func regionMissBackoff() func() error {
|
||||
)
|
||||
return NewBackoff(maxRetry, sleepBase, sleepCap, NoJitter)
|
||||
}
|
||||
|
||||
// pdBackoff is for PD RPC retry.
|
||||
func pdBackoff() func() error {
|
||||
const (
|
||||
maxRetry = 5
|
||||
sleepBase = 500
|
||||
sleepCap = 1000
|
||||
)
|
||||
return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter)
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@ package tikv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/store/tikv/mock-tikv"
|
||||
@ -64,6 +65,20 @@ func (s *testRegionCacheSuite) TestDropStore(c *C) {
|
||||
c.Assert(s.cache.regions, HasLen, 0)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) {
|
||||
s.cluster.RemoveStore(s.store1)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
s.cluster.AddStore(s.store1, s.storeAddr(s.store1))
|
||||
close(done)
|
||||
}()
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(r.GetID(), Equals, s.region1)
|
||||
<-done
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestUpdateLeader(c *C) {
|
||||
r, err := s.cache.GetRegion([]byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -206,11 +206,11 @@ func (c *txnCommitter) commitSingleRegion(regionID RegionVerID, keys [][]byte) e
|
||||
if c.committed {
|
||||
// No secondary key could be rolled back after it's primary key is committed.
|
||||
// There must be a serious bug somewhere.
|
||||
log.Errorf("txn failed commit key after primary key committed: %v", err)
|
||||
log.Errorf("txn failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// The transaction maybe rolled back by concurrent transactions.
|
||||
log.Warnf("txn failed commit primary key: %v, retry later", err)
|
||||
log.Warnf("txn failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
|
||||
return errors.Annotate(err, txnRetryableMark)
|
||||
}
|
||||
|
||||
@ -242,7 +242,7 @@ func (c *txnCommitter) cleanupSingleRegion(regionID RegionVerID, keys [][]byte)
|
||||
}
|
||||
if keyErr := rollbackResp.GetError(); keyErr != nil {
|
||||
err = errors.Errorf("cleanup failed: %s", keyErr)
|
||||
log.Errorf("txn failed cleanup key: %v", err)
|
||||
log.Errorf("txn failed cleanup key: %v, tid: %d", err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
@ -263,7 +263,7 @@ func (c *txnCommitter) cleanupKeys(keys [][]byte) error {
|
||||
func (c *txnCommitter) Commit() error {
|
||||
err := c.prewriteKeys(c.keys)
|
||||
if err != nil {
|
||||
log.Warnf("txn commit failed on prewrite: %v", err)
|
||||
log.Warnf("txn commit failed on prewrite: %v, tid: %d", err, c.startTS)
|
||||
c.cleanupKeys(c.writtenKeys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -280,7 +280,7 @@ func (c *txnCommitter) Commit() error {
|
||||
c.cleanupKeys(c.writtenKeys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
log.Warnf("txn commit succeed with error: %v", err)
|
||||
log.Warnf("txn commit succeed with error: %v, tid: %d", err, c.startTS)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user