diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_serial_test.go similarity index 59% rename from store/gcworker/gc_worker_test.go rename to store/gcworker/gc_worker_serial_test.go index 86828f00a8..8cb3f01797 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_serial_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" @@ -37,8 +36,9 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/oracle/oracles" "github.com/tikv/client-go/v2/testutils" @@ -48,19 +48,45 @@ import ( pd "github.com/tikv/pd/client" ) -func TestT(t *testing.T) { - tikv.EnableFailpoints() - TestingT(t) +type mockGCWorkerClient struct { + tikv.Client + unsafeDestroyRangeHandler handler + physicalScanLockHandler handler + registerLockObserverHandler handler + checkLockObserverHandler handler + removeLockObserverHandler handler } -type testGCWorkerSuite struct { +type handler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) + +func (c *mockGCWorkerClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if req.Type == tikvrpc.CmdUnsafeDestroyRange && c.unsafeDestroyRangeHandler != nil { + return c.unsafeDestroyRangeHandler(addr, req) + } + if req.Type == tikvrpc.CmdPhysicalScanLock && c.physicalScanLockHandler != nil { + return c.physicalScanLockHandler(addr, req) + } + if req.Type == tikvrpc.CmdRegisterLockObserver && c.registerLockObserverHandler != nil { + return c.registerLockObserverHandler(addr, req) + } + if req.Type == tikvrpc.CmdCheckLockObserver && c.checkLockObserverHandler != nil { + return c.checkLockObserverHandler(addr, req) + } + if req.Type == tikvrpc.CmdRemoveLockObserver && c.removeLockObserverHandler != nil { + return c.removeLockObserverHandler(addr, req) + } + + return c.Client.SendRequest(ctx, addr, req, timeout) +} + +type mockGCWorkerSuite struct { store kv.Storage tikvStore tikv.Storage cluster testutils.Cluster oracle *oracles.MockOracle gcWorker *GCWorker dom *domain.Domain - client *testGCWorkerClient + client *mockGCWorkerClient pdClient pd.Client initRegion struct { storeIDs []uint64 @@ -69,18 +95,16 @@ type testGCWorkerSuite struct { } } -var _ = SerialSuites(&testGCWorkerSuite{}) +func createGCWorkerSuite(t *testing.T) (s *mockGCWorkerSuite, clean func()) { + s = new(mockGCWorkerSuite) -func (s *testGCWorkerSuite) SetUpTest(c *C) { hijackClient := func(client tikv.Client) tikv.Client { - s.client = &testGCWorkerClient{ - Client: client, - } + s.client = &mockGCWorkerClient{Client: client} client = s.client return client } - store, err := mockstore.NewMockStore( + opts := []mockstore.MockTiKVStoreOption{ mockstore.WithStoreType(mockstore.MockTiKV), mockstore.WithClusterInspector(func(c testutils.Cluster) { s.initRegion.storeIDs, s.initRegion.peerIDs, s.initRegion.regionID, _ = mockstore.BootstrapWithMultiStores(c, 3) @@ -91,99 +115,90 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) { s.pdClient = c return c }), - ) - c.Assert(err, IsNil) + } - s.store = store - s.tikvStore = store.(tikv.Storage) + s.store, s.dom, clean = testkit.CreateMockStoreAndDomain(t, opts...) + s.tikvStore = s.store.(tikv.Storage) + + s.tikvStore.GetOracle().Close() s.oracle = &oracles.MockOracle{} s.tikvStore.SetOracle(s.oracle) - s.dom, err = session.BootstrapSession(s.store) - c.Assert(err, IsNil) gcWorker, err := NewGCWorker(s.store, s.pdClient) - c.Assert(err, IsNil) + require.NoError(t, err) gcWorker.Start() gcWorker.Close() s.gcWorker = gcWorker + + return } -func (s *testGCWorkerSuite) TearDownTest(c *C) { - s.dom.Close() - err := s.store.Close() - c.Assert(err, IsNil) -} - -func (s *testGCWorkerSuite) timeEqual(c *C, t1, t2 time.Time, epsilon time.Duration) { - c.Assert(math.Abs(float64(t1.Sub(t2))), Less, float64(epsilon)) -} - -func (s *testGCWorkerSuite) mustPut(c *C, key, value string) { +func (s *mockGCWorkerSuite) mustPut(t *testing.T, key, value string) { txn, err := s.store.Begin() - c.Assert(err, IsNil) + require.NoError(t, err) err = txn.Set([]byte(key), []byte(value)) - c.Assert(err, IsNil) + require.NoError(t, err) err = txn.Commit(context.Background()) - c.Assert(err, IsNil) + require.NoError(t, err) } -func (s *testGCWorkerSuite) mustGet(c *C, key string, ts uint64) string { +func (s *mockGCWorkerSuite) mustGet(t *testing.T, key string, ts uint64) string { snap := s.store.GetSnapshot(kv.Version{Ver: ts}) value, err := snap.Get(context.TODO(), []byte(key)) - c.Assert(err, IsNil) + require.NoError(t, err) return string(value) } -func (s *testGCWorkerSuite) mustGetNone(c *C, key string, ts uint64) { +func (s *mockGCWorkerSuite) mustGetNone(t *testing.T, key string, ts uint64) { snap := s.store.GetSnapshot(kv.Version{Ver: ts}) _, err := snap.Get(context.TODO(), []byte(key)) if err != nil { - // Unistore's gc is based on compaction filter. + // unistore gc is based on compaction filter. // So skip the error check if err == nil. - c.Assert(kv.ErrNotExist.Equal(err), IsTrue) + require.True(t, kv.ErrNotExist.Equal(err)) } } -func (s *testGCWorkerSuite) mustAllocTs(c *C) uint64 { +func (s *mockGCWorkerSuite) mustAllocTs(t *testing.T) uint64 { ts, err := s.oracle.GetTimestamp(context.Background(), &oracle.Option{}) - c.Assert(err, IsNil) + require.NoError(t, err) return ts } -func (s *testGCWorkerSuite) mustGetSafePointFromPd(c *C) uint64 { +func (s *mockGCWorkerSuite) mustGetSafePointFromPd(t *testing.T) uint64 { // UpdateGCSafePoint returns the newest safePoint after the updating, which can be used to check whether the // safePoint is successfully uploaded. safePoint, err := s.pdClient.UpdateGCSafePoint(context.Background(), 0) - c.Assert(err, IsNil) + require.NoError(t, err) return safePoint } -func (s *testGCWorkerSuite) mustGetMinServiceSafePointFromPd(c *C) uint64 { +func (s *mockGCWorkerSuite) mustGetMinServiceSafePointFromPd(t *testing.T) uint64 { // UpdateServiceGCSafePoint returns the minimal service safePoint. If trying to update it with a value less than the // current minimal safePoint, nothing will be updated and the current minimal one will be returned. So we can use // this API to check the current safePoint. // This function shouldn't be invoked when there's no service safePoint set. minSafePoint, err := s.pdClient.UpdateServiceGCSafePoint(context.Background(), "test", 0, 0) - c.Assert(err, IsNil) + require.NoError(t, err) return minSafePoint } -func (s *testGCWorkerSuite) mustUpdateServiceGCSafePoint(c *C, serviceID string, safePoint, expectedMinSafePoint uint64) { +func (s *mockGCWorkerSuite) mustUpdateServiceGCSafePoint(t *testing.T, serviceID string, safePoint, expectedMinSafePoint uint64) { minSafePoint, err := s.pdClient.UpdateServiceGCSafePoint(context.Background(), serviceID, math.MaxInt64, safePoint) - c.Assert(err, IsNil) - c.Assert(minSafePoint, Equals, expectedMinSafePoint) + require.NoError(t, err) + require.Equal(t, expectedMinSafePoint, minSafePoint) } -func (s *testGCWorkerSuite) mustRemoveServiceGCSafePoint(c *C, serviceID string, safePoint, expectedMinSafePoint uint64) { +func (s *mockGCWorkerSuite) mustRemoveServiceGCSafePoint(t *testing.T, serviceID string, safePoint, expectedMinSafePoint uint64) { minSafePoint, err := s.pdClient.UpdateServiceGCSafePoint(context.Background(), serviceID, 0, safePoint) - c.Assert(err, IsNil) - c.Assert(minSafePoint, Equals, expectedMinSafePoint) + require.NoError(t, err) + require.Equal(t, expectedMinSafePoint, minSafePoint) } -func (s *testGCWorkerSuite) mustSetTiDBServiceSafePoint(c *C, safePoint, expectedMinSafePoint uint64) { +func (s *mockGCWorkerSuite) mustSetTiDBServiceSafePoint(t *testing.T, safePoint, expectedMinSafePoint uint64) { minSafePoint, err := s.gcWorker.setGCWorkerServiceSafePoint(context.Background(), safePoint) - c.Assert(err, IsNil) - c.Assert(minSafePoint, Equals, expectedMinSafePoint) + require.NoError(t, err) + require.Equal(t, expectedMinSafePoint, minSafePoint) } // gcProbe represents a key that contains multiple versions, one of which should be collected. Execution of GC with @@ -199,351 +214,379 @@ type gcProbe struct { } // createGCProbe creates gcProbe on specified key. -func (s *testGCWorkerSuite) createGCProbe(c *C, key string) *gcProbe { - s.mustPut(c, key, "v1") - ts1 := s.mustAllocTs(c) - s.mustPut(c, key, "v2") - ts2 := s.mustAllocTs(c) +func (s *mockGCWorkerSuite) createGCProbe(t *testing.T, key string) *gcProbe { + s.mustPut(t, key, "v1") + ts1 := s.mustAllocTs(t) + s.mustPut(t, key, "v2") + ts2 := s.mustAllocTs(t) p := &gcProbe{ key: key, v1Ts: ts1, v2Ts: ts2, } - s.checkNotCollected(c, p) + s.checkNotCollected(t, p) return p } // checkCollected asserts the gcProbe has been correctly collected. -func (s *testGCWorkerSuite) checkCollected(c *C, p *gcProbe) { - s.mustGetNone(c, p.key, p.v1Ts) - c.Assert(s.mustGet(c, p.key, p.v2Ts), Equals, "v2") +func (s *mockGCWorkerSuite) checkCollected(t *testing.T, p *gcProbe) { + s.mustGetNone(t, p.key, p.v1Ts) + require.Equal(t, "v2", s.mustGet(t, p.key, p.v2Ts)) } // checkNotCollected asserts the gcProbe has not been collected. -func (s *testGCWorkerSuite) checkNotCollected(c *C, p *gcProbe) { - c.Assert(s.mustGet(c, p.key, p.v1Ts), Equals, "v1") - c.Assert(s.mustGet(c, p.key, p.v2Ts), Equals, "v2") +func (s *mockGCWorkerSuite) checkNotCollected(t *testing.T, p *gcProbe) { + require.Equal(t, "v1", s.mustGet(t, p.key, p.v1Ts)) + require.Equal(t, "v2", s.mustGet(t, p.key, p.v2Ts)) } -func (s *testGCWorkerSuite) TestGetOracleTime(c *C) { +func timeEqual(t *testing.T, t1, t2 time.Time, epsilon time.Duration) { + require.Less(t, math.Abs(float64(t1.Sub(t2))), float64(epsilon)) +} + +func TestGetOracleTime(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + t1, err := s.gcWorker.getOracleTime() - c.Assert(err, IsNil) - s.timeEqual(c, time.Now(), t1, time.Millisecond*10) + require.NoError(t, err) + timeEqual(t, time.Now(), t1, time.Millisecond*10) s.oracle.AddOffset(time.Second * 10) t2, err := s.gcWorker.getOracleTime() - c.Assert(err, IsNil) - s.timeEqual(c, t2, t1.Add(time.Second*10), time.Millisecond*10) + require.NoError(t, err) + timeEqual(t, t2, t1.Add(time.Second*10), time.Millisecond*10) } -func (s *testGCWorkerSuite) TestMinStartTS(c *C) { +func TestMinStartTS(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + ctx := context.Background() spkv := s.tikvStore.GetSafePointKV() err := spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10)) - c.Assert(err, IsNil) + require.NoError(t, err) now := oracle.GoTimeToTS(time.Now()) sp := s.gcWorker.calcSafePointByMinStartTS(ctx, now) - c.Assert(sp, Equals, now) + require.Equal(t, now, sp) err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0") - c.Assert(err, IsNil) + require.NoError(t, err) sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now) - c.Assert(sp, Equals, uint64(0)) + require.Equal(t, uint64(0), sp) err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0") - c.Assert(err, IsNil) + require.NoError(t, err) err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), "1") - c.Assert(err, IsNil) + require.NoError(t, err) sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now) - c.Assert(sp, Equals, uint64(0)) + require.Equal(t, uint64(0), sp) - err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), - strconv.FormatUint(now, 10)) - c.Assert(err, IsNil) - err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), - strconv.FormatUint(now-oracle.ComposeTS(20000, 0), 10)) - c.Assert(err, IsNil) + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(now, 10)) + require.NoError(t, err) + err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), strconv.FormatUint(now-oracle.ComposeTS(20000, 0), 10)) + require.NoError(t, err) sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.ComposeTS(10000, 0)) - c.Assert(sp, Equals, now-oracle.ComposeTS(20000, 0)-1) + require.Equal(t, now-oracle.ComposeTS(20000, 0)-1, sp) } -func (s *testGCWorkerSuite) TestPrepareGC(c *C) { +func TestPrepareGC(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + now, err := s.gcWorker.getOracleTime() - c.Assert(err, IsNil) + require.NoError(t, err) close(s.gcWorker.done) ok, _, err := s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsFalse) + require.NoError(t, err) + require.False(t, ok) lastRun, err := s.gcWorker.loadTime(gcLastRunTimeKey) - c.Assert(err, IsNil) - c.Assert(lastRun, NotNil) + require.NoError(t, err) + require.NotNil(t, lastRun) safePoint, err := s.gcWorker.loadTime(gcSafePointKey) - c.Assert(err, IsNil) - s.timeEqual(c, safePoint.Add(gcDefaultLifeTime), now, 2*time.Second) + require.NoError(t, err) + timeEqual(t, safePoint.Add(gcDefaultLifeTime), now, 2*time.Second) // Change GC run interval. err = s.gcWorker.saveDuration(gcRunIntervalKey, time.Minute*5) - c.Assert(err, IsNil) + require.NoError(t, err) s.oracle.AddOffset(time.Minute * 4) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsFalse) + require.NoError(t, err) + require.False(t, ok) s.oracle.AddOffset(time.Minute * 2) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsTrue) + require.NoError(t, err) + require.True(t, ok) - // Change GC life time. + // Change GC lifetime. err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) - c.Assert(err, IsNil) + require.NoError(t, err) s.oracle.AddOffset(time.Minute * 5) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsFalse) + require.NoError(t, err) + require.False(t, ok) s.oracle.AddOffset(time.Minute * 40) now, err = s.gcWorker.getOracleTime() - c.Assert(err, IsNil) + require.NoError(t, err) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsTrue) + require.NoError(t, err) + require.True(t, ok) safePoint, err = s.gcWorker.loadTime(gcSafePointKey) - c.Assert(err, IsNil) - s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second) + require.NoError(t, err) + timeEqual(t, safePoint.Add(time.Minute*30), now, 2*time.Second) // Change GC concurrency. concurrency, err := s.gcWorker.loadGCConcurrencyWithDefault() - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcDefaultConcurrency) + require.NoError(t, err) + require.Equal(t, gcDefaultConcurrency, concurrency) err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency)) - c.Assert(err, IsNil) + require.NoError(t, err) concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcMinConcurrency) + require.NoError(t, err) + require.Equal(t, gcMinConcurrency, concurrency) err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(-1)) - c.Assert(err, IsNil) + require.NoError(t, err) concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcMinConcurrency) + require.NoError(t, err) + require.Equal(t, gcMinConcurrency, concurrency) err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(1000000)) - c.Assert(err, IsNil) + require.NoError(t, err) concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcMaxConcurrency) + require.NoError(t, err) + require.Equal(t, gcMaxConcurrency, concurrency) // Change GC enable status. s.oracle.AddOffset(time.Minute * 40) err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) - c.Assert(err, IsNil) + require.NoError(t, err) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsFalse) + require.NoError(t, err) + require.False(t, ok) err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) - c.Assert(err, IsNil) + require.NoError(t, err) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsTrue) + require.NoError(t, err) + require.True(t, ok) - // Check gc life time small than min. + // Check gc lifetime smaller than min. s.oracle.AddOffset(time.Minute * 40) err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute) - c.Assert(err, IsNil) + require.NoError(t, err) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsTrue) + require.NoError(t, err) + require.True(t, ok) lifeTime, err := s.gcWorker.loadDuration(gcLifeTimeKey) - c.Assert(err, IsNil) - c.Assert(*lifeTime, Equals, gcMinLifeTime) + require.NoError(t, err) + require.Equal(t, gcMinLifeTime, *lifeTime) s.oracle.AddOffset(time.Minute * 40) err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) - c.Assert(err, IsNil) + require.NoError(t, err) ok, _, err = s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsTrue) + require.NoError(t, err) + require.True(t, ok) lifeTime, err = s.gcWorker.loadDuration(gcLifeTimeKey) - c.Assert(err, IsNil) - c.Assert(*lifeTime, Equals, 30*time.Minute) + require.NoError(t, err) + require.Equal(t, 30*time.Minute, *lifeTime) // Change auto concurrency err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) - c.Assert(err, IsNil) + require.NoError(t, err) useAutoConcurrency, err := s.gcWorker.checkUseAutoConcurrency() - c.Assert(err, IsNil) - c.Assert(useAutoConcurrency, IsFalse) + require.NoError(t, err) + require.False(t, useAutoConcurrency) err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) - c.Assert(err, IsNil) + require.NoError(t, err) useAutoConcurrency, err = s.gcWorker.checkUseAutoConcurrency() - c.Assert(err, IsNil) - c.Assert(useAutoConcurrency, IsTrue) + require.NoError(t, err) + require.True(t, useAutoConcurrency) // Check skipping GC if safe point is not changed. safePointTime, err := s.gcWorker.loadTime(gcSafePointKey) minStartTS := oracle.GoTimeToTS(*safePointTime) + 1 - c.Assert(err, IsNil) + require.NoError(t, err) spkv := s.tikvStore.GetSafePointKV() err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(minStartTS, 10)) - c.Assert(err, IsNil) + require.NoError(t, err) s.oracle.AddOffset(time.Minute * 40) ok, safepoint, err := s.gcWorker.prepare() - c.Assert(err, IsNil) - c.Assert(ok, IsFalse) - c.Assert(safepoint, Equals, uint64(0)) + require.NoError(t, err) + require.False(t, ok) + require.Equal(t, uint64(0), safepoint) } -func (s *testGCWorkerSuite) TestStatusVars(c *C) { +func TestStatusVars(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + // Status variables should now exist for: // tidb_gc_safe_point, tidb_gc_last_run_time se := createSession(s.gcWorker.store) defer se.Close() safePoint, err := s.gcWorker.loadValueFromSysTable(gcSafePointKey) - c.Assert(err, IsNil) + require.NoError(t, err) lastRunTime, err := s.gcWorker.loadValueFromSysTable(gcLastRunTimeKey) - c.Assert(err, IsNil) + require.NoError(t, err) statusVars, _ := s.gcWorker.Stats(se.GetSessionVars()) val, ok := statusVars[tidbGCSafePoint] - c.Assert(ok, IsTrue) - c.Assert(val, Equals, safePoint) + require.True(t, ok) + require.Equal(t, safePoint, val) val, ok = statusVars[tidbGCLastRunTime] - c.Assert(ok, IsTrue) - c.Assert(val, Equals, lastRunTime) + require.True(t, ok) + require.Equal(t, lastRunTime, val) } -func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { +func TestDoGCForOneRegion(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + ctx := context.Background() bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil) loc, err := s.tikvStore.GetRegionCache().LocateKey(bo, []byte("")) - c.Assert(err, IsNil) + require.NoError(t, err) var regionErr *errorpb.Error - p := s.createGCProbe(c, "k1") - regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(c), loc.Region) - c.Assert(regionErr, IsNil) - c.Assert(err, IsNil) - s.checkCollected(c, p) + p := s.createGCProbe(t, "k1") + regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(t), loc.Region) + require.Nil(t, regionErr) + require.NoError(t, err) + s.checkCollected(t, p) - c.Assert(failpoint.Enable("tikvclient/tikvStoreSendReqResult", `return("timeout")`), IsNil) - regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(c), loc.Region) - c.Assert(regionErr, IsNil) - c.Assert(err, NotNil) - c.Assert(failpoint.Disable("tikvclient/tikvStoreSendReqResult"), IsNil) + require.NoError(t, failpoint.Enable("tikvclient/tikvStoreSendReqResult", `return("timeout")`)) + regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(t), loc.Region) + require.Nil(t, regionErr) + require.Error(t, err) + require.NoError(t, failpoint.Disable("tikvclient/tikvStoreSendReqResult")) - c.Assert(failpoint.Enable("tikvclient/tikvStoreSendReqResult", `return("GCNotLeader")`), IsNil) - regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(c), loc.Region) - c.Assert(regionErr.GetNotLeader(), NotNil) - c.Assert(err, IsNil) - c.Assert(failpoint.Disable("tikvclient/tikvStoreSendReqResult"), IsNil) + require.NoError(t, failpoint.Enable("tikvclient/tikvStoreSendReqResult", `return("GCNotLeader")`)) + regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(t), loc.Region) + require.NoError(t, err) + require.NotNil(t, regionErr.GetNotLeader()) + require.NoError(t, failpoint.Disable("tikvclient/tikvStoreSendReqResult")) - c.Assert(failpoint.Enable("tikvclient/tikvStoreSendReqResult", `return("GCServerIsBusy")`), IsNil) - regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(c), loc.Region) - c.Assert(regionErr.GetServerIsBusy(), NotNil) - c.Assert(err, IsNil) - c.Assert(failpoint.Disable("tikvclient/tikvStoreSendReqResult"), IsNil) + require.NoError(t, failpoint.Enable("tikvclient/tikvStoreSendReqResult", `return("GCServerIsBusy")`)) + regionErr, err = s.gcWorker.doGCForRegion(bo, s.mustAllocTs(t), loc.Region) + require.NoError(t, err) + require.NotNil(t, regionErr.GetServerIsBusy()) + require.NoError(t, failpoint.Disable("tikvclient/tikvStoreSendReqResult")) } -func (s *testGCWorkerSuite) TestGetGCConcurrency(c *C) { +func TestGetGCConcurrency(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + // Pick a concurrency that doesn't equal to the number of stores. concurrencyConfig := 25 - c.Assert(concurrencyConfig, Not(Equals), len(s.cluster.GetAllStores())) + require.NotEqual(t, len(s.cluster.GetAllStores()), concurrencyConfig) err := s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(concurrencyConfig)) - c.Assert(err, IsNil) + require.NoError(t, err) ctx := context.Background() err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) - c.Assert(err, IsNil) + require.NoError(t, err) concurrency, err := s.gcWorker.getGCConcurrency(ctx) - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, concurrencyConfig) + require.NoError(t, err) + require.Equal(t, concurrencyConfig, concurrency) err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) - c.Assert(err, IsNil) + require.NoError(t, err) concurrency, err = s.gcWorker.getGCConcurrency(ctx) - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, len(s.cluster.GetAllStores())) + require.NoError(t, err) + require.Len(t, s.cluster.GetAllStores(), concurrency) } -func (s *testGCWorkerSuite) TestDoGC(c *C) { - var err error - ctx := context.Background() +func TestDoGC(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + ctx := context.Background() gcSafePointCacheInterval = 1 - p := s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcDefaultConcurrency) - c.Assert(err, IsNil) - s.checkCollected(c, p) + p := s.createGCProbe(t, "k1") + err := s.gcWorker.doGC(ctx, s.mustAllocTs(t), gcDefaultConcurrency) + require.NoError(t, err) + s.checkCollected(t, p) - p = s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcMinConcurrency) - c.Assert(err, IsNil) - s.checkCollected(c, p) + p = s.createGCProbe(t, "k1") + err = s.gcWorker.doGC(ctx, s.mustAllocTs(t), gcMinConcurrency) + require.NoError(t, err) + s.checkCollected(t, p) - p = s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcMaxConcurrency) - c.Assert(err, IsNil) - s.checkCollected(c, p) + p = s.createGCProbe(t, "k1") + err = s.gcWorker.doGC(ctx, s.mustAllocTs(t), gcMaxConcurrency) + require.NoError(t, err) + s.checkCollected(t, p) } -func (s *testGCWorkerSuite) TestCheckGCMode(c *C) { +func TestCheckGCMode(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + useDistributedGC := s.gcWorker.checkUseDistributedGC() - c.Assert(useDistributedGC, Equals, true) + require.True(t, useDistributedGC) // Now the row must be set to the default value. str, err := s.gcWorker.loadValueFromSysTable(gcModeKey) - c.Assert(err, IsNil) - c.Assert(str, Equals, gcModeDistributed) + require.NoError(t, err) + require.Equal(t, gcModeDistributed, str) // Central mode is deprecated in v5.0. err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) - c.Assert(err, IsNil) + require.NoError(t, err) useDistributedGC = s.gcWorker.checkUseDistributedGC() - c.Assert(err, IsNil) - c.Assert(useDistributedGC, Equals, true) + require.NoError(t, err) + require.True(t, useDistributedGC) err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeDistributed) - c.Assert(err, IsNil) + require.NoError(t, err) useDistributedGC = s.gcWorker.checkUseDistributedGC() - c.Assert(useDistributedGC, Equals, true) + require.True(t, useDistributedGC) err = s.gcWorker.saveValueToSysTable(gcModeKey, "invalid_mode") - c.Assert(err, IsNil) + require.NoError(t, err) useDistributedGC = s.gcWorker.checkUseDistributedGC() - c.Assert(useDistributedGC, Equals, true) + require.True(t, useDistributedGC) } -func (s *testGCWorkerSuite) TestCheckScanLockMode(c *C) { +func TestCheckScanLockMode(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + usePhysical, err := s.gcWorker.checkUsePhysicalScanLock() - c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, false) - c.Assert(usePhysical, Equals, gcScanLockModeDefault == gcScanLockModePhysical) + require.NoError(t, err) + require.False(t, usePhysical) + require.Equal(t, usePhysical, gcScanLockModeDefault == gcScanLockModePhysical) + // Now the row must be set to the default value. str, err := s.gcWorker.loadValueFromSysTable(gcScanLockModeKey) - c.Assert(err, IsNil) - c.Assert(str, Equals, gcScanLockModeDefault) + require.NoError(t, err) + require.Equal(t, gcScanLockModeDefault, str) err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModePhysical) - c.Assert(err, IsNil) + require.NoError(t, err) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() - c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, true) + require.NoError(t, err) + require.True(t, usePhysical) err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModeLegacy) - c.Assert(err, IsNil) + require.NoError(t, err) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() - c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, false) + require.NoError(t, err) + require.False(t, usePhysical) err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, "invalid_mode") - c.Assert(err, IsNil) + require.NoError(t, err) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() - c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, false) + require.NoError(t, err) + require.False(t, usePhysical) } -func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) { +func TestNeedsGCOperationForStore(t *testing.T) { newStore := func(state metapb.StoreState, hasEngineLabel bool, engineLabel string) *metapb.Store { store := &metapb.Store{} store.State = state @@ -557,23 +600,23 @@ func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) { for _, state := range []metapb.StoreState{metapb.StoreState_Up, metapb.StoreState_Offline, metapb.StoreState_Tombstone} { needGC := state != metapb.StoreState_Tombstone res, err := needsGCOperationForStore(newStore(state, false, "")) - c.Assert(err, IsNil) - c.Assert(res, Equals, needGC) + require.NoError(t, err) + require.Equal(t, needGC, res) res, err = needsGCOperationForStore(newStore(state, true, "")) - c.Assert(err, IsNil) - c.Assert(res, Equals, needGC) + require.NoError(t, err) + require.Equal(t, needGC, res) res, err = needsGCOperationForStore(newStore(state, true, placement.EngineLabelTiKV)) - c.Assert(err, IsNil) - c.Assert(res, Equals, needGC) + require.NoError(t, err) + require.Equal(t, needGC, res) // TiFlash does not need these operations. res, err = needsGCOperationForStore(newStore(state, true, placement.EngineLabelTiFlash)) - c.Assert(err, IsNil) - c.Assert(res, IsFalse) + require.NoError(t, err) + require.False(t, res) } // Throw an error for unknown store types. _, err := needsGCOperationForStore(newStore(metapb.StoreState_Up, true, "invalid")) - c.Assert(err, NotNil) + require.Error(t, err) } const ( @@ -582,132 +625,143 @@ const ( failErrResp = 2 ) -func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC", "return(1)"), IsNil) - defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC"), IsNil) - }() - - // Put some delete range tasks. - se := createSession(s.gcWorker.store) - defer se.Close() - _, err := se.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES - ("1", "2", "31", "32", "10"), - ("3", "4", "33", "34", "10"), - ("5", "6", "35", "36", "10")`) - c.Assert(err, IsNil) - - ranges := []util.DelRangeTask{ - { - JobID: 1, - ElementID: 2, - StartKey: []byte("1"), - EndKey: []byte("2"), - }, - { - JobID: 3, - ElementID: 4, - StartKey: []byte("3"), - EndKey: []byte("4"), - }, - { - JobID: 5, - ElementID: 6, - StartKey: []byte("5"), - EndKey: []byte("6"), - }, +func TestDeleteRangesFailure(t *testing.T) { + tests := []struct { + name string + failType int + }{ + {"failRPCErr", failRPCErr}, + {"failNilResp", failNilResp}, + {"failErrResp", failErrResp}, } - // Check the delete range tasks. - preparedRanges, err := util.LoadDeleteRanges(se, 20) - se.Close() - c.Assert(err, IsNil) - c.Assert(preparedRanges, DeepEquals, ranges) + s, clean := createGCWorkerSuite(t) + defer clean() - stores, err := s.gcWorker.getStoresForGC(context.Background()) - c.Assert(err, IsNil) - c.Assert(len(stores), Equals, 3) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + failType := test.failType + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC", "return(1)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC")) + }() - // Sort by address for checking. - sort.Slice(stores, func(i, j int) bool { return stores[i].Address < stores[j].Address }) + // Put some delete range tasks. + se := createSession(s.gcWorker.store) + defer se.Close() + _, err := se.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES +("1", "2", "31", "32", "10"), +("3", "4", "33", "34", "10"), +("5", "6", "35", "36", "10")`) + require.NoError(t, err) - sendReqCh := make(chan SentReq, 20) - - // The request sent to the specified key and store wil fail. - var ( - failKey []byte - failStore *metapb.Store - ) - s.client.unsafeDestroyRangeHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - sendReqCh <- SentReq{req, addr} - resp := &tikvrpc.Response{ - Resp: &kvrpcpb.UnsafeDestroyRangeResponse{}, - } - if bytes.Equal(req.UnsafeDestroyRange().GetStartKey(), failKey) && addr == failStore.GetAddress() { - if failType == failRPCErr { - return nil, errors.New("error") - } else if failType == failNilResp { - resp.Resp = nil - } else if failType == failErrResp { - (resp.Resp.(*kvrpcpb.UnsafeDestroyRangeResponse)).Error = "error" - } else { - panic("unreachable") + ranges := []util.DelRangeTask{ + { + JobID: 1, + ElementID: 2, + StartKey: []byte("1"), + EndKey: []byte("2"), + }, + { + JobID: 3, + ElementID: 4, + StartKey: []byte("3"), + EndKey: []byte("4"), + }, + { + JobID: 5, + ElementID: 6, + StartKey: []byte("5"), + EndKey: []byte("6"), + }, } - } - return resp, nil + + // Check the DeleteRanges tasks. + preparedRanges, err := util.LoadDeleteRanges(se, 20) + se.Close() + require.NoError(t, err) + require.Equal(t, ranges, preparedRanges) + + stores, err := s.gcWorker.getStoresForGC(context.Background()) + require.NoError(t, err) + require.Len(t, stores, 3) + + // Sort by address for checking. + sort.Slice(stores, func(i, j int) bool { return stores[i].Address < stores[j].Address }) + + sendReqCh := make(chan SentReq, 20) + + // The request sent to the specified key and store wil fail. + var ( + failKey []byte + failStore *metapb.Store + ) + s.client.unsafeDestroyRangeHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + sendReqCh <- SentReq{req, addr} + resp := &tikvrpc.Response{ + Resp: &kvrpcpb.UnsafeDestroyRangeResponse{}, + } + if bytes.Equal(req.UnsafeDestroyRange().GetStartKey(), failKey) && addr == failStore.GetAddress() { + if failType == failRPCErr { + return nil, errors.New("error") + } else if failType == failNilResp { + resp.Resp = nil + } else if failType == failErrResp { + (resp.Resp.(*kvrpcpb.UnsafeDestroyRangeResponse)).Error = "error" + } else { + panic("unreachable") + } + } + return resp, nil + } + defer func() { s.client.unsafeDestroyRangeHandler = nil }() + + // Make the logic in a closure to reduce duplicated code that tests deleteRanges and + test := func(redo bool) { + deleteRangeFunc := s.gcWorker.deleteRanges + loadRangesFunc := util.LoadDeleteRanges + if redo { + deleteRangeFunc = s.gcWorker.redoDeleteRanges + loadRangesFunc = util.LoadDoneDeleteRanges + } + + // Make the first request fail. + failKey = ranges[0].StartKey + failStore = stores[0] + + err = deleteRangeFunc(context.Background(), 20, 1) + require.NoError(t, err) + + s.checkDestroyRangeReq(t, sendReqCh, ranges, stores) + + // The first delete range task should be still here since it didn't success. + se = createSession(s.gcWorker.store) + remainingRanges, err := loadRangesFunc(se, 20) + se.Close() + require.NoError(t, err) + require.Equal(t, ranges[:1], remainingRanges) + + failKey = nil + failStore = nil + + // Delete the remaining range again. + err = deleteRangeFunc(context.Background(), 20, 1) + require.NoError(t, err) + s.checkDestroyRangeReq(t, sendReqCh, ranges[:1], stores) + + se = createSession(s.gcWorker.store) + remainingRanges, err = loadRangesFunc(se, 20) + se.Close() + require.NoError(t, err) + require.Len(t, remainingRanges, 0) + } + + test(false) + // Change the order because the first range is the last successfully deleted. + ranges = append(ranges[1:], ranges[0]) + test(true) + }) } - defer func() { s.client.unsafeDestroyRangeHandler = nil }() - - // Make the logic in a closure to reduce duplicated code that tests deleteRanges and - test := func(redo bool) { - deleteRangeFunc := s.gcWorker.deleteRanges - loadRangesFunc := util.LoadDeleteRanges - if redo { - deleteRangeFunc = s.gcWorker.redoDeleteRanges - loadRangesFunc = util.LoadDoneDeleteRanges - } - - // Make the first request fail. - failKey = ranges[0].StartKey - failStore = stores[0] - - err = deleteRangeFunc(context.Background(), 20, 1) - c.Assert(err, IsNil) - - s.checkDestroyRangeReq(c, sendReqCh, ranges, stores) - - // The first delete range task should be still here since it didn't success. - se = createSession(s.gcWorker.store) - remainingRanges, err := loadRangesFunc(se, 20) - se.Close() - c.Assert(err, IsNil) - c.Assert(remainingRanges, DeepEquals, ranges[:1]) - - failKey = nil - failStore = nil - - // Delete the remaining range again. - err = deleteRangeFunc(context.Background(), 20, 1) - c.Assert(err, IsNil) - s.checkDestroyRangeReq(c, sendReqCh, ranges[:1], stores) - - se = createSession(s.gcWorker.store) - remainingRanges, err = loadRangesFunc(se, 20) - se.Close() - c.Assert(err, IsNil) - c.Assert(len(remainingRanges), Equals, 0) - } - - test(false) - // Change the order because the first range is the last successfully deleted. - ranges = append(ranges[1:], ranges[0]) - test(true) -} - -func (s *testGCWorkerSuite) TestDeleteRangesFailure(c *C) { - s.testDeleteRangesFailureImpl(c, failRPCErr) - s.testDeleteRangesFailureImpl(c, failNilResp) - s.testDeleteRangesFailureImpl(c, failErrResp) } type SentReq struct { @@ -716,7 +770,7 @@ type SentReq struct { } // checkDestroyRangeReq checks whether given sentReq matches given ranges and stores. -func (s *testGCWorkerSuite) checkDestroyRangeReq(c *C, sendReqCh chan SentReq, expectedRanges []util.DelRangeTask, expectedStores []*metapb.Store) { +func (s *mockGCWorkerSuite) checkDestroyRangeReq(t *testing.T, sendReqCh chan SentReq, expectedRanges []util.DelRangeTask, expectedStores []*metapb.Store) { sentReq := make([]SentReq, 0, len(expectedStores)*len(expectedStores)) Loop: for { @@ -741,47 +795,17 @@ Loop: for rangeIndex := range sortedRanges { for storeIndex := range expectedStores { i := rangeIndex*len(expectedStores) + storeIndex - c.Assert(sentReq[i].addr, Equals, expectedStores[storeIndex].Address) - c.Assert(kv.Key(sentReq[i].req.UnsafeDestroyRange().GetStartKey()), DeepEquals, - sortedRanges[rangeIndex].StartKey) - c.Assert(kv.Key(sentReq[i].req.UnsafeDestroyRange().GetEndKey()), DeepEquals, - sortedRanges[rangeIndex].EndKey) + require.Equal(t, expectedStores[storeIndex].Address, sentReq[i].addr) + require.Equal(t, sortedRanges[rangeIndex].StartKey, kv.Key(sentReq[i].req.UnsafeDestroyRange().GetStartKey())) + require.Equal(t, sortedRanges[rangeIndex].EndKey, kv.Key(sentReq[i].req.UnsafeDestroyRange().GetEndKey())) } } } -type testGCWorkerClient struct { - tikv.Client - unsafeDestroyRangeHandler handler - physicalScanLockHandler handler - registerLockObserverHandler handler - checkLockObserverHandler handler - removeLockObserverHandler handler -} +func TestLeaderTick(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() -type handler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) - -func (c *testGCWorkerClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { - if req.Type == tikvrpc.CmdUnsafeDestroyRange && c.unsafeDestroyRangeHandler != nil { - return c.unsafeDestroyRangeHandler(addr, req) - } - if req.Type == tikvrpc.CmdPhysicalScanLock && c.physicalScanLockHandler != nil { - return c.physicalScanLockHandler(addr, req) - } - if req.Type == tikvrpc.CmdRegisterLockObserver && c.registerLockObserverHandler != nil { - return c.registerLockObserverHandler(addr, req) - } - if req.Type == tikvrpc.CmdCheckLockObserver && c.checkLockObserverHandler != nil { - return c.checkLockObserverHandler(addr, req) - } - if req.Type == tikvrpc.CmdRemoveLockObserver && c.removeLockObserverHandler != nil { - return c.removeLockObserverHandler(addr, req) - } - - return c.Client.SendRequest(ctx, addr, req, timeout) -} - -func (s *testGCWorkerSuite) TestLeaderTick(c *C) { gcSafePointCacheInterval = 0 veryLong := gcDefaultLifeTime * 10 @@ -789,45 +813,45 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { s.gcWorker.lastFinish = time.Now().Add(-veryLong) // Use central mode to do this test. err := s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) - c.Assert(err, IsNil) - p := s.createGCProbe(c, "k1") + require.NoError(t, err) + p := s.createGCProbe(t, "k1") s.oracle.AddOffset(gcDefaultLifeTime * 2) // Skip if GC is running. s.gcWorker.gcIsRunning = true err = s.gcWorker.leaderTick(context.Background()) - c.Assert(err, IsNil) - s.checkNotCollected(c, p) + require.NoError(t, err) + s.checkNotCollected(t, p) s.gcWorker.gcIsRunning = false // Reset GC last run time - err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) - c.Assert(err, IsNil) + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(t)).Add(-veryLong)) + require.NoError(t, err) // Skip if prepare failed (disabling GC will make prepare returns ok = false). err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) - c.Assert(err, IsNil) + require.NoError(t, err) err = s.gcWorker.leaderTick(context.Background()) - c.Assert(err, IsNil) - s.checkNotCollected(c, p) + require.NoError(t, err) + s.checkNotCollected(t, p) err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) - c.Assert(err, IsNil) + require.NoError(t, err) // Reset GC last run time - err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) - c.Assert(err, IsNil) + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(t)).Add(-veryLong)) + require.NoError(t, err) // Skip if gcWaitTime not exceeded. s.gcWorker.lastFinish = time.Now() err = s.gcWorker.leaderTick(context.Background()) - c.Assert(err, IsNil) - s.checkNotCollected(c, p) + require.NoError(t, err) + s.checkNotCollected(t, p) s.gcWorker.lastFinish = time.Now().Add(-veryLong) // Reset GC last run time - err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) - c.Assert(err, IsNil) + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(t)).Add(-veryLong)) + require.NoError(t, err) // Continue GC if all those checks passed. err = s.gcWorker.leaderTick(context.Background()) - c.Assert(err, IsNil) + require.NoError(t, err) // Wait for GC finish select { case err = <-s.gcWorker.done: @@ -836,18 +860,18 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { case <-time.After(time.Second * 10): err = errors.New("receive from s.gcWorker.done timeout") } - c.Assert(err, IsNil) - s.checkCollected(c, p) + require.NoError(t, err) + s.checkCollected(t, p) // Test again to ensure the synchronization between goroutines is correct. - err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) - c.Assert(err, IsNil) + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(t)).Add(-veryLong)) + require.NoError(t, err) s.gcWorker.lastFinish = time.Now().Add(-veryLong) - p = s.createGCProbe(c, "k1") + p = s.createGCProbe(t, "k1") s.oracle.AddOffset(gcDefaultLifeTime * 2) err = s.gcWorker.leaderTick(context.Background()) - c.Assert(err, IsNil) + require.NoError(t, err) // Wait for GC finish select { case err = <-s.gcWorker.done: @@ -856,8 +880,8 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { case <-time.After(time.Second * 10): err = errors.New("receive from s.gcWorker.done timeout") } - c.Assert(err, IsNil) - s.checkCollected(c, p) + require.NoError(t, err) + s.checkCollected(t, p) // No more signals in the channel select { @@ -867,21 +891,27 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { case <-time.After(time.Second): break } - c.Assert(err, IsNil) + require.NoError(t, err) } -func (s *testGCWorkerSuite) TestResolveLockRangeInfine(c *C) { - c.Assert(failpoint.Enable("tikvclient/invalidCacheAndRetry", "return(true)"), IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff", "return(1)"), IsNil) +func TestResolveLockRangeInfine(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + + require.NoError(t, failpoint.Enable("tikvclient/invalidCacheAndRetry", "return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff", "return(1)")) defer func() { - c.Assert(failpoint.Disable("tikvclient/invalidCacheAndRetry"), IsNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff"), IsNil) + require.NoError(t, failpoint.Disable("tikvclient/invalidCacheAndRetry")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff")) }() _, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte{0}, []byte{1}) - c.Assert(err, NotNil) + require.Error(t, err) } -func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) { +func TestResolveLockRangeMeetRegionCacheMiss(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + var ( scanCnt int scanCntRef = &scanCnt @@ -909,12 +939,15 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) { return true, nil } _, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte{0}, []byte{10}) - c.Assert(err, IsNil) - c.Assert(resolveCnt, Equals, 2) - c.Assert(scanCnt, Equals, 1) + require.NoError(t, err) + require.Equal(t, 2, resolveCnt) + require.Equal(t, 1, scanCnt) } -func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(c *C) { +func TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + var ( firstAccess = true firstAccessRef = &firstAccess @@ -948,7 +981,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM tikv.NewNoopBackoff(context.Background()), &tikv.RPCContext{Region: regionID, Store: &tikv.Store{}}, []*metapb.Region{regionMeta}) - c.Assert(err, IsNil) + require.NoError(t, err) // also let region1 contains all 4 locks s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*txnlock.Lock { if regionID == s.initRegion.regionID { @@ -975,136 +1008,151 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM } _, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte(""), []byte("z")) - c.Assert(err, IsNil) - c.Assert(len(resolvedLock), Equals, 4) + require.NoError(t, err) + require.Len(t, resolvedLock, 4) expects := [][]byte{[]byte("a"), []byte("b"), []byte("o"), []byte("p")} for i, l := range resolvedLock { - c.Assert(l, BytesEquals, expects[i]) + require.Equal(t, expects[i], l) } } -func (s *testGCWorkerSuite) TestRunGCJob(c *C) { +func TestRunGCJob(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + gcSafePointCacheInterval = 0 // Test distributed mode useDistributedGC := s.gcWorker.checkUseDistributedGC() - c.Assert(useDistributedGC, IsTrue) - safePoint := s.mustAllocTs(c) + require.True(t, useDistributedGC) + safePoint := s.mustAllocTs(t) err := s.gcWorker.runGCJob(context.Background(), safePoint, 1) - c.Assert(err, IsNil) + require.NoError(t, err) - pdSafePoint := s.mustGetSafePointFromPd(c) - c.Assert(pdSafePoint, Equals, safePoint) + pdSafePoint := s.mustGetSafePointFromPd(t) + require.Equal(t, safePoint, pdSafePoint) - etcdSafePoint := s.loadEtcdSafePoint(c) - c.Assert(etcdSafePoint, Equals, safePoint) + etcdSafePoint := s.loadEtcdSafePoint(t) + require.Equal(t, safePoint, etcdSafePoint) // Test distributed mode with safePoint regressing (although this is impossible) err = s.gcWorker.runGCJob(context.Background(), safePoint-1, 1) - c.Assert(err, NotNil) + require.Error(t, err) // Central mode is deprecated in v5.0, fallback to distributed mode if it's set. err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) - c.Assert(err, IsNil) + require.NoError(t, err) useDistributedGC = s.gcWorker.checkUseDistributedGC() - c.Assert(useDistributedGC, IsTrue) + require.True(t, useDistributedGC) - p := s.createGCProbe(c, "k1") - safePoint = s.mustAllocTs(c) + p := s.createGCProbe(t, "k1") + safePoint = s.mustAllocTs(t) err = s.gcWorker.runGCJob(context.Background(), safePoint, 1) - c.Assert(err, IsNil) - s.checkCollected(c, p) + require.NoError(t, err) + s.checkCollected(t, p) - etcdSafePoint = s.loadEtcdSafePoint(c) - c.Assert(etcdSafePoint, Equals, safePoint) + etcdSafePoint = s.loadEtcdSafePoint(t) + require.Equal(t, safePoint, etcdSafePoint) } -func (s *testGCWorkerSuite) TestSetServiceSafePoint(c *C) { +func TestSetServiceSafePoint(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + // SafePoint calculations are based on time rather than ts value. - safePoint := s.mustAllocTs(c) - s.mustSetTiDBServiceSafePoint(c, safePoint, safePoint) - c.Assert(s.mustGetMinServiceSafePointFromPd(c), Equals, safePoint) + safePoint := s.mustAllocTs(t) + s.mustSetTiDBServiceSafePoint(t, safePoint, safePoint) + require.Equal(t, safePoint, s.mustGetMinServiceSafePointFromPd(t)) // Advance the service safe point safePoint += 100 - s.mustSetTiDBServiceSafePoint(c, safePoint, safePoint) - c.Assert(s.mustGetMinServiceSafePointFromPd(c), Equals, safePoint) + s.mustSetTiDBServiceSafePoint(t, safePoint, safePoint) + require.Equal(t, safePoint, s.mustGetMinServiceSafePointFromPd(t)) // It doesn't matter if there is a greater safePoint from other services. safePoint += 100 // Returns the last service safePoint that were uploaded. - s.mustUpdateServiceGCSafePoint(c, "svc1", safePoint+10, safePoint-100) - s.mustSetTiDBServiceSafePoint(c, safePoint, safePoint) - c.Assert(s.mustGetMinServiceSafePointFromPd(c), Equals, safePoint) + s.mustUpdateServiceGCSafePoint(t, "svc1", safePoint+10, safePoint-100) + s.mustSetTiDBServiceSafePoint(t, safePoint, safePoint) + require.Equal(t, safePoint, s.mustGetMinServiceSafePointFromPd(t)) // Test the case when there is a smaller safePoint from other services. safePoint += 100 // Returns the last service safePoint that were uploaded. - s.mustUpdateServiceGCSafePoint(c, "svc1", safePoint-10, safePoint-100) - s.mustSetTiDBServiceSafePoint(c, safePoint, safePoint-10) - c.Assert(s.mustGetMinServiceSafePointFromPd(c), Equals, safePoint-10) + s.mustUpdateServiceGCSafePoint(t, "svc1", safePoint-10, safePoint-100) + s.mustSetTiDBServiceSafePoint(t, safePoint, safePoint-10) + require.Equal(t, safePoint-10, s.mustGetMinServiceSafePointFromPd(t)) // Test removing the minimum service safe point. - s.mustRemoveServiceGCSafePoint(c, "svc1", safePoint-10, safePoint) - c.Assert(s.mustGetMinServiceSafePointFromPd(c), Equals, safePoint) + s.mustRemoveServiceGCSafePoint(t, "svc1", safePoint-10, safePoint) + require.Equal(t, safePoint, s.mustGetMinServiceSafePointFromPd(t)) // Test the case when there are many safePoints. safePoint += 100 for i := 0; i < 10; i++ { svcName := fmt.Sprintf("svc%d", i) - s.mustUpdateServiceGCSafePoint(c, svcName, safePoint+uint64(i)*10, safePoint-100) + s.mustUpdateServiceGCSafePoint(t, svcName, safePoint+uint64(i)*10, safePoint-100) } - s.mustSetTiDBServiceSafePoint(c, safePoint+50, safePoint) + s.mustSetTiDBServiceSafePoint(t, safePoint+50, safePoint) } -func (s *testGCWorkerSuite) TestRunGCJobAPI(c *C) { +func TestRunGCJobAPI(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + gcSafePointCacheInterval = 0 - p := s.createGCProbe(c, "k1") - safePoint := s.mustAllocTs(c) + p := s.createGCProbe(t, "k1") + safePoint := s.mustAllocTs(t) err := RunGCJob(context.Background(), s.tikvStore, s.pdClient, safePoint, "mock", 1) - c.Assert(err, IsNil) - s.checkCollected(c, p) - etcdSafePoint := s.loadEtcdSafePoint(c) - c.Assert(err, IsNil) - c.Assert(etcdSafePoint, Equals, safePoint) + require.NoError(t, err) + s.checkCollected(t, p) + etcdSafePoint := s.loadEtcdSafePoint(t) + require.NoError(t, err) + require.Equal(t, safePoint, etcdSafePoint) } -func (s *testGCWorkerSuite) TestRunDistGCJobAPI(c *C) { +func TestRunDistGCJobAPI(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + gcSafePointCacheInterval = 0 - safePoint := s.mustAllocTs(c) + safePoint := s.mustAllocTs(t) err := RunDistributedGCJob(context.Background(), s.tikvStore, s.pdClient, safePoint, "mock", 1) - c.Assert(err, IsNil) - pdSafePoint := s.mustGetSafePointFromPd(c) - c.Assert(pdSafePoint, Equals, safePoint) - etcdSafePoint := s.loadEtcdSafePoint(c) - c.Assert(err, IsNil) - c.Assert(etcdSafePoint, Equals, safePoint) + require.NoError(t, err) + pdSafePoint := s.mustGetSafePointFromPd(t) + require.Equal(t, safePoint, pdSafePoint) + etcdSafePoint := s.loadEtcdSafePoint(t) + require.NoError(t, err) + require.Equal(t, safePoint, etcdSafePoint) } -func (s *testGCWorkerSuite) TestStartWithRunGCJobFailures(c *C) { +func TestStartWithRunGCJobFailures(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + s.gcWorker.Start() defer s.gcWorker.Close() for i := 0; i < 3; i++ { select { case <-time.After(100 * time.Millisecond): - c.Fatal("gc worker failed to handle errors") + require.FailNow(t, "gc worker failed to handle errors") case s.gcWorker.done <- errors.New("mock error"): } } } -func (s *testGCWorkerSuite) loadEtcdSafePoint(c *C) uint64 { +func (s *mockGCWorkerSuite) loadEtcdSafePoint(t *testing.T) uint64 { val, err := s.gcWorker.tikvStore.GetSafePointKV().Get(tikv.GcSavedSafePoint) - c.Assert(err, IsNil) + require.NoError(t, err) res, err := strconv.ParseUint(val, 10, 64) - c.Assert(err, IsNil) + require.NoError(t, err) return res } -func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResult, []uint64, <-chan []*txnlock.Lock) { +func makeMergedChannel(t *testing.T, count int) (*mergeLockScanner, []chan scanLockResult, []uint64, <-chan []*txnlock.Lock) { scanner := &mergeLockScanner{} channels := make([]chan scanLockResult, 0, count) receivers := make([]*receiver, 0, count) @@ -1128,16 +1176,17 @@ func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResul scanner.startWithReceivers(receivers) // Get a batch of a enough-large size to get all results. result := scanner.NextBatch(1000) - c.Assert(len(result), Less, 1000) + require.Less(t, len(result), 1000) resultCh <- result }() return scanner, channels, storeIDs, resultCh } -func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockScanner, []chan scanLockResult, []uint64, <-chan []*txnlock.Lock) { +func (s *mockGCWorkerSuite) makeMergedMockClient(t *testing.T, count int) (*mergeLockScanner, []chan scanLockResult, []uint64, <-chan []*txnlock.Lock) { stores := s.cluster.GetAllStores() - c.Assert(count, Equals, len(stores)) + require.Len(t, stores, count) + storeIDs := make([]uint64, count) for i := 0; i < count; i++ { storeIDs[i] = stores[i].Id @@ -1146,7 +1195,7 @@ func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockSca const scanLockLimit = 3 storesMap, err := s.gcWorker.getStoresMapForGC(context.Background()) - c.Assert(err, IsNil) + require.NoError(t, err) scanner := newMergeLockScanner(100000, s.client, storesMap) scanner.scanLockLimit = scanLockLimit channels := make([]chan scanLockResult, 0, len(stores)) @@ -1191,17 +1240,20 @@ func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockSca // Initializing and getting result from scanner is blocking operations. Collect the result in a separated thread. go func() { err := scanner.Start(context.Background()) - c.Assert(err, IsNil) + require.NoError(t, err) // Get a batch of a enough-large size to get all results. result := scanner.NextBatch(1000) - c.Assert(len(result), Less, 1000) + require.Less(t, len(result), 1000) resultCh <- result }() return scanner, channels, storeIDs, resultCh } -func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { +func TestMergeLockScanner(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + // Shortcuts to make the following test code simpler // Get stores by index, and get their store IDs. @@ -1251,27 +1303,27 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { } // No lock. - scanner, sendCh, storeIDs, resCh := makeMergedChannel(c, 1) + scanner, sendCh, storeIDs, resCh := makeMergedChannel(t, 1) close(sendCh[0]) - c.Assert(len(<-resCh), Equals, 0) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0)) + require.Len(t, <-resCh, 0) + require.Equal(t, makeIDSet(storeIDs, 0), scanner.GetSucceededStores()) - scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 1) + scanner, sendCh, storeIDs, resCh = makeMergedChannel(t, 1) locks := sendLocksByKey(sendCh[0], "a", "b", "c") close(sendCh[0]) - c.Assert(<-resCh, DeepEquals, locks) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0)) + require.Equal(t, locks, <-resCh) + require.Equal(t, makeIDSet(storeIDs, 0), scanner.GetSucceededStores()) // Send locks with error - scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 1) + scanner, sendCh, storeIDs, resCh = makeMergedChannel(t, 1) locks = sendLocksByKey(sendCh[0], "a", "b", "c") sendErr(sendCh[0]) close(sendCh[0]) - c.Assert(<-resCh, DeepEquals, locks) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs)) + require.Equal(t, locks, <-resCh) + require.Equal(t, makeIDSet(storeIDs), scanner.GetSucceededStores()) // Merge sort locks with different keys. - scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 2) + scanner, sendCh, storeIDs, resCh = makeMergedChannel(t, 2) locks = sendLocksByKey(sendCh[0], "a", "c", "e") time.Sleep(time.Millisecond * 100) locks = append(locks, sendLocksByKey(sendCh[1], "b", "d", "f")...) @@ -1280,18 +1332,18 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { sort.Slice(locks, func(i, j int) bool { return bytes.Compare(locks[i].Key, locks[j].Key) < 0 }) - c.Assert(<-resCh, DeepEquals, locks) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1)) + require.Equal(t, locks, <-resCh) + require.Equal(t, makeIDSet(storeIDs, 0, 1), scanner.GetSucceededStores()) // Merge sort locks with different timestamps. - scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 2) + scanner, sendCh, storeIDs, resCh = makeMergedChannel(t, 2) sendLocks(sendCh[0], makeLock("a", 0), makeLock("a", 1)) time.Sleep(time.Millisecond * 100) sendLocks(sendCh[1], makeLock("a", 1), makeLock("a", 2), makeLock("b", 0)) close(sendCh[0]) close(sendCh[1]) - c.Assert(<-resCh, DeepEquals, makeLockList(makeLock("a", 0), makeLock("a", 1), makeLock("a", 2), makeLock("b", 0))) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1)) + require.Equal(t, makeLockList(makeLock("a", 0), makeLock("a", 1), makeLock("a", 2), makeLock("b", 0)), <-resCh) + require.Equal(t, makeIDSet(storeIDs, 0, 1), scanner.GetSucceededStores()) for _, useMock := range []bool{false, true} { channel := makeMergedChannel @@ -1299,7 +1351,7 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { channel = s.makeMergedMockClient } - scanner, sendCh, storeIDs, resCh = channel(c, 3) + scanner, sendCh, storeIDs, resCh = channel(t, 3) sendLocksByKey(sendCh[0], "a", "d", "g", "h") time.Sleep(time.Millisecond * 100) sendLocksByKey(sendCh[1], "a", "d", "f", "h") @@ -1308,10 +1360,10 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { close(sendCh[0]) close(sendCh[1]) close(sendCh[2]) - c.Assert(<-resCh, DeepEquals, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h")) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2)) + require.Equal(t, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h"), <-resCh) + require.Equal(t, makeIDSet(storeIDs, 0, 1, 2), scanner.GetSucceededStores()) - scanner, sendCh, storeIDs, resCh = channel(c, 3) + scanner, sendCh, storeIDs, resCh = channel(t, 3) sendLocksByKey(sendCh[0], "a", "d", "g", "h") time.Sleep(time.Millisecond * 100) sendLocksByKey(sendCh[1], "a", "d", "f", "h") @@ -1321,34 +1373,48 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { close(sendCh[0]) close(sendCh[1]) close(sendCh[2]) - c.Assert(<-resCh, DeepEquals, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h")) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 1, 2)) + require.Equal(t, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h"), <-resCh) + require.Equal(t, makeIDSet(storeIDs, 1, 2), scanner.GetSucceededStores()) - scanner, sendCh, storeIDs, resCh = channel(c, 3) + scanner, sendCh, storeIDs, resCh = channel(t, 3) sendLocksByKey(sendCh[0], "a\x00", "a\x00\x00", "b", "b\x00") sendLocksByKey(sendCh[1], "a", "a\x00\x00", "a\x00\x00\x00", "c") sendLocksByKey(sendCh[2], "1", "a\x00", "a\x00\x00", "b") close(sendCh[0]) close(sendCh[1]) close(sendCh[2]) - c.Assert(<-resCh, DeepEquals, makeLockListByKey("1", "a", "a\x00", "a\x00\x00", "a\x00\x00\x00", "b", "b\x00", "c")) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2)) + require.Equal(t, makeLockListByKey("1", "a", "a\x00", "a\x00\x00", "a\x00\x00\x00", "b", "b\x00", "c"), <-resCh) + require.Equal(t, makeIDSet(storeIDs, 0, 1, 2), scanner.GetSucceededStores()) - scanner, sendCh, storeIDs, resCh = channel(c, 3) + scanner, sendCh, storeIDs, resCh = channel(t, 3) sendLocks(sendCh[0], makeLock("a", 0), makeLock("d", 0), makeLock("g", 0), makeLock("h", 0)) sendLocks(sendCh[1], makeLock("a", 1), makeLock("b", 0), makeLock("c", 0), makeLock("d", 1)) sendLocks(sendCh[2], makeLock("e", 0), makeLock("g", 1), makeLock("g", 2), makeLock("h", 0)) close(sendCh[0]) close(sendCh[1]) close(sendCh[2]) - c.Assert(<-resCh, DeepEquals, makeLockList(makeLock("a", 0), makeLock("a", 1), makeLock("b", 0), makeLock("c", 0), - makeLock("d", 0), makeLock("d", 1), makeLock("e", 0), makeLock("g", 0), makeLock("g", 1), makeLock("g", 2), makeLock("h", 0))) - c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2)) + locks := makeLockList( + makeLock("a", 0), + makeLock("a", 1), + makeLock("b", 0), + makeLock("c", 0), + makeLock("d", 0), + makeLock("d", 1), + makeLock("e", 0), + makeLock("g", 0), + makeLock("g", 1), + makeLock("g", 2), + makeLock("h", 0)) + require.Equal(t, locks, <-resCh) + require.Equal(t, makeIDSet(storeIDs, 0, 1, 2), scanner.GetSucceededStores()) } } -func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { - alwaysSucceedHanlder := func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { +func TestResolveLocksPhysical(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + + alwaysSucceedHandler := func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { switch req.Type { case tikvrpc.CmdPhysicalScanLock: return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: nil, Error: ""}}, nil @@ -1377,10 +1443,10 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { } } reset := func() { - s.client.physicalScanLockHandler = alwaysSucceedHanlder - s.client.registerLockObserverHandler = alwaysSucceedHanlder - s.client.checkLockObserverHandler = alwaysSucceedHanlder - s.client.removeLockObserverHandler = alwaysSucceedHanlder + s.client.physicalScanLockHandler = alwaysSucceedHandler + s.client.registerLockObserverHandler = alwaysSucceedHandler + s.client.checkLockObserverHandler = alwaysSucceedHandler + s.client.removeLockObserverHandler = alwaysSucceedHandler } ctx := context.Background() @@ -1389,15 +1455,15 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { // No lock reset() physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsTrue) - c.Assert(err, IsNil) + require.True(t, physicalUsed) + require.NoError(t, err) // Should fall back on the legacy mode when fails to register lock observers. reset() s.client.registerLockObserverHandler = alwaysFailHandler physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsFalse) - c.Assert(err, IsNil) + require.False(t, physicalUsed) + require.NoError(t, err) // Should fall back when fails to resolve locks. reset() @@ -1405,11 +1471,11 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { locks := []*kvrpcpb.LockInfo{{Key: []byte{0}}} return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: locks, Error: ""}}, nil } - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr", "return(100)"), IsNil) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr", "return(100)")) physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsFalse) - c.Assert(err, IsNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr"), IsNil) + require.False(t, physicalUsed) + require.NoError(t, err) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr")) // Shouldn't fall back when fails to scan locks less than 3 times. reset() @@ -1418,18 +1484,18 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { if atomic.CompareAndSwapUint32(&returnError, 1, 0) { return alwaysFailHandler(addr, req) } - return alwaysSucceedHanlder(addr, req) + return alwaysSucceedHandler(addr, req) } physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsTrue) - c.Assert(err, IsNil) + require.True(t, physicalUsed) + require.NoError(t, err) // Should fall back if reaches retry limit reset() s.client.physicalScanLockHandler = alwaysFailHandler physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsFalse) - c.Assert(err, IsNil) + require.False(t, physicalUsed) + require.NoError(t, err) // Should fall back when one registered store is dirty. reset() @@ -1437,27 +1503,27 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil } physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsFalse) - c.Assert(err, IsNil) + require.False(t, physicalUsed) + require.NoError(t, err) // When fails to check lock observer in a store, we assume the store is dirty. // Should fall back when fails to check lock observers. reset() s.client.checkLockObserverHandler = alwaysFailHandler physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsFalse) - c.Assert(err, IsNil) + require.False(t, physicalUsed) + require.NoError(t, err) // Shouldn't fall back when the dirty store is newly added. reset() var wg sync.WaitGroup wg.Add(1) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers", "pause"), IsNil) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers", "pause")) go func() { defer wg.Done() physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsTrue) - c.Assert(err, IsNil) + require.True(t, physicalUsed) + require.NoError(t, err) }() // Sleep to let the goroutine pause. time.Sleep(500 * time.Millisecond) @@ -1469,36 +1535,36 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { once = false return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil } - return alwaysSucceedHanlder(addr, req) + return alwaysSucceedHandler(addr, req) } - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers"), IsNil) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers")) wg.Wait() // Shouldn't fall back when a store is removed. reset() wg.Add(1) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers", "pause"), IsNil) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers", "pause")) go func() { defer wg.Done() physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsTrue) - c.Assert(err, IsNil) + require.True(t, physicalUsed) + require.NoError(t, err) }() // Sleep to let the goroutine pause. time.Sleep(500 * time.Millisecond) s.cluster.RemoveStore(100) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers"), IsNil) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers")) wg.Wait() // Should fall back when a cleaned store becomes dirty. reset() wg.Add(1) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers", "pause"), IsNil) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers", "pause")) go func() { defer wg.Done() physicalUsed, err := s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsFalse) - c.Assert(err, IsNil) + require.False(t, physicalUsed) + require.NoError(t, err) }() // Sleep to let the goroutine pause. time.Sleep(500 * time.Millisecond) @@ -1513,35 +1579,38 @@ func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { if atomic.CompareAndSwapUint32(&onceDirty, 1, 0) { return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil } - return alwaysSucceedHanlder(addr, req) + return alwaysSucceedHandler(addr, req) case store.Address: // The store returns IsClean=true for the first time. if atomic.CompareAndSwapUint32(&onceClean, 1, 0) { - return alwaysSucceedHanlder(addr, req) + return alwaysSucceedHandler(addr, req) } return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil default: - return alwaysSucceedHanlder(addr, req) + return alwaysSucceedHandler(addr, req) } } - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers"), IsNil) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/beforeCheckLockObservers")) wg.Wait() // Shouldn't fall back when fails to remove lock observers. reset() s.client.removeLockObserverHandler = alwaysFailHandler physicalUsed, err = s.gcWorker.resolveLocks(ctx, safePoint, 3, true) - c.Assert(physicalUsed, IsTrue) - c.Assert(err, IsNil) + require.True(t, physicalUsed) + require.NoError(t, err) } -func (s *testGCWorkerSuite) TestPhyscailScanLockDeadlock(c *C) { +func TestPhysicalScanLockDeadlock(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + ctx := context.Background() stores := s.cluster.GetAllStores() - c.Assert(len(stores), Greater, 1) + require.Greater(t, len(stores), 1) s.client.physicalScanLockHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - c.Assert(addr, Equals, stores[0].Address) + require.Equal(t, stores[0].Address, addr) scanReq := req.PhysicalScanLock() scanLockLimit := int(scanReq.Limit) locks := make([]*kvrpcpb.LockInfo, 0, scanReq.Limit) @@ -1559,9 +1628,9 @@ func (s *testGCWorkerSuite) TestPhyscailScanLockDeadlock(c *C) { // Sleep 1000ms to let the main goroutine block on sending tasks. // Inject error to the goroutine resolving locks so that the main goroutine will block forever if it doesn't handle channels properly. - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr", "return(1000)"), IsNil) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr", "return(1000)")) defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr"), IsNil) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/resolveLocksAcrossRegionsErr")) }() done := make(chan interface{}) @@ -1569,74 +1638,83 @@ func (s *testGCWorkerSuite) TestPhyscailScanLockDeadlock(c *C) { defer close(done) storesMap := map[uint64]*metapb.Store{stores[0].Id: stores[0]} succeeded, err := s.gcWorker.physicalScanAndResolveLocks(ctx, 10000, storesMap) - c.Assert(succeeded, IsNil) - c.Assert(err, ErrorMatches, "injectedError") + require.Nil(t, succeeded) + require.EqualError(t, err, "injectedError") }() select { case <-done: case <-time.After(5 * time.Second): - c.Fatal("physicalScanAndResolveLocks blocks") + require.FailNow(t, "physicalScanAndResolveLocks blocks") } } -func (s *testGCWorkerSuite) TestGCPlacementRules(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC", "return(1)"), IsNil) +func TestGCPlacementRules(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC", "return(1)")) defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC"), IsNil) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/mockHistoryJobForGC")) }() dr := util.DelRangeTask{JobID: 1, ElementID: 1} pid, err := s.gcWorker.doGCPlacementRules(dr) - c.Assert(pid, Equals, int64(1)) - c.Assert(err, IsNil) + require.Equal(t, int64(1), pid) + require.NoError(t, err) } -func (s *testGCWorkerSuite) TestGCLabelRules(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/gcworker/mockHistoryJob", "return(\"schema/d1/t1\")"), IsNil) +func TestGCLabelRules(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/mockHistoryJob", "return(\"schema/d1/t1\")")) defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/gcworker/mockHistoryJob"), IsNil) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/mockHistoryJob")) }() dr := util.DelRangeTask{JobID: 1, ElementID: 1} err := s.gcWorker.doGCLabelRules(dr) - c.Assert(err, IsNil) + require.NoError(t, err) } -func (s *testGCWorkerSuite) TestGCWithPendingTxn(c *C) { +func TestGCWithPendingTxn(t *testing.T) { + s, clean := createGCWorkerSuite(t) + defer clean() + ctx := context.Background() gcSafePointCacheInterval = 0 err := s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) - c.Assert(err, IsNil) + require.NoError(t, err) k1 := []byte("tk1") v1 := []byte("v1") txn, err := s.store.Begin() - c.Assert(err, IsNil) + require.NoError(t, err) txn.SetOption(kv.Pessimistic, true) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} // Lock the key. err = txn.Set(k1, v1) - c.Assert(err, IsNil) + require.NoError(t, err) err = txn.LockKeys(ctx, lockCtx, k1) - c.Assert(err, IsNil) + require.NoError(t, err) // Prepare to run gc with txn's startTS as the safepoint ts. spkv := s.tikvStore.GetSafePointKV() err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(txn.StartTS(), 10)) - c.Assert(err, IsNil) - s.mustSetTiDBServiceSafePoint(c, txn.StartTS(), txn.StartTS()) + require.NoError(t, err) + s.mustSetTiDBServiceSafePoint(t, txn.StartTS(), txn.StartTS()) veryLong := gcDefaultLifeTime * 100 - err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) - c.Assert(err, IsNil) + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(t)).Add(-veryLong)) + require.NoError(t, err) s.gcWorker.lastFinish = time.Now().Add(-veryLong) s.oracle.AddOffset(time.Minute * 10) err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) - c.Assert(err, IsNil) + require.NoError(t, err) // Trigger the tick let the gc job start. err = s.gcWorker.leaderTick(ctx) - c.Assert(err, IsNil) + require.NoError(t, err) // Wait for GC finish select { case err = <-s.gcWorker.done: @@ -1645,8 +1723,8 @@ func (s *testGCWorkerSuite) TestGCWithPendingTxn(c *C) { case <-time.After(time.Second * 10): err = errors.New("receive from s.gcWorker.done timeout") } - c.Assert(err, IsNil) + require.NoError(t, err) err = txn.Commit(ctx) - c.Assert(err, IsNil) + require.NoError(t, err) } diff --git a/store/gcworker/main_test.go b/store/gcworker/main_test.go new file mode 100644 index 0000000000..0a6be0a875 --- /dev/null +++ b/store/gcworker/main_test.go @@ -0,0 +1,40 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcworker + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/testkit/testmain" + "github.com/pingcap/tidb/util/testbridge" + "github.com/tikv/client-go/v2/tikv" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + tikv.EnableFailpoints() + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + callback := func(i int) int { + // wait for MVCCLevelDB to close, MVCCLevelDB will be closed in one second + time.Sleep(time.Second) + return i + } + goleak.VerifyTestMain(testmain.WrapTestingM(m, callback), opts...) +} diff --git a/testkit/testmain/testmain.go b/testkit/testmain/testmain.go new file mode 100644 index 0000000000..1ea915053e --- /dev/null +++ b/testkit/testmain/testmain.go @@ -0,0 +1,42 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !codes + +package testmain + +import "go.uber.org/goleak" + +type testingM struct { + goleak.TestingM + callback func(int) int +} + +func (m *testingM) Run() int { + return m.callback(m.TestingM.Run()) +} + +// WrapTestingM returns a TestingM wrapped with callback on m.Run returning +func WrapTestingM(m goleak.TestingM, callback func(int) int) *testingM { + if callback == nil { + callback = func(i int) int { + return i + } + } + + return &testingM{ + TestingM: m, + callback: callback, + } +}