From 124b937cd4cb169aa8bcf5e6958ce48b2f8cc08a Mon Sep 17 00:00:00 2001 From: Shirly Date: Thu, 8 Jun 2017 16:02:01 +0800 Subject: [PATCH] 2pc: cleanup primary key first (#3420) --- store/tikv/2pc.go | 25 +++++++++++++++++++------ store/tikv/2pc_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f3a84d5ee3..55480ec526 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -69,8 +69,9 @@ type twoPhaseCommitter struct { commitTS uint64 mu struct { sync.RWMutex - writtenKeys [][]byte - committed bool + writtenKeys [][]byte + committed bool + undetermined bool } } @@ -206,8 +207,8 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } firstIsPrimary := bytes.Equal(keys[0], c.primary()) - if firstIsPrimary && action == actionCommit { - // primary should be committed first. + if firstIsPrimary && (action == actionCommit || action == actionCleanup) { + // primary should be committed/cleanup first. err = c.doActionOnBatches(bo, action, batches[:1]) if err != nil { return errors.Trace(err) @@ -368,7 +369,15 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) // We need to cleanup all written keys if transaction aborts. c.mu.Lock() defer c.mu.Unlock() - c.mu.writtenKeys = append(c.mu.writtenKeys, batch.keys...) + // Primary key should always been in the front since in `cleanup` we + // would check whether the `writtenKeys`'s first key is primary key. + if bytes.Equal(batch.keys[0], c.primary()) { + tmpKeys := make([][]byte, 0, len(batch.keys)+len(c.mu.writtenKeys)) + tmpKeys = append(tmpKeys, batch.keys...) + c.mu.writtenKeys = append(tmpKeys, c.mu.writtenKeys...) + } else { + c.mu.writtenKeys = append(c.mu.writtenKeys, batch.keys...) + } return nil } var locks []*Lock @@ -518,8 +527,9 @@ func (c *twoPhaseCommitter) execute() error { c.mu.RLock() writtenKeys := c.mu.writtenKeys committed := c.mu.committed + undetermined := c.mu.undetermined c.mu.RUnlock() - if !committed { + if !committed && !undetermined { go func() { reserveStack(false) err := c.cleanupKeys(NewBackoffer(cleanupMaxBackoff, goctx.Background()), writtenKeys) @@ -572,6 +582,9 @@ func (c *twoPhaseCommitter) execute() error { err = c.commitKeys(NewBackoffer(commitMaxBackoff, ctx), c.keys) if err != nil { + if errors.Cause(err) == terror.ErrResultUndetermined { + c.mu.undetermined = true + } if !c.mu.committed { log.Debugf("2PC failed on commit: %v, tid: %d", err, c.startTS) return errors.Trace(err) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 2da8325168..dc2d07afd0 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -406,3 +406,38 @@ func (s *testCommitterSuite) TestCommitPrimaryError(c *C) { c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue) } + +type commitWithUndeterminedErrClient struct { + Client +} + +func (c *commitWithUndeterminedErrClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + resp, err := c.Client.SendReq(ctx, addr, req) + if err != nil || req.Type != tikvrpc.CmdCommit { + return resp, err + } + return nil, terror.ErrResultUndetermined +} + +func (s *testCommitterSuite) TestCommitTimeout(c *C) { + s.store.client = &commitWithUndeterminedErrClient{ + Client: s.store.client, + } + txn := s.begin(c) + err := txn.Set([]byte("a"), []byte("a1")) + c.Assert(err, IsNil) + err = txn.Set([]byte("b"), []byte("b1")) + c.Assert(err, IsNil) + err = txn.Set([]byte("c"), []byte("c1")) + c.Assert(err, IsNil) + err = txn.Commit() + c.Assert(err, NotNil) + + txn2 := s.begin(c) + value, err := txn2.Get([]byte("a")) + c.Assert(err, IsNil) + c.Assert(len(value), Greater, 0) + _, err = txn2.Get([]byte("b")) + c.Assert(err, IsNil) + c.Assert(len(value), Greater, 0) +}