diff --git a/store/tikv/kv.go b/store/tikv/kv.go index cac34b990d..2647e3d552 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -165,7 +165,7 @@ func (s *tikvStore) UUID() string { } func (s *tikvStore) CurrentVersion() (kv.Version, error) { - startTS, err := s.oracle.GetTimestamp() + startTS, err := s.getTimestampWithRetry() if err != nil { return kv.NewVersion(0), errors.Trace(err) } @@ -173,6 +173,32 @@ func (s *tikvStore) CurrentVersion() (kv.Version, error) { return kv.NewVersion(startTS), nil } +func (s *tikvStore) getTimestampWithRetry() (uint64, error) { + var backoffErr error + for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() { + startTS, err := s.oracle.GetTimestamp() + if err != nil { + log.Warnf("get timestamp failed: %v, retry later", err) + continue + } + return startTS, nil + } + return 0, errors.Annotate(backoffErr, txnRetryableMark) +} + +func (s *tikvStore) checkTimestampExpiredWithRetry(ts uint64, TTL uint64) (bool, error) { + var backoffErr error + for backoff := pdBackoff(); backoffErr == nil; backoffErr = backoff() { + expired, err := s.oracle.IsExpired(ts, TTL) + if err != nil { + log.Warnf("check expired failed: %v, retry later", err) + continue + } + return expired, nil + } + return false, errors.Annotate(backoffErr, txnRetryableMark) +} + // sendKVReq sends req to tikv server. It will retry internally to find the right // region leader if i) fails to establish a connection to server or ii) server // returns `NotLeader`. diff --git a/store/tikv/lock.go b/store/tikv/lock.go index e35f954377..0d7277448e 100644 --- a/store/tikv/lock.go +++ b/store/tikv/lock.go @@ -53,7 +53,7 @@ const lockTTL = 3000 // cleanup cleanup the lock func (l *txnLock) cleanup() ([]byte, error) { - expired, err := l.store.oracle.IsExpired(l.pl.version, lockTTL) + expired, err := l.store.checkTimestampExpiredWithRetry(l.pl.version, lockTTL) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index ca72640829..dda3073d86 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -347,9 +347,9 @@ func regionMissBackoff() func() error { // pdBackoff is for PD RPC retry. func pdBackoff() func() error { const ( - maxRetry = 5 + maxRetry = 10 sleepBase = 500 - sleepCap = 1000 + sleepCap = 3000 ) return NewBackoff(maxRetry, sleepBase, sleepCap, EqualJitter) } diff --git a/store/tikv/store_test.go b/store/tikv/store_test.go new file mode 100644 index 0000000000..d52e9d7b0a --- /dev/null +++ b/store/tikv/store_test.go @@ -0,0 +1,119 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "sync" + "time" + + "github.com/juju/errors" + . "github.com/pingcap/check" + "github.com/pingcap/tidb/store/tikv/mock-tikv" + "github.com/pingcap/tidb/store/tikv/oracle" +) + +type testStoreSuite struct { + cluster *mocktikv.Cluster + store *tikvStore +} + +var _ = Suite(&testStoreSuite{}) + +func (s *testStoreSuite) SetUpTest(c *C) { + s.cluster = mocktikv.NewCluster() + mocktikv.BootstrapWithSingleStore(s.cluster) + mvccStore := mocktikv.NewMvccStore() + clientFactory := mockClientFactory(s.cluster, mvccStore) + s.store = newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), clientFactory) +} + +func (s *testStoreSuite) TestOracle(c *C) { + o := newMockOracle(s.store.oracle) + s.store.oracle = o + + t1, err := s.store.getTimestampWithRetry() + c.Assert(err, IsNil) + t2, err := s.store.getTimestampWithRetry() + c.Assert(err, IsNil) + c.Assert(t1, Less, t2) + + // Check retry. + var wg sync.WaitGroup + wg.Add(3) + + o.disable() + go func() { + time.Sleep(time.Second) + o.enable() + wg.Done() + }() + + go func() { + t3, err := s.store.getTimestampWithRetry() + c.Assert(err, IsNil) + c.Assert(t2, Less, t3) + wg.Done() + }() + + go func() { + expired, err := s.store.checkTimestampExpiredWithRetry(t2, 500) + c.Assert(err, IsNil) + c.Assert(expired, IsTrue) + wg.Done() + }() + + wg.Wait() +} + +type mockOracle struct { + oracle.Oracle + mu sync.RWMutex + stop bool +} + +func newMockOracle(oracle oracle.Oracle) *mockOracle { + return &mockOracle{Oracle: oracle} +} + +func (o *mockOracle) enable() { + o.mu.Lock() + defer o.mu.Unlock() + o.stop = false +} + +func (o *mockOracle) disable() { + o.mu.Lock() + defer o.mu.Unlock() + o.stop = true +} + +func (o *mockOracle) GetTimestamp() (uint64, error) { + o.mu.RLock() + defer o.mu.RUnlock() + + if o.stop { + return 0, errors.New("stopped") + } + return o.Oracle.GetTimestamp() +} + +func (o *mockOracle) IsExpired(lockTimestamp uint64, TTL uint64) (bool, error) { + o.mu.RLock() + defer o.mu.RUnlock() + + if o.stop { + return false, errors.New("stopped") + } + return o.Oracle.IsExpired(lockTimestamp, TTL) +} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index cb6a6b0dc0..0b4b7bcff7 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -40,7 +40,7 @@ type tikvTxn struct { } func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { - startTS, err := store.oracle.GetTimestamp() + startTS, err := store.getTimestampWithRetry() if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/txn_committer.go b/store/tikv/txn_committer.go index 8e17247e58..3e034a2760 100644 --- a/store/tikv/txn_committer.go +++ b/store/tikv/txn_committer.go @@ -310,7 +310,7 @@ func (c *txnCommitter) Commit() error { return errors.Trace(err) } - commitTS, err := c.store.oracle.GetTimestamp() + commitTS, err := c.store.getTimestampWithRetry() if err != nil { return errors.Trace(err) }