From aade9a45f20f02a0e4aef3450bea18cbcd3b30ff Mon Sep 17 00:00:00 2001 From: XuWentao <30992981+wentaoxu@users.noreply.github.com> Date: Mon, 26 Feb 2018 16:33:29 +0800 Subject: [PATCH] store/tikv: resolve locks in a batch (#5750) --- store/mockstore/mocktikv/mock_tikv_test.go | 32 +++++++++ store/mockstore/mocktikv/mvcc.go | 36 ++++++++++ store/mockstore/mocktikv/mvcc_leveldb.go | 41 +++++++++++ store/tikv/gcworker/gc_worker.go | 3 +- store/tikv/lock_resolver.go | 83 ++++++++++++++++++++++ 5 files changed, 194 insertions(+), 1 deletion(-) diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index b92f63e2a2..8f6b780b99 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -184,6 +184,10 @@ func (s *testMockTiKVSuite) mustResolveLock(c *C, startTS, commitTS uint64) { c.Assert(s.store.ResolveLock(nil, nil, startTS, commitTS), IsNil) } +func (s *testMockTiKVSuite) mustBatchResolveLock(c *C, txnInfos map[uint64]uint64) { + c.Assert(s.store.BatchResolveLock(nil, nil, txnInfos), IsNil) +} + func (s *testMockTiKVSuite) TestGet(c *C) { s.mustGetNone(c, "x", 10) s.mustPutOK(c, "x", "x", 5, 10) @@ -392,6 +396,34 @@ func (s *testMockTiKVSuite) TestResolveLock(c *C) { s.mustScanLock(c, 30, nil) } +func (s *testMockTiKVSuite) TestBatchResolveLock(c *C) { + s.mustPrewriteOK(c, putMutations("p1", "v11", "s1", "v11"), "p1", 11) + s.mustPrewriteOK(c, putMutations("p2", "v12", "s2", "v12"), "p2", 12) + s.mustPrewriteOK(c, putMutations("p3", "v13"), "p3", 13) + s.mustPrewriteOK(c, putMutations("p4", "v14", "s3", "v14", "s4", "v14"), "p4", 14) + s.mustPrewriteOK(c, putMutations("p5", "v15", "s5", "v15"), "p5", 15) + txnInfos := map[uint64]uint64{ + 11: 0, + 12: 22, + 13: 0, + 14: 24, + } + s.mustBatchResolveLock(c, txnInfos) + s.mustGetNone(c, "p1", 20) + s.mustGetNone(c, "p3", 30) + s.mustGetOK(c, "p2", 30, "v12") + s.mustGetOK(c, "s4", 30, "v14") + s.mustScanLock(c, 30, []*kvrpcpb.LockInfo{ + lock("p5", "p5", 15), + lock("s5", "p5", 15), + }) + txnInfos = map[uint64]uint64{ + 15: 0, + } + s.mustBatchResolveLock(c, txnInfos) + s.mustScanLock(c, 30, nil) +} + func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) { s.mustPutOK(c, "test", "test", 1, 3) diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 834a0b54fc..18486dc148 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -426,6 +426,7 @@ type MVCCStore interface { Cleanup(key []byte, startTS uint64) error ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error + BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error } // RawKV is a key-value storage. MVCCStore can be implemented upon it with timestamp encoded into key. @@ -707,6 +708,41 @@ func (s *MvccStore) ResolveLock(startKey, endKey []byte, startTS, commitTS uint6 return nil } +// BatchResolveLock resolves all orphan locks belong to a transaction. +func (s *MvccStore) BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error { + s.Lock() + defer s.Unlock() + + var ents []*mvccEntry + var err error + iterator := func(item btree.Item) bool { + ent := item.(*mvccEntry) + if !regionContains(startKey, endKey, ent.key) { + return false + } + if ent.lock != nil { + if commitTS, ok := txnInfos[ent.lock.startTS]; ok { + if commitTS > 0 { + err = ent.Commit(ent.lock.startTS, commitTS) + } else { + err = ent.Rollback(ent.lock.startTS) + } + if err != nil { + return false + } + ents = append(ents, ent) + } + } + return true + } + s.tree.AscendGreaterOrEqual(newEntry(startKey), iterator) + if err != nil { + return errors.Trace(err) + } + s.submit(ents...) + return nil +} + // RawGet queries value with the key. func (s *MvccStore) RawGet(key []byte) []byte { s.RLock() diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 1b0bbd5451..7e253265c2 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -823,6 +823,47 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS return mvcc.db.Write(batch, nil) } +// BatchResolveLock implements the MVCCStore interface. +func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) + defer iter.Release() + if err != nil { + return errors.Trace(err) + } + + batch := &leveldb.Batch{} + for iter.Valid() { + dec := lockDecoder{expectKey: currKey} + ok, err := dec.Decode(iter) + if err != nil { + return errors.Trace(err) + } + if ok { + if commitTS, ok := txnInfos[dec.lock.startTS]; ok { + if commitTS > 0 { + err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS) + } else { + err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS) + } + if err != nil { + return errors.Trace(err) + } + } + } + + skip := skipDecoder{currKey: currKey} + _, err = skip.Decode(iter) + if err != nil { + return errors.Trace(err) + } + currKey = skip.currKey + } + return mvcc.db.Write(batch, nil) +} + // Close calls leveldb's Close to free resources. func (mvcc *MVCCLevelDB) Close() error { return mvcc.db.Close() diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 6e66c2f482..1cc6f37188 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -504,7 +504,8 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64) error { for i := range locksInfo { locks[i] = tikv.NewLock(locksInfo[i]) } - ok, err1 := w.store.GetLockResolver().ResolveLocks(bo, locks) + + ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region) if err1 != nil { return errors.Trace(err1) } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 8c21f536a8..62710fd544 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -17,6 +17,7 @@ import ( "container/list" "fmt" "sync" + "time" "github.com/juju/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -148,6 +149,88 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) { return s, ok } +// BatchResolveLocks resolve locks in a batch +func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) { + if len(locks) == 0 { + return true, nil + } + + metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve").Inc() + + var expiredLocks []*Lock + for _, l := range locks { + if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) { + metrics.TiKVLockResolverCounter.WithLabelValues("expired").Inc() + expiredLocks = append(expiredLocks, l) + } else { + metrics.TiKVLockResolverCounter.WithLabelValues("not_expired").Inc() + } + } + if len(expiredLocks) != len(locks) { + log.Errorf("BatchResolveLocks: get %d Locks, but only %d are expired, maybe safe point is wrong!", len(locks), len(expiredLocks)) + return false, nil + } + + startTime := time.Now() + txnInfos := make(map[uint64]uint64) + for _, l := range expiredLocks { + if _, ok := txnInfos[l.TxnID]; ok { + continue + } + + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary) + if err != nil { + return false, errors.Trace(err) + } + txnInfos[l.TxnID] = uint64(status) + } + log.Infof("BatchResolveLocks: it took %v to lookup %v txn status", time.Since(startTime), len(txnInfos)) + + var listTxnInfos []*kvrpcpb.TxnInfo + for txnID, status := range txnInfos { + listTxnInfos = append(listTxnInfos, &kvrpcpb.TxnInfo{ + Txn: txnID, + Status: status, + }) + } + + req := &tikvrpc.Request{ + Type: tikvrpc.CmdResolveLock, + ResolveLock: &kvrpcpb.ResolveLockRequest{ + TxnInfos: listTxnInfos, + }, + } + startTime = time.Now() + resp, err := lr.store.SendReq(bo, req, loc, readTimeoutShort) + if err != nil { + return false, errors.Trace(err) + } + + regionErr, err := resp.GetRegionError() + if err != nil { + return false, errors.Trace(err) + } + + if regionErr != nil { + err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return false, errors.Trace(err) + } + return false, nil + } + + cmdResp := resp.ResolveLock + if cmdResp == nil { + return false, errors.Trace(ErrBodyMissing) + } + if keyErr := cmdResp.GetError(); keyErr != nil { + return false, errors.Errorf("unexpected resolve err: %s", keyErr) + } + + log.Infof("BatchResolveLocks: it took %v to resolve %v locks in a batch.", time.Since(startTime), len(expiredLocks)) + return true, nil +} + // ResolveLocks tries to resolve Locks. The resolving process is in 3 steps: // 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too // old are considered orphan locks and will be handled later. If all locks