diff --git a/executor/write_test.go b/executor/write_test.go index 64b633ff24..e98f24ae5d 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "sync" . "github.com/pingcap/check" "github.com/pingcap/parser/model" @@ -2599,6 +2600,56 @@ func (s *testSuite7) TestRebaseIfNeeded(c *C) { tk.MustQuery(`select a from t where b = 6;`).Check(testkit.Rows("30003")) } +func (s *testSuite7) TestDeferConstraintCheckForDelete(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set tidb_constraint_check_in_place = 0") + tk.MustExec("set @@tidb_txn_mode = 'optimistic'") + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1, t2, t3, t4, t5") + tk.MustExec("create table t1(i int primary key, j int)") + tk.MustExec("insert into t1 values(1, 2)") + tk.MustExec("begin") + tk.MustExec("insert into t1 values(1, 3)") + tk.MustExec("delete from t1 where j = 3") + _, err := tk.Exec("commit") + c.Assert(err.Error(), Equals, "previous statement: delete from t1 where j = 3: [kv:1062]Duplicate entry '1' for key 'PRIMARY'") + tk.MustExec("rollback") + + tk.MustExec("create table t2(i int, j int, unique index idx(i))") + tk.MustExec("insert into t2 values(1, 2)") + tk.MustExec("begin") + tk.MustExec("insert into t2 values(1, 3)") + tk.MustExec("delete from t2 where j = 3") + _, err = tk.Exec("commit") + c.Assert(err.Error(), Equals, "previous statement: delete from t2 where j = 3: [kv:1062]Duplicate entry '1' for key 'idx'") + tk.MustExec("admin check table t2") + + tk.MustExec("create table t3(i int, j int, primary key(i))") + tk.MustExec("begin") + tk.MustExec("insert into t3 values(1, 3)") + tk.MustExec("delete from t3 where j = 3") + tk.MustExec("commit") + + tk.MustExec("create table t4(i int, j int, primary key(i))") + tk.MustExec("begin") + tk.MustExec("insert into t4 values(1, 3)") + tk.MustExec("delete from t4 where j = 3") + tk.MustExec("insert into t4 values(2, 3)") + tk.MustExec("commit") + tk.MustExec("admin check table t4") + tk.MustQuery("select * from t4").Check(testkit.Rows("2 3")) + + tk.MustExec("create table t5(i int, j int, primary key(i))") + tk.MustExec("begin") + tk.MustExec("insert into t5 values(1, 3)") + tk.MustExec("delete from t5 where j = 3") + tk.MustExec("insert into t5 values(1, 4)") + tk.MustExec("commit") + tk.MustExec("admin check table t5") + tk.MustQuery("select * from t5").Check(testkit.Rows("1 4")) +} + func (s *testSuite7) TestDeferConstraintCheckForInsert(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec(`use test`) @@ -2645,6 +2696,32 @@ func (s *testSuite7) TestDeferConstraintCheckForInsert(c *C) { c.Assert(err, NotNil) } +func (s *testSuite7) TestPessimisticDeleteYourWrites(c *C) { + session1 := testkit.NewTestKitWithInit(c, s.store) + session2 := testkit.NewTestKitWithInit(c, s.store) + + session1.MustExec("drop table if exists x;") + session1.MustExec("create table x (id int primary key, c int);") + + session1.MustExec("set tidb_txn_mode = 'pessimistic'") + session2.MustExec("set tidb_txn_mode = 'pessimistic'") + + session1.MustExec("begin;") + session1.MustExec("insert into x select 1, 1") + session1.MustExec("delete from x where id = 1") + session2.MustExec("begin;") + var wg sync.WaitGroup + wg.Add(1) + go func() { + session2.MustExec("insert into x select 1, 2") + wg.Done() + }() + session1.MustExec("commit;") + wg.Wait() + session2.MustExec("commit;") + session2.MustQuery("select * from x").Check(testkit.Rows("1 2")) +} + func (s *testSuite7) TestDefEnumInsert(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/go.mod b/go.mod index ba2b4b8b50..900c92d639 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798 github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20200221125103-35b65c96516e + github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5 github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd github.com/pingcap/parser v0.0.0-20200301092054-bfc519c0a57f github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497 diff --git a/go.sum b/go.sum index 6b478b0f53..f4f772c0ed 100644 --- a/go.sum +++ b/go.sum @@ -206,8 +206,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20191213111810-93cb7c623c8b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20200221125103-35b65c96516e h1:z7j9uyuG/6I4god5h5NbsbMDSfhoOYAvVW6JxhwdHHw= -github.com/pingcap/kvproto v0.0.0-20200221125103-35b65c96516e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5 h1:knEvP4R5v5b2T107/Q6VzB0C8/6T7NXB/V7Vl1FtQsg= +github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 84a453e650..ee367b64a6 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -653,7 +653,8 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { // If the operation is Insert, check if key is exists at first. var err error // no need to check insert values for pessimistic transaction. - if m.GetOp() == kvrpcpb.Op_Insert && forUpdateTS == 0 { + op := m.GetOp() + if (op == kvrpcpb.Op_Insert || op == kvrpcpb.Op_CheckNotExists) && forUpdateTS == 0 { v, err := mvcc.getValue(m.Key, startTS, kvrpcpb.IsolationLevel_SI, req.Context.ResolvedLocks) if err != nil { errs = append(errs, err) @@ -669,6 +670,9 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { continue } } + if op == kvrpcpb.Op_CheckNotExists { + continue + } isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i] err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS) errs = append(errs, err) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c746a354d4..d18d6290f5 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -128,18 +128,19 @@ func metricsTag(action string) string { // twoPhaseCommitter executes a two-phase commit protocol. type twoPhaseCommitter struct { - store *tikvStore - txn *tikvTxn - startTS uint64 - keys [][]byte - mutations map[string]*mutationEx - lockTTL uint64 - commitTS uint64 - priority pb.CommandPri - connID uint64 // connID is used for log. - cleanWg sync.WaitGroup - detail unsafe.Pointer - txnSize int + store *tikvStore + txn *tikvTxn + startTS uint64 + keys [][]byte + mutations map[string]*mutationEx + lockTTL uint64 + commitTS uint64 + priority pb.CommandPri + connID uint64 // connID is used for log. + cleanWg sync.WaitGroup + detail unsafe.Pointer + txnSize int + noNeedCommitKeys map[string]struct{} primaryKey []byte forUpdateTS uint64 @@ -227,11 +228,12 @@ func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, func (c *twoPhaseCommitter) initKeysAndMutations() error { var ( - keys [][]byte - size int - putCnt int - delCnt int - lockCnt int + keys [][]byte + size int + putCnt int + delCnt int + lockCnt int + noNeedCommitKey = make(map[string]struct{}) ) mutations := make(map[string]*mutationEx) txn := c.txn @@ -264,13 +266,24 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { } putCnt++ } else { + var op pb.Op + if !txn.IsPessimistic() && txn.us.GetKeyExistErrInfo(k) != nil { + // delete-your-writes keys in optimistic txn need check not exists in prewrite-phase + // due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase. + op = pb.Op_CheckNotExists + noNeedCommitKey[string(k)] = struct{}{} + } else { + // normal delete keys in optimistic txn can be delete without not exists checking + // delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them + op = pb.Op_Del + delCnt++ + } mutations[string(k)] = &mutationEx{ Mutation: pb.Mutation{ - Op: pb.Op_Del, + Op: op, Key: k, }, } - delCnt++ } if c.isPessimistic { if !bytes.Equal(k, c.primaryKey) { @@ -326,6 +339,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { zap.Int("puts", putCnt), zap.Int("dels", delCnt), zap.Int("locks", lockCnt), + zap.Int("checks", len(noNeedCommitKey)), zap.Uint64("txnStartTS", txn.startTS)) } @@ -342,6 +356,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys)) metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) c.keys = keys + c.noNeedCommitKeys = noNeedCommitKey c.mutations = mutations c.lockTTL = txnLockTTL(txn.startTime, size) c.priority = getTxnPriority(txn) @@ -1128,6 +1143,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { return errors.Trace(err) } + // strip check_not_exists keys that no need to commit. + c.stripNoNeedCommitKeys() + start = time.Now() logutil.Event(ctx, "start get commit ts") commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars)) @@ -1190,6 +1208,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { return nil } +func (c *twoPhaseCommitter) stripNoNeedCommitKeys() { + if len(c.noNeedCommitKeys) == 0 { + return + } + var i int + for _, k := range c.keys { + if _, ck := c.noNeedCommitKeys[string(k)]; ck { + continue + } + c.keys[i] = k + i++ + } + c.keys = c.keys[:i] +} + type schemaLeaseChecker interface { Check(txnTS uint64) error }