diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 49378b37b1..dfbbff3f24 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -100,6 +100,9 @@ type twoPhaseCommitter struct { acAfterCommitPrimary chan struct{} bkAfterCommitPrimary chan struct{} } + + useAsyncCommit uint32 + minCommitTS uint64 } type committerMutations struct { @@ -666,44 +669,84 @@ func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, } } +// checkAsyncCommit checks if async commit protocol is available for current transaction commit, true is returned if possible. +func (c *twoPhaseCommitter) checkAsyncCommit() bool { + // TODO the keys limit need more tests, this value makes the unit test pass by now. + const asyncCommitKeysLimit = 256 + // Async commit is not compatible with Binlog because of the non unique timestamp issue. + if c.connID > 0 && config.GetGlobalConfig().TiKVClient.EnableAsyncCommit && + len(c.mutations.keys) <= asyncCommitKeysLimit && !c.shouldWriteBinlog() { + return true + } + return false +} + +func (c *twoPhaseCommitter) isAsyncCommit() bool { + return atomic.LoadUint32(&c.useAsyncCommit) > 0 +} + +func (c *twoPhaseCommitter) setAsyncCommit(val bool) { + if val { + atomic.StoreUint32(&c.useAsyncCommit, 1) + } else { + atomic.StoreUint32(&c.useAsyncCommit, 0) + } +} + +func (c *twoPhaseCommitter) cleanup(ctx context.Context) { + c.cleanWg.Add(1) + go func() { + cleanupKeysCtx := context.WithValue(context.Background(), txnStartKey, ctx.Value(txnStartKey)) + err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) + if err != nil { + tikvSecondaryLockCleanupFailureCounterRollback.Inc() + logutil.Logger(ctx).Info("2PC cleanup failed", + zap.Error(err), + zap.Uint64("txnStartTS", c.startTS)) + } else { + logutil.Logger(ctx).Info("2PC clean up done", + zap.Uint64("txnStartTS", c.startTS)) + } + c.cleanWg.Done() + }() +} + // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { var binlogSkipped bool defer func() { - // Always clean up all written keys if the txn does not commit. - c.mu.RLock() - committed := c.mu.committed - undetermined := c.mu.undeterminedErr != nil - c.mu.RUnlock() - if !committed && !undetermined { - c.cleanWg.Add(1) - go func() { - cleanupKeysCtx := context.WithValue(context.Background(), txnStartKey, ctx.Value(txnStartKey)) - err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) - if err != nil { - tikvSecondaryLockCleanupFailureCounterRollback.Inc() - logutil.Logger(ctx).Info("2PC cleanup failed", - zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) - } else { - logutil.Logger(ctx).Info("2PC clean up done", - zap.Uint64("txnStartTS", c.startTS)) - } - c.cleanWg.Done() - }() - } - c.txn.commitTS = c.commitTS - if binlogSkipped { - binloginfo.RemoveOneSkippedCommitter() - } else { - if err != nil { - c.writeFinishBinlog(ctx, binlog.BinlogType_Rollback, 0) + if !c.isAsyncCommit() { + // Always clean up all written keys if the txn does not commit. + c.mu.RLock() + committed := c.mu.committed + undetermined := c.mu.undeterminedErr != nil + c.mu.RUnlock() + if !committed && !undetermined { + c.cleanup(ctx) + } + c.txn.commitTS = c.commitTS + if binlogSkipped { + binloginfo.RemoveOneSkippedCommitter() } else { - c.writeFinishBinlog(ctx, binlog.BinlogType_Commit, int64(c.commitTS)) + if err != nil { + c.writeFinishBinlog(ctx, binlog.BinlogType_Rollback, 0) + } else { + c.writeFinishBinlog(ctx, binlog.BinlogType_Commit, int64(c.commitTS)) + } + } + } else { + // The error means the async commit should not succeed. + if err != nil { + c.cleanup(ctx) } } }() + // Check async commit is available or not. + if c.checkAsyncCommit() { + c.setAsyncCommit(true) + } + binlogChan := c.prewriteBinlog(ctx) prewriteBo := NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars) start := time.Now() @@ -738,18 +781,27 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { // strip check_not_exists keys that no need to commit. c.stripNoNeedCommitKeys() - start = time.Now() - logutil.Event(ctx, "start get commit ts") - commitTS, err := c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars)) - if err != nil { - logutil.Logger(ctx).Warn("2PC get commitTS failed", - zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) + var commitTS uint64 + if c.isAsyncCommit() { + if c.minCommitTS == 0 { + err = errors.Errorf("conn %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.connID, c.startTS) + return errors.Trace(err) + } + commitTS = c.minCommitTS + } else { + start = time.Now() + logutil.Event(ctx, "start get commit ts") + commitTS, err = c.store.getTimestampWithRetry(NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars)) + if err != nil { + logutil.Logger(ctx).Warn("2PC get commitTS failed", + zap.Error(err), + zap.Uint64("txnStartTS", c.startTS)) + return errors.Trace(err) + } + commitDetail.GetCommitTsTime = time.Since(start) + logutil.Event(ctx, "finish get commit ts") + logutil.SetTag(ctx, "commitTs", commitTS) } - commitDetail.GetCommitTsTime = time.Since(start) - logutil.Event(ctx, "finish get commit ts") - logutil.SetTag(ctx, "commitTs", commitTS) // check commitTS if commitTS <= c.startTS { @@ -773,9 +825,32 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { failpoint.Inject("beforeCommit", func() {}) } - start = time.Now() + if c.isAsyncCommit() { + // For async commit protocol, the commit is considered success here. + c.txn.commitTS = c.commitTS + logutil.Logger(ctx).Info("2PC will use async commit protocol to commit this txn", zap.Uint64("startTS", c.startTS), + zap.Uint64("commitTS", c.commitTS)) + go func() { + failpoint.Inject("asyncCommitDoNothing", func() { + failpoint.Return() + }) + defer c.ttlManager.close() + commitBo := NewBackofferWithVars(ctx, CommitMaxBackoff, c.txn.vars) + err := c.commitMutations(commitBo, c.mutations) + if err != nil { + logutil.Logger(ctx).Warn("2PC async commit failed", zap.Uint64("connID", c.connID), + zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Error(err)) + } + }() + return nil + } + return c.commitTxn(ctx, commitDetail) +} + +func (c *twoPhaseCommitter) commitTxn(ctx context.Context, commitDetail *execdetails.CommitDetails) error { + start := time.Now() commitBo := NewBackofferWithVars(ctx, CommitMaxBackoff, c.txn.vars) - err = c.commitMutations(commitBo, c.mutations) + err := c.commitMutations(commitBo, c.mutations) commitDetail.CommitTime = time.Since(start) if commitBo.totalSleep > 0 { atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(commitBo.totalSleep)*int64(time.Millisecond)) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 2899331e64..486981c4dc 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -1167,11 +1167,45 @@ func (s *testCommitterSuite) TestPrewiteSecondaryKeys(c *C) { mock := mockClient{inner: s.store.client} s.store.client = &mock ctx := context.Background() + // TODO remove this when minCommitTS is returned from mockStore prewrite response. + committer.minCommitTS = committer.startTS + 10 err = committer.execute(ctx) c.Assert(err, IsNil) c.Assert(mock.seenPrimaryReq > 0 && mock.seenSecondaryReq > 0, IsTrue) } +func (s *testCommitterSuite) TestAsyncCommit(c *C) { + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.EnableAsyncCommit = true + }) + + ctx := context.Background() + pk := kv.Key("tpk") + pkVal := []byte("pkVal") + k1 := kv.Key("tk1") + k1Val := []byte("k1Val") + txn1 := s.begin(c) + err := txn1.Set(pk, pkVal) + c.Assert(err, IsNil) + err = txn1.Set(k1, k1Val) + c.Assert(err, IsNil) + + committer, err := newTwoPhaseCommitterWithInit(txn1, 0) + c.Assert(err, IsNil) + committer.connID = 1 + committer.minCommitTS = txn1.startTS + 10 + err = committer.execute(ctx) + c.Assert(err, IsNil) + + // TODO remove sleep when recovery logic is done + time.Sleep(1 * time.Second) + s.checkValues(c, map[string]string{ + string(pk): string(pkVal), + string(k1): string(k1Val), + }) +} + type mockClient struct { inner Client seenPrimaryReq uint32 diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index 4c986e6cda..1d49e579fa 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -79,7 +79,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u MinCommitTs: minCommitTS, } - if config.GetGlobalConfig().TiKVClient.EnableAsyncCommit { + if c.isAsyncCommit() { if batch.isPrimary { req.Secondaries = c.asyncSecondaries() } @@ -128,6 +128,22 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff c.run(c, nil) } } + if c.isAsyncCommit() { + // 0 if the min_commit_ts is not ready or any other reason that async + // commit cannot proceed. The client can then fallback to normal way to + // continue committing the transaction if prewrite are all finished. + if prewriteResp.MinCommitTs == 0 { + logutil.Logger(bo.ctx).Warn("async commit cannot proceed since the returned minCommitTS is zero, "+ + "fallback to normal path", zap.Uint64("startTS", c.startTS)) + c.setAsyncCommit(false) + } else { + c.mu.Lock() + if prewriteResp.MinCommitTs > c.minCommitTS { + c.minCommitTS = prewriteResp.MinCommitTs + } + c.mu.Unlock() + } + } return nil } var locks []*Lock diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 128982a2f2..43df71d985 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -234,7 +234,12 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { return errors.Trace(err) } } - defer committer.ttlManager.close() + defer func() { + // For async commit transactions, the ttl manager will be closed in the asynchronous commit goroutine. + if !committer.isAsyncCommit() { + committer.ttlManager.close() + } + }() if err := committer.initKeysAndMutations(); err != nil { return errors.Trace(err) }