From 5ede18f10eedfe2e3690d7728dec3ffa4b0af2d5 Mon Sep 17 00:00:00 2001 From: Lei Zhao Date: Fri, 24 Apr 2020 23:42:52 +0800 Subject: [PATCH] store/tikv: fix TestZeroMinCommitTS (#16811) --- store/mockstore/mocktikv/mock_tikv_test.go | 6 ++++ store/mockstore/mocktikv/mvcc_leveldb.go | 12 ++++++-- store/tikv/lock_test.go | 2 +- store/tikv/snapshot_test.go | 34 ++++++++++++++++++++++ store/tikv/store_test.go | 6 ++-- 5 files changed, 54 insertions(+), 6 deletions(-) diff --git a/store/mockstore/mocktikv/mock_tikv_test.go b/store/mockstore/mocktikv/mock_tikv_test.go index de06798c74..5444c8ddcc 100644 --- a/store/mockstore/mocktikv/mock_tikv_test.go +++ b/store/mockstore/mocktikv/mock_tikv_test.go @@ -695,6 +695,12 @@ func (s *testMVCCLevelDB) TestCheckTxnStatus(c *C) { c.Assert(commitTS, Equals, uint64(0)) c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed) + // MaxUint64 as callerStartTS shouldn't update minCommitTS but return Action_MinCommitTSPushed. + ttl, commitTS, action, err = s.store.CheckTxnStatus([]byte("pk"), startTS, math.MaxUint64, 666, false) + c.Assert(err, IsNil) + c.Assert(ttl, Equals, uint64(666)) + c.Assert(commitTS, Equals, uint64(0)) + c.Assert(action, Equals, kvrpcpb.Action_MinCommitTSPushed) s.mustCommitOK(c, [][]byte{[]byte("pk")}, startTS, startTS+101) ttl, commitTS, _, err = s.store.CheckTxnStatus([]byte("pk"), startTS, 0, 666, false) diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index ffe30f9906..e1f191009f 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -1162,9 +1162,15 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS return 0, 0, kvrpcpb.Action_TTLExpireRollback, nil } - // If this is a large transaction and the lock is active, push forward the minCommitTS. - // lock.minCommitTS == 0 may be a secondary lock, or not a large transaction (old version TiDB). - if lock.minCommitTS > 0 { + // If the caller_start_ts is MaxUint64, it's a point get in the autocommit transaction. + // Even though the MinCommitTs is not pushed, the point get can ingore the lock + // next time because it's not committed. So we pretend it has been pushed. + if callerStartTS == math.MaxUint64 { + action = kvrpcpb.Action_MinCommitTSPushed + + // If this is a large transaction and the lock is active, push forward the minCommitTS. + // lock.minCommitTS == 0 may be a secondary lock, or not a large transaction (old version TiDB). + } else if lock.minCommitTS > 0 { action = kvrpcpb.Action_MinCommitTSPushed // We *must* guarantee the invariance lock.minCommitTS >= callerStartTS + 1 if lock.minCommitTS < callerStartTS+1 { diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 3fce04de2e..8160aa652a 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -511,7 +511,7 @@ func (s *testLockSuite) TestZeroMinCommitTS(c *C) { expire, pushed, err = newLockResolver(s.store).ResolveLocks(bo, math.MaxUint64, []*Lock{lock}) c.Assert(err, IsNil) - c.Assert(pushed, HasLen, 0) + c.Assert(pushed, HasLen, 1) c.Assert(expire, Greater, int64(0)) // Clean up this test. diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 67514e84c1..b2273f86f3 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -240,3 +240,37 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) { c.Assert(status.IsCommitted(), IsTrue) c.Assert(status.CommitTS(), Greater, txn1.StartTS()) } + +func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) { + x := kv.Key("x_key_TestPointGetSkipTxnLock") + y := kv.Key("y_key_TestPointGetSkipTxnLock") + txn := s.beginTxn(c) + c.Assert(txn.Set(x, []byte("x")), IsNil) + c.Assert(txn.Set(y, []byte("y")), IsNil) + ctx := context.Background() + bo := NewBackoffer(ctx, PrewriteMaxBackoff) + committer, err := newTwoPhaseCommitterWithInit(txn, 0) + c.Assert(err, IsNil) + committer.lockTTL = 3000 + c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil) + + snapshot := newTiKVSnapshot(s.store, kv.MaxVersion, 0) + start := time.Now() + c.Assert(committer.primary(), BytesEquals, []byte(x)) + // Point get secondary key. Shouldn't be blocked by the lock and read old data. + _, err = snapshot.Get(ctx, y) + c.Assert(kv.IsErrNotFound(errors.Trace(err)), IsTrue) + c.Assert(time.Since(start), Less, 500*time.Millisecond) + + // Commit the primary key + committer.commitTS = txn.StartTS() + 1 + committer.commitMutations(bo, committer.mutationsOfKeys([][]byte{committer.primary()})) + + snapshot = newTiKVSnapshot(s.store, kv.MaxVersion, 0) + start = time.Now() + // Point get secondary key. Should read committed data. + value, err := snapshot.Get(ctx, y) + c.Assert(err, IsNil) + c.Assert(value, BytesEquals, []byte("y")) + c.Assert(time.Since(start), Less, 500*time.Millisecond) +} diff --git a/store/tikv/store_test.go b/store/tikv/store_test.go index 8c8f0d58a6..30a0745ebd 100644 --- a/store/tikv/store_test.go +++ b/store/tikv/store_test.go @@ -228,8 +228,10 @@ func (c *checkRequestClient) SendRequest(ctx context.Context, addr string, req * resp, err := c.Client.SendRequest(ctx, addr, req, timeout) if c.priority != req.Priority { if resp.Resp != nil { - (resp.Resp.(*pb.GetResponse)).Error = &pb.KeyError{ - Abort: "request check error", + if getResp, ok := resp.Resp.(*pb.GetResponse); ok { + getResp.Error = &pb.KeyError{ + Abort: "request check error", + } } } }