diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 3987c0a784..c84764025a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -402,7 +402,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } } -func (c *twoPhaseCommitter) doCommitSingleBatch(bo *Backoffer, batch batchKeys) error { +func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdCommit, Commit: &pb.CommitRequest{ @@ -412,8 +412,19 @@ func (c *twoPhaseCommitter) doCommitSingleBatch(bo *Backoffer, batch batchKeys) }, } + // If we fail to receive response for the request that commits primary key, it will be undetermined whether this + // transaction has been successfully committed. + // Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw + // an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best + // solution is to populate this error and let upper layer drop the connection to the corresponding mysql client. + isPrimary := bytes.Compare(batch.keys[0], c.primary()) == 0 + resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { + if isPrimary { + // change the Cause of the error to be returned + return errors.Trace(errors.Wrap(err, terror.ErrResultUndetermined)) + } return errors.Trace(err) } regionErr, err := resp.GetRegionError() @@ -456,21 +467,6 @@ func (c *twoPhaseCommitter) doCommitSingleBatch(bo *Backoffer, batch batchKeys) return nil } -func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error { - // If we fail to receive response for the request that commits primary key, it will be undetermined whether this - // transaction has been successfully committed. - // Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw - // an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best - // solution is to populate this error and let upper layer drop the connection to the corresponding mysql client. - isPrimary := bytes.Compare(batch.keys[0], c.primary()) == 0 - err := c.doCommitSingleBatch(bo, batch) - if err != nil && isPrimary { - // change the Cause of the error to be returned - return errors.Wrap(err, terror.ErrResultUndetermined) - } - return err -} - func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ Type: tikvrpc.CmdBatchRollback, diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index dc2d07afd0..ba295b2f2f 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -21,6 +21,7 @@ import ( "github.com/juju/errors" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/tikv/mock-tikv" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -373,38 +374,86 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { c.Assert(v, BytesEquals, []byte("a3")) } -// timeoutClient wraps rpcClient and returns timeout error for the specified commands. -type timeoutClient struct { +// interceptCommitClient wraps rpcClient and returns specified response and error for commit command. +type interceptCommitClient struct { Client - timeouts map[tikvrpc.CmdType]bool + resp *tikvrpc.Response + err error } -func (c *timeoutClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - if _, ok := c.timeouts[req.Type]; ok { - return nil, errors.Errorf("timeout when send kv req %v", req.Type) +func (c *interceptCommitClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + if req.Type == tikvrpc.CmdCommit { + return c.resp, c.err } return c.Client.SendReq(ctx, addr, req) } -func (s *testCommitterSuite) TestCommitPrimaryError(c *C) { - timeouts := map[tikvrpc.CmdType]bool{ - tikvrpc.CmdCommit: true, - } - s.store.client = &timeoutClient{ - Client: s.store.client, - timeouts: timeouts, +// TestCommitPrimaryRpcError tests rpc errors are handled properly +// when committing primary region task. +func (s *testCommitterSuite) TestCommitPrimaryRpcErrors(c *C) { + s.store.client = &interceptCommitClient{ + Client: s.store.client, + resp: nil, + err: errors.Errorf("timeout"), } - t, err := s.store.Begin() + // The rpc error may or may not be wrapped to ErrResultUndetermined. + t1 := s.begin(c) + err := t1.Set([]byte("a"), []byte("a1")) c.Assert(err, IsNil) - txn := t.(*tikvTxn) - err = txn.Set([]byte("a"), []byte("b")) - c.Assert(err, IsNil) - - err = txn.Commit() + err = t1.Commit() c.Assert(err, NotNil) + // TODO: refine errors of region cache and rpc, so that every the rpc error + // could be easily wrapped to ErrResultUndetermined, but RegionError would not. + // c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err))) +} - c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue) +// TestCommitPrimaryRegionError tests RegionError is handled properly +// when committing primary region task. +func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) { + s.store.client = &interceptCommitClient{ + Client: s.store.client, + resp: &tikvrpc.Response{ + Type: tikvrpc.CmdCommit, + Commit: &kvrpcpb.CommitResponse{ + RegionError: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{}, + }, + }, + }, + err: nil, + } + // Ensure it returns the original error without wrapped to ErrResultUndetermined + // if it exceeds max retry timeout on RegionError. + t2 := s.begin(c) + err := t2.Set([]byte("b"), []byte("b1")) + c.Assert(err, IsNil) + err = t2.Commit() + c.Assert(err, NotNil) + c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue) +} + +// TestCommitPrimaryKeyError tests KeyError is handled properly +// when committing primary region task. +func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) { + s.store.client = &interceptCommitClient{ + Client: s.store.client, + resp: &tikvrpc.Response{ + Type: tikvrpc.CmdCommit, + Commit: &kvrpcpb.CommitResponse{ + Error: &kvrpcpb.KeyError{}, + }, + }, + err: nil, + } + // Ensure it returns the original error without wrapped to ErrResultUndetermined + // if it meets KeyError. + t3 := s.begin(c) + err := t3.Set([]byte("c"), []byte("c1")) + c.Assert(err, IsNil) + err = t3.Commit() + c.Assert(err, NotNil) + c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue) } type commitWithUndeterminedErrClient struct {