From db454602801fea40b098fb5942576c3cd89dfebc Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Wed, 17 Jul 2019 17:31:10 +0800 Subject: [PATCH] store/tikv: Avoid sending to channel everywhere in runGCJob (#11032) Signed-off-by: MyonKeminta --- store/tikv/gcworker/gc_worker.go | 31 ++++++----------- store/tikv/gcworker/gc_worker_test.go | 49 ++++++++++++++++++++------- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index fe4794abf9..382c0c474c 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -233,13 +233,11 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { if err != nil { metrics.GCJobFailureCounter.WithLabelValues("prepare").Inc() } - w.gcIsRunning = false return errors.Trace(err) } // When the worker is just started, or an old GC job has just finished, // wait a while before starting a new job. if time.Since(w.lastFinish) < gcWaitTime { - w.gcIsRunning = false logutil.Logger(ctx).Info("[gc worker] another gc job has just finished, skipped.", zap.String("leaderTick on ", w.uuid)) return nil @@ -258,7 +256,9 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { zap.String("uuid", w.uuid), zap.Uint64("safePoint", safePoint), zap.Int("concurrency", concurrency)) - go w.runGCJob(ctx, safePoint, concurrency) + go func() { + w.done <- w.runGCJob(ctx, safePoint, concurrency) + }() return nil } @@ -466,7 +466,7 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) { return &safePoint, nil } -func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency int) { +func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency int) error { metrics.GCWorkerCounter.WithLabelValues("run_job").Inc() err := w.resolveLocks(ctx, safePoint, concurrency) if err != nil { @@ -474,8 +474,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i zap.String("uuid", w.uuid), zap.Error(err)) metrics.GCJobFailureCounter.WithLabelValues("resolve_lock").Inc() - w.done <- errors.Trace(err) - return + return errors.Trace(err) } // Save safe point to pd. err = w.saveSafePoint(w.store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint) @@ -483,10 +482,8 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i logutil.Logger(ctx).Error("[gc worker] failed to save safe point to PD", zap.String("uuid", w.uuid), zap.Error(err)) - w.gcIsRunning = false metrics.GCJobFailureCounter.WithLabelValues("save_safe_point").Inc() - w.done <- errors.Trace(err) - return + return errors.Trace(err) } // Sleep to wait for all other tidb instances update their safepoint cache. time.Sleep(gcSafePointCacheInterval) @@ -497,8 +494,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i zap.String("uuid", w.uuid), zap.Error(err)) metrics.GCJobFailureCounter.WithLabelValues("delete_range").Inc() - w.done <- errors.Trace(err) - return + return errors.Trace(err) } err = w.redoDeleteRanges(ctx, safePoint, concurrency) if err != nil { @@ -506,8 +502,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i zap.String("uuid", w.uuid), zap.Error(err)) metrics.GCJobFailureCounter.WithLabelValues("redo_delete_range").Inc() - w.done <- errors.Trace(err) - return + return errors.Trace(err) } useDistributedGC, err := w.checkUseDistributedGC() @@ -525,10 +520,8 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i logutil.Logger(ctx).Error("[gc worker] failed to upload safe point to PD", zap.String("uuid", w.uuid), zap.Error(err)) - w.gcIsRunning = false metrics.GCJobFailureCounter.WithLabelValues("upload_safe_point").Inc() - w.done <- errors.Trace(err) - return + return errors.Trace(err) } } else { err = w.doGC(ctx, safePoint, concurrency) @@ -536,14 +529,12 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency i logutil.Logger(ctx).Error("[gc worker] do GC returns an error", zap.String("uuid", w.uuid), zap.Error(err)) - w.gcIsRunning = false metrics.GCJobFailureCounter.WithLabelValues("gc").Inc() - w.done <- errors.Trace(err) - return + return errors.Trace(err) } } - w.done <- nil + return nil } // deleteRanges processes all delete range records whose ts < safePoint in table `gc_delete_range` diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index b73f263d63..4f762242dc 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -16,7 +16,6 @@ package gcworker import ( "bytes" "context" - "errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "math" @@ -26,6 +25,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -647,39 +647,64 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { // Wait for GC finish select { case err = <-s.gcWorker.done: + s.gcWorker.gcIsRunning = false break case <-time.After(time.Second * 10): err = errors.New("receive from s.gcWorker.done timeout") } c.Assert(err, IsNil) s.checkCollected(c, p) + + // Test again to ensure the synchronization between goroutines is correct. + err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) + c.Assert(err, IsNil) + s.gcWorker.lastFinish = time.Now().Add(-veryLong) + p = s.createGCProbe(c, "k1") + s.oracle.AddOffset(gcDefaultLifeTime * 2) + + err = s.gcWorker.leaderTick(context.Background()) + c.Assert(err, IsNil) + // Wait for GC finish + select { + case err = <-s.gcWorker.done: + s.gcWorker.gcIsRunning = false + break + case <-time.After(time.Second * 10): + err = errors.New("receive from s.gcWorker.done timeout") + } + c.Assert(err, IsNil) + s.checkCollected(c, p) + + // No more signals in the channel + select { + case err = <-s.gcWorker.done: + err = errors.Errorf("received signal s.gcWorker.done which shouldn't exist: %v", err) + break + case <-time.After(time.Second): + break + } + c.Assert(err, IsNil) } func (s *testGCWorkerSuite) TestRunGCJob(c *C) { gcSafePointCacheInterval = 0 - // Avoid blocking runGCJob function - s.gcWorker.done = make(chan error, 1) - // Test distributed mode useDistributedGC, err := s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, IsTrue) safePoint := s.mustAllocTs(c) - s.gcWorker.runGCJob(context.Background(), safePoint, 1) - err = <-s.gcWorker.done + err = s.gcWorker.runGCJob(context.Background(), safePoint, 1) c.Assert(err, IsNil) pdSafePoint := s.mustGetSafePointFromPd(c) c.Assert(pdSafePoint, Equals, safePoint) etcdSafePoint := s.loadEtcdSafePoint(c) - c.Assert(err, IsNil) c.Assert(etcdSafePoint, Equals, safePoint) // Test distributed mode with safePoint regressing (although this is impossible) - s.gcWorker.runGCJob(context.Background(), safePoint-1, 1) - err = <-s.gcWorker.done + err = s.gcWorker.runGCJob(context.Background(), safePoint-1, 1) c.Assert(err, NotNil) // Test central mode @@ -691,13 +716,11 @@ func (s *testGCWorkerSuite) TestRunGCJob(c *C) { p := s.createGCProbe(c, "k1") safePoint = s.mustAllocTs(c) - s.gcWorker.runGCJob(context.Background(), safePoint, 1) - s.checkCollected(c, p) - err = <-s.gcWorker.done + err = s.gcWorker.runGCJob(context.Background(), safePoint, 1) c.Assert(err, IsNil) + s.checkCollected(c, p) etcdSafePoint = s.loadEtcdSafePoint(c) - c.Assert(err, IsNil) c.Assert(etcdSafePoint, Equals, safePoint) }