store/tikv: add retry for tso and adjust configs. (#1302)
* store/tikv: add retry for tso and adjust configs.
This commit is contained in:
@ -165,7 +165,7 @@ func (s *tikvStore) UUID() string {
|
||||
}
|
||||
|
||||
func (s *tikvStore) CurrentVersion() (kv.Version, error) {
|
||||
startTS, err := s.oracle.GetTimestamp()
|
||||
startTS, err := s.getTimestampWithRetry()
|
||||
if err != nil {
|
||||
return kv.NewVersion(0), errors.Trace(err)
|
||||
}
|
||||
@ -173,6 +173,32 @@ func (s *tikvStore) CurrentVersion() (kv.Version, error) {
|
||||
return kv.NewVersion(startTS), nil
|
||||
}
|
||||
|
||||
func (s *tikvStore) getTimestampWithRetry() (uint64, error) {
|
||||
var backoffErr error
|
||||
for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
startTS, err := s.oracle.GetTimestamp()
|
||||
if err != nil {
|
||||
log.Warnf("get timestamp failed: %v, retry later", err)
|
||||
continue
|
||||
}
|
||||
return startTS, nil
|
||||
}
|
||||
return 0, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
func (s *tikvStore) checkTimestampExpiredWithRetry(ts uint64, TTL uint64) (bool, error) {
|
||||
var backoffErr error
|
||||
for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() {
|
||||
expired, err := s.oracle.IsExpired(ts, TTL)
|
||||
if err != nil {
|
||||
log.Warnf("check expired failed: %v, retry later", err)
|
||||
continue
|
||||
}
|
||||
return expired, nil
|
||||
}
|
||||
return false, errors.Annotate(backoffErr, txnRetryableMark)
|
||||
}
|
||||
|
||||
// sendKVReq sends req to tikv server. It will retry internally to find the right
|
||||
// region leader if i) fails to establish a connection to server or ii) server
|
||||
// returns `NotLeader`.
|
||||
|
||||
@ -53,7 +53,7 @@ const lockTTL = 3000
|
||||
|
||||
// cleanup cleanup the lock
|
||||
func (l *txnLock) cleanup() ([]byte, error) {
|
||||
expired, err := l.store.oracle.IsExpired(l.pl.version, lockTTL)
|
||||
expired, err := l.store.checkTimestampExpiredWithRetry(l.pl.version, lockTTL)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -347,9 +347,9 @@ func regionMissBackoff() func() error {
|
||||
// pdBackoff is for PD RPC retry.
|
||||
func pdBackoff() func() error {
|
||||
const (
|
||||
maxRetry = 5
|
||||
maxRetry = 10
|
||||
sleepBase = 500
|
||||
sleepCap = 1000
|
||||
sleepCap = 3000
|
||||
)
|
||||
return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter)
|
||||
}
|
||||
|
||||
119
store/tikv/store_test.go
Normal file
119
store/tikv/store_test.go
Normal file
@ -0,0 +1,119 @@
|
||||
// Copyright 2016 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/errors"
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/store/tikv/mock-tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
)
|
||||
|
||||
type testStoreSuite struct {
|
||||
cluster *mocktikv.Cluster
|
||||
store *tikvStore
|
||||
}
|
||||
|
||||
var _ = Suite(&testStoreSuite{})
|
||||
|
||||
func (s *testStoreSuite) SetUpTest(c *C) {
|
||||
s.cluster = mocktikv.NewCluster()
|
||||
mocktikv.BootstrapWithSingleStore(s.cluster)
|
||||
mvccStore := mocktikv.NewMvccStore()
|
||||
clientFactory := mockClientFactory(s.cluster, mvccStore)
|
||||
s.store = newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), clientFactory)
|
||||
}
|
||||
|
||||
func (s *testStoreSuite) TestOracle(c *C) {
|
||||
o := newMockOracle(s.store.oracle)
|
||||
s.store.oracle = o
|
||||
|
||||
t1, err := s.store.getTimestampWithRetry()
|
||||
c.Assert(err, IsNil)
|
||||
t2, err := s.store.getTimestampWithRetry()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(t1, Less, t2)
|
||||
|
||||
// Check retry.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
|
||||
o.disable()
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
o.enable()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
t3, err := s.store.getTimestampWithRetry()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(t2, Less, t3)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
expired, err := s.store.checkTimestampExpiredWithRetry(t2, 500)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(expired, IsTrue)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type mockOracle struct {
|
||||
oracle.Oracle
|
||||
mu sync.RWMutex
|
||||
stop bool
|
||||
}
|
||||
|
||||
func newMockOracle(oracle oracle.Oracle) *mockOracle {
|
||||
return &mockOracle{Oracle: oracle}
|
||||
}
|
||||
|
||||
func (o *mockOracle) enable() {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
o.stop = false
|
||||
}
|
||||
|
||||
func (o *mockOracle) disable() {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
o.stop = true
|
||||
}
|
||||
|
||||
func (o *mockOracle) GetTimestamp() (uint64, error) {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
|
||||
if o.stop {
|
||||
return 0, errors.New("stopped")
|
||||
}
|
||||
return o.Oracle.GetTimestamp()
|
||||
}
|
||||
|
||||
func (o *mockOracle) IsExpired(lockTimestamp uint64, TTL uint64) (bool, error) {
|
||||
o.mu.RLock()
|
||||
defer o.mu.RUnlock()
|
||||
|
||||
if o.stop {
|
||||
return false, errors.New("stopped")
|
||||
}
|
||||
return o.Oracle.IsExpired(lockTimestamp, TTL)
|
||||
}
|
||||
@ -40,7 +40,7 @@ type tikvTxn struct {
|
||||
}
|
||||
|
||||
func newTiKVTxn(store *tikvStore) (*tikvTxn, error) {
|
||||
startTS, err := store.oracle.GetTimestamp()
|
||||
startTS, err := store.getTimestampWithRetry()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -310,7 +310,7 @@ func (c *txnCommitter) Commit() error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
commitTS, err := c.store.oracle.GetTimestamp()
|
||||
commitTS, err := c.store.getTimestampWithRetry()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user