From 23f526858d0f64723912374313f9dee6202ef9aa Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 1 Dec 2017 04:24:36 -0600 Subject: [PATCH] store/tikv: refine code, backoffer is-a go context (#5276) --- store/tikv/backoff.go | 31 ++++++++++++++++++------------- store/tikv/kv.go | 2 +- store/tikv/region_cache.go | 6 +++--- store/tikv/region_request.go | 4 ++-- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index dc33e0dfba..5283a1523c 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -40,10 +40,10 @@ const ( // NewBackoffFn creates a backoff func which implements exponential backoff with // optional jitters. // See http://www.awsarchitectureblog.com/2015/03/backoff.html -func NewBackoffFn(base, cap, jitter int) func() int { +func NewBackoffFn(base, cap, jitter int) func(goCtx goctx.Context) int { attempts := 0 lastSleep := base - return func() int { + return func(goCtx goctx.Context) int { var sleep int switch jitter { case NoJitter: @@ -57,7 +57,11 @@ func NewBackoffFn(base, cap, jitter int) func() int { case DecorrJitter: sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base)))) } - time.Sleep(time.Duration(sleep) * time.Millisecond) + + select { + case <-time.After(time.Duration(sleep) * time.Millisecond): + case <-goCtx.Done(): + } attempts++ lastSleep = sleep @@ -81,7 +85,7 @@ const ( boServerBusy ) -func (t backoffType) createFn() func() int { +func (t backoffType) createFn() func(goctx.Context) int { switch t { case boTiKVRPC: return NewBackoffFn(100, 2000, EqualJitter) @@ -154,19 +158,20 @@ var commitMaxBackoff = 20000 // Backoffer is a utility for retrying queries. type Backoffer struct { - fn map[backoffType]func() int + goctx.Context + + fn map[backoffType]func(goctx.Context) int maxSleep int totalSleep int errors []error - ctx goctx.Context types []backoffType } // NewBackoffer creates a Backoffer with maximum sleep time(in ms). func NewBackoffer(maxSleep int, ctx goctx.Context) *Backoffer { return &Backoffer{ + Context: ctx, maxSleep: maxSleep, - ctx: ctx, } } @@ -174,7 +179,7 @@ func NewBackoffer(maxSleep int, ctx goctx.Context) *Backoffer { // It returns a retryable error if total sleep time exceeds maxSleep. func (b *Backoffer) Backoff(typ backoffType, err error) error { select { - case <-b.ctx.Done(): + case <-b.Context.Done(): return errors.Trace(err) default: } @@ -182,7 +187,7 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { backoffCounter.WithLabelValues(typ.String()).Inc() // Lazy initialize. if b.fn == nil { - b.fn = make(map[backoffType]func() int) + b.fn = make(map[backoffType]func(goctx.Context) int) } f, ok := b.fn[typ] if !ok { @@ -190,7 +195,7 @@ func (b *Backoffer) Backoff(typ backoffType, err error) error { b.fn[typ] = f } - b.totalSleep += f() + b.totalSleep += f(b) b.types = append(b.types, typ) log.Debugf("%v, retry later(totalSleep %dms, maxSleep %dms)", err, b.totalSleep, b.maxSleep) @@ -221,21 +226,21 @@ func (b *Backoffer) String() string { // current Backoffer's context. func (b *Backoffer) Clone() *Backoffer { return &Backoffer{ + Context: b.Context, maxSleep: b.maxSleep, totalSleep: b.totalSleep, errors: b.errors, - ctx: b.ctx, } } // Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds // a child context of current Backoffer's context. func (b *Backoffer) Fork() (*Backoffer, goctx.CancelFunc) { - ctx, cancel := goctx.WithCancel(b.ctx) + ctx, cancel := goctx.WithCancel(b.Context) return &Backoffer{ + Context: ctx, maxSleep: b.maxSleep, totalSleep: b.totalSleep, errors: b.errors, - ctx: ctx, }, cancel } diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 9cf77f4c8b..c132081d9b 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -371,7 +371,7 @@ func (s *tikvStore) CurrentVersion() (kv.Version, error) { func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { for { - startTS, err := s.oracle.GetTimestamp(bo.ctx) + startTS, err := s.oracle.GetTimestamp(bo) if err == nil { return startTS, nil } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 227484dfb8..9b8051b81a 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -333,7 +333,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte) (*Region, error) { } } - meta, leader, err := c.pdClient.GetRegion(bo.ctx, key) + meta, leader, err := c.pdClient.GetRegion(bo, key) if err != nil { backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err) continue @@ -367,7 +367,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e } } - meta, leader, err := c.pdClient.GetRegionByID(bo.ctx, regionID) + meta, leader, err := c.pdClient.GetRegionByID(bo, regionID) if err != nil { backoffErr = errors.Errorf("loadRegion from PD failed, regionID: %v, err: %v", regionID, err) continue @@ -427,7 +427,7 @@ func (c *RegionCache) ClearStoreByID(id uint64) { func (c *RegionCache) loadStoreAddr(bo *Backoffer, id uint64) (string, error) { for { - store, err := c.pdClient.GetStore(bo.ctx, id) + store, err := c.pdClient.GetStore(bo, id) if err != nil { if errors.Cause(err) == goctx.Canceled { return "", errors.Trace(err) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index fc0e314fff..3643fc423c 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -104,7 +104,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil { return nil, false, errors.Trace(e) } - context, cancel := goctx.WithTimeout(bo.ctx, timeout) + context, cancel := goctx.WithTimeout(bo, timeout) defer cancel() resp, err = s.client.SendReq(context, ctx.Addr, req) if err != nil { @@ -124,7 +124,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err } if grpc.Code(errors.Cause(err)) == codes.Canceled { select { - case <-bo.ctx.Done(): + case <-bo.Done(): return errors.Trace(err) default: // If we don't cancel, but the error code is Canceled, it must be from grpc remote.