store/tikv: set undetermined error if not recover from an rpc error. (#5043)
This commit is contained in:
@ -72,9 +72,9 @@ type twoPhaseCommitter struct {
|
||||
commitTS uint64
|
||||
mu struct {
|
||||
sync.RWMutex
|
||||
writtenKeys [][]byte
|
||||
committed bool
|
||||
undetermined bool
|
||||
writtenKeys [][]byte
|
||||
committed bool
|
||||
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
|
||||
}
|
||||
priority pb.CommandPri
|
||||
syncLog bool
|
||||
@ -418,6 +418,18 @@ func kvPriorityToCommandPri(pri int) pb.CommandPri {
|
||||
return pb.CommandPri_Normal
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) setUndeterminedErr(err error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.mu.undeterminedErr = err
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) getUndeterminedErr() error {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.mu.undeterminedErr
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
|
||||
req := &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdCommit,
|
||||
@ -433,19 +445,20 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
|
||||
}
|
||||
req.Context.Priority = c.priority
|
||||
|
||||
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
|
||||
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
|
||||
|
||||
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
|
||||
// transaction has been successfully committed.
|
||||
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
|
||||
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
|
||||
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
|
||||
isPrimary := bytes.Equal(batch.keys[0], c.primary())
|
||||
if isPrimary && sender.rpcError != nil {
|
||||
c.setUndeterminedErr(errors.Trace(sender.rpcError))
|
||||
}
|
||||
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
|
||||
if err != nil {
|
||||
if isPrimary {
|
||||
// change the Cause of the error to be returned
|
||||
return errors.Trace(errors.Wrap(err, terror.ErrResultUndetermined))
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
@ -465,6 +478,11 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
|
||||
if commitResp == nil {
|
||||
return errors.Trace(ErrBodyMissing)
|
||||
}
|
||||
// Here we can make sure tikv has processed the commit primary key request. So
|
||||
// we can clean undetermined error.
|
||||
if isPrimary {
|
||||
c.setUndeterminedErr(nil)
|
||||
}
|
||||
if keyErr := commitResp.GetError(); keyErr != nil {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
@ -548,7 +566,7 @@ func (c *twoPhaseCommitter) execute(ctx goctx.Context) error {
|
||||
c.mu.RLock()
|
||||
writtenKeys := c.mu.writtenKeys
|
||||
committed := c.mu.committed
|
||||
undetermined := c.mu.undetermined
|
||||
undetermined := c.mu.undeterminedErr != nil
|
||||
c.mu.RUnlock()
|
||||
if !committed && !undetermined {
|
||||
twoPhaseCommitGP.Go(func() {
|
||||
@ -615,8 +633,9 @@ func (c *twoPhaseCommitter) execute(ctx goctx.Context) error {
|
||||
|
||||
err = c.commitKeys(NewBackoffer(commitMaxBackoff, ctx), c.keys)
|
||||
if err != nil {
|
||||
if errors.Cause(err) == terror.ErrResultUndetermined {
|
||||
c.mu.undetermined = true
|
||||
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
|
||||
log.Warnf("2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", err, undeterminedErr, c.startTS)
|
||||
err = errors.Wrap(err, terror.ErrResultUndetermined)
|
||||
}
|
||||
if !c.mu.committed {
|
||||
log.Debugf("2PC failed on commit: %v, tid: %d", err, c.startTS)
|
||||
|
||||
@ -377,16 +377,33 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
|
||||
c.Assert(v, BytesEquals, []byte("a3"))
|
||||
}
|
||||
|
||||
// interceptCommitClient wraps rpcClient and returns specified response and error for commit command.
|
||||
// interceptCommitClient wraps rpcClient and returns specified response and
|
||||
// error for commit command.
|
||||
// It supports 3 strategies: "error": alwrays return err; "response": always
|
||||
// return resp; "firstCallError": return err when first called, then always
|
||||
// return resp.
|
||||
type interceptCommitClient struct {
|
||||
Client
|
||||
resp *tikvrpc.Response
|
||||
err error
|
||||
resp *tikvrpc.Response
|
||||
err error
|
||||
strategy string
|
||||
callCount int
|
||||
}
|
||||
|
||||
func (c *interceptCommitClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
if req.Type == tikvrpc.CmdCommit {
|
||||
return c.resp, c.err
|
||||
switch c.strategy {
|
||||
case "error":
|
||||
return nil, c.err
|
||||
case "response":
|
||||
return c.resp, nil
|
||||
case "firstCallError":
|
||||
c.callCount++
|
||||
if c.callCount == 1 {
|
||||
return nil, c.err
|
||||
}
|
||||
return c.resp, nil
|
||||
}
|
||||
}
|
||||
return c.Client.SendReq(ctx, addr, req)
|
||||
}
|
||||
@ -395,20 +412,18 @@ func (c *interceptCommitClient) SendReq(ctx goctx.Context, addr string, req *tik
|
||||
// when committing primary region task.
|
||||
func (s *testCommitterSuite) TestCommitPrimaryRpcErrors(c *C) {
|
||||
s.store.client = &interceptCommitClient{
|
||||
Client: s.store.client,
|
||||
resp: nil,
|
||||
err: errors.Errorf("timeout"),
|
||||
Client: s.store.client,
|
||||
err: errors.New("timeout"),
|
||||
strategy: "error",
|
||||
}
|
||||
|
||||
// The rpc error may or may not be wrapped to ErrResultUndetermined.
|
||||
// The rpc error will be wrapped to ErrResultUndetermined.
|
||||
t1 := s.begin(c)
|
||||
err := t1.Set([]byte("a"), []byte("a1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = t1.Commit(goctx.Background())
|
||||
c.Assert(err, NotNil)
|
||||
// TODO: refine errors of region cache and rpc, so that every the rpc error
|
||||
// could be easily wrapped to ErrResultUndetermined, but RegionError would not.
|
||||
// c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err)))
|
||||
c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err)))
|
||||
}
|
||||
|
||||
// TestCommitPrimaryRegionError tests RegionError is handled properly
|
||||
@ -424,7 +439,7 @@ func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) {
|
||||
},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
strategy: "response",
|
||||
}
|
||||
// Ensure it returns the original error without wrapped to ErrResultUndetermined
|
||||
// if it exceeds max retry timeout on RegionError.
|
||||
@ -436,6 +451,31 @@ func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) {
|
||||
c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue)
|
||||
}
|
||||
|
||||
// TestCommitPrimaryRPCErrorThenRegionError tests the case when commit first
|
||||
// receive a rpc timeout, then region errors afterwrards.
|
||||
func (s *testCommitterSuite) TestCommitPrimaryRPCErrorThenRegionError(c *C) {
|
||||
s.store.client = &interceptCommitClient{
|
||||
Client: s.store.client,
|
||||
resp: &tikvrpc.Response{
|
||||
Type: tikvrpc.CmdCommit,
|
||||
Commit: &kvrpcpb.CommitResponse{
|
||||
RegionError: &errorpb.Error{
|
||||
NotLeader: &errorpb.NotLeader{},
|
||||
},
|
||||
},
|
||||
},
|
||||
err: errors.New("timeout"),
|
||||
strategy: "firstCallError",
|
||||
}
|
||||
// The region error will be wrapped to ErrResultUndetermined.
|
||||
t1 := s.begin(c)
|
||||
err := t1.Set([]byte("a"), []byte("a1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = t1.Commit(goctx.Background())
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err)))
|
||||
}
|
||||
|
||||
// TestCommitPrimaryKeyError tests KeyError is handled properly
|
||||
// when committing primary region task.
|
||||
func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) {
|
||||
@ -447,7 +487,7 @@ func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) {
|
||||
Error: &kvrpcpb.KeyError{},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
strategy: "response",
|
||||
}
|
||||
// Ensure it returns the original error without wrapped to ErrResultUndetermined
|
||||
// if it meets KeyError.
|
||||
|
||||
@ -46,6 +46,7 @@ type RegionRequestSender struct {
|
||||
regionCache *RegionCache
|
||||
client Client
|
||||
storeAddr string
|
||||
rpcError error
|
||||
}
|
||||
|
||||
// NewRegionRequestSender creates a new sender.
|
||||
@ -107,6 +108,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
|
||||
defer cancel()
|
||||
resp, err = s.client.SendReq(context, ctx.Addr, req)
|
||||
if err != nil {
|
||||
s.rpcError = err
|
||||
if e := s.onSendFail(bo, ctx, err); e != nil {
|
||||
return nil, false, errors.Trace(e)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user