store: check constraint for "Delete-Your-Writes" records when txn commit (#14968)
This commit is contained in:
@ -17,6 +17,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/parser/model"
|
||||
@ -2599,6 +2600,56 @@ func (s *testSuite7) TestRebaseIfNeeded(c *C) {
|
||||
tk.MustQuery(`select a from t where b = 6;`).Check(testkit.Rows("30003"))
|
||||
}
|
||||
|
||||
func (s *testSuite7) TestDeferConstraintCheckForDelete(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("set tidb_constraint_check_in_place = 0")
|
||||
tk.MustExec("set @@tidb_txn_mode = 'optimistic'")
|
||||
tk.MustExec("use test")
|
||||
|
||||
tk.MustExec("drop table if exists t1, t2, t3, t4, t5")
|
||||
tk.MustExec("create table t1(i int primary key, j int)")
|
||||
tk.MustExec("insert into t1 values(1, 2)")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("insert into t1 values(1, 3)")
|
||||
tk.MustExec("delete from t1 where j = 3")
|
||||
_, err := tk.Exec("commit")
|
||||
c.Assert(err.Error(), Equals, "previous statement: delete from t1 where j = 3: [kv:1062]Duplicate entry '1' for key 'PRIMARY'")
|
||||
tk.MustExec("rollback")
|
||||
|
||||
tk.MustExec("create table t2(i int, j int, unique index idx(i))")
|
||||
tk.MustExec("insert into t2 values(1, 2)")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("insert into t2 values(1, 3)")
|
||||
tk.MustExec("delete from t2 where j = 3")
|
||||
_, err = tk.Exec("commit")
|
||||
c.Assert(err.Error(), Equals, "previous statement: delete from t2 where j = 3: [kv:1062]Duplicate entry '1' for key 'idx'")
|
||||
tk.MustExec("admin check table t2")
|
||||
|
||||
tk.MustExec("create table t3(i int, j int, primary key(i))")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("insert into t3 values(1, 3)")
|
||||
tk.MustExec("delete from t3 where j = 3")
|
||||
tk.MustExec("commit")
|
||||
|
||||
tk.MustExec("create table t4(i int, j int, primary key(i))")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("insert into t4 values(1, 3)")
|
||||
tk.MustExec("delete from t4 where j = 3")
|
||||
tk.MustExec("insert into t4 values(2, 3)")
|
||||
tk.MustExec("commit")
|
||||
tk.MustExec("admin check table t4")
|
||||
tk.MustQuery("select * from t4").Check(testkit.Rows("2 3"))
|
||||
|
||||
tk.MustExec("create table t5(i int, j int, primary key(i))")
|
||||
tk.MustExec("begin")
|
||||
tk.MustExec("insert into t5 values(1, 3)")
|
||||
tk.MustExec("delete from t5 where j = 3")
|
||||
tk.MustExec("insert into t5 values(1, 4)")
|
||||
tk.MustExec("commit")
|
||||
tk.MustExec("admin check table t5")
|
||||
tk.MustQuery("select * from t5").Check(testkit.Rows("1 4"))
|
||||
}
|
||||
|
||||
func (s *testSuite7) TestDeferConstraintCheckForInsert(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec(`use test`)
|
||||
@ -2645,6 +2696,32 @@ func (s *testSuite7) TestDeferConstraintCheckForInsert(c *C) {
|
||||
c.Assert(err, NotNil)
|
||||
}
|
||||
|
||||
func (s *testSuite7) TestPessimisticDeleteYourWrites(c *C) {
|
||||
session1 := testkit.NewTestKitWithInit(c, s.store)
|
||||
session2 := testkit.NewTestKitWithInit(c, s.store)
|
||||
|
||||
session1.MustExec("drop table if exists x;")
|
||||
session1.MustExec("create table x (id int primary key, c int);")
|
||||
|
||||
session1.MustExec("set tidb_txn_mode = 'pessimistic'")
|
||||
session2.MustExec("set tidb_txn_mode = 'pessimistic'")
|
||||
|
||||
session1.MustExec("begin;")
|
||||
session1.MustExec("insert into x select 1, 1")
|
||||
session1.MustExec("delete from x where id = 1")
|
||||
session2.MustExec("begin;")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
session2.MustExec("insert into x select 1, 2")
|
||||
wg.Done()
|
||||
}()
|
||||
session1.MustExec("commit;")
|
||||
wg.Wait()
|
||||
session2.MustExec("commit;")
|
||||
session2.MustQuery("select * from x").Check(testkit.Rows("1 2"))
|
||||
}
|
||||
|
||||
func (s *testSuite7) TestDefEnumInsert(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("use test")
|
||||
|
||||
2
go.mod
2
go.mod
@ -36,7 +36,7 @@ require (
|
||||
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798
|
||||
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
|
||||
github.com/pingcap/kvproto v0.0.0-20200221125103-35b65c96516e
|
||||
github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5
|
||||
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
|
||||
github.com/pingcap/parser v0.0.0-20200301092054-bfc519c0a57f
|
||||
github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497
|
||||
|
||||
4
go.sum
4
go.sum
@ -206,8 +206,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
|
||||
github.com/pingcap/kvproto v0.0.0-20191213111810-93cb7c623c8b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
|
||||
github.com/pingcap/kvproto v0.0.0-20200221125103-35b65c96516e h1:z7j9uyuG/6I4god5h5NbsbMDSfhoOYAvVW6JxhwdHHw=
|
||||
github.com/pingcap/kvproto v0.0.0-20200221125103-35b65c96516e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5 h1:knEvP4R5v5b2T107/Q6VzB0C8/6T7NXB/V7Vl1FtQsg=
|
||||
github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
|
||||
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
|
||||
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
|
||||
|
||||
@ -653,7 +653,8 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
|
||||
// If the operation is Insert, check if key is exists at first.
|
||||
var err error
|
||||
// no need to check insert values for pessimistic transaction.
|
||||
if m.GetOp() == kvrpcpb.Op_Insert && forUpdateTS == 0 {
|
||||
op := m.GetOp()
|
||||
if (op == kvrpcpb.Op_Insert || op == kvrpcpb.Op_CheckNotExists) && forUpdateTS == 0 {
|
||||
v, err := mvcc.getValue(m.Key, startTS, kvrpcpb.IsolationLevel_SI, req.Context.ResolvedLocks)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
@ -669,6 +670,9 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if op == kvrpcpb.Op_CheckNotExists {
|
||||
continue
|
||||
}
|
||||
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i]
|
||||
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS)
|
||||
errs = append(errs, err)
|
||||
|
||||
@ -128,18 +128,19 @@ func metricsTag(action string) string {
|
||||
|
||||
// twoPhaseCommitter executes a two-phase commit protocol.
|
||||
type twoPhaseCommitter struct {
|
||||
store *tikvStore
|
||||
txn *tikvTxn
|
||||
startTS uint64
|
||||
keys [][]byte
|
||||
mutations map[string]*mutationEx
|
||||
lockTTL uint64
|
||||
commitTS uint64
|
||||
priority pb.CommandPri
|
||||
connID uint64 // connID is used for log.
|
||||
cleanWg sync.WaitGroup
|
||||
detail unsafe.Pointer
|
||||
txnSize int
|
||||
store *tikvStore
|
||||
txn *tikvTxn
|
||||
startTS uint64
|
||||
keys [][]byte
|
||||
mutations map[string]*mutationEx
|
||||
lockTTL uint64
|
||||
commitTS uint64
|
||||
priority pb.CommandPri
|
||||
connID uint64 // connID is used for log.
|
||||
cleanWg sync.WaitGroup
|
||||
detail unsafe.Pointer
|
||||
txnSize int
|
||||
noNeedCommitKeys map[string]struct{}
|
||||
|
||||
primaryKey []byte
|
||||
forUpdateTS uint64
|
||||
@ -227,11 +228,12 @@ func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS,
|
||||
|
||||
func (c *twoPhaseCommitter) initKeysAndMutations() error {
|
||||
var (
|
||||
keys [][]byte
|
||||
size int
|
||||
putCnt int
|
||||
delCnt int
|
||||
lockCnt int
|
||||
keys [][]byte
|
||||
size int
|
||||
putCnt int
|
||||
delCnt int
|
||||
lockCnt int
|
||||
noNeedCommitKey = make(map[string]struct{})
|
||||
)
|
||||
mutations := make(map[string]*mutationEx)
|
||||
txn := c.txn
|
||||
@ -264,13 +266,24 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
|
||||
}
|
||||
putCnt++
|
||||
} else {
|
||||
var op pb.Op
|
||||
if !txn.IsPessimistic() && txn.us.GetKeyExistErrInfo(k) != nil {
|
||||
// delete-your-writes keys in optimistic txn need check not exists in prewrite-phase
|
||||
// due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase.
|
||||
op = pb.Op_CheckNotExists
|
||||
noNeedCommitKey[string(k)] = struct{}{}
|
||||
} else {
|
||||
// normal delete keys in optimistic txn can be delete without not exists checking
|
||||
// delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them
|
||||
op = pb.Op_Del
|
||||
delCnt++
|
||||
}
|
||||
mutations[string(k)] = &mutationEx{
|
||||
Mutation: pb.Mutation{
|
||||
Op: pb.Op_Del,
|
||||
Op: op,
|
||||
Key: k,
|
||||
},
|
||||
}
|
||||
delCnt++
|
||||
}
|
||||
if c.isPessimistic {
|
||||
if !bytes.Equal(k, c.primaryKey) {
|
||||
@ -326,6 +339,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
|
||||
zap.Int("puts", putCnt),
|
||||
zap.Int("dels", delCnt),
|
||||
zap.Int("locks", lockCnt),
|
||||
zap.Int("checks", len(noNeedCommitKey)),
|
||||
zap.Uint64("txnStartTS", txn.startTS))
|
||||
}
|
||||
|
||||
@ -342,6 +356,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
|
||||
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
|
||||
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
|
||||
c.keys = keys
|
||||
c.noNeedCommitKeys = noNeedCommitKey
|
||||
c.mutations = mutations
|
||||
c.lockTTL = txnLockTTL(txn.startTime, size)
|
||||
c.priority = getTxnPriority(txn)
|
||||
@ -1128,6 +1143,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// strip check_not_exists keys that no need to commit.
|
||||
c.stripNoNeedCommitKeys()
|
||||
|
||||
start = time.Now()
|
||||
logutil.Event(ctx, "start get commit ts")
|
||||
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
|
||||
@ -1190,6 +1208,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) stripNoNeedCommitKeys() {
|
||||
if len(c.noNeedCommitKeys) == 0 {
|
||||
return
|
||||
}
|
||||
var i int
|
||||
for _, k := range c.keys {
|
||||
if _, ck := c.noNeedCommitKeys[string(k)]; ck {
|
||||
continue
|
||||
}
|
||||
c.keys[i] = k
|
||||
i++
|
||||
}
|
||||
c.keys = c.keys[:i]
|
||||
}
|
||||
|
||||
type schemaLeaseChecker interface {
|
||||
Check(txnTS uint64) error
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user