From 6d8bc6a2ee4af32fbdc2c2e0109374847314c4fd Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 30 Sep 2016 10:26:08 +0800 Subject: [PATCH] store/tikv: add safeTime config for gc worker. (#1754) * store/tikv: add safeTime config for gc worker. --- store/tikv/gc_worker.go | 259 ++++++++++++++++++++++++++++------- store/tikv/gc_worker_test.go | 102 ++++++++++++++ store/tikv/store_test.go | 65 +++++---- 3 files changed, 356 insertions(+), 70 deletions(-) create mode 100644 store/tikv/gc_worker_test.go diff --git a/store/tikv/gc_worker.go b/store/tikv/gc_worker.go index 6d04ee8b08..819ca57130 100644 --- a/store/tikv/gc_worker.go +++ b/store/tikv/gc_worker.go @@ -15,21 +15,27 @@ package tikv import ( "fmt" + "os" + "strconv" "time" "github.com/juju/errors" "github.com/ngaut/log" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb" + "github.com/pingcap/tidb/store/tikv/oracle" ) // GCWorker periodically triggers GC process on tikv server. type GCWorker struct { - uuid string - store *tikvStore - session tidb.Session - quit chan struct{} - done chan error + uuid string + desc string + store *tikvStore + session tidb.Session + gcIsRunning bool + lastFinish time.Time + quit chan struct{} + done chan error } // NewGCWorker creates a GCWorker instance. @@ -42,14 +48,21 @@ func NewGCWorker(store *tikvStore) (*GCWorker, error) { if err != nil { return nil, errors.Trace(err) } - worker := &GCWorker{ - uuid: fmt.Sprintf("gcworker_%d", ver.Ver), - store: store, - session: session, - quit: make(chan struct{}), - done: make(chan error), + hostName, err := os.Hostname() + if err != nil { + hostName = "unknown" } - go worker.start(ver.Ver) + worker := &GCWorker{ + uuid: strconv.FormatUint(ver.Ver, 16), + desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()), + store: store, + session: session, + gcIsRunning: false, + lastFinish: time.Now(), + quit: make(chan struct{}), + done: make(chan error), + } + go worker.start() return worker, nil } @@ -59,18 +72,36 @@ func (w *GCWorker) Close() { } const ( + gcTimeFormat = "20060102-15:04:05 -0700 MST" + gcWorkerTickInterval = time.Minute gcWorkerLease = time.Minute * 2 - gcRunInterval = time.Minute * 10 gcLeaderUUIDKey = "tikv_gc_leader_uuid" + gcLeaderDescKey = "tikv_gc_leader_desc" gcLeaderLeaseKey = "tikv_gc_leader_lease" - gcLeaderLeaseFormat = "20060102-15:04:05 -0700 MST" + + gcLastRunTimeKey = "tikv_gc_last_run_time" + gcRunIntervalKey = "tikv_gc_run_interval" + gcDefaultRunInterval = time.Minute * 10 + gcWaitTime = time.Minute * 10 + + gcLifeTimeKey = "tikv_gc_life_time" + gcDefaultLifeTime = time.Minute * 10 + gcSafePointKey = "tikv_gc_safe_point" ) -func (w *GCWorker) start(ver uint64) { +var gcVariableComments = map[string]string{ + gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", + gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", + gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)", + gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)", + gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", + gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", + gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", +} + +func (w *GCWorker) start() { log.Infof("[gc worker] %s start.", w.uuid) - safePoint := ver - gcIsRunning := false ticker := time.NewTicker(gcWorkerTickInterval) for { select { @@ -80,24 +111,19 @@ func (w *GCWorker) start(ver uint64) { log.Warnf("[gc worker] check leader err: %v", err) break } - if isLeader && !gcIsRunning { - if w.store.oracle.IsExpired(safePoint, uint64(gcRunInterval/time.Millisecond)) { - gcIsRunning = true - go w.runGCJob(safePoint) + if isLeader { + err = w.leaderTick() + if err != nil { + log.Warnf("[gc worker] leader tick err: %v", err) } } case err := <-w.done: - gcIsRunning = false + w.gcIsRunning = false + w.lastFinish = time.Now() if err != nil { log.Errorf("[gc worker] runGCJob error: %v", err) break } - ver, err := w.store.CurrentVersion() - if err != nil { - log.Errorf("[gc worker] failed get current version: %v", err) - break - } - safePoint = ver.Ver case <-w.quit: log.Infof("[gc worker] (%s) quit.", w.uuid) return @@ -105,8 +131,97 @@ func (w *GCWorker) start(ver uint64) { } } -func (w *GCWorker) runGCJob(safePoint uint64) { +// Leader of GC worker checks if it should start a GC job every tick. +func (w *GCWorker) leaderTick() error { + if w.gcIsRunning { + return nil + } + // When the worker is just started, or an old GC job has just finished, + // wait a while before starting a new job. + if time.Since(w.lastFinish) < gcWaitTime { + return nil + } + + ok, safePoint, err := w.prepare() + if err != nil || !ok { + return errors.Trace(err) + } + + w.gcIsRunning = true log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint) + go w.runGCJob(safePoint) + return nil +} + +// prepare checks required conditions for starting a GC job. It returns a bool +// that indicates whether the GC job should start and the new safePoint. +func (w *GCWorker) prepare() (bool, uint64, error) { + now, err := w.getOracleTime() + if err != nil { + return false, 0, errors.Trace(err) + } + ok, err := w.checkGCInterval(now) + if err != nil || !ok { + return false, 0, errors.Trace(err) + } + newSafePoint, err := w.calculateNewSafePoint(now) + if err != nil || newSafePoint == nil { + return false, 0, errors.Trace(err) + } + err = w.saveTime(gcLastRunTimeKey, now) + if err != nil { + return false, 0, errors.Trace(err) + } + err = w.saveTime(gcSafePointKey, *newSafePoint) + if err != nil { + return false, 0, errors.Trace(err) + } + return true, oracle.ComposeTS(oracle.GetPhysical(*newSafePoint), 0), nil +} + +func (w *GCWorker) getOracleTime() (time.Time, error) { + currentVer, err := w.store.CurrentVersion() + if err != nil { + return time.Time{}, errors.Trace(err) + } + physical := oracle.ExtractPhysical(currentVer.Ver) + sec, nsec := physical/1e3, (physical%1e3)*1e6 + return time.Unix(sec, nsec), nil +} + +func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { + runInterval, err := w.loadDurationWithDefault(gcRunIntervalKey, gcDefaultRunInterval) + if err != nil { + return false, errors.Trace(err) + } + lastRun, err := w.loadTime(gcLastRunTimeKey) + if err != nil { + return false, errors.Trace(err) + } + if lastRun != nil && lastRun.Add(*runInterval).After(now) { + return false, nil + } + return true, nil +} + +func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) { + lifeTime, err := w.loadDurationWithDefault(gcLifeTimeKey, gcDefaultLifeTime) + if err != nil { + return nil, errors.Trace(err) + } + lastSafePoint, err := w.loadTime(gcSafePointKey) + if err != nil { + return nil, errors.Trace(err) + } + safePoint := now.Add(-*lifeTime) + // We should never decrease safePoint. + if lastSafePoint != nil && safePoint.Before(*lastSafePoint) { + return nil, nil + } + return &safePoint, nil +} + +func (w *GCWorker) runGCJob(safePoint uint64) { gcWorkerCounter.WithLabelValues("run_job").Inc() err := w.resolveLocks(safePoint) @@ -264,7 +379,7 @@ func (w *GCWorker) checkLeader() (bool, error) { } log.Debugf("[gc worker] got leader: %s", leader) if leader == w.uuid { - err = w.updateLease() + err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { w.session.Execute("ROLLBACK") return false, errors.Trace(err) @@ -275,8 +390,11 @@ func (w *GCWorker) checkLeader() (bool, error) { } return true, nil } - lease, err := w.loadLease() - if err != nil || lease.Before(time.Now()) { + lease, err := w.loadTime(gcLeaderLeaseKey) + if err != nil { + return false, errors.Trace(err) + } + if lease == nil || lease.Before(time.Now()) { log.Debugf("[gc worker] register %s as leader", w.uuid) gcWorkerCounter.WithLabelValues("register_leader").Inc() @@ -285,7 +403,12 @@ func (w *GCWorker) checkLeader() (bool, error) { w.session.Execute("ROLLBACK") return false, errors.Trace(err) } - err = w.updateLease() + err = w.saveValueToSysTable(gcLeaderDescKey, w.desc) + if err != nil { + w.session.Execute("ROLLBACK") + return false, errors.Trace(err) + } + err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { w.session.Execute("ROLLBACK") return false, errors.Trace(err) @@ -300,25 +423,59 @@ func (w *GCWorker) checkLeader() (bool, error) { return false, nil } -func (w *GCWorker) updateLease() error { - lease := time.Now().Add(gcWorkerLease).Format(gcLeaderLeaseFormat) - log.Debugf("[gc worker] update leader lease to %s for worker %s", lease, w.uuid) - err := w.saveValueToSysTable(gcLeaderLeaseKey, lease) +func (w *GCWorker) saveTime(key string, t time.Time) error { + err := w.saveValueToSysTable(key, t.Format(gcTimeFormat)) return errors.Trace(err) } -func (w *GCWorker) loadLease() (time.Time, error) { - var t time.Time - str, err := w.loadValueFromSysTable(gcLeaderLeaseKey) +func (w *GCWorker) loadTime(key string) (*time.Time, error) { + str, err := w.loadValueFromSysTable(key) if err != nil { - return t, errors.Trace(err) + return nil, errors.Trace(err) } - lease, err := time.Parse(gcLeaderLeaseFormat, str) + if str == "" { + return nil, nil + } + t, err := time.Parse(gcTimeFormat, str) if err != nil { - return t, errors.Trace(err) + return nil, errors.Trace(err) } - log.Debugf("[gc worker] load lease: %s", lease) - return lease, nil + return &t, nil +} + +func (w *GCWorker) saveDuration(key string, d time.Duration) error { + err := w.saveValueToSysTable(key, d.String()) + return errors.Trace(err) +} + +func (w *GCWorker) loadDuration(key string) (*time.Duration, error) { + str, err := w.loadValueFromSysTable(key) + if err != nil { + return nil, errors.Trace(err) + } + if str == "" { + return nil, nil + } + d, err := time.ParseDuration(str) + if err != nil { + return nil, errors.Trace(err) + } + return &d, nil +} + +func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time.Duration, error) { + d, err := w.loadDuration(key) + if err != nil { + return nil, errors.Trace(err) + } + if d == nil { + err = w.saveDuration(key, def) + if err != nil { + return nil, errors.Trace(err) + } + return &def, nil + } + return d, nil } func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { @@ -332,13 +489,21 @@ func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { return "", errors.Trace(err) } if row == nil { + log.Debugf("[gc worker] load kv, %s:nil", key) return "", nil } - return row.Data[0].GetString(), nil + value := row.Data[0].GetString() + log.Debugf("[gc worker] load kv, %s:%s", key, value) + return value, nil } func (w *GCWorker) saveValueToSysTable(key, value string) error { - stmt := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value) VALUES ('%s', '%s') ON DUPLICATE KEY UPDATE variable_value = '%s'`, key, value, value) + stmt := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, + key, value, gcVariableComments[key]) + _, err := w.session.Execute(stmt) + log.Debugf("[gc worker] save kv, %s:%s %v", key, value, err) return errors.Trace(err) } diff --git a/store/tikv/gc_worker_test.go b/store/tikv/gc_worker_test.go new file mode 100644 index 0000000000..650633b027 --- /dev/null +++ b/store/tikv/gc_worker_test.go @@ -0,0 +1,102 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "math" + "time" + + . "github.com/pingcap/check" +) + +type testGCWorkerSuite struct { + store *tikvStore + oracle *mockOracle + gcWorker *GCWorker +} + +var _ = Suite(&testGCWorkerSuite{}) + +func (s *testGCWorkerSuite) SetUpTest(c *C) { + s.store = newTestStore(c) + s.oracle = &mockOracle{} + s.store.oracle = s.oracle + gcWorker, err := NewGCWorker(s.store) + c.Assert(err, IsNil) + s.gcWorker = gcWorker +} + +func (s *testGCWorkerSuite) TearDownTest(c *C) { + err := s.store.Close() + c.Assert(err, IsNil) +} + +func (s *testGCWorkerSuite) timeEqual(c *C, t1, t2 time.Time, epsilon time.Duration) { + c.Assert(math.Abs(float64(t1.Sub(t2))), Less, float64(epsilon)) +} + +func (s *testGCWorkerSuite) TestGetOracleTime(c *C) { + t1, err := s.gcWorker.getOracleTime() + c.Assert(err, IsNil) + s.timeEqual(c, time.Now(), t1, time.Millisecond*2) + + s.oracle.addOffset(time.Second * 10) + t2, err := s.gcWorker.getOracleTime() + c.Assert(err, IsNil) + s.timeEqual(c, t2, t1.Add(time.Second*10), time.Millisecond*2) +} + +func (s *testGCWorkerSuite) TestPrepareGC(c *C) { + now, err := s.gcWorker.getOracleTime() + c.Assert(err, IsNil) + ok, _, err := s.gcWorker.prepare() + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + lastRun, err := s.gcWorker.loadTime(gcLastRunTimeKey) + c.Assert(err, IsNil) + c.Assert(lastRun, NotNil) + s.timeEqual(c, *lastRun, now, time.Second) + safePoint, err := s.gcWorker.loadTime(gcSafePointKey) + c.Assert(err, IsNil) + s.timeEqual(c, safePoint.Add(gcDefaultLifeTime), now, time.Second) + + // Change GC run interval. + err = s.gcWorker.saveDuration(gcRunIntervalKey, time.Minute*5) + c.Assert(err, IsNil) + s.oracle.addOffset(time.Minute * 4) + ok, _, err = s.gcWorker.prepare() + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + s.oracle.addOffset(time.Minute * 2) + ok, _, err = s.gcWorker.prepare() + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + + // Change GC life time. + err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) + c.Assert(err, IsNil) + s.oracle.addOffset(time.Minute * 5) + ok, _, err = s.gcWorker.prepare() + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + s.oracle.addOffset(time.Minute * 40) + now, err = s.gcWorker.getOracleTime() + c.Assert(err, IsNil) + ok, _, err = s.gcWorker.prepare() + c.Assert(err, IsNil) + c.Assert(ok, IsTrue) + safePoint, err = s.gcWorker.loadTime(gcSafePointKey) + c.Assert(err, IsNil) + s.timeEqual(c, safePoint.Add(time.Minute*30), now, time.Second) +} diff --git a/store/tikv/store_test.go b/store/tikv/store_test.go index 020c099e0d..d31df9adfc 100644 --- a/store/tikv/store_test.go +++ b/store/tikv/store_test.go @@ -45,7 +45,7 @@ func (s *testStoreSuite) SetUpTest(c *C) { } func (s *testStoreSuite) TestOracle(c *C) { - o := newMockOracle(s.store.oracle) + o := &mockOracle{} s.store.oracle = o t1, err := s.store.getTimestampWithRetry(NewBackoffer(100)) @@ -141,44 +141,63 @@ func (s *testStoreSuite) TestBusyServerCop(c *C) { } type mockOracle struct { - oracle.Oracle - mu struct { - sync.RWMutex - stop bool - } -} - -func newMockOracle(oracle oracle.Oracle) *mockOracle { - return &mockOracle{Oracle: oracle} + sync.RWMutex + stop bool + offset time.Duration + lastTS uint64 } func (o *mockOracle) enable() { - o.mu.Lock() - defer o.mu.Unlock() - o.mu.stop = false + o.Lock() + defer o.Unlock() + o.stop = false } func (o *mockOracle) disable() { - o.mu.Lock() - defer o.mu.Unlock() - o.mu.stop = true + o.Lock() + defer o.Unlock() + o.stop = true +} + +func (o *mockOracle) setOffset(offset time.Duration) { + o.Lock() + defer o.Unlock() + + o.offset = offset +} + +func (o *mockOracle) addOffset(d time.Duration) { + o.Lock() + defer o.Unlock() + + o.offset += d } func (o *mockOracle) GetTimestamp() (uint64, error) { - o.mu.RLock() - defer o.mu.RUnlock() + o.RLock() + defer o.RUnlock() - if o.mu.stop { + if o.stop { return 0, errors.New("stopped") } - return o.Oracle.GetTimestamp() + physical := oracle.GetPhysical(time.Now().Add(o.offset)) + ts := oracle.ComposeTS(physical, 0) + if oracle.ExtractPhysical(o.lastTS) == physical { + ts = o.lastTS + 1 + } + o.lastTS = ts + return ts, nil } func (o *mockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool { - o.mu.RLock() - defer o.mu.RUnlock() + o.RLock() + defer o.RUnlock() + + return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL) +} + +func (o *mockOracle) Close() { - return o.Oracle.IsExpired(lockTimestamp, TTL) } type busyClient struct {