txn: support 2pc async commit protocol (#18622)

This commit is contained in:
cfzjywxk
2020-07-22 18:34:58 +08:00
committed by GitHub
parent 1d6b4fa5bd
commit e66f8eb93a
4 changed files with 174 additions and 44 deletions

View File

@ -100,6 +100,9 @@ type twoPhaseCommitter struct {
acAfterCommitPrimary chan struct{}
bkAfterCommitPrimary chan struct{}
}
useAsyncCommit uint32
minCommitTS uint64
}
type committerMutations struct {
@ -666,44 +669,84 @@ func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS,
}
}
// checkAsyncCommit checks if async commit protocol is available for current transaction commit, true is returned if possible.
func (c *twoPhaseCommitter) checkAsyncCommit() bool {
// TODO the keys limit need more tests, this value makes the unit test pass by now.
const asyncCommitKeysLimit = 256
// Async commit is not compatible with Binlog because of the non unique timestamp issue.
if c.connID > 0 && config.GetGlobalConfig().TiKVClient.EnableAsyncCommit &&
len(c.mutations.keys) <= asyncCommitKeysLimit && !c.shouldWriteBinlog() {
return true
}
return false
}
func (c *twoPhaseCommitter) isAsyncCommit() bool {
return atomic.LoadUint32(&c.useAsyncCommit) > 0
}
func (c *twoPhaseCommitter) setAsyncCommit(val bool) {
if val {
atomic.StoreUint32(&c.useAsyncCommit, 1)
} else {
atomic.StoreUint32(&c.useAsyncCommit, 0)
}
}
func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
c.cleanWg.Add(1)
go func() {
cleanupKeysCtx := context.WithValue(context.Background(), txnStartKey, ctx.Value(txnStartKey))
err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
if err != nil {
tikvSecondaryLockCleanupFailureCounterRollback.Inc()
logutil.Logger(ctx).Info("2PC cleanup failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
} else {
logutil.Logger(ctx).Info("2PC clean up done",
zap.Uint64("txnStartTS", c.startTS))
}
c.cleanWg.Done()
}()
}
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
var binlogSkipped bool
defer func() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
committed := c.mu.committed
undetermined := c.mu.undeterminedErr != nil
c.mu.RUnlock()
if !committed && !undetermined {
c.cleanWg.Add(1)
go func() {
cleanupKeysCtx := context.WithValue(context.Background(), txnStartKey, ctx.Value(txnStartKey))
err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
if err != nil {
tikvSecondaryLockCleanupFailureCounterRollback.Inc()
logutil.Logger(ctx).Info("2PC cleanup failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
} else {
logutil.Logger(ctx).Info("2PC clean up done",
zap.Uint64("txnStartTS", c.startTS))
}
c.cleanWg.Done()
}()
}
c.txn.commitTS = c.commitTS
if binlogSkipped {
binloginfo.RemoveOneSkippedCommitter()
} else {
if err != nil {
c.writeFinishBinlog(ctx, binlog.BinlogType_Rollback, 0)
if !c.isAsyncCommit() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
committed := c.mu.committed
undetermined := c.mu.undeterminedErr != nil
c.mu.RUnlock()
if !committed && !undetermined {
c.cleanup(ctx)
}
c.txn.commitTS = c.commitTS
if binlogSkipped {
binloginfo.RemoveOneSkippedCommitter()
} else {
c.writeFinishBinlog(ctx, binlog.BinlogType_Commit, int64(c.commitTS))
if err != nil {
c.writeFinishBinlog(ctx, binlog.BinlogType_Rollback, 0)
} else {
c.writeFinishBinlog(ctx, binlog.BinlogType_Commit, int64(c.commitTS))
}
}
} else {
// The error means the async commit should not succeed.
if err != nil {
c.cleanup(ctx)
}
}
}()
// Check async commit is available or not.
if c.checkAsyncCommit() {
c.setAsyncCommit(true)
}
binlogChan := c.prewriteBinlog(ctx)
prewriteBo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars)
start := time.Now()
@ -738,18 +781,27 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// strip check_not_exists keys that no need to commit.
c.stripNoNeedCommitKeys()
start = time.Now()
logutil.Event(ctx, "start get commit ts")
commitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars))
if err != nil {
logutil.Logger(ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
var commitTS uint64
if c.isAsyncCommit() {
if c.minCommitTS == 0 {
err = errors.Errorf("conn %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.connID, c.startTS)
return errors.Trace(err)
}
commitTS = c.minCommitTS
} else {
start = time.Now()
logutil.Event(ctx, "start get commit ts")
commitTS, err = c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars))
if err != nil {
logutil.Logger(ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
commitDetail.GetCommitTsTime = time.Since(start)
logutil.Event(ctx, "finish get commit ts")
logutil.SetTag(ctx, "commitTs", commitTS)
}
commitDetail.GetCommitTsTime = time.Since(start)
logutil.Event(ctx, "finish get commit ts")
logutil.SetTag(ctx, "commitTs", commitTS)
// check commitTS
if commitTS <= c.startTS {
@ -773,9 +825,32 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
failpoint.Inject("beforeCommit", func() {})
}
start = time.Now()
if c.isAsyncCommit() {
// For async commit protocol, the commit is considered success here.
c.txn.commitTS = c.commitTS
logutil.Logger(ctx).Info("2PC will use async commit protocol to commit this txn", zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", c.commitTS))
go func() {
failpoint.Inject("asyncCommitDoNothing", func() {
failpoint.Return()
})
defer c.ttlManager.close()
commitBo := NewBackofferWithVars(ctx, CommitMaxBackoff, c.txn.vars)
err := c.commitMutations(commitBo, c.mutations)
if err != nil {
logutil.Logger(ctx).Warn("2PC async commit failed", zap.Uint64("connID", c.connID),
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Error(err))
}
}()
return nil
}
return c.commitTxn(ctx, commitDetail)
}
func (c *twoPhaseCommitter) commitTxn(ctx context.Context, commitDetail *execdetails.CommitDetails) error {
start := time.Now()
commitBo := NewBackofferWithVars(ctx, CommitMaxBackoff, c.txn.vars)
err = c.commitMutations(commitBo, c.mutations)
err := c.commitMutations(commitBo, c.mutations)
commitDetail.CommitTime = time.Since(start)
if commitBo.totalSleep > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(commitBo.totalSleep)*int64(time.Millisecond))

View File

@ -1167,11 +1167,45 @@ func (s *testCommitterSuite) TestPrewiteSecondaryKeys(c *C) {
mock := mockClient{inner: s.store.client}
s.store.client = &mock
ctx := context.Background()
// TODO remove this when minCommitTS is returned from mockStore prewrite response.
committer.minCommitTS = committer.startTS + 10
err = committer.execute(ctx)
c.Assert(err, IsNil)
c.Assert(mock.seenPrimaryReq > 0 && mock.seenSecondaryReq > 0, IsTrue)
}
func (s *testCommitterSuite) TestAsyncCommit(c *C) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.EnableAsyncCommit = true
})
ctx := context.Background()
pk := kv.Key("tpk")
pkVal := []byte("pkVal")
k1 := kv.Key("tk1")
k1Val := []byte("k1Val")
txn1 := s.begin(c)
err := txn1.Set(pk, pkVal)
c.Assert(err, IsNil)
err = txn1.Set(k1, k1Val)
c.Assert(err, IsNil)
committer, err := newTwoPhaseCommitterWithInit(txn1, 0)
c.Assert(err, IsNil)
committer.connID = 1
committer.minCommitTS = txn1.startTS + 10
err = committer.execute(ctx)
c.Assert(err, IsNil)
// TODO remove sleep when recovery logic is done
time.Sleep(1 * time.Second)
s.checkValues(c, map[string]string{
string(pk): string(pkVal),
string(k1): string(k1Val),
})
}
type mockClient struct {
inner Client
seenPrimaryReq uint32

View File

@ -79,7 +79,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
MinCommitTs: minCommitTS,
}
if config.GetGlobalConfig().TiKVClient.EnableAsyncCommit {
if c.isAsyncCommit() {
if batch.isPrimary {
req.Secondaries = c.asyncSecondaries()
}
@ -128,6 +128,22 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
c.run(c, nil)
}
}
if c.isAsyncCommit() {
// 0 if the min_commit_ts is not ready or any other reason that async
// commit cannot proceed. The client can then fallback to normal way to
// continue committing the transaction if prewrite are all finished.
if prewriteResp.MinCommitTs == 0 {
logutil.Logger(bo.ctx).Warn("async commit cannot proceed since the returned minCommitTS is zero, "+
"fallback to normal path", zap.Uint64("startTS", c.startTS))
c.setAsyncCommit(false)
} else {
c.mu.Lock()
if prewriteResp.MinCommitTs > c.minCommitTS {
c.minCommitTS = prewriteResp.MinCommitTs
}
c.mu.Unlock()
}
}
return nil
}
var locks []*Lock

View File

@ -234,7 +234,12 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
return errors.Trace(err)
}
}
defer committer.ttlManager.close()
defer func() {
// For async commit transactions, the ttl manager will be closed in the asynchronous commit goroutine.
if !committer.isAsyncCommit() {
committer.ttlManager.close()
}
}()
if err := committer.initKeysAndMutations(); err != nil {
return errors.Trace(err)
}