store/tikv: fix TestZeroMinCommitTS (#16811)
This commit is contained in:
@ -695,6 +695,12 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) {
|
||||
c.Assert(commitTS, Equals, uint64(0))
|
||||
c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed)
|
||||
|
||||
// MaxUint64 as callerStartTS shouldn't update minCommitTS but return Action_MinCommitTSPushed.
|
||||
ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk"), startTS, math.MaxUint64, 666, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(ttl, Equals, uint64(666))
|
||||
c.Assert(commitTS, Equals, uint64(0))
|
||||
c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed)
|
||||
s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+101)
|
||||
|
||||
ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false)
|
||||
|
||||
@ -1162,9 +1162,15 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
|
||||
return 0, 0, kvrpcpb.Action_TTLExpireRollback, nil
|
||||
}
|
||||
|
||||
// If this is a large transaction and the lock is active, push forward the minCommitTS.
|
||||
// lock.minCommitTS == 0 may be a secondary lock, or not a large transaction (old version TiDB).
|
||||
if lock.minCommitTS > 0 {
|
||||
// If the caller_start_ts is MaxUint64, it's a point get in the autocommit transaction.
|
||||
// Even though the MinCommitTs is not pushed, the point get can ingore the lock
|
||||
// next time because it's not committed. So we pretend it has been pushed.
|
||||
if callerStartTS == math.MaxUint64 {
|
||||
action = kvrpcpb.Action_MinCommitTSPushed
|
||||
|
||||
// If this is a large transaction and the lock is active, push forward the minCommitTS.
|
||||
// lock.minCommitTS == 0 may be a secondary lock, or not a large transaction (old version TiDB).
|
||||
} else if lock.minCommitTS > 0 {
|
||||
action = kvrpcpb.Action_MinCommitTSPushed
|
||||
// We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1
|
||||
if lock.minCommitTS < callerStartTS+1 {
|
||||
|
||||
@ -511,7 +511,7 @@ func (s *testLockSuite) TestZeroMinCommitTS(c *C) {
|
||||
|
||||
expire, pushed, err = newLockResolver(s.store).ResolveLocks(bo, math.MaxUint64, []*Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(pushed, HasLen, 0)
|
||||
c.Assert(pushed, HasLen, 1)
|
||||
c.Assert(expire, Greater, int64(0))
|
||||
|
||||
// Clean up this test.
|
||||
|
||||
@ -240,3 +240,37 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {
|
||||
c.Assert(status.IsCommitted(), IsTrue)
|
||||
c.Assert(status.CommitTS(), Greater, txn1.StartTS())
|
||||
}
|
||||
|
||||
func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) {
|
||||
x := kv.Key("x_key_TestPointGetSkipTxnLock")
|
||||
y := kv.Key("y_key_TestPointGetSkipTxnLock")
|
||||
txn := s.beginTxn(c)
|
||||
c.Assert(txn.Set(x, []byte("x")), IsNil)
|
||||
c.Assert(txn.Set(y, []byte("y")), IsNil)
|
||||
ctx := context.Background()
|
||||
bo := NewBackoffer(ctx, PrewriteMaxBackoff)
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
|
||||
c.Assert(err, IsNil)
|
||||
committer.lockTTL = 3000
|
||||
c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil)
|
||||
|
||||
snapshot := newTiKVSnapshot(s.store, kv.MaxVersion, 0)
|
||||
start := time.Now()
|
||||
c.Assert(committer.primary(), BytesEquals, []byte(x))
|
||||
// Point get secondary key. Shouldn't be blocked by the lock and read old data.
|
||||
_, err = snapshot.Get(ctx, y)
|
||||
c.Assert(kv.IsErrNotFound(errors.Trace(err)), IsTrue)
|
||||
c.Assert(time.Since(start), Less, 500*time.Millisecond)
|
||||
|
||||
// Commit the primary key
|
||||
committer.commitTS = txn.StartTS() + 1
|
||||
committer.commitMutations(bo, committer.mutationsOfKeys([][]byte{committer.primary()}))
|
||||
|
||||
snapshot = newTiKVSnapshot(s.store, kv.MaxVersion, 0)
|
||||
start = time.Now()
|
||||
// Point get secondary key. Should read committed data.
|
||||
value, err := snapshot.Get(ctx, y)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(value, BytesEquals, []byte("y"))
|
||||
c.Assert(time.Since(start), Less, 500*time.Millisecond)
|
||||
}
|
||||
|
||||
@ -228,8 +228,10 @@ func (c *checkRequestClient) SendRequest(ctx context.Context, addr string, req *
|
||||
resp, err := c.Client.SendRequest(ctx, addr, req, timeout)
|
||||
if c.priority != req.Priority {
|
||||
if resp.Resp != nil {
|
||||
(resp.Resp.(*pb.GetResponse)).Error = &pb.KeyError{
|
||||
Abort: "request check error",
|
||||
if getResp, ok := resp.Resp.(*pb.GetResponse); ok {
|
||||
getResp.Error = &pb.KeyError{
|
||||
Abort: "request check error",
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user