2pc: cleanup primary key first (#3420)
This commit is contained in:
@ -69,8 +69,9 @@ type twoPhaseCommitter struct {
|
||||
commitTS uint64
|
||||
mu struct {
|
||||
sync.RWMutex
|
||||
writtenKeys [][]byte
|
||||
committed bool
|
||||
writtenKeys [][]byte
|
||||
committed bool
|
||||
undetermined bool
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,8 +207,8 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
|
||||
}
|
||||
|
||||
firstIsPrimary := bytes.Equal(keys[0], c.primary())
|
||||
if firstIsPrimary && action == actionCommit {
|
||||
// primary should be committed first.
|
||||
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
|
||||
// primary should be committed/cleanup first.
|
||||
err = c.doActionOnBatches(bo, action, batches[:1])
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -368,7 +369,15 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
|
||||
// We need to cleanup all written keys if transaction aborts.
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.mu.writtenKeys = append(c.mu.writtenKeys, batch.keys...)
|
||||
// Primary key should always been in the front since in `cleanup` we
|
||||
// would check whether the `writtenKeys`'s first key is primary key.
|
||||
if bytes.Equal(batch.keys[0], c.primary()) {
|
||||
tmpKeys := make([][]byte, 0, len(batch.keys)+len(c.mu.writtenKeys))
|
||||
tmpKeys = append(tmpKeys, batch.keys...)
|
||||
c.mu.writtenKeys = append(tmpKeys, c.mu.writtenKeys...)
|
||||
} else {
|
||||
c.mu.writtenKeys = append(c.mu.writtenKeys, batch.keys...)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
var locks []*Lock
|
||||
@ -518,8 +527,9 @@ func (c *twoPhaseCommitter) execute() error {
|
||||
c.mu.RLock()
|
||||
writtenKeys := c.mu.writtenKeys
|
||||
committed := c.mu.committed
|
||||
undetermined := c.mu.undetermined
|
||||
c.mu.RUnlock()
|
||||
if !committed {
|
||||
if !committed && !undetermined {
|
||||
go func() {
|
||||
reserveStack(false)
|
||||
err := c.cleanupKeys(NewBackoffer(cleanupMaxBackoff, goctx.Background()), writtenKeys)
|
||||
@ -572,6 +582,9 @@ func (c *twoPhaseCommitter) execute() error {
|
||||
|
||||
err = c.commitKeys(NewBackoffer(commitMaxBackoff, ctx), c.keys)
|
||||
if err != nil {
|
||||
if errors.Cause(err) == terror.ErrResultUndetermined {
|
||||
c.mu.undetermined = true
|
||||
}
|
||||
if !c.mu.committed {
|
||||
log.Debugf("2PC failed on commit: %v, tid: %d", err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
|
||||
@ -406,3 +406,38 @@ func (s *testCommitterSuite) TestCommitPrimaryError(c *C) {
|
||||
|
||||
c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue)
|
||||
}
|
||||
|
||||
type commitWithUndeterminedErrClient struct {
|
||||
Client
|
||||
}
|
||||
|
||||
func (c *commitWithUndeterminedErrClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
resp, err := c.Client.SendReq(ctx, addr, req)
|
||||
if err != nil || req.Type != tikvrpc.CmdCommit {
|
||||
return resp, err
|
||||
}
|
||||
return nil, terror.ErrResultUndetermined
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) TestCommitTimeout(c *C) {
|
||||
s.store.client = &commitWithUndeterminedErrClient{
|
||||
Client: s.store.client,
|
||||
}
|
||||
txn := s.begin(c)
|
||||
err := txn.Set([]byte("a"), []byte("a1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = txn.Set([]byte("b"), []byte("b1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = txn.Set([]byte("c"), []byte("c1"))
|
||||
c.Assert(err, IsNil)
|
||||
err = txn.Commit()
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
txn2 := s.begin(c)
|
||||
value, err := txn2.Get([]byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(value), Greater, 0)
|
||||
_, err = txn2.Get([]byte("b"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(value), Greater, 0)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user