diff --git a/go.mod b/go.mod index d855de8222..ac464b952e 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 + github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5 github.com/pingcap/pd v1.1.0-beta.0.20190923032047-5c648dc365e0 diff --git a/go.sum b/go.sum index 51470197e9..6e55be6ac3 100644 --- a/go.sum +++ b/go.sum @@ -180,8 +180,13 @@ github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89 github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190822090350-11ea838aedf7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1 h1:J5oimSv+0emw5e/D1ZX/zh2WcMv0pOVT9QKruXfvJbg= -github.com/pingcap/kvproto v0.0.0-20191104103048-40f562012fb1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70 h1:l9VcGUPRHvmM7mkFHo4JqxZeCvioRuL1/4tFUQcs6jQ= +github.com/pingcap/kvproto v0.0.0-20191113075618-7ce83b774d70/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191113115126-45e0702fff1e/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f h1:CJ1IdT7bPbIvyq2Of9VhC/fhEGh6+0ItdT1dPBv7x7I= +github.com/pingcap/kvproto v0.0.0-20191118030148-ec389ef1b41f/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0 h1:CHOC95Ct4abJ9EdmWqzpUxV+bgjB4lOxd3AFxqgoyzQ= +github.com/pingcap/kvproto v0.0.0-20191118050206-47672e7eabc0/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index fa07665438..7b1904e531 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -661,40 +661,52 @@ func (s *testMVCCLevelDB) TestErrors(c *C) { } func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { - s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", 5, 666) + startTS := uint64(5 << 18) + s.mustPrewriteWithTTLOK(c, putMutations("pk", "val"), "pk", startTS, 666) - ttl, commitTS, err := s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false) + ttl, commitTS, action, err := s.store.CheckTxnStatus([]byte("pk"), startTS, startTS+100, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(666)) c.Assert(commitTS, Equals, uint64(0)) + c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed) - s.mustCommitOK(c, [][]byte{[]byte("pk")}, 5, 30) + s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+101) - ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk"), 5, 0, 666, false) + ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) - c.Assert(commitTS, Equals, uint64(30)) + c.Assert(commitTS, Equals, uint64(startTS+101)) - s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", 5, 666) - s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, 5) + s.mustPrewriteWithTTLOK(c, putMutations("pk1", "val"), "pk1", startTS, 666) + s.mustRollbackOK(c, [][]byte{[]byte("pk1")}, startTS) - ttl, commitTS, err = s.store.CheckTxnStatus([]byte("pk1"), 5, 0, 666, false) + ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk1"), startTS, 0, 666, false) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) c.Assert(commitTS, Equals, uint64(0)) + c.Assert(action, Equals, kvrpcpb.Action_NoAction) + + s.mustPrewriteWithTTLOK(c, putMutations("pk2", "val"), "pk2", startTS, 666) + currentTS := uint64(777 << 18) + ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk2"), startTS, 0, currentTS, false) + c.Assert(err, IsNil) + c.Assert(ttl, Equals, uint64(0)) + c.Assert(commitTS, Equals, uint64(0)) + c.Assert(action, Equals, kvrpcpb.Action_TTLExpireRollback) // Cover the TxnNotFound case. - _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false) + _, _, _, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, false) c.Assert(err, NotNil) notFound, ok := errors.Cause(err).(*ErrTxnNotFound) c.Assert(ok, IsTrue) c.Assert(notFound.StartTs, Equals, uint64(5)) c.Assert(string(notFound.PrimaryKey), Equals, "txnNotFound") - ttl, commitTS, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true) + ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("txnNotFound"), 5, 0, 666, true) c.Assert(err, IsNil) c.Assert(ttl, Equals, uint64(0)) c.Assert(commitTS, Equals, uint64(0)) + c.Assert(action, Equals, kvrpcpb.Action_LockNotExistRollback) // Check the rollback tombstone blocks this prewrite which comes with a smaller startTS. req := &kvrpcpb.PrewriteRequest{ @@ -710,7 +722,7 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { func (s *testMVCCLevelDB) TestRejectCommitTS(c *C) { s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5) // Push the minCommitTS - _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false) + _, _, _, err := s.store.CheckTxnStatus([]byte("x"), 5, 100, 100, false) c.Assert(err, IsNil) err = s.store.Commit([][]byte{[]byte("x")}, 5, 10) e, ok := errors.Cause(err).(*ErrCommitTSExpired) diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index d5067adbdc..759f7a7f12 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -268,7 +268,7 @@ type MVCCStore interface { BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error GC(startKey, endKey []byte, safePoint uint64) error DeleteRange(startKey, endKey []byte) error - CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (ttl, commitTS uint64, err error) + CheckTxnStatus(primaryKey []byte, lockTS uint64, startTS, currentTS uint64, rollbackIfNotFound bool) (uint64, uint64, kvrpcpb.Action, error) Close() error } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 7f8349dd57..2e6dd599b9 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -1032,10 +1032,13 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { // primaryKey + lockTS together could locate the primary lock. // callerStartTS is the start ts of reader transaction. // currentTS is the current ts, but it may be inaccurate. Just use it to check TTL. -func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (uint64, uint64, error) { +func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS, currentTS uint64, + rollbackIfNotExist bool) (ttl uint64, commitTS uint64, action kvrpcpb.Action, err error) { mvcc.mu.Lock() defer mvcc.mu.Unlock() + action = kvrpcpb.Action_NoAction + startKey := mvccEncode(primaryKey, lockVer) iter := newIterator(mvcc.db, &util.Range{ Start: startKey, @@ -1046,9 +1049,11 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS dec := lockDecoder{ expectKey: primaryKey, } - ok, err := dec.Decode(iter) + var ok bool + ok, err = dec.Decode(iter) if err != nil { - return 0, 0, errors.Trace(err) + err = errors.Trace(err) + return } // If current transaction's lock exists. if ok && dec.lock.startTS == lockTS { @@ -1058,17 +1063,20 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS // If the lock has already outdated, clean up it. if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { if err = rollbackLock(batch, primaryKey, lockTS); err != nil { - return 0, 0, errors.Trace(err) + err = errors.Trace(err) + return } if err = mvcc.db.Write(batch, nil); err != nil { - return 0, 0, errors.Trace(err) + err = errors.Trace(err) + return } - return 0, 0, nil + return 0, 0, kvrpcpb.Action_TTLExpireRollback, nil } // If this is a large transaction and the lock is active, push forward the minCommitTS. - // lock.minCommitTS == 0 may be a secondary lock, or not a large transaction. + // lock.minCommitTS == 0 may be a secondary lock, or not a large transaction (old version TiDB). if lock.minCommitTS > 0 { + action = kvrpcpb.Action_MinCommitTSPushed // We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1 if lock.minCommitTS < callerStartTS+1 { lock.minCommitTS = callerStartTS + 1 @@ -1081,33 +1089,36 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS } writeKey := mvccEncode(primaryKey, lockVer) - writeValue, err := lock.MarshalBinary() - if err != nil { - return 0, 0, errors.Trace(err) + writeValue, err1 := lock.MarshalBinary() + if err1 != nil { + err = errors.Trace(err1) + return } batch.Put(writeKey, writeValue) - if err = mvcc.db.Write(batch, nil); err != nil { - return 0, 0, errors.Trace(err) + if err1 = mvcc.db.Write(batch, nil); err1 != nil { + err = errors.Trace(err1) + return } } } - return lock.ttl, 0, nil + return lock.ttl, 0, action, nil } // If current transaction's lock does not exist. // If the commit info of the current transaction exists. - c, ok, err := getTxnCommitInfo(iter, primaryKey, lockTS) - if err != nil { - return 0, 0, errors.Trace(err) + c, ok, err1 := getTxnCommitInfo(iter, primaryKey, lockTS) + if err1 != nil { + err = errors.Trace(err1) + return } if ok { // If current transaction is already committed. if c.valueType != typeRollback { - return 0, c.commitTS, nil + return 0, c.commitTS, action, nil } // If current transaction is already rollback. - return 0, 0, nil + return 0, 0, kvrpcpb.Action_NoAction, nil } } @@ -1120,16 +1131,18 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS if rollbackIfNotExist { batch := &leveldb.Batch{} - if err := rollbackLock(batch, primaryKey, lockTS); err != nil { - return 0, 0, errors.Trace(err) + if err1 := rollbackLock(batch, primaryKey, lockTS); err1 != nil { + err = errors.Trace(err1) + return } - if err := mvcc.db.Write(batch, nil); err != nil { - return 0, 0, errors.Trace(err) + if err1 := mvcc.db.Write(batch, nil); err1 != nil { + err = errors.Trace(err1) + return } - return 0, 0, nil + return 0, 0, kvrpcpb.Action_LockNotExistRollback, nil } - return 0, 0, &ErrTxnNotFound{kvrpcpb.TxnNotFound{ + return 0, 0, action, &ErrTxnNotFound{kvrpcpb.TxnNotFound{ StartTs: lockTS, PrimaryKey: primaryKey, }} diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index c87fb76b23..f10fef665c 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -387,11 +387,11 @@ func (h *rpcHandler) handleKvCheckTxnStatus(req *kvrpcpb.CheckTxnStatusRequest) panic("KvCheckTxnStatus: key not in region") } var resp kvrpcpb.CheckTxnStatusResponse - ttl, commitTS, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist()) + ttl, commitTS, action, err := h.mvccStore.CheckTxnStatus(req.GetPrimaryKey(), req.GetLockTs(), req.GetCallerStartTs(), req.GetCurrentTs(), req.GetRollbackIfNotExist()) if err != nil { resp.Error = convertToKeyError(err) } else { - resp.LockTtl, resp.CommitVersion = ttl, commitTS + resp.LockTtl, resp.CommitVersion, resp.Action = ttl, commitTS, action } return &resp } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 27cfbd398b..dc09d0c5bc 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -109,6 +109,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve type TxnStatus struct { ttl uint64 commitTS uint64 + action kvrpcpb.Action } // IsCommitted returns true if the txn's final status is Commit. @@ -397,7 +398,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart } if l.LockType == kvrpcpb.Op_PessimisticLock { - return TxnStatus{l.TTL, 0}, nil + return TxnStatus{ttl: l.TTL}, nil } // Handle txnNotFound error. @@ -482,6 +483,7 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte logutil.BgLogger().Error("getTxnStatus error", zap.Error(err)) return status, err } + status.action = cmdResp.Action if cmdResp.LockTtl != 0 { status.ttl = cmdResp.LockTtl } else { diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 245695272b..c998f860ad 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -201,7 +201,7 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) { status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) - c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(status.ttl, Greater, uint64(0), Commentf("action:%s", status.action)) } func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { @@ -234,6 +234,7 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) + c.Assert(status.action, Equals, kvrpcpb.Action_NoAction) // Check a committed txn. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) @@ -279,6 +280,8 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { oracle := s.store.GetOracle() currentTS, err := oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) + c.Assert(currentTS, Greater, txn.StartTS()) + bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) resolver := newLockResolver(s.store) // Call getTxnStatus to check the lock status. @@ -287,6 +290,9 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { c.Assert(status.IsCommitted(), IsFalse) c.Assert(status.ttl, Greater, uint64(0)) c.Assert(status.CommitTS(), Equals, uint64(0)) + // TODO: It should be Action_MinCommitTSPushed if minCommitTS is set in the Prewrite request. + // Update here to kvrpcpb.Action_MinCommitTSPushed in the next PR. + c.Assert(status.action, Equals, kvrpcpb.Action_NoAction) // Test the ResolveLocks API lock := s.mustGetLock(c, []byte("second")) @@ -303,10 +309,11 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { // Then call getTxnStatus again and check the lock status. currentTS, err = oracle.GetTimestamp(context.Background()) c.Assert(err, IsNil) - status, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true) + status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true) c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) + c.Assert(status.action, Equals, kvrpcpb.Action_NoAction) // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) @@ -366,7 +373,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { c.Assert(err, IsNil) lock = &Lock{ Key: []byte("second"), - Primary: []byte("key"), + Primary: []byte("key_not_exist"), TxnID: startTS, TTL: 1000, } @@ -374,6 +381,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { c.Assert(err, IsNil) c.Assert(status.ttl, Equals, uint64(0)) c.Assert(status.commitTS, Equals, uint64(0)) + c.Assert(status.action, Equals, kvrpcpb.Action_LockNotExistRollback) } func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {