store/tikv: support fallback from async commit (take 2) (#21531)
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
4
go.mod
4
go.mod
@ -32,7 +32,7 @@ require (
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
|
||||
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
|
||||
github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d
|
||||
github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
|
||||
github.com/opentracing/basictracer-go v1.0.0
|
||||
github.com/opentracing/opentracing-go v1.1.0
|
||||
@ -45,7 +45,7 @@ require (
|
||||
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
|
||||
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
|
||||
github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3
|
||||
github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c
|
||||
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
|
||||
github.com/pingcap/parser v0.0.0-20201203085211-44f6be1df1c4
|
||||
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966
|
||||
|
||||
8
go.sum
8
go.sum
@ -590,8 +590,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc
|
||||
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI=
|
||||
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k=
|
||||
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8=
|
||||
github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d h1:hh0yCo0UtCuakNdkiRPaLHqzfgxacwUk6/pb9iJyJKU=
|
||||
github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
|
||||
github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5 h1:inEktZjWoqSSRB8P6Zkj8cgwnbaAiSObeisgr/36L8U=
|
||||
github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4=
|
||||
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
@ -681,8 +681,8 @@ github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLy
|
||||
github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3 h1:cpYxg8ggZU3UhVVd4iafhzetjEl2xB1KVjuhEKOhmjU=
|
||||
github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c h1:RbI6VpxZjaVVkeuxzEKCxw20+FWtXiIhgM+mvzhTc8I=
|
||||
github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
|
||||
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
|
||||
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
|
||||
|
||||
@ -2068,9 +2068,7 @@ func (s *testPessimisticSuite) TestAsyncCommitWithSchemaChange(c *C) {
|
||||
tk2.MustExec("alter table tk add index k2(c2)")
|
||||
}()
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil)
|
||||
_ = tk.ExecToErr("commit")
|
||||
// TODO: wait for https://github.com/pingcap/tidb/pull/21531
|
||||
// c.Assert(err, ErrorMatches, ".*commit TS \\d+ is too large")
|
||||
tk.MustExec("commit")
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
|
||||
tk3.MustExec("admin check table tk")
|
||||
}
|
||||
@ -2123,9 +2121,7 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
|
||||
tk2.MustExec("alter table tk add index k2(c2)")
|
||||
}()
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1000)"), IsNil)
|
||||
_ = tk.ExecToErr("commit")
|
||||
// TODO: Check the error after supporting falling back to 2PC in TiKV.
|
||||
// c.Assert(err, IsNil)
|
||||
tk.MustExec("commit")
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
|
||||
tk3.MustExec("admin check table tk")
|
||||
}
|
||||
|
||||
@ -691,7 +691,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
|
||||
|
||||
lr := newLockResolver(s.store)
|
||||
bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
|
||||
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true)
|
||||
status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl)
|
||||
|
||||
|
||||
@ -179,7 +179,7 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) {
|
||||
|
||||
primary := txn.committer.primary()
|
||||
bo := NewBackofferWithVars(context.Background(), 5000, nil)
|
||||
txnStatus, err := s.store.lockResolver.getTxnStatus(bo, txn.StartTS(), primary, 0, 0, false)
|
||||
txnStatus, err := s.store.lockResolver.getTxnStatus(bo, txn.StartTS(), primary, 0, 0, false, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(txnStatus.IsCommitted(), IsFalse)
|
||||
c.Assert(txnStatus.action, Equals, kvrpcpb.Action_NoAction)
|
||||
@ -203,6 +203,8 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) {
|
||||
c.Assert(gotSecondaries, DeepEquals, expectedSecondaries)
|
||||
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil)
|
||||
txn.committer.cleanup(context.Background())
|
||||
txn.committer.cleanWg.Wait()
|
||||
}
|
||||
|
||||
test([]string{"a"}, []string{"a1"})
|
||||
|
||||
@ -208,7 +208,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
currentTS, err := s.store.oracle.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
status, err = s.store.lockResolver.getTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true)
|
||||
status, err = s.store.lockResolver.getTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsTrue)
|
||||
c.Assert(status.CommitTS(), Equals, ts)
|
||||
@ -234,7 +234,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) {
|
||||
atomic.StoreInt64(&gotCheckA, 1)
|
||||
|
||||
resp = kvrpcpb.CheckSecondaryLocksResponse{
|
||||
Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts}},
|
||||
Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts, UseAsyncCommit: true}},
|
||||
CommitTs: commitTs,
|
||||
}
|
||||
} else if bytes.Equal(k, []byte("i")) {
|
||||
|
||||
@ -233,7 +233,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
|
||||
tikvLockResolverCountWithExpired.Inc()
|
||||
|
||||
// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
|
||||
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true)
|
||||
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -242,11 +242,18 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
|
||||
// Then we need to check the secondary locks to determine the final status of the transaction.
|
||||
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
|
||||
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
txnInfos[l.TxnID] = resolveData.commitTs
|
||||
continue
|
||||
}
|
||||
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
|
||||
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
} else {
|
||||
return false, err
|
||||
}
|
||||
txnInfos[l.TxnID] = resolveData.commitTs
|
||||
continue
|
||||
}
|
||||
|
||||
if status.ttl > 0 {
|
||||
@ -344,12 +351,11 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
|
||||
pushed = make([]uint64, 0, len(locks))
|
||||
}
|
||||
|
||||
for _, l := range locks {
|
||||
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS)
|
||||
var resolve func(*Lock, bool) error
|
||||
resolve = func(l *Lock, forceSyncCommit bool) error {
|
||||
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit)
|
||||
if err != nil {
|
||||
msBeforeTxnExpired.update(0)
|
||||
err = errors.Trace(err)
|
||||
return msBeforeTxnExpired.value(), nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if status.ttl == 0 {
|
||||
@ -361,17 +367,18 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
|
||||
cleanTxns[l.TxnID] = cleanRegions
|
||||
}
|
||||
|
||||
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !exists {
|
||||
if status.primaryLock != nil && !forceSyncCommit && status.primaryLock.UseAsyncCommit && !exists {
|
||||
err = lr.resolveLockAsync(bo, l, status)
|
||||
if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok {
|
||||
err = resolve(l, true)
|
||||
}
|
||||
} else if l.LockType == kvrpcpb.Op_PessimisticLock {
|
||||
err = lr.resolvePessimisticLock(bo, l, cleanRegions)
|
||||
} else {
|
||||
err = lr.resolveLock(bo, l, status, lite, cleanRegions)
|
||||
}
|
||||
if err != nil {
|
||||
msBeforeTxnExpired.update(0)
|
||||
err = errors.Trace(err)
|
||||
return msBeforeTxnExpired.value(), nil, err
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
tikvLockResolverCountWithNotExpired.Inc()
|
||||
@ -386,16 +393,26 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
|
||||
// This could avoids the deadlock scene of two large transaction.
|
||||
if l.LockType != kvrpcpb.Op_PessimisticLock && l.TxnID > callerStartTS {
|
||||
tikvLockResolverCountWithWriteConflict.Inc()
|
||||
return msBeforeTxnExpired.value(), nil, kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key)
|
||||
return kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key)
|
||||
}
|
||||
} else {
|
||||
if status.action != kvrpcpb.Action_MinCommitTSPushed {
|
||||
pushFail = true
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
pushed = append(pushed, l.TxnID)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, l := range locks {
|
||||
err := resolve(l, false)
|
||||
if err != nil {
|
||||
msBeforeTxnExpired.update(0)
|
||||
err = errors.Trace(err)
|
||||
return msBeforeTxnExpired.value(), nil, err
|
||||
}
|
||||
}
|
||||
if pushFail {
|
||||
// If any of the lock fails to push minCommitTS, don't return the pushed array.
|
||||
@ -451,14 +468,14 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary
|
||||
if err != nil {
|
||||
return status, err
|
||||
}
|
||||
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true)
|
||||
return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, false)
|
||||
}
|
||||
|
||||
func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) {
|
||||
func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool) (TxnStatus, error) {
|
||||
var currentTS uint64
|
||||
var err error
|
||||
var status TxnStatus
|
||||
if l.UseAsyncCommit {
|
||||
if l.UseAsyncCommit && !forceSyncCommit {
|
||||
// Async commit doesn't need the current ts since it uses the minCommitTS.
|
||||
currentTS = 0
|
||||
// Set to 0 so as not to push forward min commit ts.
|
||||
@ -481,7 +498,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
})
|
||||
for {
|
||||
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist)
|
||||
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, forceSyncCommit)
|
||||
if err == nil {
|
||||
return status, nil
|
||||
}
|
||||
@ -533,7 +550,8 @@ func (e txnNotFoundErr) Error() string {
|
||||
|
||||
// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
|
||||
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
|
||||
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) {
|
||||
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte,
|
||||
callerStartTS, currentTS uint64, rollbackIfNotExist bool, forceSyncCommit bool) (TxnStatus, error) {
|
||||
if s, ok := lr.getResolved(txnID); ok {
|
||||
return s, nil
|
||||
}
|
||||
@ -556,6 +574,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
|
||||
CallerStartTs: callerStartTS,
|
||||
CurrentTs: currentTS,
|
||||
RollbackIfNotExist: rollbackIfNotExist,
|
||||
ForceSyncCommit: forceSyncCommit,
|
||||
})
|
||||
for {
|
||||
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
|
||||
@ -628,6 +647,12 @@ type asyncResolveData struct {
|
||||
missingLock bool
|
||||
}
|
||||
|
||||
type nonAsyncCommitLock struct{}
|
||||
|
||||
func (*nonAsyncCommitLock) Error() string {
|
||||
return "CheckSecondaryLocks receives a non-async-commit lock"
|
||||
}
|
||||
|
||||
// addKeys adds the keys from locks to data, keeping other fields up to date. startTS and commitTS are for the
|
||||
// transaction being resolved.
|
||||
//
|
||||
@ -671,7 +696,9 @@ func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, s
|
||||
logutil.BgLogger().Error("addLocks error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if !lockInfo.UseAsyncCommit {
|
||||
return &nonAsyncCommitLock{}
|
||||
}
|
||||
if !data.missingLock && lockInfo.MinCommitTs > data.commitTs {
|
||||
data.commitTs = lockInfo.MinCommitTs
|
||||
}
|
||||
@ -786,28 +813,24 @@ func (lr *LockResolver) checkAllSecondaries(bo *Backoffer, l *Lock, status *TxnS
|
||||
}
|
||||
|
||||
errChan := make(chan error, len(regions))
|
||||
|
||||
checkBo, cancel := bo.Fork()
|
||||
defer cancel()
|
||||
for regionID, keys := range regions {
|
||||
curRegionID := regionID
|
||||
curKeys := keys
|
||||
|
||||
go func() {
|
||||
errChan <- lr.checkSecondaries(bo, l.TxnID, curKeys, curRegionID, &shared)
|
||||
errChan <- lr.checkSecondaries(checkBo, l.TxnID, curKeys, curRegionID, &shared)
|
||||
}()
|
||||
}
|
||||
|
||||
var errs []string
|
||||
for range regions {
|
||||
err1 := <-errChan
|
||||
if err1 != nil {
|
||||
errs = append(errs, err1.Error())
|
||||
err := <-errChan
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, errors.Errorf("async commit recovery (sending CheckSecondaryLocks) finished with errors: %v", errs)
|
||||
}
|
||||
|
||||
return &shared, nil
|
||||
}
|
||||
|
||||
|
||||
@ -288,7 +288,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
|
||||
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
|
||||
resolver := newLockResolver(s.store)
|
||||
// Call getTxnStatus to check the lock status.
|
||||
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true)
|
||||
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsFalse)
|
||||
c.Assert(status.ttl, Greater, uint64(0))
|
||||
@ -310,7 +310,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
|
||||
// Then call getTxnStatus again and check the lock status.
|
||||
currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true)
|
||||
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, uint64(0))
|
||||
@ -318,7 +318,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
|
||||
|
||||
// Call getTxnStatus on a committed transaction.
|
||||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
|
||||
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true)
|
||||
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, commitTS)
|
||||
@ -346,7 +346,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
||||
resolver := newLockResolver(s.store)
|
||||
|
||||
// Call getTxnStatus for the TxnNotFound case.
|
||||
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false)
|
||||
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, false)
|
||||
c.Assert(err, NotNil)
|
||||
_, ok := errors.Cause(err).(txnNotFoundErr)
|
||||
c.Assert(ok, IsTrue)
|
||||
@ -363,7 +363,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
||||
TTL: 100000,
|
||||
}
|
||||
// Call getTxnStatusFromLock to cover the retry logic.
|
||||
status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS)
|
||||
status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Greater, uint64(0))
|
||||
c.Assert(<-errCh, IsNil)
|
||||
@ -378,7 +378,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
||||
TxnID: startTS,
|
||||
TTL: 1000,
|
||||
}
|
||||
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS)
|
||||
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, uint64(0))
|
||||
@ -585,3 +585,89 @@ func (s *testLockSuite) TestDeduplicateKeys(c *C) {
|
||||
c.Assert(out, Equals, "a b c")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit(c *C) {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
err = txn.Set([]byte("fb1"), []byte("1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = txn.Set([]byte("fb2"), []byte("2"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 1)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.mutations.Len(), Equals, 2)
|
||||
committer.lockTTL = 0
|
||||
committer.setAsyncCommit(true)
|
||||
committer.maxCommitTS = committer.startTS + (100 << 18) // 100ms
|
||||
|
||||
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
|
||||
err = committer.prewriteMutations(bo, committer.mutations.Slice(0, 1))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.isAsyncCommit(), IsTrue)
|
||||
|
||||
// Set an invalid maxCommitTS to produce MaxCommitTsTooLarge
|
||||
committer.maxCommitTS = committer.startTS - 1
|
||||
err = committer.prewriteMutations(bo, committer.mutations.Slice(1, 2))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.isAsyncCommit(), IsFalse) // Fallback due to MaxCommitTsTooLarge
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit(c *C) {
|
||||
s.prepareTxnFallenBackFromAsyncCommit(c)
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
bo := NewBackoffer(context.Background(), getMaxBackoff)
|
||||
lr := newLockResolver(s.store)
|
||||
status, err := lr.getTxnStatusFromLock(bo, lock, 0, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(NewLock(status.primaryLock), DeepEquals, lock)
|
||||
|
||||
_, err = lr.checkAllSecondaries(bo, lock, &status)
|
||||
c.Assert(err.(*nonAsyncCommitLock), NotNil)
|
||||
|
||||
status, err = lr.getTxnStatusFromLock(bo, lock, 0, true)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.action, Equals, kvrpcpb.Action_TTLExpireRollback)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
s.prepareTxnFallenBackFromAsyncCommit(c)
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
bo := NewBackoffer(context.Background(), getMaxBackoff)
|
||||
expire, pushed, err := newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(expire, Equals, int64(0))
|
||||
c.Assert(len(pushed), Equals, 0)
|
||||
|
||||
t3, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
_, err = t3.Get(context.Background(), []byte("fb1"))
|
||||
errMsgMustContain(c, err, "key not exist")
|
||||
_, err = t3.Get(context.Background(), []byte("fb2"))
|
||||
errMsgMustContain(c, err, "key not exist")
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
s.prepareTxnFallenBackFromAsyncCommit(c)
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
bo := NewBackoffer(context.Background(), getMaxBackoff)
|
||||
loc, err := s.store.regionCache.LocateKey(bo, []byte("fb1"))
|
||||
c.Assert(err, IsNil)
|
||||
ok, err := newLockResolver(s.store).BatchResolveLocks(bo, []*Lock{lock}, loc.Region)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(ok, IsTrue)
|
||||
|
||||
t3, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
_, err = t3.Get(context.Background(), []byte("fb1"))
|
||||
errMsgMustContain(c, err, "key not exist")
|
||||
_, err = t3.Get(context.Background(), []byte("fb2"))
|
||||
errMsgMustContain(c, err, "key not exist")
|
||||
}
|
||||
|
||||
@ -84,6 +84,12 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
|
||||
MaxCommitTs: c.maxCommitTS,
|
||||
}
|
||||
|
||||
failpoint.Inject("invalidMaxCommitTS", func() {
|
||||
if req.MaxCommitTs > 0 {
|
||||
req.MaxCommitTs = minCommitTS - 1
|
||||
}
|
||||
})
|
||||
|
||||
if c.isAsyncCommit() {
|
||||
if batch.isPrimary {
|
||||
req.Secondaries = c.asyncSecondaries()
|
||||
@ -158,10 +164,14 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
|
||||
|
||||
if c.isOnePC() {
|
||||
if prewriteResp.OnePcCommitTs == 0 {
|
||||
if prewriteResp.MinCommitTs != 0 {
|
||||
return errors.Trace(errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc"))
|
||||
}
|
||||
logutil.Logger(bo.ctx).Warn("1pc failed and fallbacks to normal commit procedure",
|
||||
zap.Uint64("startTS", c.startTS))
|
||||
tikvOnePCTxnCounterFallback.Inc()
|
||||
c.setOnePC(false)
|
||||
c.setAsyncCommit(false)
|
||||
} else {
|
||||
// For 1PC, there's no racing to access to access `onePCCommmitTS` so it's safe
|
||||
// not to lock the mutex.
|
||||
|
||||
@ -488,6 +488,10 @@ func (s *mockTikvGrpcServer) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteR
|
||||
return nil, errors.New("unreachable")
|
||||
}
|
||||
|
||||
func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) {
|
||||
return nil, errors.New("unreachable")
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) {
|
||||
// prepare a mock tikv grpc server
|
||||
addr := "localhost:56341"
|
||||
|
||||
Reference in New Issue
Block a user