diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 54cb4c7099..a5a8ea25a2 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -60,14 +60,15 @@ func (ca twoPhaseCommitAction) MetricsTag() string { // twoPhaseCommitter executes a two-phase commit protocol. type twoPhaseCommitter struct { - store *tikvStore - txn *tikvTxn - startTS uint64 - keys [][]byte - mutations map[string]*pb.Mutation - lockTTL uint64 - commitTS uint64 - mu struct { + store *tikvStore + txn *tikvTxn + startTS uint64 + keys [][]byte + mutations map[string]*pb.Mutation + lockTTL uint64 + commitTS uint64 + skipCheckForWrite bool + mu struct { sync.RWMutex writtenKeys [][]byte committed bool @@ -141,14 +142,17 @@ func newTwoPhaseCommitter(txn *tikvTxn) (*twoPhaseCommitter, error) { txnWriteKVCountHistogram.Observe(float64(len(keys))) txnWriteSizeHistogram.Observe(float64(size / 1024)) + optSkipCheck := txn.us.GetOption(kv.SkipCheckForWrite) + skip, ok := optSkipCheck.(bool) return &twoPhaseCommitter{ - store: txn.store, - txn: txn, - startTS: txn.StartTS(), - keys: keys, - mutations: mutations, - lockTTL: txnLockTTL(txn.startTime, size), - priority: getTxnPriority(txn), + store: txn.store, + txn: txn, + startTS: txn.StartTS(), + keys: keys, + mutations: mutations, + skipCheckForWrite: ok && skip, + lockTTL: txnLockTTL(txn.startTime, size), + priority: getTxnPriority(txn), }, nil } @@ -209,8 +213,9 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } firstIsPrimary := bytes.Equal(keys[0], c.primary()) - if firstIsPrimary && (action == actionCommit || action == actionCleanup) { - // primary should be committed/cleanup first. + if firstIsPrimary && (c.skipCheckForWrite || action == actionCommit || action == actionCleanup) { + // primary should be committed/cleanup first + // primary should be prewrite first when skip_constraint_check is true err = c.doActionOnBatches(bo, action, batches[:1]) if err != nil { return errors.Trace(err) @@ -329,11 +334,6 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) mutations[i] = c.mutations[string(k)] } - skipCheck := false - optSkipCheck := c.txn.us.GetOption(kv.SkipCheckForWrite) - if skip, ok := optSkipCheck.(bool); ok && skip { - skipCheck = true - } req := &tikvrpc.Request{ Type: tikvrpc.CmdPrewrite, Priority: c.priority, @@ -342,7 +342,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) PrimaryLock: c.primary(), StartVersion: c.startTS, LockTtl: c.lockTTL, - SkipConstraintCheck: skipCheck, + SkipConstraintCheck: c.skipCheckForWrite, }, } for {