diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 8992866877..66413e6bde 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -132,6 +132,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { case tikvstore.IsolationLevel: level := getTiKVIsolationLevel(val.(kv.IsoLevel)) txn.KVTxn.GetSnapshot().SetIsolationLevel(level) + case tikvstore.Pessimistic: + txn.SetPessimistic(val.(bool)) default: txn.KVTxn.SetOption(opt, val) } diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 60af78db11..e70131a235 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -617,7 +617,7 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) { // This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock. txn := s.begin(c) - txn.SetOption(kv.Pessimistic, true) + txn.SetPessimistic(true) err := txn.Set([]byte("t1"), []byte("v1")) c.Assert(err, IsNil) committer, err := txn.NewCommitter(0) @@ -636,7 +636,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { c.Assert(txn.Commit(context.Background()), IsNil) txn = s.begin(c) - txn.SetOption(kv.Pessimistic, true) + txn.SetPessimistic(true) _, _ = txn.GetUnionStore().Get(context.TODO(), key) c.Assert(txn.GetMemBuffer().SetWithFlags(key, key, kv.SetPresumeKeyNotExists), IsNil) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} @@ -651,7 +651,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { txn := s.begin(c) - txn.SetOption(kv.Pessimistic, true) + txn.SetPessimistic(true) lockCtx := &kv.LockCtx{ForUpdateTS: 100, WaitStartTime: time.Now()} err := txn.LockKeys(context.Background(), lockCtx, []byte("abc"), []byte("def")) c.Assert(err, IsNil) @@ -664,7 +664,7 @@ func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { func (s *testCommitterSuite) TestPessimisticTTL(c *C) { key := []byte("key") txn := s.begin(c) - txn.SetOption(kv.Pessimistic, true) + txn.SetPessimistic(true) time.Sleep(time.Millisecond * 100) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} err := txn.LockKeys(context.Background(), lockCtx, key) @@ -710,7 +710,7 @@ func (s *testCommitterSuite) TestPessimisticLockReturnValues(c *C) { c.Assert(txn.Set(key2, key2), IsNil) c.Assert(txn.Commit(context.Background()), IsNil) txn = s.begin(c) - txn.SetOption(kv.Pessimistic, true) + txn.SetPessimistic(true) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} lockCtx.ReturnValues = true lockCtx.Values = map[string]kv.ReturnedValue{} @@ -725,7 +725,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { key := []byte("key") txn := s.begin(c) txn.SetStartTS(oracle.ComposeTS(oracle.GetPhysical(time.Now().Add(time.Second*10)), 1)) - txn.SetOption(kv.Pessimistic, true) + txn.SetPessimistic(true) time.Sleep(time.Millisecond * 100) lockCtx := &kv.LockCtx{ ForUpdateTS: oracle.ComposeTS(oracle.ExtractPhysical(txn.StartTS())+100, 1), @@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) { // insert k1, k2, k3 and delete k1 txn1 := s.begin(c) - txn1.DelOption(kv.Pessimistic) + txn1.SetPessimistic(false) s.store.ClearTxnLatches() txn1.Get(context.Background(), k1) txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists) @@ -771,7 +771,7 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) { // start txn2 to read k3(prewrite success and primary should be committed) txn2 := s.begin(c) - txn2.DelOption(kv.Pessimistic) + txn2.SetPessimistic(false) s.store.ClearTxnLatches() v, err := txn2.Get(context.Background(), k3) c.Assert(err, IsNil) // should resolve lock and read txn1 k3 result instead of rollback it. @@ -788,7 +788,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWrites(c *C) { // insert k1, k2, k3 and delete k1, k2, k3 txn1 := s.begin(c) - txn1.DelOption(kv.Pessimistic) + txn1.SetPessimistic(false) s.store.ClearTxnLatches() txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists) txn1.Delete(k1) @@ -808,7 +808,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU(c *C) { // insert k1, k2, k2 and delete k1 txn1 := s.begin(c) - txn1.DelOption(kv.Pessimistic) + txn1.SetPessimistic(false) s.store.ClearTxnLatches() txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists) txn1.Delete(k1) @@ -832,7 +832,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU(c *C) { <-ac // start txn2 to read k3 txn2 := s.begin(c) - txn2.DelOption(kv.Pessimistic) + txn2.SetPessimistic(false) s.store.ClearTxnLatches() err = txn2.Set(k3, []byte{33}) c.Assert(err, IsNil) @@ -860,7 +860,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) { k2 := []byte("k2") txn1 := s.begin(c) - txn1.SetOption(kv.Pessimistic, true) + txn1.SetPessimistic(true) // lock the primary key lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()} err := txn1.LockKeys(context.Background(), lockCtx, k1) @@ -875,7 +875,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) { // wait until secondary key exceeds its own TTL time.Sleep(time.Duration(atomic.LoadUint64(&tikv.ManagedLockTTL)) * time.Millisecond) txn2 := s.begin(c) - txn2.SetOption(kv.Pessimistic, true) + txn2.SetPessimistic(true) // test no wait lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), LockWaitTime: tikv.LockNoWait, WaitStartTime: time.Now()} @@ -921,7 +921,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) { k3 := []byte("k3") txn1 := s.begin(c) - txn1.SetOption(kv.Pessimistic, true) + txn1.SetPessimistic(true) // lock the primary key. lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()} err := txn1.LockKeys(ctx, lockCtx, k1) @@ -957,7 +957,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) { c.Assert(err, IsNil) c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistDoNothing) txn2 := s.begin(c) - txn2.SetOption(kv.Pessimistic, true) + txn2.SetPessimistic(true) lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()} err = txn2.LockKeys(ctx, lockCtx, k2) c.Assert(err, IsNil) @@ -981,7 +981,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) { // After disable fail point, the rollbackIfNotExist flag will be set, and the resolve should succeed. In this // case, the returned action of TxnStatus should be LockNotExistDoNothing, and lock on k3 could be resolved. txn3 := s.begin(c) - txn3.SetOption(kv.Pessimistic, true) + txn3.SetPessimistic(true) lockCtx = &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait} err = txn3.LockKeys(ctx, lockCtx, k3) c.Assert(err, IsNil) @@ -997,7 +997,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) { k2 := []byte("b") txn1 := s.begin(c) - txn1.SetOption(kv.Pessimistic, true) + txn1.SetPessimistic(true) // txn1 lock k1 lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()} err := txn1.LockKeys(context.Background(), lockCtx, k1) @@ -1008,7 +1008,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) { doneCh := make(chan error) go func() { txn2 := s.begin(c) - txn2.SetOption(kv.Pessimistic, true) + txn2.SetPessimistic(true) lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: 200} waitErr := txn2.LockKeys(context.Background(), lockCtx2, k1, k2) doneCh <- waitErr @@ -1017,7 +1017,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) { // txn3 should locks k2 successfully using no wait txn3 := s.begin(c) - txn3.SetOption(kv.Pessimistic, true) + txn3.SetPessimistic(true) lockCtx3 := &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait} c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/txnNotFoundRetTTL", "return"), IsNil) err = txn3.LockKeys(context.Background(), lockCtx3, k2) @@ -1108,7 +1108,7 @@ func (s *testCommitterSuite) TestPushPessimisticLock(c *C) { ctx := context.Background() txn1 := s.begin(c) - txn1.SetOption(kv.Pessimistic, true) + txn1.SetPessimistic(true) lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()} err := txn1.LockKeys(context.Background(), lockCtx, k1, k2) c.Assert(err, IsNil) @@ -1160,7 +1160,7 @@ func (s *testCommitterSuite) TestResolveMixed(c *C) { // make the optimistic and pessimistic lock left with primary lock not found txn1 := s.begin(c) - txn1.SetOption(kv.Pessimistic, true) + txn1.SetPessimistic(true) // lock the primary key lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()} err := txn1.LockKeys(context.Background(), lockCtx, pk) @@ -1202,7 +1202,7 @@ func (s *testCommitterSuite) TestResolveMixed(c *C) { // txn2 tries to lock the pessimisticLockKey, the lock should has been resolved in clean whole region resolve txn2 := s.begin(c) - txn2.SetOption(kv.Pessimistic, true) + txn2.SetPessimistic(true) lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait} err = txn2.LockKeys(context.Background(), lockCtx, pessimisticLockKey) c.Assert(err, IsNil) diff --git a/store/tikv/tests/async_commit_test.go b/store/tikv/tests/async_commit_test.go index 96a35085f4..99e82add2e 100644 --- a/store/tikv/tests/async_commit_test.go +++ b/store/tikv/tests/async_commit_test.go @@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead(c *C) { sessionID++ ctx := context.WithValue(context.Background(), util.SessionID, sessionID) txn1 := s.beginAsyncCommit(c) - txn1.SetOption(kv.Pessimistic, isPessimistic) + txn1.SetPessimistic(isPessimistic) s.mustGetFromTxn(c, txn1, []byte("k1"), []byte("v1")) txn1.Set([]byte("k1"), []byte("v2")) diff --git a/store/tikv/tests/ticlient_slow_test.go b/store/tikv/tests/ticlient_slow_test.go index b37245e722..61f6748874 100644 --- a/store/tikv/tests/ticlient_slow_test.go +++ b/store/tikv/tests/ticlient_slow_test.go @@ -64,7 +64,7 @@ func (s *testTiclientSuite) TestSplitRegionIn2PC(c *C) { checkKeyRegion(bo, startKey, endKey, Equals) txn := s.beginTxn(c) if m == "pessimistic" { - txn.SetOption(kv.Pessimistic, true) + txn.SetPessimistic(true) lockCtx := &kv.LockCtx{} lockCtx.ForUpdateTS = txn.StartTS() keys := make([][]byte, 0, preSplitThresholdInTest) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 3ef4ad545e..d1eae7c6e2 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -73,8 +73,10 @@ type KVTxn struct { schemaAmender SchemaAmender // commitCallback is called after current transaction gets committed commitCallback func(info string, err error) - binlog BinlogExecutor - kvFilter KVFilter + + binlog BinlogExecutor + isPessimistic bool + kvFilter KVFilter } func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) { @@ -198,6 +200,11 @@ func (txn *KVTxn) DelOption(opt int) { txn.us.DelOption(opt) } +// SetPessimistic indicates if the transaction should use pessimictic lock. +func (txn *KVTxn) SetPessimistic(b bool) { + txn.isPessimistic = b +} + // SetKVFilter sets the filter to ignore key-values in memory buffer. func (txn *KVTxn) SetKVFilter(filter KVFilter) { txn.kvFilter = filter @@ -205,7 +212,7 @@ func (txn *KVTxn) SetKVFilter(filter KVFilter) { // IsPessimistic returns true if it is pessimistic. func (txn *KVTxn) IsPessimistic() bool { - return txn.us.GetOption(kv.Pessimistic) != nil + return txn.isPessimistic } // Commit commits the transaction operations to KV store.