From 2d79d59f24afa7623fddd49d68cdf6bb76ec487d Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 8 Nov 2017 18:27:59 +0800 Subject: [PATCH] store/tikv: set undetermined error if not recover from an rpc error. (#5043) --- store/tikv/2pc.go | 41 ++++++++++++++++------ store/tikv/2pc_test.go | 66 +++++++++++++++++++++++++++++------- store/tikv/region_request.go | 2 ++ 3 files changed, 85 insertions(+), 24 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 6336226dee..6aacf0fe46 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -72,9 +72,9 @@ type twoPhaseCommitter struct { commitTS uint64 mu struct { sync.RWMutex - writtenKeys [][]byte - committed bool - undetermined bool + writtenKeys [][]byte + committed bool + undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key. } priority pb.CommandPri syncLog bool @@ -418,6 +418,18 @@ func kvPriorityToCommandPri(pri int) pb.CommandPri { return pb.CommandPri_Normal } +func (c *twoPhaseCommitter) setUndeterminedErr(err error) { + c.mu.Lock() + defer c.mu.Unlock() + c.mu.undeterminedErr = err +} + +func (c *twoPhaseCommitter) getUndeterminedErr() error { + c.mu.RLock() + defer c.mu.RUnlock() + return c.mu.undeterminedErr +} + func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdCommit, @@ -433,19 +445,20 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er } req.Context.Priority = c.priority + sender := NewRegionRequestSender(c.store.regionCache, c.store.client) + resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort) + // If we fail to receive response for the request that commits primary key, it will be undetermined whether this // transaction has been successfully committed. // Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw // an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best // solution is to populate this error and let upper layer drop the connection to the corresponding mysql client. isPrimary := bytes.Equal(batch.keys[0], c.primary()) + if isPrimary && sender.rpcError != nil { + c.setUndeterminedErr(errors.Trace(sender.rpcError)) + } - resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { - if isPrimary { - // change the Cause of the error to be returned - return errors.Trace(errors.Wrap(err, terror.ErrResultUndetermined)) - } return errors.Trace(err) } regionErr, err := resp.GetRegionError() @@ -465,6 +478,11 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er if commitResp == nil { return errors.Trace(ErrBodyMissing) } + // Here we can make sure tikv has processed the commit primary key request. So + // we can clean undetermined error. + if isPrimary { + c.setUndeterminedErr(nil) + } if keyErr := commitResp.GetError(); keyErr != nil { c.mu.RLock() defer c.mu.RUnlock() @@ -548,7 +566,7 @@ func (c *twoPhaseCommitter) execute(ctx goctx.Context) error { c.mu.RLock() writtenKeys := c.mu.writtenKeys committed := c.mu.committed - undetermined := c.mu.undetermined + undetermined := c.mu.undeterminedErr != nil c.mu.RUnlock() if !committed && !undetermined { twoPhaseCommitGP.Go(func() { @@ -615,8 +633,9 @@ func (c *twoPhaseCommitter) execute(ctx goctx.Context) error { err = c.commitKeys(NewBackoffer(commitMaxBackoff, ctx), c.keys) if err != nil { - if errors.Cause(err) == terror.ErrResultUndetermined { - c.mu.undetermined = true + if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil { + log.Warnf("2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", err, undeterminedErr, c.startTS) + err = errors.Wrap(err, terror.ErrResultUndetermined) } if !c.mu.committed { log.Debugf("2PC failed on commit: %v, tid: %d", err, c.startTS) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 9d18c30db6..835c39a669 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -377,16 +377,33 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { c.Assert(v, BytesEquals, []byte("a3")) } -// interceptCommitClient wraps rpcClient and returns specified response and error for commit command. +// interceptCommitClient wraps rpcClient and returns specified response and +// error for commit command. +// It supports 3 strategies: "error": alwrays return err; "response": always +// return resp; "firstCallError": return err when first called, then always +// return resp. type interceptCommitClient struct { Client - resp *tikvrpc.Response - err error + resp *tikvrpc.Response + err error + strategy string + callCount int } func (c *interceptCommitClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { if req.Type == tikvrpc.CmdCommit { - return c.resp, c.err + switch c.strategy { + case "error": + return nil, c.err + case "response": + return c.resp, nil + case "firstCallError": + c.callCount++ + if c.callCount == 1 { + return nil, c.err + } + return c.resp, nil + } } return c.Client.SendReq(ctx, addr, req) } @@ -395,20 +412,18 @@ func (c *interceptCommitClient) SendReq(ctx goctx.Context, addr string, req *tik // when committing primary region task. func (s *testCommitterSuite) TestCommitPrimaryRpcErrors(c *C) { s.store.client = &interceptCommitClient{ - Client: s.store.client, - resp: nil, - err: errors.Errorf("timeout"), + Client: s.store.client, + err: errors.New("timeout"), + strategy: "error", } - // The rpc error may or may not be wrapped to ErrResultUndetermined. + // The rpc error will be wrapped to ErrResultUndetermined. t1 := s.begin(c) err := t1.Set([]byte("a"), []byte("a1")) c.Assert(err, IsNil) err = t1.Commit(goctx.Background()) c.Assert(err, NotNil) - // TODO: refine errors of region cache and rpc, so that every the rpc error - // could be easily wrapped to ErrResultUndetermined, but RegionError would not. - // c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err))) + c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err))) } // TestCommitPrimaryRegionError tests RegionError is handled properly @@ -424,7 +439,7 @@ func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) { }, }, }, - err: nil, + strategy: "response", } // Ensure it returns the original error without wrapped to ErrResultUndetermined // if it exceeds max retry timeout on RegionError. @@ -436,6 +451,31 @@ func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) { c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue) } +// TestCommitPrimaryRPCErrorThenRegionError tests the case when commit first +// receive a rpc timeout, then region errors afterwrards. +func (s *testCommitterSuite) TestCommitPrimaryRPCErrorThenRegionError(c *C) { + s.store.client = &interceptCommitClient{ + Client: s.store.client, + resp: &tikvrpc.Response{ + Type: tikvrpc.CmdCommit, + Commit: &kvrpcpb.CommitResponse{ + RegionError: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{}, + }, + }, + }, + err: errors.New("timeout"), + strategy: "firstCallError", + } + // The region error will be wrapped to ErrResultUndetermined. + t1 := s.begin(c) + err := t1.Set([]byte("a"), []byte("a1")) + c.Assert(err, IsNil) + err = t1.Commit(goctx.Background()) + c.Assert(err, NotNil) + c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err))) +} + // TestCommitPrimaryKeyError tests KeyError is handled properly // when committing primary region task. func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) { @@ -447,7 +487,7 @@ func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) { Error: &kvrpcpb.KeyError{}, }, }, - err: nil, + strategy: "response", } // Ensure it returns the original error without wrapped to ErrResultUndetermined // if it meets KeyError. diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 745c497f08..fc0e314fff 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -46,6 +46,7 @@ type RegionRequestSender struct { regionCache *RegionCache client Client storeAddr string + rpcError error } // NewRegionRequestSender creates a new sender. @@ -107,6 +108,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re defer cancel() resp, err = s.client.SendReq(context, ctx.Addr, req) if err != nil { + s.rpcError = err if e := s.onSendFail(bo, ctx, err); e != nil { return nil, false, errors.Trace(e) }