tikv: support async commit locks in BatchResolveLocks (#19860)
Signed-off-by: Nick Cameron <nrc@ncameron.org> Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
@ -135,6 +135,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
|
||||
resp.Resp, err = c.usSvr.KvCleanup(ctx, req.Cleanup())
|
||||
case tikvrpc.CmdCheckTxnStatus:
|
||||
resp.Resp, err = c.usSvr.KvCheckTxnStatus(ctx, req.CheckTxnStatus())
|
||||
case tikvrpc.CmdCheckSecondaryLocks:
|
||||
resp.Resp, err = c.usSvr.KvCheckSecondaryLocks(ctx, req.CheckSecondaryLocks())
|
||||
case tikvrpc.CmdTxnHeartBeat:
|
||||
resp.Resp, err = c.usSvr.KvTxnHeartBeat(ctx, req.TxnHeartBeat())
|
||||
case tikvrpc.CmdBatchGet:
|
||||
|
||||
@ -237,6 +237,17 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
|
||||
return false, err
|
||||
}
|
||||
|
||||
// If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock.
|
||||
// Then we need to check the secondary locks to determine the final status of the transaction.
|
||||
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
|
||||
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
txnInfos[l.TxnID] = resolveData.commitTs
|
||||
continue
|
||||
}
|
||||
|
||||
if status.ttl > 0 {
|
||||
logutil.BgLogger().Error("BatchResolveLocks fail to clean locks, this result is not expected!")
|
||||
return false, errors.New("TiDB ask TiKV to rollback locks but it doesn't, the protocol maybe wrong")
|
||||
@ -718,11 +729,52 @@ func (lr *LockResolver) checkSecondaries(bo *Backoffer, txnID uint64, curKeys []
|
||||
func (lr *LockResolver) resolveLockAsync(bo *Backoffer, l *Lock, status TxnStatus) error {
|
||||
tikvLockResolverCountWithResolveAsync.Inc()
|
||||
|
||||
regions, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, status.primaryLock.Secondaries, nil)
|
||||
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
status.commitTS = resolveData.commitTs
|
||||
|
||||
resolveData.keys = append(resolveData.keys, l.Primary)
|
||||
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, resolveData.keys, nil)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
errChan := make(chan error, len(keysByRegion))
|
||||
// Resolve every lock in the transaction.
|
||||
for region, locks := range keysByRegion {
|
||||
curLocks := locks
|
||||
curRegion := region
|
||||
go func() {
|
||||
errChan <- lr.resolveRegionLocks(bo, l, curRegion, curLocks, status)
|
||||
}()
|
||||
}
|
||||
|
||||
var errs []string
|
||||
for range keysByRegion {
|
||||
err1 := <-errChan
|
||||
if err1 != nil {
|
||||
errs = append(errs, err1.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Errorf("async commit recovery (sending ResolveLock) finished with errors: %v", errs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkAllSecondaries checks the secondary locks of an async commit transaction to find out the final
|
||||
// status of the transaction
|
||||
func (lr *LockResolver) checkAllSecondaries(bo *Backoffer, l *Lock, status *TxnStatus) (*asyncResolveData, error) {
|
||||
regions, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, status.primaryLock.Secondaries, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
shared := asyncResolveData{
|
||||
mutex: sync.Mutex{},
|
||||
commitTs: status.primaryLock.MinCommitTs,
|
||||
@ -750,42 +802,12 @@ func (lr *LockResolver) resolveLockAsync(bo *Backoffer, l *Lock, status TxnStatu
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Errorf("async commit recovery (sending CheckSecondaryLocks) finished with errors: %v", errs)
|
||||
}
|
||||
|
||||
shared.keys = append(shared.keys, l.Primary)
|
||||
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, shared.keys, nil)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return nil, errors.Errorf("async commit recovery (sending CheckSecondaryLocks) finished with errors: %v", errs)
|
||||
}
|
||||
|
||||
// TODO(nrc, cfzjywxk) schema lease check
|
||||
|
||||
status.commitTS = shared.commitTs
|
||||
|
||||
errChan = make(chan error, len(keysByRegion))
|
||||
// Resolve every lock in the transaction.
|
||||
for region, locks := range keysByRegion {
|
||||
curLocks := locks
|
||||
curRegion := region
|
||||
go func() {
|
||||
errChan <- lr.resolveRegionLocks(bo, l, curRegion, curLocks, status)
|
||||
}()
|
||||
}
|
||||
|
||||
errs = nil
|
||||
for range keysByRegion {
|
||||
err1 := <-errChan
|
||||
if err1 != nil {
|
||||
errs = append(errs, err1.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.Errorf("async commit recovery (sending ResolveLock) finished with errors: %v", errs)
|
||||
}
|
||||
|
||||
return nil
|
||||
return &shared, nil
|
||||
}
|
||||
|
||||
// resolveRegionLocks is essentially the same as resolveLock, but we resolve all keys in the same region at the same time.
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
package tikv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
@ -463,25 +464,61 @@ func (s *testLockSuite) TestLockTTL(c *C) {
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestBatchResolveLocks(c *C) {
|
||||
// The first transaction is a normal transaction with a long TTL
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
txn.Set(kv.Key("key"), []byte("value"))
|
||||
s.prewriteTxnWithTTL(c, txn.(*tikvTxn), 1000)
|
||||
l := s.mustGetLock(c, []byte("key"))
|
||||
msBeforeLockExpired := s.store.GetOracle().UntilExpired(l.TxnID, l.TTL)
|
||||
txn.Set(kv.Key("k1"), []byte("v1"))
|
||||
txn.Set(kv.Key("k2"), []byte("v2"))
|
||||
s.prewriteTxnWithTTL(c, txn.(*tikvTxn), 20000)
|
||||
|
||||
// The second transaction is an async commit transaction
|
||||
txn, err = s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
txn.Set(kv.Key("k3"), []byte("v3"))
|
||||
txn.Set(kv.Key("k4"), []byte("v4"))
|
||||
tikvTxn := txn.(*tikvTxn)
|
||||
committer, err := newTwoPhaseCommitterWithInit(tikvTxn, 0)
|
||||
c.Assert(err, IsNil)
|
||||
committer.setAsyncCommit(true)
|
||||
committer.lockTTL = 20000
|
||||
err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
var locks []*Lock
|
||||
for _, key := range []string{"k1", "k2", "k3", "k4"} {
|
||||
l := s.mustGetLock(c, []byte(key))
|
||||
locks = append(locks, l)
|
||||
}
|
||||
|
||||
// Locks may not expired
|
||||
msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL)
|
||||
c.Assert(msBeforeLockExpired, Greater, int64(0))
|
||||
msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL)
|
||||
c.Assert(msBeforeLockExpired, Greater, int64(0))
|
||||
|
||||
lr := newLockResolver(s.store)
|
||||
bo := NewBackofferWithVars(context.Background(), GcResolveLockMaxBackoff, nil)
|
||||
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Primary)
|
||||
loc, err := lr.store.GetRegionCache().LocateKey(bo, locks[0].Primary)
|
||||
c.Assert(err, IsNil)
|
||||
// Check BatchResolveLocks resolve the lock even the ttl is not expired.
|
||||
succ, err := lr.BatchResolveLocks(bo, []*Lock{l}, loc.Region)
|
||||
c.Assert(succ, IsTrue)
|
||||
success, err := lr.BatchResolveLocks(bo, locks, loc.Region)
|
||||
c.Assert(success, IsTrue)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = txn.Commit(context.Background())
|
||||
c.Assert(err, NotNil)
|
||||
txn, err = s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
// transaction 1 is rolled back
|
||||
_, err = txn.Get(context.Background(), kv.Key("k1"))
|
||||
c.Assert(err, Equals, kv.ErrNotExist)
|
||||
_, err = txn.Get(context.Background(), kv.Key("k2"))
|
||||
c.Assert(err, Equals, kv.ErrNotExist)
|
||||
// transaction 2 is committed
|
||||
v, err := txn.Get(context.Background(), kv.Key("k3"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(bytes.Equal(v, []byte("v3")), IsTrue)
|
||||
v, err = txn.Get(context.Background(), kv.Key("k4"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(bytes.Equal(v, []byte("v4")), IsTrue)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestNewLockZeroTTL(c *C) {
|
||||
|
||||
Reference in New Issue
Block a user