store/tikv: remove Pessimistic option (#24332)

This commit is contained in:
disksing
2021-04-28 21:21:56 +08:00
committed by GitHub
parent 349221142b
commit 91ca4eafdc
5 changed files with 36 additions and 27 deletions

View File

@ -132,6 +132,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
case tikvstore.IsolationLevel:
level := getTiKVIsolationLevel(val.(kv.IsoLevel))
txn.KVTxn.GetSnapshot().SetIsolationLevel(level)
case tikvstore.Pessimistic:
txn.SetPessimistic(val.(bool))
default:
txn.KVTxn.SetOption(opt, val)
}

View File

@ -617,7 +617,7 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
// This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock.
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
err := txn.Set([]byte("t1"), []byte("v1"))
c.Assert(err, IsNil)
committer, err := txn.NewCommitter(0)
@ -636,7 +636,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
c.Assert(txn.Commit(context.Background()), IsNil)
txn = s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
_, _ = txn.GetUnionStore().Get(context.TODO(), key)
c.Assert(txn.GetMemBuffer().SetWithFlags(key, key, kv.SetPresumeKeyNotExists), IsNil)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
@ -651,7 +651,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: 100, WaitStartTime: time.Now()}
err := txn.LockKeys(context.Background(), lockCtx, []byte("abc"), []byte("def"))
c.Assert(err, IsNil)
@ -664,7 +664,7 @@ func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
key := []byte("key")
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
time.Sleep(time.Millisecond * 100)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
err := txn.LockKeys(context.Background(), lockCtx, key)
@ -710,7 +710,7 @@ func (s *testCommitterSuite) TestPessimisticLockReturnValues(c *C) {
c.Assert(txn.Set(key2, key2), IsNil)
c.Assert(txn.Commit(context.Background()), IsNil)
txn = s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockCtx.ReturnValues = true
lockCtx.Values = map[string]kv.ReturnedValue{}
@ -725,7 +725,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
key := []byte("key")
txn := s.begin(c)
txn.SetStartTS(oracle.ComposeTS(oracle.GetPhysical(time.Now().Add(time.Second*10)), 1))
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
time.Sleep(time.Millisecond * 100)
lockCtx := &kv.LockCtx{
ForUpdateTS: oracle.ComposeTS(oracle.ExtractPhysical(txn.StartTS())+100, 1),
@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {
// insert k1, k2, k3 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetPessimistic(false)
s.store.ClearTxnLatches()
txn1.Get(context.Background(), k1)
txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists)
@ -771,7 +771,7 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {
// start txn2 to read k3(prewrite success and primary should be committed)
txn2 := s.begin(c)
txn2.DelOption(kv.Pessimistic)
txn2.SetPessimistic(false)
s.store.ClearTxnLatches()
v, err := txn2.Get(context.Background(), k3)
c.Assert(err, IsNil) // should resolve lock and read txn1 k3 result instead of rollback it.
@ -788,7 +788,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWrites(c *C) {
// insert k1, k2, k3 and delete k1, k2, k3
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetPessimistic(false)
s.store.ClearTxnLatches()
txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists)
txn1.Delete(k1)
@ -808,7 +808,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU(c *C) {
// insert k1, k2, k2 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetPessimistic(false)
s.store.ClearTxnLatches()
txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists)
txn1.Delete(k1)
@ -832,7 +832,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU(c *C) {
<-ac
// start txn2 to read k3
txn2 := s.begin(c)
txn2.DelOption(kv.Pessimistic)
txn2.SetPessimistic(false)
s.store.ClearTxnLatches()
err = txn2.Set(k3, []byte{33})
c.Assert(err, IsNil)
@ -860,7 +860,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
k2 := []byte("k2")
txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// lock the primary key
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, k1)
@ -875,7 +875,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
// wait until secondary key exceeds its own TTL
time.Sleep(time.Duration(atomic.LoadUint64(&tikv.ManagedLockTTL)) * time.Millisecond)
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)
// test no wait
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), LockWaitTime: tikv.LockNoWait, WaitStartTime: time.Now()}
@ -921,7 +921,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
k3 := []byte("k3")
txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// lock the primary key.
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(ctx, lockCtx, k1)
@ -957,7 +957,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
c.Assert(err, IsNil)
c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistDoNothing)
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()}
err = txn2.LockKeys(ctx, lockCtx, k2)
c.Assert(err, IsNil)
@ -981,7 +981,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
// After disable fail point, the rollbackIfNotExist flag will be set, and the resolve should succeed. In this
// case, the returned action of TxnStatus should be LockNotExistDoNothing, and lock on k3 could be resolved.
txn3 := s.begin(c)
txn3.SetOption(kv.Pessimistic, true)
txn3.SetPessimistic(true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
err = txn3.LockKeys(ctx, lockCtx, k3)
c.Assert(err, IsNil)
@ -997,7 +997,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {
k2 := []byte("b")
txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// txn1 lock k1
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, k1)
@ -1008,7 +1008,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {
doneCh := make(chan error)
go func() {
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)
lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: 200}
waitErr := txn2.LockKeys(context.Background(), lockCtx2, k1, k2)
doneCh <- waitErr
@ -1017,7 +1017,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {
// txn3 should locks k2 successfully using no wait
txn3 := s.begin(c)
txn3.SetOption(kv.Pessimistic, true)
txn3.SetPessimistic(true)
lockCtx3 := &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/txnNotFoundRetTTL", "return"), IsNil)
err = txn3.LockKeys(context.Background(), lockCtx3, k2)
@ -1108,7 +1108,7 @@ func (s *testCommitterSuite) TestPushPessimisticLock(c *C) {
ctx := context.Background()
txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, k1, k2)
c.Assert(err, IsNil)
@ -1160,7 +1160,7 @@ func (s *testCommitterSuite) TestResolveMixed(c *C) {
// make the optimistic and pessimistic lock left with primary lock not found
txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// lock the primary key
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, pk)
@ -1202,7 +1202,7 @@ func (s *testCommitterSuite) TestResolveMixed(c *C) {
// txn2 tries to lock the pessimisticLockKey, the lock should has been resolved in clean whole region resolve
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
err = txn2.LockKeys(context.Background(), lockCtx, pessimisticLockKey)
c.Assert(err, IsNil)

View File

@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead(c *C) {
sessionID++
ctx := context.WithValue(context.Background(), util.SessionID, sessionID)
txn1 := s.beginAsyncCommit(c)
txn1.SetOption(kv.Pessimistic, isPessimistic)
txn1.SetPessimistic(isPessimistic)
s.mustGetFromTxn(c, txn1, []byte("k1"), []byte("v1"))
txn1.Set([]byte("k1"), []byte("v2"))

View File

@ -64,7 +64,7 @@ func (s *testTiclientSuite) TestSplitRegionIn2PC(c *C) {
checkKeyRegion(bo, startKey, endKey, Equals)
txn := s.beginTxn(c)
if m == "pessimistic" {
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
lockCtx := &kv.LockCtx{}
lockCtx.ForUpdateTS = txn.StartTS()
keys := make([][]byte, 0, preSplitThresholdInTest)

View File

@ -73,8 +73,10 @@ type KVTxn struct {
schemaAmender SchemaAmender
// commitCallback is called after current transaction gets committed
commitCallback func(info string, err error)
binlog BinlogExecutor
kvFilter KVFilter
binlog BinlogExecutor
isPessimistic bool
kvFilter KVFilter
}
func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) {
@ -198,6 +200,11 @@ func (txn *KVTxn) DelOption(opt int) {
txn.us.DelOption(opt)
}
// SetPessimistic indicates if the transaction should use pessimictic lock.
func (txn *KVTxn) SetPessimistic(b bool) {
txn.isPessimistic = b
}
// SetKVFilter sets the filter to ignore key-values in memory buffer.
func (txn *KVTxn) SetKVFilter(filter KVFilter) {
txn.kvFilter = filter
@ -205,7 +212,7 @@ func (txn *KVTxn) SetKVFilter(filter KVFilter) {
// IsPessimistic returns true if it is pessimistic.
func (txn *KVTxn) IsPessimistic() bool {
return txn.us.GetOption(kv.Pessimistic) != nil
return txn.isPessimistic
}
// Commit commits the transaction operations to KV store.