store/tikv: handle Canceled error code by grpc remote. (#4133)
It's observed that killing tikv when running sysbench, we may get a codes.Canceled error, that error is from grpc remote and we didn't handle it properly. Distinguish the error cause of cancel: 1. the request should not retry if cancelled by ourself 2. the request need retry when cancelled by remote
This commit is contained in:
@ -120,10 +120,21 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
|
||||
}
|
||||
|
||||
func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error {
|
||||
// If it failed because the context is canceled, don't retry on this error.
|
||||
if errors.Cause(err) == goctx.Canceled || grpc.Code(errors.Cause(err)) == codes.Canceled {
|
||||
// If it failed because the context is cancelled by ourself, don't retry.
|
||||
if errors.Cause(err) == goctx.Canceled {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if grpc.Code(errors.Cause(err)) == codes.Canceled {
|
||||
select {
|
||||
case <-bo.ctx.Done():
|
||||
return errors.Trace(err)
|
||||
default:
|
||||
// If we don't cancel, but the error code is Canceled, it must be from grpc remote.
|
||||
// This may happen when tikv is killed and exiting.
|
||||
// Backoff and retry in this case.
|
||||
log.Warn("receive a grpc cancel signal from remote:", errors.ErrorStack(err))
|
||||
}
|
||||
}
|
||||
|
||||
s.regionCache.OnRequestFail(ctx, err)
|
||||
|
||||
|
||||
@ -29,7 +29,6 @@ import (
|
||||
"github.com/pingcap/tidb/util"
|
||||
goctx "golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
type testRegionRequestSuite struct {
|
||||
@ -240,10 +239,7 @@ func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
client := &cancelContextClient{
|
||||
Client: newRPCClient(),
|
||||
redirectAddr: addr,
|
||||
}
|
||||
client := newRPCClient()
|
||||
sender := NewRegionRequestSender(s.cache, client, kvrpcpb.IsolationLevel_SI)
|
||||
req := &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdRawPut,
|
||||
@ -255,10 +251,20 @@ func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C)
|
||||
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
_, err = sender.SendReq(s.bo, req, region.Region, 3*time.Second)
|
||||
c.Assert(grpc.Code(errors.Cause(err)), Equals, codes.Canceled)
|
||||
bo, cancel := s.bo.Fork()
|
||||
cancel()
|
||||
_, err = sender.SendReq(bo, req, region.Region, 3*time.Second)
|
||||
c.Assert(errors.Cause(err), Equals, goctx.Canceled)
|
||||
c.Assert(s.cache.getRegionByIDFromCache(s.region), NotNil)
|
||||
|
||||
// Just for covering error code = codes.Canceled.
|
||||
client1 := &cancelContextClient{
|
||||
Client: newRPCClient(),
|
||||
redirectAddr: addr,
|
||||
}
|
||||
sender = NewRegionRequestSender(s.cache, client1, kvrpcpb.IsolationLevel_SI)
|
||||
sender.SendReq(s.bo, req, region.Region, 3*time.Second)
|
||||
|
||||
// cleanup
|
||||
server.Stop()
|
||||
wg.Wait()
|
||||
|
||||
Reference in New Issue
Block a user