diff --git a/store/tikv/lock_test.go b/store/tikv/tests/lock_test.go similarity index 64% rename from store/tikv/lock_test.go rename to store/tikv/tests/lock_test.go index 6e8f98708d..2a610f55d4 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/tests/lock_test.go @@ -11,38 +11,37 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package tikv_test import ( "bytes" "context" "fmt" "math" - "math/rand" "runtime" - "strings" - "sync/atomic" "time" . "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) +var getMaxBackoff = tikv.ConfigProbe{}.GetGetMaxBackoff() + type testLockSuite struct { OneByOneSuite - store *KVStore + store tikv.StoreProbe } var _ = Suite(&testLockSuite{}) func (s *testLockSuite) SetUpTest(c *C) { - s.store = NewTestStore(c) + s.store = tikv.StoreProbe{KVStore: NewTestStore(c)} } func (s *testLockSuite) TearDownTest(c *C) { @@ -50,7 +49,7 @@ func (s *testLockSuite) TearDownTest(c *C) { } func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { - txn, err := newTiKVTxn(s.store, oracle.GlobalTxnScope) + txn, err := s.store.Begin() c.Assert(err, IsNil) if len(value) > 0 { err = txn.Set(key, value) @@ -65,21 +64,22 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt err = txn.Delete(primaryKey) } c.Assert(err, IsNil) - tpc, err := newTwoPhaseCommitterWithInit(txn, 0) + tpc, err := txn.NewCommitter(0) c.Assert(err, IsNil) - tpc.primaryKey = primaryKey + tpc.SetPrimaryKey(primaryKey) ctx := context.Background() - err = tpc.prewriteMutations(NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil), tpc.mutations) + err = tpc.PrewriteAllMutations(ctx) c.Assert(err, IsNil) if commitPrimary { - tpc.commitTS, err = s.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(err, IsNil) - err = tpc.commitMutations(NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), nil), tpc.mutationsOfKeys([][]byte{primaryKey})) + tpc.SetCommitTS(commitTS) + err = tpc.CommitMutations(ctx) c.Assert(err, IsNil) } - return txn.startTS, tpc.commitTS + return txn.StartTS(), tpc.GetCommitTS() } func (s *testLockSuite) putAlphabets(c *C) { @@ -95,7 +95,7 @@ func (s *testLockSuite) putKV(c *C, key, value []byte) (uint64, uint64) { c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) - return txn.StartTS(), txn.commitTS + return txn.StartTS(), txn.GetCommitTS() } func (s *testLockSuite) prepareAlphabetLocks(c *C) { @@ -161,10 +161,9 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) { keys = append(keys, []byte{ch}) } - ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + txn, err := s.store.Begin() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(s.store, ver, 0) - m, err := snapshot.BatchGet(context.Background(), keys) + m, err := txn.BatchGet(context.Background(), keys) c.Assert(err, IsNil) c.Assert(len(m), Equals, int('z'-'a'+1)) for ch := byte('a'); ch <= byte('z'); ch++ { @@ -190,22 +189,22 @@ func (s *testLockSuite) TestCleanLock(c *C) { func (s *testLockSuite) TestGetTxnStatus(c *C) { startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err := s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a")) + status, err := s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsTrue) c.Assert(status.CommitTS(), Equals, commitTS) startTS, commitTS = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), true) - status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a")) + status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsTrue) c.Assert(status.CommitTS(), Equals, commitTS) startTS, _ = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), false) - status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a")) + status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) - c.Assert(status.ttl, Greater, uint64(0), Commentf("action:%s", status.action)) + c.Assert(status.TTL(), Greater, uint64(0), Commentf("action:%s", status.Action())) } func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { @@ -214,38 +213,36 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) { txn.Set(tidbkv.Key("key"), []byte("value")) s.prewriteTxnWithTTL(c, txn, 1000) - bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil) - lr := newLockResolver(s.store) - callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil) + lr := s.store.NewLockResolver() + callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(err, IsNil) // Check the lock TTL of a transaction. - status, err := lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key")) + status, err := lr.LockResolver.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key")) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) - c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(status.TTL(), Greater, uint64(0)) c.Assert(status.CommitTS(), Equals, uint64(0)) // Rollback the txn. lock := s.mustGetLock(c, []byte("key")) - status = TxnStatus{} - cleanRegions := make(map[RegionVerID]struct{}) - err = newLockResolver(s.store).resolveLock(bo, lock, status, false, cleanRegions) + err = s.store.NewLockResolver().ResolveLock(context.Background(), lock) c.Assert(err, IsNil) // Check its status is rollbacked. - status, err = lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key")) + status, err = lr.LockResolver.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key")) c.Assert(err, IsNil) - c.Assert(status.ttl, Equals, uint64(0)) - c.Assert(status.commitTS, Equals, uint64(0)) - c.Assert(status.action, Equals, kvrpcpb.Action_NoAction) + c.Assert(status.TTL(), Equals, uint64(0)) + c.Assert(status.CommitTS(), Equals, uint64(0)) + c.Assert(status.Action(), Equals, kvrpcpb.Action_NoAction) // Check a committed txn. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = lr.GetTxnStatus(startTS, callerStartTS, []byte("a")) + status, err = lr.LockResolver.GetTxnStatus(startTS, callerStartTS, []byte("a")) c.Assert(err, IsNil) - c.Assert(status.ttl, Equals, uint64(0)) - c.Assert(status.commitTS, Equals, commitTS) + c.Assert(status.TTL(), Equals, uint64(0)) + c.Assert(status.CommitTS(), Equals, commitTS) } func (s *testLockSuite) TestTxnHeartBeat(c *C) { @@ -254,22 +251,19 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) { txn.Set(tidbkv.Key("key"), []byte("value")) s.prewriteTxn(c, txn) - bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil) - newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 6666) + newTTL, err := s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666) c.Assert(err, IsNil) c.Assert(newTTL, Equals, uint64(6666)) - newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 5555) + newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 5555) c.Assert(err, IsNil) c.Assert(newTTL, Equals, uint64(6666)) lock := s.mustGetLock(c, []byte("key")) - status := TxnStatus{ttl: newTTL} - cleanRegions := make(map[RegionVerID]struct{}) - err = newLockResolver(s.store).resolveLock(bo, lock, status, false, cleanRegions) + err = s.store.NewLockResolver().ResolveLock(context.Background(), lock) c.Assert(err, IsNil) - newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 6666) + newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666) c.Assert(err, NotNil) c.Assert(newTTL, Equals, uint64(0)) } @@ -286,43 +280,43 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) { c.Assert(err, IsNil) c.Assert(currentTS, Greater, txn.StartTS()) - bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil) - resolver := newLockResolver(s.store) + bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil) + resolver := s.store.NewLockResolver() // Call getTxnStatus to check the lock status. - status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false, nil) + status, err := resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false, nil) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsFalse) - c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(status.TTL(), Greater, uint64(0)) c.Assert(status.CommitTS(), Equals, uint64(0)) - c.Assert(status.action, Equals, kvrpcpb.Action_MinCommitTSPushed) + c.Assert(status.Action(), Equals, kvrpcpb.Action_MinCommitTSPushed) // Test the ResolveLocks API lock := s.mustGetLock(c, []byte("second")) - timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) + timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock}) c.Assert(err, IsNil) c.Assert(timeBeforeExpire > int64(0), IsTrue) // Force rollback the lock using lock.TTL = 0. lock.TTL = uint64(0) - timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock}) + timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock}) c.Assert(err, IsNil) c.Assert(timeBeforeExpire, Equals, int64(0)) // Then call getTxnStatus again and check the lock status. currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(err, IsNil) - status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil) + status, err = s.store.NewLockResolver().GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil) c.Assert(err, IsNil) - c.Assert(status.ttl, Equals, uint64(0)) - c.Assert(status.commitTS, Equals, uint64(0)) - c.Assert(status.action, Equals, kvrpcpb.Action_NoAction) + c.Assert(status.TTL(), Equals, uint64(0)) + c.Assert(status.CommitTS(), Equals, uint64(0)) + c.Assert(status.Action(), Equals, kvrpcpb.Action_NoAction) // Call getTxnStatus on a committed transaction. startTS, commitTS := s.putKV(c, []byte("a"), []byte("a")) - status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, false, nil) + status, err = s.store.NewLockResolver().GetTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, false, nil) c.Assert(err, IsNil) - c.Assert(status.ttl, Equals, uint64(0)) - c.Assert(status.commitTS, Equals, commitTS) + c.Assert(status.TTL(), Equals, uint64(0)) + c.Assert(status.CommitTS(), Equals, commitTS) } func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { @@ -330,93 +324,92 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) { c.Assert(err, IsNil) txn.Set(tidbkv.Key("key"), []byte("value")) txn.Set(tidbkv.Key("second"), []byte("xxx")) - committer, err := newTwoPhaseCommitterWithInit(txn, 0) + committer, err := txn.NewCommitter(0) c.Assert(err, IsNil) // Increase lock TTL to make CI more stable. - committer.lockTTL = txnLockTTL(txn.startTime, 200*1024*1024) + committer.SetLockTTLByTimeAndSize(txn.GetStartTime(), 200*1024*1024) // Only prewrite the secondary key to simulate a concurrent prewrite case: // prewrite secondary regions success and prewrite the primary region is pending. - err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutationsOfKeys([][]byte{[]byte("second")})) + err = committer.PrewriteMutations(context.Background(), committer.MutationsOfKeys([][]byte{[]byte("second")})) c.Assert(err, IsNil) o := s.store.GetOracle() currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(err, IsNil) - bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil) - resolver := newLockResolver(s.store) + bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil) + resolver := s.store.NewLockResolver() // Call getTxnStatus for the TxnNotFound case. - _, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, false, nil) + _, err = resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, false, nil) c.Assert(err, NotNil) - _, ok := errors.Cause(err).(txnNotFoundErr) - c.Assert(ok, IsTrue) + c.Assert(resolver.IsErrorNotFound(err), IsTrue) errCh := make(chan error) go func() { - errCh <- committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutationsOfKeys([][]byte{[]byte("key")})) + errCh <- committer.PrewriteMutations(context.Background(), committer.MutationsOfKeys([][]byte{[]byte("key")})) }() - lock := &Lock{ + lock := &tikv.Lock{ Key: []byte("second"), Primary: []byte("key"), TxnID: txn.StartTS(), TTL: 100000, } // Call getTxnStatusFromLock to cover the retry logic. - status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS, false) + status, err := resolver.GetTxnStatusFromLock(bo, lock, currentTS, false) c.Assert(err, IsNil) - c.Assert(status.ttl, Greater, uint64(0)) + c.Assert(status.TTL(), Greater, uint64(0)) c.Assert(<-errCh, IsNil) - c.Assert(committer.cleanupMutations(bo, committer.mutations), IsNil) + c.Assert(committer.CleanupMutations(context.Background()), IsNil) // Call getTxnStatusFromLock to cover TxnNotFound and retry timeout. startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(err, IsNil) - lock = &Lock{ + lock = &tikv.Lock{ Key: []byte("second"), Primary: []byte("key_not_exist"), TxnID: startTS, TTL: 1000, } - status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS, false) + status, err = resolver.GetTxnStatusFromLock(bo, lock, currentTS, false) c.Assert(err, IsNil) - c.Assert(status.ttl, Equals, uint64(0)) - c.Assert(status.commitTS, Equals, uint64(0)) - c.Assert(status.action, Equals, kvrpcpb.Action_LockNotExistRollback) + c.Assert(status.TTL(), Equals, uint64(0)) + c.Assert(status.CommitTS(), Equals, uint64(0)) + c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistRollback) } -func (s *testLockSuite) prewriteTxn(c *C, txn *KVTxn) { +func (s *testLockSuite) prewriteTxn(c *C, txn tikv.TxnProbe) { s.prewriteTxnWithTTL(c, txn, 0) } -func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn *KVTxn, ttl uint64) { - committer, err := newTwoPhaseCommitterWithInit(txn, 0) +func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn tikv.TxnProbe, ttl uint64) { + committer, err := txn.NewCommitter(0) c.Assert(err, IsNil) if ttl > 0 { - elapsed := time.Since(txn.startTime) / time.Millisecond - committer.lockTTL = uint64(elapsed) + ttl + elapsed := time.Since(txn.GetStartTime()) / time.Millisecond + committer.SetLockTTL(uint64(elapsed) + ttl) } - err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations) + err = committer.PrewriteAllMutations(context.Background()) c.Assert(err, IsNil) } -func (s *testLockSuite) mustGetLock(c *C, key []byte) *Lock { +func (s *testLockSuite) mustGetLock(c *C, key []byte) *tikv.Lock { ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) c.Assert(err, IsNil) - bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil) + bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, Version: ver, }) - loc, err := s.store.regionCache.LocateKey(bo, key) + loc, err := s.store.GetRegionCache().LocateKey(bo, key) c.Assert(err, IsNil) - resp, err := s.store.SendReq(bo, req, loc.Region, ReadTimeoutShort) + resp, err := s.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutShort) c.Assert(err, IsNil) c.Assert(resp.Resp, NotNil) keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() c.Assert(keyErr, NotNil) - lock, err := extractLockFromKeyErr(keyErr) + lock, err := tikv.LockProbe{}.ExtractLockFromKeyErr(keyErr) c.Assert(err, IsNil) return lock } @@ -433,6 +426,9 @@ func (s *testLockSuite) ttlEquals(c *C, x, y uint64) { } func (s *testLockSuite) TestLockTTL(c *C) { + defaultLockTTL := tikv.ConfigProbe{}.GetDefaultLockTTL() + ttlFactor := tikv.ConfigProbe{}.GetTTLFactor() + txn, err := s.store.Begin() c.Assert(err, IsNil) txn.Set(tidbkv.Key("key"), []byte("value")) @@ -478,15 +474,14 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) { c.Assert(err, IsNil) txn.Set(tidbkv.Key("k3"), []byte("v3")) txn.Set(tidbkv.Key("k4"), []byte("v4")) - tikvTxn := txn - committer, err := newTwoPhaseCommitterWithInit(tikvTxn, 0) + committer, err := txn.NewCommitter(0) c.Assert(err, IsNil) - committer.setAsyncCommit(true) - committer.lockTTL = 20000 - err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations) + committer.SetUseAsyncCommit() + committer.SetLockTTL(20000) + committer.PrewriteAllMutations(context.Background()) c.Assert(err, IsNil) - var locks []*Lock + var locks []*tikv.Lock for _, key := range []string{"k1", "k2", "k3", "k4"} { l := s.mustGetLock(c, []byte(key)) locks = append(locks, l) @@ -498,9 +493,9 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) { msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) c.Assert(msBeforeLockExpired, Greater, int64(0)) - lr := newLockResolver(s.store) - bo := NewBackofferWithVars(context.Background(), GcResolveLockMaxBackoff, nil) - loc, err := lr.store.GetRegionCache().LocateKey(bo, locks[0].Primary) + lr := s.store.NewLockResolver() + bo := tikv.NewBackofferWithVars(context.Background(), tikv.GcResolveLockMaxBackoff, nil) + loc, err := s.store.GetRegionCache().LocateKey(bo, locks[0].Primary) c.Assert(err, IsNil) // Check BatchResolveLocks resolve the lock even the ttl is not expired. success, err := lr.BatchResolveLocks(bo, locks, loc.Region) @@ -524,20 +519,20 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) { } func (s *testLockSuite) TestNewLockZeroTTL(c *C) { - l := NewLock(&kvrpcpb.LockInfo{}) + l := tikv.NewLock(&kvrpcpb.LockInfo{}) c.Assert(l.TTL, Equals, uint64(0)) } func init() { // Speed up tests. - oracleUpdateInterval = 2 + tikv.ConfigProbe{}.SetOracleUpdateInterval(2) } func (s *testLockSuite) TestZeroMinCommitTS(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) txn.Set(tidbkv.Key("key"), []byte("value")) - bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil) + bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil) mockValue := fmt.Sprintf(`return(%d)`, txn.StartTS()) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockZeroCommitTS", mockValue), IsNil) @@ -545,48 +540,23 @@ func (s *testLockSuite) TestZeroMinCommitTS(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockZeroCommitTS"), IsNil) lock := s.mustGetLock(c, []byte("key")) - expire, pushed, err := newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock}) + expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock}) c.Assert(err, IsNil) c.Assert(pushed, HasLen, 0) c.Assert(expire, Greater, int64(0)) - expire, pushed, err = newLockResolver(s.store).ResolveLocks(bo, math.MaxUint64, []*Lock{lock}) + expire, pushed, err = s.store.NewLockResolver().ResolveLocks(bo, math.MaxUint64, []*tikv.Lock{lock}) c.Assert(err, IsNil) c.Assert(pushed, HasLen, 1) c.Assert(expire, Greater, int64(0)) // Clean up this test. lock.TTL = uint64(0) - expire, _, err = newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock}) + expire, _, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock}) c.Assert(err, IsNil) c.Assert(expire, Equals, int64(0)) } -func (s *testLockSuite) TestDeduplicateKeys(c *C) { - inputs := []string{ - "a b c", - "a a b c", - "a a a b c", - "a a a b b b b c", - "a b b b b c c c", - } - r := rand.New(rand.NewSource(time.Now().UnixNano())) - for _, in := range inputs { - strs := strings.Split(in, " ") - keys := make([][]byte, len(strs)) - for _, i := range r.Perm(len(strs)) { - keys[i] = []byte(strs[i]) - } - keys = deduplicateKeys(keys) - strs = strs[:len(keys)] - for i := range keys { - strs[i] = string(keys[i]) - } - out := strings.Join(strs, " ") - c.Assert(out, Equals, "a b c") - } -} - func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) @@ -595,23 +565,22 @@ func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit(c *C) { err = txn.Set([]byte("fb2"), []byte("2")) c.Assert(err, IsNil) - committer, err := newTwoPhaseCommitterWithInit(txn, 1) + committer, err := txn.NewCommitter(1) c.Assert(err, IsNil) - c.Assert(committer.mutations.Len(), Equals, 2) - committer.lockTTL = 0 - committer.setAsyncCommit(true) - committer.maxCommitTS = committer.startTS + (100 << 18) // 100ms + c.Assert(committer.GetMutations().Len(), Equals, 2) + committer.SetLockTTL(0) + committer.SetUseAsyncCommit() + committer.SetCommitTS(committer.GetStartTS() + (100 << 18)) // 100ms - bo := NewBackoffer(context.Background(), PrewriteMaxBackoff) - err = committer.prewriteMutations(bo, committer.mutations.Slice(0, 1)) + err = committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(0, 1)) c.Assert(err, IsNil) - c.Assert(committer.isAsyncCommit(), IsTrue) + c.Assert(committer.IsAsyncCommit(), IsTrue) // Set an invalid maxCommitTS to produce MaxCommitTsTooLarge - committer.maxCommitTS = committer.startTS - 1 - err = committer.prewriteMutations(bo, committer.mutations.Slice(1, 2)) + committer.SetMaxCommitTS(committer.GetStartTS() - 1) + err = committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(1, 2)) c.Assert(err, IsNil) - c.Assert(committer.isAsyncCommit(), IsFalse) // Fallback due to MaxCommitTsTooLarge + c.Assert(committer.IsAsyncCommit(), IsFalse) // Fallback due to MaxCommitTsTooLarge } func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit(c *C) { @@ -619,18 +588,18 @@ func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit(c *C) { lock := s.mustGetLock(c, []byte("fb1")) c.Assert(lock.UseAsyncCommit, IsTrue) - bo := NewBackoffer(context.Background(), getMaxBackoff) - lr := newLockResolver(s.store) - status, err := lr.getTxnStatusFromLock(bo, lock, 0, false) + bo := tikv.NewBackoffer(context.Background(), getMaxBackoff) + lr := s.store.NewLockResolver() + status, err := lr.GetTxnStatusFromLock(bo, lock, 0, false) c.Assert(err, IsNil) - c.Assert(NewLock(status.primaryLock), DeepEquals, lock) + c.Assert(tikv.LockProbe{}.GetPrimaryKeyFromTxnStatus(status), DeepEquals, []byte("fb1")) - _, err = lr.checkAllSecondaries(bo, lock, &status) - c.Assert(err.(*nonAsyncCommitLock), NotNil) + err = lr.CheckAllSecondaries(bo, lock, &status) + c.Assert(lr.IsNonAsyncCommitLock(err), IsTrue) - status, err = lr.getTxnStatusFromLock(bo, lock, 0, true) + status, err = lr.GetTxnStatusFromLock(bo, lock, 0, true) c.Assert(err, IsNil) - c.Assert(status.action, Equals, kvrpcpb.Action_TTLExpireRollback) + c.Assert(status.Action(), Equals, kvrpcpb.Action_TTLExpireRollback) c.Assert(status.TTL(), Equals, uint64(0)) } @@ -639,8 +608,8 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit(c *C) { lock := s.mustGetLock(c, []byte("fb1")) c.Assert(lock.UseAsyncCommit, IsTrue) - bo := NewBackoffer(context.Background(), getMaxBackoff) - expire, pushed, err := newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock}) + bo := tikv.NewBackoffer(context.Background(), getMaxBackoff) + expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock}) c.Assert(err, IsNil) c.Assert(expire, Equals, int64(0)) c.Assert(len(pushed), Equals, 0) @@ -658,10 +627,10 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) { lock := s.mustGetLock(c, []byte("fb1")) c.Assert(lock.UseAsyncCommit, IsTrue) - bo := NewBackoffer(context.Background(), getMaxBackoff) - loc, err := s.store.regionCache.LocateKey(bo, []byte("fb1")) + bo := tikv.NewBackoffer(context.Background(), getMaxBackoff) + loc, err := s.store.GetRegionCache().LocateKey(bo, []byte("fb1")) c.Assert(err, IsNil) - ok, err := newLockResolver(s.store).BatchResolveLocks(bo, []*Lock{lock}, loc.Region) + ok, err := s.store.NewLockResolver().BatchResolveLocks(bo, []*tikv.Lock{lock}, loc.Region) c.Assert(err, IsNil) c.Assert(ok, IsTrue) @@ -672,19 +641,3 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) { _, err = t3.Get(context.Background(), []byte("fb2")) errMsgMustContain(c, err, "key not exist") } - -func errMsgMustContain(c *C, err error, msg string) { - c.Assert(strings.Contains(err.Error(), msg), IsTrue) -} - -func randKV(keyLen, valLen int) (string, string) { - const letters = "abc" - k, v := make([]byte, keyLen), make([]byte, valLen) - for i := range k { - k[i] = letters[rand.Intn(len(letters))] - } - for i := range v { - v[i] = letters[rand.Intn(len(letters))] - } - return string(k), string(v) -}