diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 8aa0dec84d..4b5d3c75b2 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -117,6 +118,11 @@ const ( gcMaxConcurrency = 128 // We don't want gc to sweep out the cached info belong to other processes, like coprocessor. gcScanLockLimit = tikv.ResolvedCacheSize / 2 + + gcEnableKey = "tikv_gc_enable" + gcEnableValue = "true" + gcDisableValue = "false" + gcDefaultEnableValue = true ) var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval @@ -130,6 +136,7 @@ var gcVariableComments = map[string]string{ gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 128], default 2", + gcEnableKey: "Current GC enable status", } func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { @@ -250,6 +257,39 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { // prepare checks preconditions for starting a GC job. It returns a bool // that indicates whether the GC job should start and the new safePoint. func (w *GCWorker) prepare() (bool, uint64, error) { + // Add a transaction here is to prevent following situations: + // 1. GC check gcEnable is true, continue to do GC + // 2. The user sets gcEnable to false + // 3. The user gets `tikv_gc_safe_point` value is t1, then the user thinks the data after time t1 won't be clean by GC. + // 4. GC update `tikv_gc_safe_point` value to t2, continue do GC in this round. + // Then the data record that has been dropped between time t1 and t2, will be cleaned by GC, but the user thinks the data after t1 won't be clean by GC. + ctx := context.Background() + _, err := w.session.Execute(ctx, "BEGIN") + if err != nil { + return false, 0, errors.Trace(err) + } + doGC, safePoint, err := w.checkPrepare(ctx) + if doGC { + _, err = w.session.Execute(ctx, "COMMIT") + if err != nil { + return false, 0, errors.Trace(err) + } + } else { + _, err1 := w.session.Execute(ctx, "ROLLBACK") + terror.Log(errors.Trace(err1)) + } + return doGC, safePoint, errors.Trace(err) +} + +func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { + enable, err := w.checkGCEnable() + if err != nil { + return false, 0, errors.Trace(err) + } + if !enable { + log.Warn("[gc worker] gc status is disabled.") + return false, 0, nil + } now, err := w.getOracleTime() if err != nil { return false, 0, errors.Trace(err) @@ -283,6 +323,22 @@ func (w *GCWorker) getOracleTime() (time.Time, error) { return time.Unix(sec, nsec), nil } +func (w *GCWorker) checkGCEnable() (bool, error) { + str, err := w.loadValueFromSysTable(gcEnableKey) + if err != nil { + return false, errors.Trace(err) + } + if str == "" { + // Save default value for gc enable key. The default value is always true. + err = w.saveValueToSysTable(gcEnableKey, gcEnableValue) + if err != nil { + return gcDefaultEnableValue, errors.Trace(err) + } + return gcDefaultEnableValue, nil + } + return strings.EqualFold(str, gcEnableValue), nil +} + func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { runInterval, err := w.loadDurationWithDefault(gcRunIntervalKey, gcDefaultRunInterval) if err != nil { diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 490c148291..110118662e 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -144,6 +144,19 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) c.Assert(concurrency, Equals, gcMaxConcurrency) + + // Change GC enable status. + s.oracle.AddOffset(time.Minute * 40) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcDisableValue) + c.Assert(err, IsNil) + ok, _, err = s.gcWorker.prepare() + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcEnableValue) + c.Assert(err, IsNil) + ok, _, err = s.gcWorker.prepare() + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) } func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {