store/tikv: add safeTime config for gc worker. (#1754)

* store/tikv: add safeTime config for gc worker.
This commit is contained in:
disksing
2016-09-30 10:26:08 +08:00
committed by Shen Li
parent e6cec94876
commit 6d8bc6a2ee
3 changed files with 356 additions and 70 deletions

View File

@ -15,21 +15,27 @@ package tikv
import (
"fmt"
"os"
"strconv"
"time"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/store/tikv/oracle"
)
// GCWorker periodically triggers GC process on tikv server.
type GCWorker struct {
uuid string
store *tikvStore
session tidb.Session
quit chan struct{}
done chan error
uuid string
desc string
store *tikvStore
session tidb.Session
gcIsRunning bool
lastFinish time.Time
quit chan struct{}
done chan error
}
// NewGCWorker creates a GCWorker instance.
@ -42,14 +48,21 @@ func NewGCWorker(store *tikvStore) (*GCWorker, error) {
if err != nil {
return nil, errors.Trace(err)
}
worker := &GCWorker{
uuid: fmt.Sprintf("gcworker_%d", ver.Ver),
store: store,
session: session,
quit: make(chan struct{}),
done: make(chan error),
hostName, err := os.Hostname()
if err != nil {
hostName = "unknown"
}
go worker.start(ver.Ver)
worker := &GCWorker{
uuid: strconv.FormatUint(ver.Ver, 16),
desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()),
store: store,
session: session,
gcIsRunning: false,
lastFinish: time.Now(),
quit: make(chan struct{}),
done: make(chan error),
}
go worker.start()
return worker, nil
}
@ -59,18 +72,36 @@ func (w *GCWorker) Close() {
}
const (
gcTimeFormat = "20060102-15:04:05 -0700 MST"
gcWorkerTickInterval = time.Minute
gcWorkerLease = time.Minute * 2
gcRunInterval = time.Minute * 10
gcLeaderUUIDKey = "tikv_gc_leader_uuid"
gcLeaderDescKey = "tikv_gc_leader_desc"
gcLeaderLeaseKey = "tikv_gc_leader_lease"
gcLeaderLeaseFormat = "20060102-15:04:05 -0700 MST"
gcLastRunTimeKey = "tikv_gc_last_run_time"
gcRunIntervalKey = "tikv_gc_run_interval"
gcDefaultRunInterval = time.Minute * 10
gcWaitTime = time.Minute * 10
gcLifeTimeKey = "tikv_gc_life_time"
gcDefaultLifeTime = time.Minute * 10
gcSafePointKey = "tikv_gc_safe_point"
)
func (w *GCWorker) start(ver uint64) {
var gcVariableComments = map[string]string{
gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)",
gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)",
gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)",
gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)",
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.",
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.",
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
}
func (w *GCWorker) start() {
log.Infof("[gc worker] %s start.", w.uuid)
safePoint := ver
gcIsRunning := false
ticker := time.NewTicker(gcWorkerTickInterval)
for {
select {
@ -80,24 +111,19 @@ func (w *GCWorker) start(ver uint64) {
log.Warnf("[gc worker] check leader err: %v", err)
break
}
if isLeader && !gcIsRunning {
if w.store.oracle.IsExpired(safePoint, uint64(gcRunInterval/time.Millisecond)) {
gcIsRunning = true
go w.runGCJob(safePoint)
if isLeader {
err = w.leaderTick()
if err != nil {
log.Warnf("[gc worker] leader tick err: %v", err)
}
}
case err := <-w.done:
gcIsRunning = false
w.gcIsRunning = false
w.lastFinish = time.Now()
if err != nil {
log.Errorf("[gc worker] runGCJob error: %v", err)
break
}
ver, err := w.store.CurrentVersion()
if err != nil {
log.Errorf("[gc worker] failed get current version: %v", err)
break
}
safePoint = ver.Ver
case <-w.quit:
log.Infof("[gc worker] (%s) quit.", w.uuid)
return
@ -105,8 +131,97 @@ func (w *GCWorker) start(ver uint64) {
}
}
func (w *GCWorker) runGCJob(safePoint uint64) {
// Leader of GC worker checks if it should start a GC job every tick.
func (w *GCWorker) leaderTick() error {
if w.gcIsRunning {
return nil
}
// When the worker is just started, or an old GC job has just finished,
// wait a while before starting a new job.
if time.Since(w.lastFinish) < gcWaitTime {
return nil
}
ok, safePoint, err := w.prepare()
if err != nil || !ok {
return errors.Trace(err)
}
w.gcIsRunning = true
log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint)
go w.runGCJob(safePoint)
return nil
}
// prepare checks required conditions for starting a GC job. It returns a bool
// that indicates whether the GC job should start and the new safePoint.
func (w *GCWorker) prepare() (bool, uint64, error) {
now, err := w.getOracleTime()
if err != nil {
return false, 0, errors.Trace(err)
}
ok, err := w.checkGCInterval(now)
if err != nil || !ok {
return false, 0, errors.Trace(err)
}
newSafePoint, err := w.calculateNewSafePoint(now)
if err != nil || newSafePoint == nil {
return false, 0, errors.Trace(err)
}
err = w.saveTime(gcLastRunTimeKey, now)
if err != nil {
return false, 0, errors.Trace(err)
}
err = w.saveTime(gcSafePointKey, *newSafePoint)
if err != nil {
return false, 0, errors.Trace(err)
}
return true, oracle.ComposeTS(oracle.GetPhysical(*newSafePoint), 0), nil
}
func (w *GCWorker) getOracleTime() (time.Time, error) {
currentVer, err := w.store.CurrentVersion()
if err != nil {
return time.Time{}, errors.Trace(err)
}
physical := oracle.ExtractPhysical(currentVer.Ver)
sec, nsec := physical/1e3, (physical%1e3)*1e6
return time.Unix(sec, nsec), nil
}
func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) {
runInterval, err := w.loadDurationWithDefault(gcRunIntervalKey, gcDefaultRunInterval)
if err != nil {
return false, errors.Trace(err)
}
lastRun, err := w.loadTime(gcLastRunTimeKey)
if err != nil {
return false, errors.Trace(err)
}
if lastRun != nil && lastRun.Add(*runInterval).After(now) {
return false, nil
}
return true, nil
}
func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) {
lifeTime, err := w.loadDurationWithDefault(gcLifeTimeKey, gcDefaultLifeTime)
if err != nil {
return nil, errors.Trace(err)
}
lastSafePoint, err := w.loadTime(gcSafePointKey)
if err != nil {
return nil, errors.Trace(err)
}
safePoint := now.Add(-*lifeTime)
// We should never decrease safePoint.
if lastSafePoint != nil && safePoint.Before(*lastSafePoint) {
return nil, nil
}
return &safePoint, nil
}
func (w *GCWorker) runGCJob(safePoint uint64) {
gcWorkerCounter.WithLabelValues("run_job").Inc()
err := w.resolveLocks(safePoint)
@ -264,7 +379,7 @@ func (w *GCWorker) checkLeader() (bool, error) {
}
log.Debugf("[gc worker] got leader: %s", leader)
if leader == w.uuid {
err = w.updateLease()
err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease))
if err != nil {
w.session.Execute("ROLLBACK")
return false, errors.Trace(err)
@ -275,8 +390,11 @@ func (w *GCWorker) checkLeader() (bool, error) {
}
return true, nil
}
lease, err := w.loadLease()
if err != nil || lease.Before(time.Now()) {
lease, err := w.loadTime(gcLeaderLeaseKey)
if err != nil {
return false, errors.Trace(err)
}
if lease == nil || lease.Before(time.Now()) {
log.Debugf("[gc worker] register %s as leader", w.uuid)
gcWorkerCounter.WithLabelValues("register_leader").Inc()
@ -285,7 +403,12 @@ func (w *GCWorker) checkLeader() (bool, error) {
w.session.Execute("ROLLBACK")
return false, errors.Trace(err)
}
err = w.updateLease()
err = w.saveValueToSysTable(gcLeaderDescKey, w.desc)
if err != nil {
w.session.Execute("ROLLBACK")
return false, errors.Trace(err)
}
err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease))
if err != nil {
w.session.Execute("ROLLBACK")
return false, errors.Trace(err)
@ -300,25 +423,59 @@ func (w *GCWorker) checkLeader() (bool, error) {
return false, nil
}
func (w *GCWorker) updateLease() error {
lease := time.Now().Add(gcWorkerLease).Format(gcLeaderLeaseFormat)
log.Debugf("[gc worker] update leader lease to %s for worker %s", lease, w.uuid)
err := w.saveValueToSysTable(gcLeaderLeaseKey, lease)
func (w *GCWorker) saveTime(key string, t time.Time) error {
err := w.saveValueToSysTable(key, t.Format(gcTimeFormat))
return errors.Trace(err)
}
func (w *GCWorker) loadLease() (time.Time, error) {
var t time.Time
str, err := w.loadValueFromSysTable(gcLeaderLeaseKey)
func (w *GCWorker) loadTime(key string) (*time.Time, error) {
str, err := w.loadValueFromSysTable(key)
if err != nil {
return t, errors.Trace(err)
return nil, errors.Trace(err)
}
lease, err := time.Parse(gcLeaderLeaseFormat, str)
if str == "" {
return nil, nil
}
t, err := time.Parse(gcTimeFormat, str)
if err != nil {
return t, errors.Trace(err)
return nil, errors.Trace(err)
}
log.Debugf("[gc worker] load lease: %s", lease)
return lease, nil
return &t, nil
}
func (w *GCWorker) saveDuration(key string, d time.Duration) error {
err := w.saveValueToSysTable(key, d.String())
return errors.Trace(err)
}
func (w *GCWorker) loadDuration(key string) (*time.Duration, error) {
str, err := w.loadValueFromSysTable(key)
if err != nil {
return nil, errors.Trace(err)
}
if str == "" {
return nil, nil
}
d, err := time.ParseDuration(str)
if err != nil {
return nil, errors.Trace(err)
}
return &d, nil
}
func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time.Duration, error) {
d, err := w.loadDuration(key)
if err != nil {
return nil, errors.Trace(err)
}
if d == nil {
err = w.saveDuration(key, def)
if err != nil {
return nil, errors.Trace(err)
}
return &def, nil
}
return d, nil
}
func (w *GCWorker) loadValueFromSysTable(key string) (string, error) {
@ -332,13 +489,21 @@ func (w *GCWorker) loadValueFromSysTable(key string) (string, error) {
return "", errors.Trace(err)
}
if row == nil {
log.Debugf("[gc worker] load kv, %s:nil", key)
return "", nil
}
return row.Data[0].GetString(), nil
value := row.Data[0].GetString()
log.Debugf("[gc worker] load kv, %s:%s", key, value)
return value, nil
}
func (w *GCWorker) saveValueToSysTable(key, value string) error {
stmt := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value) VALUES ('%s', '%s') ON DUPLICATE KEY UPDATE variable_value = '%s'`, key, value, value)
stmt := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`,
key, value, gcVariableComments[key])
_, err := w.session.Execute(stmt)
log.Debugf("[gc worker] save kv, %s:%s %v", key, value, err)
return errors.Trace(err)
}

View File

@ -0,0 +1,102 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv
import (
"math"
"time"
. "github.com/pingcap/check"
)
type testGCWorkerSuite struct {
store *tikvStore
oracle *mockOracle
gcWorker *GCWorker
}
var _ = Suite(&testGCWorkerSuite{})
func (s *testGCWorkerSuite) SetUpTest(c *C) {
s.store = newTestStore(c)
s.oracle = &mockOracle{}
s.store.oracle = s.oracle
gcWorker, err := NewGCWorker(s.store)
c.Assert(err, IsNil)
s.gcWorker = gcWorker
}
func (s *testGCWorkerSuite) TearDownTest(c *C) {
err := s.store.Close()
c.Assert(err, IsNil)
}
func (s *testGCWorkerSuite) timeEqual(c *C, t1, t2 time.Time, epsilon time.Duration) {
c.Assert(math.Abs(float64(t1.Sub(t2))), Less, float64(epsilon))
}
func (s *testGCWorkerSuite) TestGetOracleTime(c *C) {
t1, err := s.gcWorker.getOracleTime()
c.Assert(err, IsNil)
s.timeEqual(c, time.Now(), t1, time.Millisecond*2)
s.oracle.addOffset(time.Second * 10)
t2, err := s.gcWorker.getOracleTime()
c.Assert(err, IsNil)
s.timeEqual(c, t2, t1.Add(time.Second*10), time.Millisecond*2)
}
func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
now, err := s.gcWorker.getOracleTime()
c.Assert(err, IsNil)
ok, _, err := s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)
lastRun, err := s.gcWorker.loadTime(gcLastRunTimeKey)
c.Assert(err, IsNil)
c.Assert(lastRun, NotNil)
s.timeEqual(c, *lastRun, now, time.Second)
safePoint, err := s.gcWorker.loadTime(gcSafePointKey)
c.Assert(err, IsNil)
s.timeEqual(c, safePoint.Add(gcDefaultLifeTime), now, time.Second)
// Change GC run interval.
err = s.gcWorker.saveDuration(gcRunIntervalKey, time.Minute*5)
c.Assert(err, IsNil)
s.oracle.addOffset(time.Minute * 4)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)
s.oracle.addOffset(time.Minute * 2)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)
// Change GC life time.
err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30)
c.Assert(err, IsNil)
s.oracle.addOffset(time.Minute * 5)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)
s.oracle.addOffset(time.Minute * 40)
now, err = s.gcWorker.getOracleTime()
c.Assert(err, IsNil)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)
safePoint, err = s.gcWorker.loadTime(gcSafePointKey)
c.Assert(err, IsNil)
s.timeEqual(c, safePoint.Add(time.Minute*30), now, time.Second)
}

View File

@ -45,7 +45,7 @@ func (s *testStoreSuite) SetUpTest(c *C) {
}
func (s *testStoreSuite) TestOracle(c *C) {
o := newMockOracle(s.store.oracle)
o := &mockOracle{}
s.store.oracle = o
t1, err := s.store.getTimestampWithRetry(NewBackoffer(100))
@ -141,44 +141,63 @@ func (s *testStoreSuite) TestBusyServerCop(c *C) {
}
type mockOracle struct {
oracle.Oracle
mu struct {
sync.RWMutex
stop bool
}
}
func newMockOracle(oracle oracle.Oracle) *mockOracle {
return &mockOracle{Oracle: oracle}
sync.RWMutex
stop bool
offset time.Duration
lastTS uint64
}
func (o *mockOracle) enable() {
o.mu.Lock()
defer o.mu.Unlock()
o.mu.stop = false
o.Lock()
defer o.Unlock()
o.stop = false
}
func (o *mockOracle) disable() {
o.mu.Lock()
defer o.mu.Unlock()
o.mu.stop = true
o.Lock()
defer o.Unlock()
o.stop = true
}
func (o *mockOracle) setOffset(offset time.Duration) {
o.Lock()
defer o.Unlock()
o.offset = offset
}
func (o *mockOracle) addOffset(d time.Duration) {
o.Lock()
defer o.Unlock()
o.offset += d
}
func (o *mockOracle) GetTimestamp() (uint64, error) {
o.mu.RLock()
defer o.mu.RUnlock()
o.RLock()
defer o.RUnlock()
if o.mu.stop {
if o.stop {
return 0, errors.New("stopped")
}
return o.Oracle.GetTimestamp()
physical := oracle.GetPhysical(time.Now().Add(o.offset))
ts := oracle.ComposeTS(physical, 0)
if oracle.ExtractPhysical(o.lastTS) == physical {
ts = o.lastTS + 1
}
o.lastTS = ts
return ts, nil
}
func (o *mockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool {
o.mu.RLock()
defer o.mu.RUnlock()
o.RLock()
defer o.RUnlock()
return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL)
}
func (o *mockOracle) Close() {
return o.Oracle.IsExpired(lockTimestamp, TTL)
}
type busyClient struct {