From bc41e47360e87a56771ada8767ccc4fd9fcf64cb Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 10 Dec 2020 12:16:21 +0800 Subject: [PATCH] store/tikv: support fallback from async commit (take 2) (#21531) Signed-off-by: Yilin Chen --- go.mod | 4 +- go.sum | 8 +-- session/pessimistic_test.go | 8 +-- store/tikv/2pc_test.go | 2 +- store/tikv/async_commit_fail_test.go | 4 +- store/tikv/async_commit_test.go | 4 +- store/tikv/lock_resolver.go | 85 +++++++++++++++--------- store/tikv/lock_test.go | 98 ++++++++++++++++++++++++++-- store/tikv/prewrite.go | 10 +++ store/tikv/region_request_test.go | 4 ++ 10 files changed, 174 insertions(+), 53 deletions(-) diff --git a/go.mod b/go.mod index 62417ba2f0..6b006e1dd6 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef - github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d + github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.1.0 @@ -45,7 +45,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3 + github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 github.com/pingcap/parser v0.0.0-20201203085211-44f6be1df1c4 github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966 diff --git a/go.sum b/go.sum index d9e46d4b94..f7203326d7 100644 --- a/go.sum +++ b/go.sum @@ -590,8 +590,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= -github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d h1:hh0yCo0UtCuakNdkiRPaLHqzfgxacwUk6/pb9iJyJKU= -github.com/ngaut/unistore v0.0.0-20201113064408-907e3fcf8e7d/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= +github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5 h1:inEktZjWoqSSRB8P6Zkj8cgwnbaAiSObeisgr/36L8U= +github.com/ngaut/unistore v0.0.0-20201208082126-4766545aa5b5/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -681,8 +681,8 @@ github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3 h1:cpYxg8ggZU3UhVVd4iafhzetjEl2xB1KVjuhEKOhmjU= -github.com/pingcap/kvproto v0.0.0-20201130052818-5dfa7b1325a3/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c h1:RbI6VpxZjaVVkeuxzEKCxw20+FWtXiIhgM+mvzhTc8I= +github.com/pingcap/kvproto v0.0.0-20201208043834-923c9609272c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index db9c5136b8..ada32ad2e6 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2068,9 +2068,7 @@ func (s *testPessimisticSuite) TestAsyncCommitWithSchemaChange(c *C) { tk2.MustExec("alter table tk add index k2(c2)") }() c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil) - _ = tk.ExecToErr("commit") - // TODO: wait for https://github.com/pingcap/tidb/pull/21531 - // c.Assert(err, ErrorMatches, ".*commit TS \\d+ is too large") + tk.MustExec("commit") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) tk3.MustExec("admin check table tk") } @@ -2123,9 +2121,7 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) { tk2.MustExec("alter table tk add index k2(c2)") }() c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1000)"), IsNil) - _ = tk.ExecToErr("commit") - // TODO: Check the error after supporting falling back to 2PC in TiKV. - // c.Assert(err, IsNil) + tk.MustExec("commit") c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) tk3.MustExec("admin check table tk") } diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 866342dfa0..69ca6bcb63 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -691,7 +691,7 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lr := newLockResolver(s.store) bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil) - status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, 0, txn.startTS, true, false) c.Assert(err, IsNil) c.Assert(status.ttl, GreaterEqual, lockInfo.LockTtl) diff --git a/store/tikv/async_commit_fail_test.go b/store/tikv/async_commit_fail_test.go index de778b1a6e..da2e4757ae 100644 --- a/store/tikv/async_commit_fail_test.go +++ b/store/tikv/async_commit_fail_test.go @@ -179,7 +179,7 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) { primary := txn.committer.primary() bo := NewBackofferWithVars(context.Background(), 5000, nil) - txnStatus, err := s.store.lockResolver.getTxnStatus(bo, txn.StartTS(), primary, 0, 0, false) + txnStatus, err := s.store.lockResolver.getTxnStatus(bo, txn.StartTS(), primary, 0, 0, false, false) c.Assert(err, IsNil) c.Assert(txnStatus.IsCommitted(), IsFalse) c.Assert(txnStatus.action, Equals, kvrpcpb.Action_NoAction) @@ -203,6 +203,8 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) { c.Assert(gotSecondaries, DeepEquals, expectedSecondaries) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil) + txn.committer.cleanup(context.Background()) + txn.committer.cleanWg.Wait() } test([]string{"a"}, []string{"a1"}) diff --git a/store/tikv/async_commit_test.go b/store/tikv/async_commit_test.go index eb2080d537..7d7607a2f5 100644 --- a/store/tikv/async_commit_test.go +++ b/store/tikv/async_commit_test.go @@ -208,7 +208,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) { c.Assert(err, IsNil) currentTS, err := s.store.oracle.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(err, IsNil) - status, err = s.store.lockResolver.getTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true) + status, err = s.store.lockResolver.getTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsTrue) c.Assert(status.CommitTS(), Equals, ts) @@ -234,7 +234,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) { atomic.StoreInt64(&gotCheckA, 1) resp = kvrpcpb.CheckSecondaryLocksResponse{ - Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts}}, + Locks: []*kvrpcpb.LockInfo{{Key: []byte("a"), PrimaryLock: []byte("z"), LockVersion: ts, UseAsyncCommit: true}}, CommitTs: commitTs, } } else if bytes.Equal(k, []byte("i")) { diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index c3584afa9c..33f69c09b0 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -233,7 +233,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi tikvLockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false) if err != nil { return false, err } @@ -242,11 +242,18 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // Then we need to check the secondary locks to determine the final status of the transaction. if status.primaryLock != nil && status.primaryLock.UseAsyncCommit { resolveData, err := lr.checkAllSecondaries(bo, l, &status) - if err != nil { + if err == nil { + txnInfos[l.TxnID] = resolveData.commitTs + continue + } + if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok { + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true) + if err != nil { + return false, err + } + } else { return false, err } - txnInfos[l.TxnID] = resolveData.commitTs - continue } if status.ttl > 0 { @@ -344,12 +351,11 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks pushed = make([]uint64, 0, len(locks)) } - for _, l := range locks { - status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS) + var resolve func(*Lock, bool) error + resolve = func(l *Lock, forceSyncCommit bool) error { + status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit) if err != nil { - msBeforeTxnExpired.update(0) - err = errors.Trace(err) - return msBeforeTxnExpired.value(), nil, err + return err } if status.ttl == 0 { @@ -361,17 +367,18 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks cleanTxns[l.TxnID] = cleanRegions } - if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !exists { + if status.primaryLock != nil && !forceSyncCommit && status.primaryLock.UseAsyncCommit && !exists { err = lr.resolveLockAsync(bo, l, status) + if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok { + err = resolve(l, true) + } } else if l.LockType == kvrpcpb.Op_PessimisticLock { err = lr.resolvePessimisticLock(bo, l, cleanRegions) } else { err = lr.resolveLock(bo, l, status, lite, cleanRegions) } if err != nil { - msBeforeTxnExpired.update(0) - err = errors.Trace(err) - return msBeforeTxnExpired.value(), nil, err + return err } } else { tikvLockResolverCountWithNotExpired.Inc() @@ -386,16 +393,26 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks // This could avoids the deadlock scene of two large transaction. if l.LockType != kvrpcpb.Op_PessimisticLock && l.TxnID > callerStartTS { tikvLockResolverCountWithWriteConflict.Inc() - return msBeforeTxnExpired.value(), nil, kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key) + return kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key) } } else { if status.action != kvrpcpb.Action_MinCommitTSPushed { pushFail = true - continue + return nil } pushed = append(pushed, l.TxnID) } } + return nil + } + + for _, l := range locks { + err := resolve(l, false) + if err != nil { + msBeforeTxnExpired.update(0) + err = errors.Trace(err) + return msBeforeTxnExpired.value(), nil, err + } } if pushFail { // If any of the lock fails to push minCommitTS, don't return the pushed array. @@ -451,14 +468,14 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary if err != nil { return status, err } - return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true) + return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, false) } -func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool) (TxnStatus, error) { var currentTS uint64 var err error var status TxnStatus - if l.UseAsyncCommit { + if l.UseAsyncCommit && !forceSyncCommit { // Async commit doesn't need the current ts since it uses the minCommitTS. currentTS = 0 // Set to 0 so as not to push forward min commit ts. @@ -481,7 +498,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart time.Sleep(100 * time.Millisecond) }) for { - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist) + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist, forceSyncCommit) if err == nil { return status, nil } @@ -533,7 +550,8 @@ func (e txnNotFoundErr) Error() string { // getTxnStatus sends the CheckTxnStatus request to the TiKV server. // When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error. -func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, + callerStartTS, currentTS uint64, rollbackIfNotExist bool, forceSyncCommit bool) (TxnStatus, error) { if s, ok := lr.getResolved(txnID); ok { return s, nil } @@ -556,6 +574,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte CallerStartTs: callerStartTS, CurrentTs: currentTS, RollbackIfNotExist: rollbackIfNotExist, + ForceSyncCommit: forceSyncCommit, }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -628,6 +647,12 @@ type asyncResolveData struct { missingLock bool } +type nonAsyncCommitLock struct{} + +func (*nonAsyncCommitLock) Error() string { + return "CheckSecondaryLocks receives a non-async-commit lock" +} + // addKeys adds the keys from locks to data, keeping other fields up to date. startTS and commitTS are for the // transaction being resolved. // @@ -671,7 +696,9 @@ func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, s logutil.BgLogger().Error("addLocks error", zap.Error(err)) return err } - + if !lockInfo.UseAsyncCommit { + return &nonAsyncCommitLock{} + } if !data.missingLock && lockInfo.MinCommitTs > data.commitTs { data.commitTs = lockInfo.MinCommitTs } @@ -786,28 +813,24 @@ func (lr *LockResolver) checkAllSecondaries(bo *Backoffer, l *Lock, status *TxnS } errChan := make(chan error, len(regions)) - + checkBo, cancel := bo.Fork() + defer cancel() for regionID, keys := range regions { curRegionID := regionID curKeys := keys go func() { - errChan <- lr.checkSecondaries(bo, l.TxnID, curKeys, curRegionID, &shared) + errChan <- lr.checkSecondaries(checkBo, l.TxnID, curKeys, curRegionID, &shared) }() } - var errs []string for range regions { - err1 := <-errChan - if err1 != nil { - errs = append(errs, err1.Error()) + err := <-errChan + if err != nil { + return nil, err } } - if len(errs) > 0 { - return nil, errors.Errorf("async commit recovery (sending CheckSecondaryLocks) finished with errors: %v", errs) - } - return &shared, nil } diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 4de9f101e3..2be3dfac5b 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -288,7 +288,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil) resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. - status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) + status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) @@ -310,7 +310,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Then call getTxnStatus again and check the lock status. currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(err, IsNil) - status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true) + status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) @@ -318,7 +318,7 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true) + status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, false) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, commitTS) @@ -346,7 +346,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { resolver := newLockResolver(s.store) // Call getTxnStatus for the TxnNotFound case. - _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false) + _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, false) c.Assert(err, NotNil) _, ok := errors.Cause(err).(txnNotFoundErr) c.Assert(ok, IsTrue) @@ -363,7 +363,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { TTL: 100000, } // Call getTxnStatusFromLock to cover the retry logic. - status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS) + status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS, false) c.Assert(err, IsNil) c.Assert(status.ttl, Greater, uint64(0)) c.Assert(<-errCh, IsNil) @@ -378,7 +378,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { TxnID: startTS, TTL: 1000, } - status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS) + status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS, false) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) @@ -585,3 +585,89 @@ func (s *testLockSuite) TestDeduplicateKeys(c *C) { c.Assert(out, Equals, "a b c") } } + +func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit(c *C) { + txn, err := s.store.Begin() + c.Assert(err, IsNil) + err = txn.Set([]byte("fb1"), []byte("1")) + c.Assert(err, IsNil) + err = txn.Set([]byte("fb2"), []byte("2")) + c.Assert(err, IsNil) + + committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 1) + c.Assert(err, IsNil) + c.Assert(committer.mutations.Len(), Equals, 2) + committer.lockTTL = 0 + committer.setAsyncCommit(true) + committer.maxCommitTS = committer.startTS + (100 << 18) // 100ms + + bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) + err = committer.prewriteMutations(bo, committer.mutations.Slice(0, 1)) + c.Assert(err, IsNil) + c.Assert(committer.isAsyncCommit(), IsTrue) + + // Set an invalid maxCommitTS to produce MaxCommitTsTooLarge + committer.maxCommitTS = committer.startTS - 1 + err = committer.prewriteMutations(bo, committer.mutations.Slice(1, 2)) + c.Assert(err, IsNil) + c.Assert(committer.isAsyncCommit(), IsFalse) // Fallback due to MaxCommitTsTooLarge +} + +func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit(c *C) { + s.prepareTxnFallenBackFromAsyncCommit(c) + + lock := s.mustGetLock(c, []byte("fb1")) + c.Assert(lock.UseAsyncCommit, IsTrue) + bo := NewBackoffer(context.Background(), getMaxBackoff) + lr := newLockResolver(s.store) + status, err := lr.getTxnStatusFromLock(bo, lock, 0, false) + c.Assert(err, IsNil) + c.Assert(NewLock(status.primaryLock), DeepEquals, lock) + + _, err = lr.checkAllSecondaries(bo, lock, &status) + c.Assert(err.(*nonAsyncCommitLock), NotNil) + + status, err = lr.getTxnStatusFromLock(bo, lock, 0, true) + c.Assert(err, IsNil) + c.Assert(status.action, Equals, kvrpcpb.Action_TTLExpireRollback) + c.Assert(status.TTL(), Equals, uint64(0)) +} + +func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit(c *C) { + s.prepareTxnFallenBackFromAsyncCommit(c) + + lock := s.mustGetLock(c, []byte("fb1")) + c.Assert(lock.UseAsyncCommit, IsTrue) + bo := NewBackoffer(context.Background(), getMaxBackoff) + expire, pushed, err := newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock}) + c.Assert(err, IsNil) + c.Assert(expire, Equals, int64(0)) + c.Assert(len(pushed), Equals, 0) + + t3, err := s.store.Begin() + c.Assert(err, IsNil) + _, err = t3.Get(context.Background(), []byte("fb1")) + errMsgMustContain(c, err, "key not exist") + _, err = t3.Get(context.Background(), []byte("fb2")) + errMsgMustContain(c, err, "key not exist") +} + +func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) { + s.prepareTxnFallenBackFromAsyncCommit(c) + + lock := s.mustGetLock(c, []byte("fb1")) + c.Assert(lock.UseAsyncCommit, IsTrue) + bo := NewBackoffer(context.Background(), getMaxBackoff) + loc, err := s.store.regionCache.LocateKey(bo, []byte("fb1")) + c.Assert(err, IsNil) + ok, err := newLockResolver(s.store).BatchResolveLocks(bo, []*Lock{lock}, loc.Region) + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + + t3, err := s.store.Begin() + c.Assert(err, IsNil) + _, err = t3.Get(context.Background(), []byte("fb1")) + errMsgMustContain(c, err, "key not exist") + _, err = t3.Get(context.Background(), []byte("fb2")) + errMsgMustContain(c, err, "key not exist") +} diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index f204131180..bc1780f343 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -84,6 +84,12 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u MaxCommitTs: c.maxCommitTS, } + failpoint.Inject("invalidMaxCommitTS", func() { + if req.MaxCommitTs > 0 { + req.MaxCommitTs = minCommitTS - 1 + } + }) + if c.isAsyncCommit() { if batch.isPrimary { req.Secondaries = c.asyncSecondaries() @@ -158,10 +164,14 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff if c.isOnePC() { if prewriteResp.OnePcCommitTs == 0 { + if prewriteResp.MinCommitTs != 0 { + return errors.Trace(errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc")) + } logutil.Logger(bo.ctx).Warn("1pc failed and fallbacks to normal commit procedure", zap.Uint64("startTS", c.startTS)) tikvOnePCTxnCounterFallback.Inc() c.setOnePC(false) + c.setAsyncCommit(false) } else { // For 1PC, there's no racing to access to access `onePCCommmitTS` so it's safe // not to lock the mutex. diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index c8bd231503..ce2c6e6bbb 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -488,6 +488,10 @@ func (s *mockTikvGrpcServer) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteR return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) { // prepare a mock tikv grpc server addr := "localhost:56341"