store/tikv: resolve locks in a batch (#5750)

This commit is contained in:
XuWentao
2018-02-26 16:33:29 +08:00
committed by zhangjinpeng1987
parent cd63332ebf
commit aade9a45f2
5 changed files with 194 additions and 1 deletions

View File

@ -184,6 +184,10 @@ func (s *testMockTiKVSuite) mustResolveLock(c *C, startTS, commitTS uint64) {
c.Assert(s.store.ResolveLock(nil, nil, startTS, commitTS), IsNil)
}
func (s *testMockTiKVSuite) mustBatchResolveLock(c *C, txnInfos map[uint64]uint64) {
c.Assert(s.store.BatchResolveLock(nil, nil, txnInfos), IsNil)
}
func (s *testMockTiKVSuite) TestGet(c *C) {
s.mustGetNone(c, "x", 10)
s.mustPutOK(c, "x", "x", 5, 10)
@ -392,6 +396,34 @@ func (s *testMockTiKVSuite) TestResolveLock(c *C) {
s.mustScanLock(c, 30, nil)
}
func (s *testMockTiKVSuite) TestBatchResolveLock(c *C) {
s.mustPrewriteOK(c, putMutations("p1", "v11", "s1", "v11"), "p1", 11)
s.mustPrewriteOK(c, putMutations("p2", "v12", "s2", "v12"), "p2", 12)
s.mustPrewriteOK(c, putMutations("p3", "v13"), "p3", 13)
s.mustPrewriteOK(c, putMutations("p4", "v14", "s3", "v14", "s4", "v14"), "p4", 14)
s.mustPrewriteOK(c, putMutations("p5", "v15", "s5", "v15"), "p5", 15)
txnInfos := map[uint64]uint64{
11: 0,
12: 22,
13: 0,
14: 24,
}
s.mustBatchResolveLock(c, txnInfos)
s.mustGetNone(c, "p1", 20)
s.mustGetNone(c, "p3", 30)
s.mustGetOK(c, "p2", 30, "v12")
s.mustGetOK(c, "s4", 30, "v14")
s.mustScanLock(c, 30, []*kvrpcpb.LockInfo{
lock("p5", "p5", 15),
lock("s5", "p5", 15),
})
txnInfos = map[uint64]uint64{
15: 0,
}
s.mustBatchResolveLock(c, txnInfos)
s.mustScanLock(c, 30, nil)
}
func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) {
s.mustPutOK(c, "test", "test", 1, 3)

View File

@ -426,6 +426,7 @@ type MVCCStore interface {
Cleanup(key []byte, startTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
}
// RawKV is a key-value storage. MVCCStore can be implemented upon it with timestamp encoded into key.
@ -707,6 +708,41 @@ func (s *MvccStore) ResolveLock(startKey, endKey []byte, startTS, commitTS uint6
return nil
}
// BatchResolveLock resolves all orphan locks belong to a transaction.
func (s *MvccStore) BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error {
s.Lock()
defer s.Unlock()
var ents []*mvccEntry
var err error
iterator := func(item btree.Item) bool {
ent := item.(*mvccEntry)
if !regionContains(startKey, endKey, ent.key) {
return false
}
if ent.lock != nil {
if commitTS, ok := txnInfos[ent.lock.startTS]; ok {
if commitTS > 0 {
err = ent.Commit(ent.lock.startTS, commitTS)
} else {
err = ent.Rollback(ent.lock.startTS)
}
if err != nil {
return false
}
ents = append(ents, ent)
}
}
return true
}
s.tree.AscendGreaterOrEqual(newEntry(startKey), iterator)
if err != nil {
return errors.Trace(err)
}
s.submit(ents...)
return nil
}
// RawGet queries value with the key.
func (s *MvccStore) RawGet(key []byte) []byte {
s.RLock()

View File

@ -823,6 +823,47 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
return mvcc.db.Write(batch, nil)
}
// BatchResolveLock implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release()
if err != nil {
return errors.Trace(err)
}
batch := &leveldb.Batch{}
for iter.Valid() {
dec := lockDecoder{expectKey: currKey}
ok, err := dec.Decode(iter)
if err != nil {
return errors.Trace(err)
}
if ok {
if commitTS, ok := txnInfos[dec.lock.startTS]; ok {
if commitTS > 0 {
err = commitLock(batch, dec.lock, currKey, dec.lock.startTS, commitTS)
} else {
err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS)
}
if err != nil {
return errors.Trace(err)
}
}
}
skip := skipDecoder{currKey: currKey}
_, err = skip.Decode(iter)
if err != nil {
return errors.Trace(err)
}
currKey = skip.currKey
}
return mvcc.db.Write(batch, nil)
}
// Close calls leveldb's Close to free resources.
func (mvcc *MVCCLevelDB) Close() error {
return mvcc.db.Close()

View File

@ -504,7 +504,8 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64) error {
for i := range locksInfo {
locks[i] = tikv.NewLock(locksInfo[i])
}
ok, err1 := w.store.GetLockResolver().ResolveLocks(bo, locks)
ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region)
if err1 != nil {
return errors.Trace(err1)
}

View File

@ -17,6 +17,7 @@ import (
"container/list"
"fmt"
"sync"
"time"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
@ -148,6 +149,88 @@ func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) {
return s, ok
}
// BatchResolveLocks resolve locks in a batch
func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc RegionVerID) (bool, error) {
if len(locks) == 0 {
return true, nil
}
metrics.TiKVLockResolverCounter.WithLabelValues("batch_resolve").Inc()
var expiredLocks []*Lock
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
metrics.TiKVLockResolverCounter.WithLabelValues("expired").Inc()
expiredLocks = append(expiredLocks, l)
} else {
metrics.TiKVLockResolverCounter.WithLabelValues("not_expired").Inc()
}
}
if len(expiredLocks) != len(locks) {
log.Errorf("BatchResolveLocks: get %d Locks, but only %d are expired, maybe safe point is wrong!", len(locks), len(expiredLocks))
return false, nil
}
startTime := time.Now()
txnInfos := make(map[uint64]uint64)
for _, l := range expiredLocks {
if _, ok := txnInfos[l.TxnID]; ok {
continue
}
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
if err != nil {
return false, errors.Trace(err)
}
txnInfos[l.TxnID] = uint64(status)
}
log.Infof("BatchResolveLocks: it took %v to lookup %v txn status", time.Since(startTime), len(txnInfos))
var listTxnInfos []*kvrpcpb.TxnInfo
for txnID, status := range txnInfos {
listTxnInfos = append(listTxnInfos, &kvrpcpb.TxnInfo{
Txn: txnID,
Status: status,
})
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdResolveLock,
ResolveLock: &kvrpcpb.ResolveLockRequest{
TxnInfos: listTxnInfos,
},
}
startTime = time.Now()
resp, err := lr.store.SendReq(bo, req, loc, readTimeoutShort)
if err != nil {
return false, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return false, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return false, errors.Trace(err)
}
return false, nil
}
cmdResp := resp.ResolveLock
if cmdResp == nil {
return false, errors.Trace(ErrBodyMissing)
}
if keyErr := cmdResp.GetError(); keyErr != nil {
return false, errors.Errorf("unexpected resolve err: %s", keyErr)
}
log.Infof("BatchResolveLocks: it took %v to resolve %v locks in a batch.", time.Since(startTime), len(expiredLocks))
return true, nil
}
// ResolveLocks tries to resolve Locks. The resolving process is in 3 steps:
// 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks