store/tikv: move snapshot_test to tests (#23908)
Signed-off-by: disksing <i@disksing.com> Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
This commit is contained in:
@ -47,6 +47,12 @@ func (s StoreProbe) Begin() (TxnProbe, error) {
|
||||
return TxnProbe{KVTxn: txn}, err
|
||||
}
|
||||
|
||||
// GetSnapshot returns a snapshot.
|
||||
func (s StoreProbe) GetSnapshot(ts uint64) SnapshotProbe {
|
||||
snap := s.KVStore.GetSnapshot(ts)
|
||||
return SnapshotProbe{KVSnapshot: snap}
|
||||
}
|
||||
|
||||
// SetRegionCachePDClient replaces pd client inside region cache.
|
||||
func (s StoreProbe) SetRegionCachePDClient(client pd.Client) {
|
||||
s.regionCache.pdClient = client
|
||||
@ -178,12 +184,12 @@ func (c CommitterProbe) SetMutations(muts CommitterMutations) {
|
||||
|
||||
// SetCommitTS resets the committer's commit ts.
|
||||
func (c CommitterProbe) SetCommitTS(ts uint64) {
|
||||
c.commitTS = ts
|
||||
atomic.StoreUint64(&c.commitTS, ts)
|
||||
}
|
||||
|
||||
// GetCommitTS returns the commit ts of the committer.
|
||||
func (c CommitterProbe) GetCommitTS() uint64 {
|
||||
return c.commitTS
|
||||
return atomic.LoadUint64(&c.commitTS)
|
||||
}
|
||||
|
||||
// GetMinCommitTS returns the minimal commit ts can be used.
|
||||
@ -357,6 +363,33 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error {
|
||||
return c.cleanupMutations(bo, c.mutations)
|
||||
}
|
||||
|
||||
// SnapshotProbe exposes some snapshot utilities for testing purpose.
|
||||
type SnapshotProbe struct {
|
||||
*KVSnapshot
|
||||
}
|
||||
|
||||
// MergeRegionRequestStats merges RPC runtime stats into snapshot's stats.
|
||||
func (s SnapshotProbe) MergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) {
|
||||
s.mergeRegionRequestStats(stats)
|
||||
}
|
||||
|
||||
// RecordBackoffInfo records backoff stats into snapshot's stats.
|
||||
func (s SnapshotProbe) RecordBackoffInfo(bo *Backoffer) {
|
||||
s.recordBackoffInfo(bo)
|
||||
}
|
||||
|
||||
// MergeExecDetail merges exec stats into snapshot's stats.
|
||||
func (s SnapshotProbe) MergeExecDetail(detail *pb.ExecDetailsV2) {
|
||||
s.mergeExecDetail(detail)
|
||||
}
|
||||
|
||||
// FormatStats dumps information of stats.
|
||||
func (s SnapshotProbe) FormatStats() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.mu.stats.String()
|
||||
}
|
||||
|
||||
// LockProbe exposes some lock utilities for testing purpose.
|
||||
type LockProbe struct {
|
||||
}
|
||||
|
||||
@ -11,23 +11,24 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
package tikv_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/failpoint"
|
||||
tidbkv "github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/mockstore/unistore"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/kv"
|
||||
)
|
||||
|
||||
type testSnapshotFailSuite struct {
|
||||
OneByOneSuite
|
||||
store *KVStore
|
||||
store tikv.StoreProbe
|
||||
}
|
||||
|
||||
var _ = SerialSuites(&testSnapshotFailSuite{})
|
||||
@ -37,9 +38,9 @@ func (s *testSnapshotFailSuite) SetUpSuite(c *C) {
|
||||
client, pdClient, cluster, err := unistore.New("")
|
||||
c.Assert(err, IsNil)
|
||||
unistore.BootstrapWithSingleStore(cluster)
|
||||
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
|
||||
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
|
||||
c.Assert(err, IsNil)
|
||||
s.store = store
|
||||
s.store = tikv.StoreProbe{KVStore: store}
|
||||
}
|
||||
|
||||
func (s *testSnapshotFailSuite) TearDownSuite(c *C) {
|
||||
@ -143,7 +144,7 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError(c *C) {
|
||||
func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) {
|
||||
defer s.cleanup(c)
|
||||
|
||||
snapshot := s.store.GetSnapshot(maxTimestamp)
|
||||
snapshot := s.store.GetSnapshot(math.MaxUint64)
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync", `pause`), IsNil)
|
||||
ch := make(chan error)
|
||||
go func() {
|
||||
@ -160,11 +161,11 @@ func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) {
|
||||
txn.SetOption(kv.GuaranteeLinearizability, false)
|
||||
// Prewrite an async-commit lock and do not commit it.
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil)
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
|
||||
committer, err := txn.NewCommitter(1)
|
||||
c.Assert(err, IsNil)
|
||||
// Sets its minCommitTS to one second later, so the lock will be ignored by point get.
|
||||
committer.minCommitTS = committer.startTS + (1000 << 18)
|
||||
err = committer.execute(context.Background())
|
||||
committer.SetMinCommitTS(committer.GetStartTS() + (1000 << 18))
|
||||
err = committer.Execute(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync"), IsNil)
|
||||
|
||||
@ -189,11 +190,11 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
|
||||
// Prewrite the lock without committing it
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommit", `pause`), IsNil)
|
||||
ch := make(chan struct{})
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 1)
|
||||
c.Assert(committer.primary(), DeepEquals, []byte("k1"))
|
||||
committer, err := txn.NewCommitter(1)
|
||||
c.Assert(committer.GetPrimaryKey(), DeepEquals, []byte("k1"))
|
||||
go func() {
|
||||
c.Assert(err, IsNil)
|
||||
err = committer.execute(context.Background())
|
||||
err = committer.Execute(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
ch <- struct{}{}
|
||||
}()
|
||||
@ -201,11 +202,11 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) {
|
||||
// Wait until prewrite finishes
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// Should get nothing with max version, and **not pushing forward minCommitTS** of the primary lock
|
||||
snapshot := s.store.GetSnapshot(maxTimestamp)
|
||||
snapshot := s.store.GetSnapshot(math.MaxUint64)
|
||||
_, err = snapshot.Get(context.Background(), []byte("k2"))
|
||||
c.Assert(err, ErrorMatches, ".*key not exist")
|
||||
|
||||
initialCommitTS := atomic.LoadUint64(&committer.commitTS)
|
||||
initialCommitTS := committer.GetCommitTS()
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommit"), IsNil)
|
||||
|
||||
<-ch
|
||||
@ -11,11 +11,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
package tikv_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -24,6 +25,7 @@ import (
|
||||
"github.com/pingcap/failpoint"
|
||||
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
tidbkv "github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/kv"
|
||||
"github.com/pingcap/tidb/store/tikv/logutil"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
@ -32,7 +34,7 @@ import (
|
||||
|
||||
type testSnapshotSuite struct {
|
||||
OneByOneSuite
|
||||
store *KVStore
|
||||
store tikv.StoreProbe
|
||||
prefix string
|
||||
rowNums []int
|
||||
}
|
||||
@ -41,7 +43,7 @@ var _ = Suite(&testSnapshotSuite{})
|
||||
|
||||
func (s *testSnapshotSuite) SetUpSuite(c *C) {
|
||||
s.OneByOneSuite.SetUpSuite(c)
|
||||
s.store = NewTestStore(c)
|
||||
s.store = tikv.StoreProbe{KVStore: NewTestStore(c)}
|
||||
s.prefix = fmt.Sprintf("snapshot_%d", time.Now().Unix())
|
||||
s.rowNums = append(s.rowNums, 1, 100, 191)
|
||||
}
|
||||
@ -64,7 +66,7 @@ func (s *testSnapshotSuite) TearDownSuite(c *C) {
|
||||
s.OneByOneSuite.TearDownSuite(c)
|
||||
}
|
||||
|
||||
func (s *testSnapshotSuite) beginTxn(c *C) *KVTxn {
|
||||
func (s *testSnapshotSuite) beginTxn(c *C) tikv.TxnProbe {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
return txn
|
||||
@ -72,7 +74,7 @@ func (s *testSnapshotSuite) beginTxn(c *C) *KVTxn {
|
||||
|
||||
func (s *testSnapshotSuite) checkAll(keys []tidbkv.Key, c *C) {
|
||||
txn := s.beginTxn(c)
|
||||
snapshot := newTiKVSnapshot(s.store, txn.StartTS(), 0)
|
||||
snapshot := txn.GetSnapshot()
|
||||
m, err := snapshot.BatchGet(context.Background(), keys)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -131,7 +133,7 @@ func (s *testSnapshotSuite) TestSnapshotCache(c *C) {
|
||||
c.Assert(txn.Commit(context.Background()), IsNil)
|
||||
|
||||
txn = s.beginTxn(c)
|
||||
snapshot := newTiKVSnapshot(s.store, txn.StartTS(), 0)
|
||||
snapshot := txn.GetSnapshot()
|
||||
_, err := snapshot.BatchGet(context.Background(), []tidbkv.Key{tidbkv.Key("x"), tidbkv.Key("y")})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
@ -182,11 +184,10 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {
|
||||
c.Assert(txn.Set(x, []byte("x")), IsNil)
|
||||
c.Assert(txn.Set(y, []byte("y")), IsNil)
|
||||
ctx := context.Background()
|
||||
bo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil)
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
committer.lockTTL = 3000
|
||||
c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil)
|
||||
committer.SetLockTTL(3000)
|
||||
c.Assert(committer.PrewriteAllMutations(ctx), IsNil)
|
||||
|
||||
txn1 := s.beginTxn(c)
|
||||
// txn1 is not blocked by txn in the large txn protocol.
|
||||
@ -198,9 +199,9 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {
|
||||
c.Assert(res, HasLen, 0)
|
||||
|
||||
// Commit txn, check the final commit ts is pushed.
|
||||
committer.commitTS = txn.StartTS() + 1
|
||||
c.Assert(committer.commitMutations(bo, committer.mutations), IsNil)
|
||||
status, err := s.store.lockResolver.GetTxnStatus(txn.StartTS(), 0, x)
|
||||
committer.SetCommitTS(txn.StartTS() + 1)
|
||||
c.Assert(committer.CommitMutations(ctx), IsNil)
|
||||
status, err := s.store.GetLockResolver().GetTxnStatus(txn.StartTS(), 0, x)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsTrue)
|
||||
c.Assert(status.CommitTS(), Greater, txn1.StartTS())
|
||||
@ -213,25 +214,24 @@ func (s *testSnapshotSuite) TestPointGetSkipTxnLock(c *C) {
|
||||
c.Assert(txn.Set(x, []byte("x")), IsNil)
|
||||
c.Assert(txn.Set(y, []byte("y")), IsNil)
|
||||
ctx := context.Background()
|
||||
bo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, nil)
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
committer.lockTTL = 3000
|
||||
c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil)
|
||||
committer.SetLockTTL(3000)
|
||||
c.Assert(committer.PrewriteAllMutations(ctx), IsNil)
|
||||
|
||||
snapshot := newTiKVSnapshot(s.store, maxTimestamp, 0)
|
||||
snapshot := s.store.GetSnapshot(math.MaxUint64)
|
||||
start := time.Now()
|
||||
c.Assert(committer.primary(), BytesEquals, []byte(x))
|
||||
c.Assert(committer.GetPrimaryKey(), BytesEquals, []byte(x))
|
||||
// Point get secondary key. Shouldn't be blocked by the lock and read old data.
|
||||
_, err = snapshot.Get(ctx, y)
|
||||
c.Assert(tidbkv.IsErrNotFound(errors.Trace(err)), IsTrue)
|
||||
c.Assert(time.Since(start), Less, 500*time.Millisecond)
|
||||
|
||||
// Commit the primary key
|
||||
committer.commitTS = txn.StartTS() + 1
|
||||
committer.commitMutations(bo, committer.mutationsOfKeys([][]byte{committer.primary()}))
|
||||
committer.SetCommitTS(txn.StartTS() + 1)
|
||||
committer.CommitMutations(ctx)
|
||||
|
||||
snapshot = newTiKVSnapshot(s.store, maxTimestamp, 0)
|
||||
snapshot = s.store.GetSnapshot(math.MaxUint64)
|
||||
start = time.Now()
|
||||
// Point get secondary key. Should read committed data.
|
||||
value, err := snapshot.Get(ctx, y)
|
||||
@ -248,7 +248,7 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) {
|
||||
err := txn.Commit(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
snapshot := newTiKVSnapshot(s.store, maxTimestamp, 0)
|
||||
snapshot := s.store.GetSnapshot(math.MaxUint64)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(5)
|
||||
for i := 0; i < 5; i++ {
|
||||
@ -266,20 +266,20 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) {
|
||||
}
|
||||
|
||||
func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) {
|
||||
reqStats := NewRegionRequestRuntimeStats()
|
||||
RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second)
|
||||
RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond)
|
||||
snapshot := newTiKVSnapshot(s.store, 0, 0)
|
||||
snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{})
|
||||
snapshot.mergeRegionRequestStats(reqStats.Stats)
|
||||
snapshot.mergeRegionRequestStats(reqStats.Stats)
|
||||
bo := NewBackofferWithVars(context.Background(), 2000, nil)
|
||||
err := bo.BackoffWithMaxSleep(BoTxnLockFast, 30, errors.New("test"))
|
||||
reqStats := tikv.NewRegionRequestRuntimeStats()
|
||||
tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second)
|
||||
tikv.RecordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond)
|
||||
snapshot := s.store.GetSnapshot(0)
|
||||
snapshot.SetOption(kv.CollectRuntimeStats, &tikv.SnapshotRuntimeStats{})
|
||||
snapshot.MergeRegionRequestStats(reqStats.Stats)
|
||||
snapshot.MergeRegionRequestStats(reqStats.Stats)
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil)
|
||||
err := bo.BackoffWithMaxSleep(tikv.BoTxnLockFast, 30, errors.New("test"))
|
||||
c.Assert(err, IsNil)
|
||||
snapshot.recordBackoffInfo(bo)
|
||||
snapshot.recordBackoffInfo(bo)
|
||||
snapshot.RecordBackoffInfo(bo)
|
||||
snapshot.RecordBackoffInfo(bo)
|
||||
expect := "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}"
|
||||
c.Assert(snapshot.mu.stats.String(), Equals, expect)
|
||||
c.Assert(snapshot.FormatStats(), Equals, expect)
|
||||
detail := &pb.ExecDetailsV2{
|
||||
TimeDetail: &pb.TimeDetail{
|
||||
WaitWallTimeMs: 100,
|
||||
@ -295,7 +295,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) {
|
||||
RocksdbBlockCacheHitCount: 10,
|
||||
},
|
||||
}
|
||||
snapshot.mergeExecDetail(detail)
|
||||
snapshot.MergeExecDetail(detail)
|
||||
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " +
|
||||
"total_process_time: 100ms, total_wait_time: 100ms, " +
|
||||
"scan_detail: {total_process_keys: 10, " +
|
||||
@ -303,8 +303,8 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) {
|
||||
"rocksdb: {delete_skipped_count: 5, " +
|
||||
"key_skipped_count: 1, " +
|
||||
"block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}"
|
||||
c.Assert(snapshot.mu.stats.String(), Equals, expect)
|
||||
snapshot.mergeExecDetail(detail)
|
||||
c.Assert(snapshot.FormatStats(), Equals, expect)
|
||||
snapshot.MergeExecDetail(detail)
|
||||
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " +
|
||||
"total_process_time: 200ms, total_wait_time: 200ms, " +
|
||||
"scan_detail: {total_process_keys: 20, " +
|
||||
@ -312,5 +312,5 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) {
|
||||
"rocksdb: {delete_skipped_count: 10, " +
|
||||
"key_skipped_count: 2, " +
|
||||
"block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
|
||||
c.Assert(snapshot.mu.stats.String(), Equals, expect)
|
||||
c.Assert(snapshot.FormatStats(), Equals, expect)
|
||||
}
|
||||
@ -16,7 +16,6 @@ package tikv
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@ -24,7 +23,6 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/store/mockstore/unistore"
|
||||
"github.com/pingcap/tidb/store/tikv/config"
|
||||
"github.com/pingcap/tidb/store/tikv/util/codec"
|
||||
pd "github.com/tikv/pd/client"
|
||||
)
|
||||
|
||||
@ -80,17 +78,3 @@ func clearStorage(store *KVStore) error {
|
||||
}
|
||||
return txn.Commit(context.Background())
|
||||
}
|
||||
|
||||
func encodeKey(prefix, s string) []byte {
|
||||
return codec.EncodeBytes(nil, []byte(fmt.Sprintf("%s_%s", prefix, s)))
|
||||
}
|
||||
|
||||
func valueBytes(n int) []byte {
|
||||
return []byte(fmt.Sprintf("value%d", n))
|
||||
}
|
||||
|
||||
// s08d is for returning format string "%s%08d" to keep string sorted.
|
||||
// e.g.: "0002" < "0011", otherwise "2" > "11"
|
||||
func s08d(prefix string, n int) string {
|
||||
return fmt.Sprintf("%s%08d", prefix, n)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user