store/tikv: Avoid sending to channel everywhere in runGCJob (#11032)

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
MyonKeminta
2019-07-17 17:31:10 +08:00
committed by GitHub
parent dd06ebb315
commit db45460280
2 changed files with 47 additions and 33 deletions

View File

@ -233,13 +233,11 @@ func (w *GCWorker) leaderTick(ctx context.Context) error {
if err != nil {
metrics.GCJobFailureCounter.WithLabelValues("prepare").Inc()
}
w.gcIsRunning = false
return errors.Trace(err)
}
// When the worker is just started, or an old GC job has just finished,
// wait a while before starting a new job.
if time.Since(w.lastFinish) < gcWaitTime {
w.gcIsRunning = false
logutil.Logger(ctx).Info("[gc worker] another gc job has just finished, skipped.",
zap.String("leaderTick on ", w.uuid))
return nil
@ -258,7 +256,9 @@ func (w *GCWorker) leaderTick(ctx context.Context) error {
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int("concurrency", concurrency))
go w.runGCJob(ctx, safePoint, concurrency)
go func() {
w.done <- w.runGCJob(ctx, safePoint, concurrency)
}()
return nil
}
@ -466,7 +466,7 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) {
return &safePoint, nil
}
func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency int) {
func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency int) error {
metrics.GCWorkerCounter.WithLabelValues("run_job").Inc()
err := w.resolveLocks(ctx, safePoint, concurrency)
if err != nil {
@ -474,8 +474,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i
zap.String("uuid", w.uuid),
zap.Error(err))
metrics.GCJobFailureCounter.WithLabelValues("resolve_lock").Inc()
w.done <- errors.Trace(err)
return
return errors.Trace(err)
}
// Save safe point to pd.
err = w.saveSafePoint(w.store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint)
@ -483,10 +482,8 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i
logutil.Logger(ctx).Error("[gc worker] failed to save safe point to PD",
zap.String("uuid", w.uuid),
zap.Error(err))
w.gcIsRunning = false
metrics.GCJobFailureCounter.WithLabelValues("save_safe_point").Inc()
w.done <- errors.Trace(err)
return
return errors.Trace(err)
}
// Sleep to wait for all other tidb instances update their safepoint cache.
time.Sleep(gcSafePointCacheInterval)
@ -497,8 +494,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i
zap.String("uuid", w.uuid),
zap.Error(err))
metrics.GCJobFailureCounter.WithLabelValues("delete_range").Inc()
w.done <- errors.Trace(err)
return
return errors.Trace(err)
}
err = w.redoDeleteRanges(ctx, safePoint, concurrency)
if err != nil {
@ -506,8 +502,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i
zap.String("uuid", w.uuid),
zap.Error(err))
metrics.GCJobFailureCounter.WithLabelValues("redo_delete_range").Inc()
w.done <- errors.Trace(err)
return
return errors.Trace(err)
}
useDistributedGC, err := w.checkUseDistributedGC()
@ -525,10 +520,8 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i
logutil.Logger(ctx).Error("[gc worker] failed to upload safe point to PD",
zap.String("uuid", w.uuid),
zap.Error(err))
w.gcIsRunning = false
metrics.GCJobFailureCounter.WithLabelValues("upload_safe_point").Inc()
w.done <- errors.Trace(err)
return
return errors.Trace(err)
}
} else {
err = w.doGC(ctx, safePoint, concurrency)
@ -536,14 +529,12 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i
logutil.Logger(ctx).Error("[gc worker] do GC returns an error",
zap.String("uuid", w.uuid),
zap.Error(err))
w.gcIsRunning = false
metrics.GCJobFailureCounter.WithLabelValues("gc").Inc()
w.done <- errors.Trace(err)
return
return errors.Trace(err)
}
}
w.done <- nil
return nil
}
// deleteRanges processes all delete range records whose ts < safePoint in table `gc_delete_range`

View File

@ -16,7 +16,6 @@ package gcworker
import (
"bytes"
"context"
"errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"math"
@ -26,6 +25,7 @@ import (
"time"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
@ -647,39 +647,64 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) {
// Wait for GC finish
select {
case err = <-s.gcWorker.done:
s.gcWorker.gcIsRunning = false
break
case <-time.After(time.Second * 10):
err = errors.New("receive from s.gcWorker.done timeout")
}
c.Assert(err, IsNil)
s.checkCollected(c, p)
// Test again to ensure the synchronization between goroutines is correct.
err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong))
c.Assert(err, IsNil)
s.gcWorker.lastFinish = time.Now().Add(-veryLong)
p = s.createGCProbe(c, "k1")
s.oracle.AddOffset(gcDefaultLifeTime * 2)
err = s.gcWorker.leaderTick(context.Background())
c.Assert(err, IsNil)
// Wait for GC finish
select {
case err = <-s.gcWorker.done:
s.gcWorker.gcIsRunning = false
break
case <-time.After(time.Second * 10):
err = errors.New("receive from s.gcWorker.done timeout")
}
c.Assert(err, IsNil)
s.checkCollected(c, p)
// No more signals in the channel
select {
case err = <-s.gcWorker.done:
err = errors.Errorf("received signal s.gcWorker.done which shouldn't exist: %v", err)
break
case <-time.After(time.Second):
break
}
c.Assert(err, IsNil)
}
func (s *testGCWorkerSuite) TestRunGCJob(c *C) {
gcSafePointCacheInterval = 0
// Avoid blocking runGCJob function
s.gcWorker.done = make(chan error, 1)
// Test distributed mode
useDistributedGC, err := s.gcWorker.checkUseDistributedGC()
c.Assert(err, IsNil)
c.Assert(useDistributedGC, IsTrue)
safePoint := s.mustAllocTs(c)
s.gcWorker.runGCJob(context.Background(), safePoint, 1)
err = <-s.gcWorker.done
err = s.gcWorker.runGCJob(context.Background(), safePoint, 1)
c.Assert(err, IsNil)
pdSafePoint := s.mustGetSafePointFromPd(c)
c.Assert(pdSafePoint, Equals, safePoint)
etcdSafePoint := s.loadEtcdSafePoint(c)
c.Assert(err, IsNil)
c.Assert(etcdSafePoint, Equals, safePoint)
// Test distributed mode with safePoint regressing (although this is impossible)
s.gcWorker.runGCJob(context.Background(), safePoint-1, 1)
err = <-s.gcWorker.done
err = s.gcWorker.runGCJob(context.Background(), safePoint-1, 1)
c.Assert(err, NotNil)
// Test central mode
@ -691,13 +716,11 @@ func (s *testGCWorkerSuite) TestRunGCJob(c *C) {
p := s.createGCProbe(c, "k1")
safePoint = s.mustAllocTs(c)
s.gcWorker.runGCJob(context.Background(), safePoint, 1)
s.checkCollected(c, p)
err = <-s.gcWorker.done
err = s.gcWorker.runGCJob(context.Background(), safePoint, 1)
c.Assert(err, IsNil)
s.checkCollected(c, p)
etcdSafePoint = s.loadEtcdSafePoint(c)
c.Assert(err, IsNil)
c.Assert(etcdSafePoint, Equals, safePoint)
}