From e2d8d29a3450dc1944730e205fe397331aba228f Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 9 Sep 2020 19:07:20 +0800 Subject: [PATCH] tikv: support async commit locks in BatchResolveLocks (#19860) Signed-off-by: Nick Cameron Signed-off-by: Yilin Chen --- store/mockstore/unistore/rpc.go | 2 + store/tikv/lock_resolver.go | 88 ++++++++++++++++++++------------- store/tikv/lock_test.go | 55 +++++++++++++++++---- 3 files changed, 103 insertions(+), 42 deletions(-) diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 98f8c9083e..3d4b7b465b 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -135,6 +135,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R resp.Resp, err = c.usSvr.KvCleanup(ctx, req.Cleanup()) case tikvrpc.CmdCheckTxnStatus: resp.Resp, err = c.usSvr.KvCheckTxnStatus(ctx, req.CheckTxnStatus()) + case tikvrpc.CmdCheckSecondaryLocks: + resp.Resp, err = c.usSvr.KvCheckSecondaryLocks(ctx, req.CheckSecondaryLocks()) case tikvrpc.CmdTxnHeartBeat: resp.Resp, err = c.usSvr.KvTxnHeartBeat(ctx, req.TxnHeartBeat()) case tikvrpc.CmdBatchGet: diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 9be52ff6ad..27a36d03e3 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -237,6 +237,17 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi return false, err } + // If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock. + // Then we need to check the secondary locks to determine the final status of the transaction. + if status.primaryLock != nil && status.primaryLock.UseAsyncCommit { + resolveData, err := lr.checkAllSecondaries(bo, l, &status) + if err != nil { + return false, err + } + txnInfos[l.TxnID] = resolveData.commitTs + continue + } + if status.ttl > 0 { logutil.BgLogger().Error("BatchResolveLocks fail to clean locks, this result is not expected!") return false, errors.New("TiDB ask TiKV to rollback locks but it doesn't, the protocol maybe wrong") @@ -718,11 +729,52 @@ func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys [] func (lr *LockResolver) resolveLockAsync(bo *Backoffer, l *Lock, status TxnStatus) error { tikvLockResolverCountWithResolveAsync.Inc() - regions, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, status.primaryLock.Secondaries, nil) + resolveData, err := lr.checkAllSecondaries(bo, l, &status) + if err != nil { + return err + } + + status.commitTS = resolveData.commitTs + + resolveData.keys = append(resolveData.keys, l.Primary) + keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, resolveData.keys, nil) if err != nil { return errors.Trace(err) } + errChan := make(chan error, len(keysByRegion)) + // Resolve every lock in the transaction. + for region, locks := range keysByRegion { + curLocks := locks + curRegion := region + go func() { + errChan <- lr.resolveRegionLocks(bo, l, curRegion, curLocks, status) + }() + } + + var errs []string + for range keysByRegion { + err1 := <-errChan + if err1 != nil { + errs = append(errs, err1.Error()) + } + } + + if len(errs) > 0 { + return errors.Errorf("async commit recovery (sending ResolveLock) finished with errors: %v", errs) + } + + return nil +} + +// checkAllSecondaries checks the secondary locks of an async commit transaction to find out the final +// status of the transaction +func (lr *LockResolver) checkAllSecondaries(bo *Backoffer, l *Lock, status *TxnStatus) (*asyncResolveData, error) { + regions, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, status.primaryLock.Secondaries, nil) + if err != nil { + return nil, errors.Trace(err) + } + shared := asyncResolveData{ mutex: sync.Mutex{}, commitTs: status.primaryLock.MinCommitTs, @@ -750,42 +802,12 @@ func (lr *LockResolver) resolveLockAsync(bo *Backoffer, l *Lock, status TxnStatu } if len(errs) > 0 { - return errors.Errorf("async commit recovery (sending CheckSecondaryLocks) finished with errors: %v", errs) - } - - shared.keys = append(shared.keys, l.Primary) - keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, shared.keys, nil) - if err != nil { - return errors.Trace(err) + return nil, errors.Errorf("async commit recovery (sending CheckSecondaryLocks) finished with errors: %v", errs) } // TODO(nrc, cfzjywxk) schema lease check - status.commitTS = shared.commitTs - - errChan = make(chan error, len(keysByRegion)) - // Resolve every lock in the transaction. - for region, locks := range keysByRegion { - curLocks := locks - curRegion := region - go func() { - errChan <- lr.resolveRegionLocks(bo, l, curRegion, curLocks, status) - }() - } - - errs = nil - for range keysByRegion { - err1 := <-errChan - if err1 != nil { - errs = append(errs, err1.Error()) - } - } - - if len(errs) > 0 { - return errors.Errorf("async commit recovery (sending ResolveLock) finished with errors: %v", errs) - } - - return nil + return &shared, nil } // resolveRegionLocks is essentially the same as resolveLock, but we resolve all keys in the same region at the same time. diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 51067a4558..c992b2d12d 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -14,6 +14,7 @@ package tikv import ( + "bytes" "context" "fmt" "math" @@ -463,25 +464,61 @@ func (s *testLockSuite) TestLockTTL(c *C) { } func (s *testLockSuite) TestBatchResolveLocks(c *C) { + // The first transaction is a normal transaction with a long TTL txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.Set(kv.Key("key"), []byte("value")) - s.prewriteTxnWithTTL(c, txn.(*tikvTxn), 1000) - l := s.mustGetLock(c, []byte("key")) - msBeforeLockExpired := s.store.GetOracle().UntilExpired(l.TxnID, l.TTL) + txn.Set(kv.Key("k1"), []byte("v1")) + txn.Set(kv.Key("k2"), []byte("v2")) + s.prewriteTxnWithTTL(c, txn.(*tikvTxn), 20000) + + // The second transaction is an async commit transaction + txn, err = s.store.Begin() + c.Assert(err, IsNil) + txn.Set(kv.Key("k3"), []byte("v3")) + txn.Set(kv.Key("k4"), []byte("v4")) + tikvTxn := txn.(*tikvTxn) + committer, err := newTwoPhaseCommitterWithInit(tikvTxn, 0) + c.Assert(err, IsNil) + committer.setAsyncCommit(true) + committer.lockTTL = 20000 + err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations) + c.Assert(err, IsNil) + + var locks []*Lock + for _, key := range []string{"k1", "k2", "k3", "k4"} { + l := s.mustGetLock(c, []byte(key)) + locks = append(locks, l) + } + + // Locks may not expired + msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL) + c.Assert(msBeforeLockExpired, Greater, int64(0)) + msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL) c.Assert(msBeforeLockExpired, Greater, int64(0)) lr := newLockResolver(s.store) bo := NewBackofferWithVars(context.Background(), GcResolveLockMaxBackoff, nil) - loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Primary) + loc, err := lr.store.GetRegionCache().LocateKey(bo, locks[0].Primary) c.Assert(err, IsNil) // Check BatchResolveLocks resolve the lock even the ttl is not expired. - succ, err := lr.BatchResolveLocks(bo, []*Lock{l}, loc.Region) - c.Assert(succ, IsTrue) + success, err := lr.BatchResolveLocks(bo, locks, loc.Region) + c.Assert(success, IsTrue) c.Assert(err, IsNil) - err = txn.Commit(context.Background()) - c.Assert(err, NotNil) + txn, err = s.store.Begin() + c.Assert(err, IsNil) + // transaction 1 is rolled back + _, err = txn.Get(context.Background(), kv.Key("k1")) + c.Assert(err, Equals, kv.ErrNotExist) + _, err = txn.Get(context.Background(), kv.Key("k2")) + c.Assert(err, Equals, kv.ErrNotExist) + // transaction 2 is committed + v, err := txn.Get(context.Background(), kv.Key("k3")) + c.Assert(err, IsNil) + c.Assert(bytes.Equal(v, []byte("v3")), IsTrue) + v, err = txn.Get(context.Background(), kv.Key("k4")) + c.Assert(err, IsNil) + c.Assert(bytes.Equal(v, []byte("v4")), IsTrue) } func (s *testLockSuite) TestNewLockZeroTTL(c *C) {