store/tikv: refine errors for undetermined result (#3423)
This commit is contained in:
@ -402,7 +402,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) doCommitSingleBatch(bo *Backoffer, batch batchKeys) error {
|
||||
func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
|
||||
req := &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdCommit,
|
||||
Commit: &pb.CommitRequest{
|
||||
@ -412,8 +412,19 @@ func (c *twoPhaseCommitter) doCommitSingleBatch(bo *Backoffer, batch batchKeys)
|
||||
},
|
||||
}
|
||||
|
||||
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
|
||||
// transaction has been successfully committed.
|
||||
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
|
||||
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
|
||||
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
|
||||
isPrimary := bytes.Compare(batch.keys[0], c.primary()) == 0
|
||||
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
|
||||
if err != nil {
|
||||
if isPrimary {
|
||||
// change the Cause of the error to be returned
|
||||
return errors.Trace(errors.Wrap(err, terror.ErrResultUndetermined))
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
@ -456,21 +467,6 @@ func (c *twoPhaseCommitter) doCommitSingleBatch(bo *Backoffer, batch batchKeys)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
|
||||
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
|
||||
// transaction has been successfully committed.
|
||||
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
|
||||
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
|
||||
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
|
||||
isPrimary := bytes.Compare(batch.keys[0], c.primary()) == 0
|
||||
err := c.doCommitSingleBatch(bo, batch)
|
||||
if err != nil && isPrimary {
|
||||
// change the Cause of the error to be returned
|
||||
return errors.Wrap(err, terror.ErrResultUndetermined)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) error {
|
||||
req := &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdBatchRollback,
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
|
||||
"github.com/juju/errors"
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/store/tikv/mock-tikv"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
@ -373,38 +374,86 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
|
||||
c.Assert(v, BytesEquals, []byte("a3"))
|
||||
}
|
||||
|
||||
// timeoutClient wraps rpcClient and returns timeout error for the specified commands.
|
||||
type timeoutClient struct {
|
||||
// interceptCommitClient wraps rpcClient and returns specified response and error for commit command.
|
||||
type interceptCommitClient struct {
|
||||
Client
|
||||
timeouts map[tikvrpc.CmdType]bool
|
||||
resp *tikvrpc.Response
|
||||
err error
|
||||
}
|
||||
|
||||
func (c *timeoutClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
if _, ok := c.timeouts[req.Type]; ok {
|
||||
return nil, errors.Errorf("timeout when send kv req %v", req.Type)
|
||||
func (c *interceptCommitClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
if req.Type == tikvrpc.CmdCommit {
|
||||
return c.resp, c.err
|
||||
}
|
||||
return c.Client.SendReq(ctx, addr, req)
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) TestCommitPrimaryError(c *C) {
|
||||
timeouts := map[tikvrpc.CmdType]bool{
|
||||
tikvrpc.CmdCommit: true,
|
||||
}
|
||||
s.store.client = &timeoutClient{
|
||||
Client: s.store.client,
|
||||
timeouts: timeouts,
|
||||
// TestCommitPrimaryRpcError tests rpc errors are handled properly
|
||||
// when committing primary region task.
|
||||
func (s *testCommitterSuite) TestCommitPrimaryRpcErrors(c *C) {
|
||||
s.store.client = &interceptCommitClient{
|
||||
Client: s.store.client,
|
||||
resp: nil,
|
||||
err: errors.Errorf("timeout"),
|
||||
}
|
||||
|
||||
t, err := s.store.Begin()
|
||||
// The rpc error may or may not be wrapped to ErrResultUndetermined.
|
||||
t1 := s.begin(c)
|
||||
err := t1.Set([]byte("a"), []byte("a1"))
|
||||
c.Assert(err, IsNil)
|
||||
txn := t.(*tikvTxn)
|
||||
err = txn.Set([]byte("a"), []byte("b"))
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = txn.Commit()
|
||||
err = t1.Commit()
|
||||
c.Assert(err, NotNil)
|
||||
// TODO: refine errors of region cache and rpc, so that every the rpc error
|
||||
// could be easily wrapped to ErrResultUndetermined, but RegionError would not.
|
||||
// c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err)))
|
||||
}
|
||||
|
||||
c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue)
|
||||
// TestCommitPrimaryRegionError tests RegionError is handled properly
|
||||
// when committing primary region task.
|
||||
func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) {
|
||||
s.store.client = &interceptCommitClient{
|
||||
Client: s.store.client,
|
||||
resp: &tikvrpc.Response{
|
||||
Type: tikvrpc.CmdCommit,
|
||||
Commit: &kvrpcpb.CommitResponse{
|
||||
RegionError: &errorpb.Error{
|
||||
NotLeader: &errorpb.NotLeader{},
|
||||
},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
}
|
||||
// Ensure it returns the original error without wrapped to ErrResultUndetermined
|
||||
// if it exceeds max retry timeout on RegionError.
|
||||
t2 := s.begin(c)
|
||||
err := t2.Set([]byte("b"), []byte("b1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = t2.Commit()
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue)
|
||||
}
|
||||
|
||||
// TestCommitPrimaryKeyError tests KeyError is handled properly
|
||||
// when committing primary region task.
|
||||
func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) {
|
||||
s.store.client = &interceptCommitClient{
|
||||
Client: s.store.client,
|
||||
resp: &tikvrpc.Response{
|
||||
Type: tikvrpc.CmdCommit,
|
||||
Commit: &kvrpcpb.CommitResponse{
|
||||
Error: &kvrpcpb.KeyError{},
|
||||
},
|
||||
},
|
||||
err: nil,
|
||||
}
|
||||
// Ensure it returns the original error without wrapped to ErrResultUndetermined
|
||||
// if it meets KeyError.
|
||||
t3 := s.begin(c)
|
||||
err := t3.Set([]byte("c"), []byte("c1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = t3.Commit()
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue)
|
||||
}
|
||||
|
||||
type commitWithUndeterminedErrClient struct {
|
||||
|
||||
Reference in New Issue
Block a user