store/tikv: move lock test to /tests (#23635)
This commit is contained in:
@ -11,38 +11,37 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
package tikv_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
tidbkv "github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/kv"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
)
|
||||
|
||||
var getMaxBackoff = tikv.ConfigProbe{}.GetGetMaxBackoff()
|
||||
|
||||
type testLockSuite struct {
|
||||
OneByOneSuite
|
||||
store *KVStore
|
||||
store tikv.StoreProbe
|
||||
}
|
||||
|
||||
var _ = Suite(&testLockSuite{})
|
||||
|
||||
func (s *testLockSuite) SetUpTest(c *C) {
|
||||
s.store = NewTestStore(c)
|
||||
s.store = tikv.StoreProbe{KVStore: NewTestStore(c)}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TearDownTest(c *C) {
|
||||
@ -50,7 +49,7 @@ func (s *testLockSuite) TearDownTest(c *C) {
|
||||
}
|
||||
|
||||
func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
|
||||
txn, err := newTiKVTxn(s.store, oracle.GlobalTxnScope)
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
if len(value) > 0 {
|
||||
err = txn.Set(key, value)
|
||||
@ -65,21 +64,22 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt
|
||||
err = txn.Delete(primaryKey)
|
||||
}
|
||||
c.Assert(err, IsNil)
|
||||
tpc, err := newTwoPhaseCommitterWithInit(txn, 0)
|
||||
tpc, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
tpc.primaryKey = primaryKey
|
||||
tpc.SetPrimaryKey(primaryKey)
|
||||
|
||||
ctx := context.Background()
|
||||
err = tpc.prewriteMutations(NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil), tpc.mutations)
|
||||
err = tpc.PrewriteAllMutations(ctx)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
if commitPrimary {
|
||||
tpc.commitTS, err = s.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
err = tpc.commitMutations(NewBackofferWithVars(ctx, int(atomic.LoadUint64(&CommitMaxBackoff)), nil), tpc.mutationsOfKeys([][]byte{primaryKey}))
|
||||
tpc.SetCommitTS(commitTS)
|
||||
err = tpc.CommitMutations(ctx)
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
return txn.startTS, tpc.commitTS
|
||||
return txn.StartTS(), tpc.GetCommitTS()
|
||||
}
|
||||
|
||||
func (s *testLockSuite) putAlphabets(c *C) {
|
||||
@ -95,7 +95,7 @@ func (s *testLockSuite) putKV(c *C, key, value []byte) (uint64, uint64) {
|
||||
c.Assert(err, IsNil)
|
||||
err = txn.Commit(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
return txn.StartTS(), txn.commitTS
|
||||
return txn.StartTS(), txn.GetCommitTS()
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prepareAlphabetLocks(c *C) {
|
||||
@ -161,10 +161,9 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) {
|
||||
keys = append(keys, []byte{ch})
|
||||
}
|
||||
|
||||
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
snapshot := newTiKVSnapshot(s.store, ver, 0)
|
||||
m, err := snapshot.BatchGet(context.Background(), keys)
|
||||
m, err := txn.BatchGet(context.Background(), keys)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(m), Equals, int('z'-'a'+1))
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
@ -190,22 +189,22 @@ func (s *testLockSuite) TestCleanLock(c *C) {
|
||||
|
||||
func (s *testLockSuite) TestGetTxnStatus(c *C) {
|
||||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
|
||||
status, err := s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
status, err := s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsTrue)
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
|
||||
startTS, commitTS = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), true)
|
||||
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsTrue)
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
|
||||
startTS, _ = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), false)
|
||||
status, err = s.store.lockResolver.GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsFalse)
|
||||
c.Assert(status.ttl, Greater, uint64(0), Commentf("action:%s", status.action))
|
||||
c.Assert(status.TTL(), Greater, uint64(0), Commentf("action:%s", status.Action()))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
|
||||
@ -214,38 +213,36 @@ func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
|
||||
txn.Set(tidbkv.Key("key"), []byte("value"))
|
||||
s.prewriteTxnWithTTL(c, txn, 1000)
|
||||
|
||||
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
|
||||
lr := newLockResolver(s.store)
|
||||
callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
lr := s.store.NewLockResolver()
|
||||
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// Check the lock TTL of a transaction.
|
||||
status, err := lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
|
||||
status, err := lr.LockResolver.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsFalse)
|
||||
c.Assert(status.ttl, Greater, uint64(0))
|
||||
c.Assert(status.TTL(), Greater, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
|
||||
// Rollback the txn.
|
||||
lock := s.mustGetLock(c, []byte("key"))
|
||||
status = TxnStatus{}
|
||||
cleanRegions := make(map[RegionVerID]struct{})
|
||||
err = newLockResolver(s.store).resolveLock(bo, lock, status, false, cleanRegions)
|
||||
err = s.store.NewLockResolver().ResolveLock(context.Background(), lock)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// Check its status is rollbacked.
|
||||
status, err = lr.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
|
||||
status, err = lr.LockResolver.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, uint64(0))
|
||||
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_NoAction)
|
||||
|
||||
// Check a committed txn.
|
||||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
|
||||
status, err = lr.GetTxnStatus(startTS, callerStartTS, []byte("a"))
|
||||
status, err = lr.LockResolver.GetTxnStatus(startTS, callerStartTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, commitTS)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestTxnHeartBeat(c *C) {
|
||||
@ -254,22 +251,19 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) {
|
||||
txn.Set(tidbkv.Key("key"), []byte("value"))
|
||||
s.prewriteTxn(c, txn)
|
||||
|
||||
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
|
||||
newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 6666)
|
||||
newTTL, err := s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(newTTL, Equals, uint64(6666))
|
||||
|
||||
newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 5555)
|
||||
newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 5555)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(newTTL, Equals, uint64(6666))
|
||||
|
||||
lock := s.mustGetLock(c, []byte("key"))
|
||||
status := TxnStatus{ttl: newTTL}
|
||||
cleanRegions := make(map[RegionVerID]struct{})
|
||||
err = newLockResolver(s.store).resolveLock(bo, lock, status, false, cleanRegions)
|
||||
err = s.store.NewLockResolver().ResolveLock(context.Background(), lock)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 6666)
|
||||
newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666)
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(newTTL, Equals, uint64(0))
|
||||
}
|
||||
@ -286,43 +280,43 @@ func (s *testLockSuite) TestCheckTxnStatus(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(currentTS, Greater, txn.StartTS())
|
||||
|
||||
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
|
||||
resolver := newLockResolver(s.store)
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
resolver := s.store.NewLockResolver()
|
||||
// Call getTxnStatus to check the lock status.
|
||||
status, err := resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false, nil)
|
||||
status, err := resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false, nil)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsFalse)
|
||||
c.Assert(status.ttl, Greater, uint64(0))
|
||||
c.Assert(status.TTL(), Greater, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.action, Equals, kvrpcpb.Action_MinCommitTSPushed)
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_MinCommitTSPushed)
|
||||
|
||||
// Test the ResolveLocks API
|
||||
lock := s.mustGetLock(c, []byte("second"))
|
||||
timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*Lock{lock})
|
||||
timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(timeBeforeExpire > int64(0), IsTrue)
|
||||
|
||||
// Force rollback the lock using lock.TTL = 0.
|
||||
lock.TTL = uint64(0)
|
||||
timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*Lock{lock})
|
||||
timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(timeBeforeExpire, Equals, int64(0))
|
||||
|
||||
// Then call getTxnStatus again and check the lock status.
|
||||
currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
status, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil)
|
||||
status, err = s.store.NewLockResolver().GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, uint64(0))
|
||||
c.Assert(status.action, Equals, kvrpcpb.Action_NoAction)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_NoAction)
|
||||
|
||||
// Call getTxnStatus on a committed transaction.
|
||||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
|
||||
status, err = newLockResolver(s.store).getTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, false, nil)
|
||||
status, err = s.store.NewLockResolver().GetTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, false, nil)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, commitTS)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
||||
@ -330,93 +324,92 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
txn.Set(tidbkv.Key("key"), []byte("value"))
|
||||
txn.Set(tidbkv.Key("second"), []byte("xxx"))
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
// Increase lock TTL to make CI more stable.
|
||||
committer.lockTTL = txnLockTTL(txn.startTime, 200*1024*1024)
|
||||
committer.SetLockTTLByTimeAndSize(txn.GetStartTime(), 200*1024*1024)
|
||||
|
||||
// Only prewrite the secondary key to simulate a concurrent prewrite case:
|
||||
// prewrite secondary regions success and prewrite the primary region is pending.
|
||||
err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutationsOfKeys([][]byte{[]byte("second")}))
|
||||
err = committer.PrewriteMutations(context.Background(), committer.MutationsOfKeys([][]byte{[]byte("second")}))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
o := s.store.GetOracle()
|
||||
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
|
||||
resolver := newLockResolver(s.store)
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
resolver := s.store.NewLockResolver()
|
||||
|
||||
// Call getTxnStatus for the TxnNotFound case.
|
||||
_, err = resolver.getTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, false, nil)
|
||||
_, err = resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, false, nil)
|
||||
c.Assert(err, NotNil)
|
||||
_, ok := errors.Cause(err).(txnNotFoundErr)
|
||||
c.Assert(ok, IsTrue)
|
||||
c.Assert(resolver.IsErrorNotFound(err), IsTrue)
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutationsOfKeys([][]byte{[]byte("key")}))
|
||||
errCh <- committer.PrewriteMutations(context.Background(), committer.MutationsOfKeys([][]byte{[]byte("key")}))
|
||||
}()
|
||||
|
||||
lock := &Lock{
|
||||
lock := &tikv.Lock{
|
||||
Key: []byte("second"),
|
||||
Primary: []byte("key"),
|
||||
TxnID: txn.StartTS(),
|
||||
TTL: 100000,
|
||||
}
|
||||
// Call getTxnStatusFromLock to cover the retry logic.
|
||||
status, err := resolver.getTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
status, err := resolver.GetTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Greater, uint64(0))
|
||||
c.Assert(status.TTL(), Greater, uint64(0))
|
||||
c.Assert(<-errCh, IsNil)
|
||||
c.Assert(committer.cleanupMutations(bo, committer.mutations), IsNil)
|
||||
c.Assert(committer.CleanupMutations(context.Background()), IsNil)
|
||||
|
||||
// Call getTxnStatusFromLock to cover TxnNotFound and retry timeout.
|
||||
startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
lock = &Lock{
|
||||
lock = &tikv.Lock{
|
||||
Key: []byte("second"),
|
||||
Primary: []byte("key_not_exist"),
|
||||
TxnID: startTS,
|
||||
TTL: 1000,
|
||||
}
|
||||
status, err = resolver.getTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
status, err = resolver.GetTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.ttl, Equals, uint64(0))
|
||||
c.Assert(status.commitTS, Equals, uint64(0))
|
||||
c.Assert(status.action, Equals, kvrpcpb.Action_LockNotExistRollback)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistRollback)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prewriteTxn(c *C, txn *KVTxn) {
|
||||
func (s *testLockSuite) prewriteTxn(c *C, txn tikv.TxnProbe) {
|
||||
s.prewriteTxnWithTTL(c, txn, 0)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn *KVTxn, ttl uint64) {
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
|
||||
func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn tikv.TxnProbe, ttl uint64) {
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
if ttl > 0 {
|
||||
elapsed := time.Since(txn.startTime) / time.Millisecond
|
||||
committer.lockTTL = uint64(elapsed) + ttl
|
||||
elapsed := time.Since(txn.GetStartTime()) / time.Millisecond
|
||||
committer.SetLockTTL(uint64(elapsed) + ttl)
|
||||
}
|
||||
err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations)
|
||||
err = committer.PrewriteAllMutations(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) mustGetLock(c *C, key []byte) *Lock {
|
||||
func (s *testLockSuite) mustGetLock(c *C, key []byte) *tikv.Lock {
|
||||
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
bo := NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
||||
Key: key,
|
||||
Version: ver,
|
||||
})
|
||||
loc, err := s.store.regionCache.LocateKey(bo, key)
|
||||
loc, err := s.store.GetRegionCache().LocateKey(bo, key)
|
||||
c.Assert(err, IsNil)
|
||||
resp, err := s.store.SendReq(bo, req, loc.Region, ReadTimeoutShort)
|
||||
resp, err := s.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutShort)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(resp.Resp, NotNil)
|
||||
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
|
||||
c.Assert(keyErr, NotNil)
|
||||
lock, err := extractLockFromKeyErr(keyErr)
|
||||
lock, err := tikv.LockProbe{}.ExtractLockFromKeyErr(keyErr)
|
||||
c.Assert(err, IsNil)
|
||||
return lock
|
||||
}
|
||||
@ -433,6 +426,9 @@ func (s *testLockSuite) ttlEquals(c *C, x, y uint64) {
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestLockTTL(c *C) {
|
||||
defaultLockTTL := tikv.ConfigProbe{}.GetDefaultLockTTL()
|
||||
ttlFactor := tikv.ConfigProbe{}.GetTTLFactor()
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
txn.Set(tidbkv.Key("key"), []byte("value"))
|
||||
@ -478,15 +474,14 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
txn.Set(tidbkv.Key("k3"), []byte("v3"))
|
||||
txn.Set(tidbkv.Key("k4"), []byte("v4"))
|
||||
tikvTxn := txn
|
||||
committer, err := newTwoPhaseCommitterWithInit(tikvTxn, 0)
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
committer.setAsyncCommit(true)
|
||||
committer.lockTTL = 20000
|
||||
err = committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations)
|
||||
committer.SetUseAsyncCommit()
|
||||
committer.SetLockTTL(20000)
|
||||
committer.PrewriteAllMutations(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
var locks []*Lock
|
||||
var locks []*tikv.Lock
|
||||
for _, key := range []string{"k1", "k2", "k3", "k4"} {
|
||||
l := s.mustGetLock(c, []byte(key))
|
||||
locks = append(locks, l)
|
||||
@ -498,9 +493,9 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) {
|
||||
msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(msBeforeLockExpired, Greater, int64(0))
|
||||
|
||||
lr := newLockResolver(s.store)
|
||||
bo := NewBackofferWithVars(context.Background(), GcResolveLockMaxBackoff, nil)
|
||||
loc, err := lr.store.GetRegionCache().LocateKey(bo, locks[0].Primary)
|
||||
lr := s.store.NewLockResolver()
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.GcResolveLockMaxBackoff, nil)
|
||||
loc, err := s.store.GetRegionCache().LocateKey(bo, locks[0].Primary)
|
||||
c.Assert(err, IsNil)
|
||||
// Check BatchResolveLocks resolve the lock even the ttl is not expired.
|
||||
success, err := lr.BatchResolveLocks(bo, locks, loc.Region)
|
||||
@ -524,20 +519,20 @@ func (s *testLockSuite) TestBatchResolveLocks(c *C) {
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestNewLockZeroTTL(c *C) {
|
||||
l := NewLock(&kvrpcpb.LockInfo{})
|
||||
l := tikv.NewLock(&kvrpcpb.LockInfo{})
|
||||
c.Assert(l.TTL, Equals, uint64(0))
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Speed up tests.
|
||||
oracleUpdateInterval = 2
|
||||
tikv.ConfigProbe{}.SetOracleUpdateInterval(2)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestZeroMinCommitTS(c *C) {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
txn.Set(tidbkv.Key("key"), []byte("value"))
|
||||
bo := NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil)
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
|
||||
mockValue := fmt.Sprintf(`return(%d)`, txn.StartTS())
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockZeroCommitTS", mockValue), IsNil)
|
||||
@ -545,48 +540,23 @@ func (s *testLockSuite) TestZeroMinCommitTS(c *C) {
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockZeroCommitTS"), IsNil)
|
||||
|
||||
lock := s.mustGetLock(c, []byte("key"))
|
||||
expire, pushed, err := newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock})
|
||||
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(pushed, HasLen, 0)
|
||||
c.Assert(expire, Greater, int64(0))
|
||||
|
||||
expire, pushed, err = newLockResolver(s.store).ResolveLocks(bo, math.MaxUint64, []*Lock{lock})
|
||||
expire, pushed, err = s.store.NewLockResolver().ResolveLocks(bo, math.MaxUint64, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(pushed, HasLen, 1)
|
||||
c.Assert(expire, Greater, int64(0))
|
||||
|
||||
// Clean up this test.
|
||||
lock.TTL = uint64(0)
|
||||
expire, _, err = newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock})
|
||||
expire, _, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(expire, Equals, int64(0))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestDeduplicateKeys(c *C) {
|
||||
inputs := []string{
|
||||
"a b c",
|
||||
"a a b c",
|
||||
"a a a b c",
|
||||
"a a a b b b b c",
|
||||
"a b b b b c c c",
|
||||
}
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
for _, in := range inputs {
|
||||
strs := strings.Split(in, " ")
|
||||
keys := make([][]byte, len(strs))
|
||||
for _, i := range r.Perm(len(strs)) {
|
||||
keys[i] = []byte(strs[i])
|
||||
}
|
||||
keys = deduplicateKeys(keys)
|
||||
strs = strs[:len(keys)]
|
||||
for i := range keys {
|
||||
strs[i] = string(keys[i])
|
||||
}
|
||||
out := strings.Join(strs, " ")
|
||||
c.Assert(out, Equals, "a b c")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit(c *C) {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
@ -595,23 +565,22 @@ func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit(c *C) {
|
||||
err = txn.Set([]byte("fb2"), []byte("2"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
|
||||
committer, err := txn.NewCommitter(1)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.mutations.Len(), Equals, 2)
|
||||
committer.lockTTL = 0
|
||||
committer.setAsyncCommit(true)
|
||||
committer.maxCommitTS = committer.startTS + (100 << 18) // 100ms
|
||||
c.Assert(committer.GetMutations().Len(), Equals, 2)
|
||||
committer.SetLockTTL(0)
|
||||
committer.SetUseAsyncCommit()
|
||||
committer.SetCommitTS(committer.GetStartTS() + (100 << 18)) // 100ms
|
||||
|
||||
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
|
||||
err = committer.prewriteMutations(bo, committer.mutations.Slice(0, 1))
|
||||
err = committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(0, 1))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.isAsyncCommit(), IsTrue)
|
||||
c.Assert(committer.IsAsyncCommit(), IsTrue)
|
||||
|
||||
// Set an invalid maxCommitTS to produce MaxCommitTsTooLarge
|
||||
committer.maxCommitTS = committer.startTS - 1
|
||||
err = committer.prewriteMutations(bo, committer.mutations.Slice(1, 2))
|
||||
committer.SetMaxCommitTS(committer.GetStartTS() - 1)
|
||||
err = committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(1, 2))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.isAsyncCommit(), IsFalse) // Fallback due to MaxCommitTsTooLarge
|
||||
c.Assert(committer.IsAsyncCommit(), IsFalse) // Fallback due to MaxCommitTsTooLarge
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit(c *C) {
|
||||
@ -619,18 +588,18 @@ func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit(c *C) {
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
bo := NewBackoffer(context.Background(), getMaxBackoff)
|
||||
lr := newLockResolver(s.store)
|
||||
status, err := lr.getTxnStatusFromLock(bo, lock, 0, false)
|
||||
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
|
||||
lr := s.store.NewLockResolver()
|
||||
status, err := lr.GetTxnStatusFromLock(bo, lock, 0, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(NewLock(status.primaryLock), DeepEquals, lock)
|
||||
c.Assert(tikv.LockProbe{}.GetPrimaryKeyFromTxnStatus(status), DeepEquals, []byte("fb1"))
|
||||
|
||||
_, err = lr.checkAllSecondaries(bo, lock, &status)
|
||||
c.Assert(err.(*nonAsyncCommitLock), NotNil)
|
||||
err = lr.CheckAllSecondaries(bo, lock, &status)
|
||||
c.Assert(lr.IsNonAsyncCommitLock(err), IsTrue)
|
||||
|
||||
status, err = lr.getTxnStatusFromLock(bo, lock, 0, true)
|
||||
status, err = lr.GetTxnStatusFromLock(bo, lock, 0, true)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.action, Equals, kvrpcpb.Action_TTLExpireRollback)
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_TTLExpireRollback)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
}
|
||||
|
||||
@ -639,8 +608,8 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
bo := NewBackoffer(context.Background(), getMaxBackoff)
|
||||
expire, pushed, err := newLockResolver(s.store).ResolveLocks(bo, 0, []*Lock{lock})
|
||||
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
|
||||
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(expire, Equals, int64(0))
|
||||
c.Assert(len(pushed), Equals, 0)
|
||||
@ -658,10 +627,10 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
bo := NewBackoffer(context.Background(), getMaxBackoff)
|
||||
loc, err := s.store.regionCache.LocateKey(bo, []byte("fb1"))
|
||||
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
|
||||
loc, err := s.store.GetRegionCache().LocateKey(bo, []byte("fb1"))
|
||||
c.Assert(err, IsNil)
|
||||
ok, err := newLockResolver(s.store).BatchResolveLocks(bo, []*Lock{lock}, loc.Region)
|
||||
ok, err := s.store.NewLockResolver().BatchResolveLocks(bo, []*tikv.Lock{lock}, loc.Region)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(ok, IsTrue)
|
||||
|
||||
@ -672,19 +641,3 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
_, err = t3.Get(context.Background(), []byte("fb2"))
|
||||
errMsgMustContain(c, err, "key not exist")
|
||||
}
|
||||
|
||||
func errMsgMustContain(c *C, err error, msg string) {
|
||||
c.Assert(strings.Contains(err.Error(), msg), IsTrue)
|
||||
}
|
||||
|
||||
func randKV(keyLen, valLen int) (string, string) {
|
||||
const letters = "abc"
|
||||
k, v := make([]byte, keyLen), make([]byte, valLen)
|
||||
for i := range k {
|
||||
k[i] = letters[rand.Intn(len(letters))]
|
||||
}
|
||||
for i := range v {
|
||||
v[i] = letters[rand.Intn(len(letters))]
|
||||
}
|
||||
return string(k), string(v)
|
||||
}
|
||||
Reference in New Issue
Block a user