gc_worker: add gc enable variable to enable/disable gc (#8282)

This commit is contained in:
crazycs
2018-12-03 23:37:39 +08:00
committed by GitHub
parent 27007923d9
commit 8ddeeea939
2 changed files with 69 additions and 0 deletions

View File

@ -18,6 +18,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -117,6 +118,11 @@ const (
gcMaxConcurrency = 128
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
gcScanLockLimit = tikv.ResolvedCacheSize / 2
gcEnableKey = "tikv_gc_enable"
gcEnableValue = "true"
gcDisableValue = "false"
gcDefaultEnableValue = true
)
var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval
@ -130,6 +136,7 @@ var gcVariableComments = map[string]string{
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.",
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 128], default 2",
gcEnableKey: "Current GC enable status",
}
func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) {
@ -250,6 +257,39 @@ func (w *GCWorker) leaderTick(ctx context.Context) error {
// prepare checks preconditions for starting a GC job. It returns a bool
// that indicates whether the GC job should start and the new safePoint.
func (w *GCWorker) prepare() (bool, uint64, error) {
// Add a transaction here is to prevent following situations:
// 1. GC check gcEnable is true, continue to do GC
// 2. The user sets gcEnable to false
// 3. The user gets `tikv_gc_safe_point` value is t1, then the user thinks the data after time t1 won't be clean by GC.
// 4. GC update `tikv_gc_safe_point` value to t2, continue do GC in this round.
// Then the data record that has been dropped between time t1 and t2, will be cleaned by GC, but the user thinks the data after t1 won't be clean by GC.
ctx := context.Background()
_, err := w.session.Execute(ctx, "BEGIN")
if err != nil {
return false, 0, errors.Trace(err)
}
doGC, safePoint, err := w.checkPrepare(ctx)
if doGC {
_, err = w.session.Execute(ctx, "COMMIT")
if err != nil {
return false, 0, errors.Trace(err)
}
} else {
_, err1 := w.session.Execute(ctx, "ROLLBACK")
terror.Log(errors.Trace(err1))
}
return doGC, safePoint, errors.Trace(err)
}
func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) {
enable, err := w.checkGCEnable()
if err != nil {
return false, 0, errors.Trace(err)
}
if !enable {
log.Warn("[gc worker] gc status is disabled.")
return false, 0, nil
}
now, err := w.getOracleTime()
if err != nil {
return false, 0, errors.Trace(err)
@ -283,6 +323,22 @@ func (w *GCWorker) getOracleTime() (time.Time, error) {
return time.Unix(sec, nsec), nil
}
func (w *GCWorker) checkGCEnable() (bool, error) {
str, err := w.loadValueFromSysTable(gcEnableKey)
if err != nil {
return false, errors.Trace(err)
}
if str == "" {
// Save default value for gc enable key. The default value is always true.
err = w.saveValueToSysTable(gcEnableKey, gcEnableValue)
if err != nil {
return gcDefaultEnableValue, errors.Trace(err)
}
return gcDefaultEnableValue, nil
}
return strings.EqualFold(str, gcEnableValue), nil
}
func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) {
runInterval, err := w.loadDurationWithDefault(gcRunIntervalKey, gcDefaultRunInterval)
if err != nil {

View File

@ -144,6 +144,19 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault()
c.Assert(err, IsNil)
c.Assert(concurrency, Equals, gcMaxConcurrency)
// Change GC enable status.
s.oracle.AddOffset(time.Minute * 40)
err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcDisableValue)
c.Assert(err, IsNil)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)
err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcEnableValue)
c.Assert(err, IsNil)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)
}
func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {