diff --git a/store/tikv/test_probe.go b/store/tikv/test_probe.go index 60c8a1bbb9..e2b072ef3c 100644 --- a/store/tikv/test_probe.go +++ b/store/tikv/test_probe.go @@ -47,6 +47,12 @@ func (s StoreProbe) Begin() (TxnProbe, error) { return TxnProbe{KVTxn: txn}, err } +// GetSnapshot returns a snapshot. +func (s StoreProbe) GetSnapshot(ts uint64) SnapshotProbe { + snap := s.KVStore.GetSnapshot(ts) + return SnapshotProbe{KVSnapshot: snap} +} + // SetRegionCachePDClient replaces pd client inside region cache. func (s StoreProbe) SetRegionCachePDClient(client pd.Client) { s.regionCache.pdClient = client @@ -178,12 +184,12 @@ func (c CommitterProbe) SetMutations(muts CommitterMutations) { // SetCommitTS resets the committer's commit ts. func (c CommitterProbe) SetCommitTS(ts uint64) { - c.commitTS = ts + atomic.StoreUint64(&c.commitTS, ts) } // GetCommitTS returns the commit ts of the committer. func (c CommitterProbe) GetCommitTS() uint64 { - return c.commitTS + return atomic.LoadUint64(&c.commitTS) } // GetMinCommitTS returns the minimal commit ts can be used. @@ -357,6 +363,33 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error { return c.cleanupMutations(bo, c.mutations) } +// SnapshotProbe exposes some snapshot utilities for testing purpose. +type SnapshotProbe struct { + *KVSnapshot +} + +// MergeRegionRequestStats merges RPC runtime stats into snapshot's stats. +func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) { + s.mergeRegionRequestStats(stats) +} + +// RecordBackoffInfo records backoff stats into snapshot's stats. +func (s SnapshotProbe) RecordBackoffInfo(bo *Backoffer) { + s.recordBackoffInfo(bo) +} + +// MergeExecDetail merges exec stats into snapshot's stats. +func (s SnapshotProbe) MergeExecDetail(detail *pb.ExecDetailsV2) { + s.mergeExecDetail(detail) +} + +// FormatStats dumps information of stats. +func (s SnapshotProbe) FormatStats() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.mu.stats.String() +} + // LockProbe exposes some lock utilities for testing purpose. type LockProbe struct { } diff --git a/store/tikv/snapshot_fail_test.go b/store/tikv/tests/snapshot_fail_test.go similarity index 91% rename from store/tikv/snapshot_fail_test.go rename to store/tikv/tests/snapshot_fail_test.go index 9fd73845d9..f8cd291626 100644 --- a/store/tikv/snapshot_fail_test.go +++ b/store/tikv/tests/snapshot_fail_test.go @@ -11,23 +11,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package tikv_test import ( "context" - "sync/atomic" + "math" "time" . "github.com/pingcap/check" "github.com/pingcap/failpoint" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/kv" ) type testSnapshotFailSuite struct { OneByOneSuite - store *KVStore + store tikv.StoreProbe } var _ = SerialSuites(&testSnapshotFailSuite{}) @@ -37,9 +38,9 @@ func (s *testSnapshotFailSuite) SetUpSuite(c *C) { client, pdClient, cluster, err := unistore.New("") c.Assert(err, IsNil) unistore.BootstrapWithSingleStore(cluster) - store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0) + store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) c.Assert(err, IsNil) - s.store = store + s.store = tikv.StoreProbe{KVStore: store} } func (s *testSnapshotFailSuite) TearDownSuite(c *C) { @@ -143,7 +144,7 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError(c *C) { func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) { defer s.cleanup(c) - snapshot := s.store.GetSnapshot(maxTimestamp) + snapshot := s.store.GetSnapshot(math.MaxUint64) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync", `pause`), IsNil) ch := make(chan error) go func() { @@ -160,11 +161,11 @@ func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) { txn.SetOption(kv.GuaranteeLinearizability, false) // Prewrite an async-commit lock and do not commit it. c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil) - committer, err := newTwoPhaseCommitterWithInit(txn, 1) + committer, err := txn.NewCommitter(1) c.Assert(err, IsNil) // Sets its minCommitTS to one second later, so the lock will be ignored by point get. - committer.minCommitTS = committer.startTS + (1000 << 18) - err = committer.execute(context.Background()) + committer.SetMinCommitTS(committer.GetStartTS() + (1000 << 18)) + err = committer.Execute(context.Background()) c.Assert(err, IsNil) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync"), IsNil) @@ -189,11 +190,11 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { // Prewrite the lock without committing it c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommit", `pause`), IsNil) ch := make(chan struct{}) - committer, err := newTwoPhaseCommitterWithInit(txn, 1) - c.Assert(committer.primary(), DeepEquals, []byte("k1")) + committer, err := txn.NewCommitter(1) + c.Assert(committer.GetPrimaryKey(), DeepEquals, []byte("k1")) go func() { c.Assert(err, IsNil) - err = committer.execute(context.Background()) + err = committer.Execute(context.Background()) c.Assert(err, IsNil) ch <- struct{}{} }() @@ -201,11 +202,11 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { // Wait until prewrite finishes time.Sleep(200 * time.Millisecond) // Should get nothing with max version, and **not pushing forward minCommitTS** of the primary lock - snapshot := s.store.GetSnapshot(maxTimestamp) + snapshot := s.store.GetSnapshot(math.MaxUint64) _, err = snapshot.Get(context.Background(), []byte("k2")) c.Assert(err, ErrorMatches, ".*key not exist") - initialCommitTS := atomic.LoadUint64(&committer.commitTS) + initialCommitTS := committer.GetCommitTS() c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommit"), IsNil) <-ch diff --git a/store/tikv/snapshot_test.go b/store/tikv/tests/snapshot_test.go similarity index 79% rename from store/tikv/snapshot_test.go rename to store/tikv/tests/snapshot_test.go index c578d04e8a..721525711d 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/tests/snapshot_test.go @@ -11,11 +11,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package tikv_test import ( "context" "fmt" + "math" "sync" "time" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -32,7 +34,7 @@ import ( type testSnapshotSuite struct { OneByOneSuite - store *KVStore + store tikv.StoreProbe prefix string rowNums []int } @@ -41,7 +43,7 @@ var _ = Suite(&testSnapshotSuite{}) func (s *testSnapshotSuite) SetUpSuite(c *C) { s.OneByOneSuite.SetUpSuite(c) - s.store = NewTestStore(c) + s.store = tikv.StoreProbe{KVStore: NewTestStore(c)} s.prefix = fmt.Sprintf("snapshot_%d", time.Now().Unix()) s.rowNums = append(s.rowNums, 1, 100, 191) } @@ -64,7 +66,7 @@ func (s *testSnapshotSuite) TearDownSuite(c *C) { s.OneByOneSuite.TearDownSuite(c) } -func (s *testSnapshotSuite) beginTxn(c *C) *KVTxn { +func (s *testSnapshotSuite) beginTxn(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) return txn @@ -72,7 +74,7 @@ func (s *testSnapshotSuite) beginTxn(c *C) *KVTxn { func (s *testSnapshotSuite) checkAll(keys []tidbkv.Key, c *C) { txn := s.beginTxn(c) - snapshot := newTiKVSnapshot(s.store, txn.StartTS(), 0) + snapshot := txn.GetSnapshot() m, err := snapshot.BatchGet(context.Background(), keys) c.Assert(err, IsNil) @@ -131,7 +133,7 @@ func (s *testSnapshotSuite) TestSnapshotCache(c *C) { c.Assert(txn.Commit(context.Background()), IsNil) txn = s.beginTxn(c) - snapshot := newTiKVSnapshot(s.store, txn.StartTS(), 0) + snapshot := txn.GetSnapshot() _, err := snapshot.BatchGet(context.Background(), []tidbkv.Key{tidbkv.Key("x"), tidbkv.Key("y")}) c.Assert(err, IsNil) @@ -182,11 +184,10 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) { c.Assert(txn.Set(x, []byte("x")), IsNil) c.Assert(txn.Set(y, []byte("y")), IsNil) ctx := context.Background() - bo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil) - committer, err := newTwoPhaseCommitterWithInit(txn, 0) + committer, err := txn.NewCommitter(0) c.Assert(err, IsNil) - committer.lockTTL = 3000 - c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil) + committer.SetLockTTL(3000) + c.Assert(committer.PrewriteAllMutations(ctx), IsNil) txn1 := s.beginTxn(c) // txn1 is not blocked by txn in the large txn protocol. @@ -198,9 +199,9 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) { c.Assert(res, HasLen, 0) // Commit txn, check the final commit ts is pushed. - committer.commitTS = txn.StartTS() + 1 - c.Assert(committer.commitMutations(bo, committer.mutations), IsNil) - status, err := s.store.lockResolver.GetTxnStatus(txn.StartTS(), 0, x) + committer.SetCommitTS(txn.StartTS() + 1) + c.Assert(committer.CommitMutations(ctx), IsNil) + status, err := s.store.GetLockResolver().GetTxnStatus(txn.StartTS(), 0, x) c.Assert(err, IsNil) c.Assert(status.IsCommitted(), IsTrue) c.Assert(status.CommitTS(), Greater, txn1.StartTS()) @@ -213,25 +214,24 @@ func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) { c.Assert(txn.Set(x, []byte("x")), IsNil) c.Assert(txn.Set(y, []byte("y")), IsNil) ctx := context.Background() - bo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil) - committer, err := newTwoPhaseCommitterWithInit(txn, 0) + committer, err := txn.NewCommitter(0) c.Assert(err, IsNil) - committer.lockTTL = 3000 - c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil) + committer.SetLockTTL(3000) + c.Assert(committer.PrewriteAllMutations(ctx), IsNil) - snapshot := newTiKVSnapshot(s.store, maxTimestamp, 0) + snapshot := s.store.GetSnapshot(math.MaxUint64) start := time.Now() - c.Assert(committer.primary(), BytesEquals, []byte(x)) + c.Assert(committer.GetPrimaryKey(), BytesEquals, []byte(x)) // Point get secondary key. Shouldn't be blocked by the lock and read old data. _, err = snapshot.Get(ctx, y) c.Assert(tidbkv.IsErrNotFound(errors.Trace(err)), IsTrue) c.Assert(time.Since(start), Less, 500*time.Millisecond) // Commit the primary key - committer.commitTS = txn.StartTS() + 1 - committer.commitMutations(bo, committer.mutationsOfKeys([][]byte{committer.primary()})) + committer.SetCommitTS(txn.StartTS() + 1) + committer.CommitMutations(ctx) - snapshot = newTiKVSnapshot(s.store, maxTimestamp, 0) + snapshot = s.store.GetSnapshot(math.MaxUint64) start = time.Now() // Point get secondary key. Should read committed data. value, err := snapshot.Get(ctx, y) @@ -248,7 +248,7 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { err := txn.Commit(context.Background()) c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(s.store, maxTimestamp, 0) + snapshot := s.store.GetSnapshot(math.MaxUint64) var wg sync.WaitGroup wg.Add(5) for i := 0; i < 5; i++ { @@ -266,20 +266,20 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { } func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { - reqStats := NewRegionRequestRuntimeStats() - RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) - RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) - snapshot := newTiKVSnapshot(s.store, 0, 0) - snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{}) - snapshot.mergeRegionRequestStats(reqStats.Stats) - snapshot.mergeRegionRequestStats(reqStats.Stats) - bo := NewBackofferWithVars(context.Background(), 2000, nil) - err := bo.BackoffWithMaxSleep(BoTxnLockFast, 30, errors.New("test")) + reqStats := tikv.NewRegionRequestRuntimeStats() + tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) + tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) + snapshot := s.store.GetSnapshot(0) + snapshot.SetOption(kv.CollectRuntimeStats, &tikv.SnapshotRuntimeStats{}) + snapshot.MergeRegionRequestStats(reqStats.Stats) + snapshot.MergeRegionRequestStats(reqStats.Stats) + bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil) + err := bo.BackoffWithMaxSleep(tikv.BoTxnLockFast, 30, errors.New("test")) c.Assert(err, IsNil) - snapshot.recordBackoffInfo(bo) - snapshot.recordBackoffInfo(bo) + snapshot.RecordBackoffInfo(bo) + snapshot.RecordBackoffInfo(bo) expect := "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}" - c.Assert(snapshot.mu.stats.String(), Equals, expect) + c.Assert(snapshot.FormatStats(), Equals, expect) detail := &pb.ExecDetailsV2{ TimeDetail: &pb.TimeDetail{ WaitWallTimeMs: 100, @@ -295,7 +295,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { RocksdbBlockCacheHitCount: 10, }, } - snapshot.mergeExecDetail(detail) + snapshot.MergeExecDetail(detail) expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " + "total_process_time: 100ms, total_wait_time: 100ms, " + "scan_detail: {total_process_keys: 10, " + @@ -303,8 +303,8 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { "rocksdb: {delete_skipped_count: 5, " + "key_skipped_count: 1, " + "block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}" - c.Assert(snapshot.mu.stats.String(), Equals, expect) - snapshot.mergeExecDetail(detail) + c.Assert(snapshot.FormatStats(), Equals, expect) + snapshot.MergeExecDetail(detail) expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " + "total_process_time: 200ms, total_wait_time: 200ms, " + "scan_detail: {total_process_keys: 20, " + @@ -312,5 +312,5 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { "rocksdb: {delete_skipped_count: 10, " + "key_skipped_count: 2, " + "block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}" - c.Assert(snapshot.mu.stats.String(), Equals, expect) + c.Assert(snapshot.FormatStats(), Equals, expect) } diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index 0abe3f52d9..8a1d60c012 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -16,7 +16,6 @@ package tikv import ( "context" "flag" - "fmt" "strings" "sync" @@ -24,7 +23,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv/config" - "github.com/pingcap/tidb/store/tikv/util/codec" pd "github.com/tikv/pd/client" ) @@ -80,17 +78,3 @@ func clearStorage(store *KVStore) error { } return txn.Commit(context.Background()) } - -func encodeKey(prefix, s string) []byte { - return codec.EncodeBytes(nil, []byte(fmt.Sprintf("%s_%s", prefix, s))) -} - -func valueBytes(n int) []byte { - return []byte(fmt.Sprintf("value%d", n)) -} - -// s08d is for returning format string "%s%08d" to keep string sorted. -// e.g.: "0002" < "0011", otherwise "2" > "11" -func s08d(prefix string, n int) string { - return fmt.Sprintf("%s%08d", prefix, n) -}