From e1f2b3728d66dddfd1b49625897fc238be742e03 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 4 Jul 2019 16:20:38 +0800 Subject: [PATCH] *: make assertion check more reasonable (#10424) Make sure there are no "ASSERTION fail" logs in session and executor unit test --- ddl/index.go | 2 +- executor/admin.go | 2 +- executor/distsql_test.go | 3 +- executor/executor_test.go | 6 ++-- executor/write_test.go | 3 +- kv/kv.go | 11 ++++-- kv/mock.go | 1 + session/txn.go | 3 ++ store/mockstore/mocktikv/mvcc_leveldb.go | 43 ++++++++++++------------ store/tikv/txn.go | 16 ++++++++- table/index.go | 14 ++++++-- table/tables/index.go | 29 ++++++++++++---- table/tables/index_test.go | 13 +++---- table/tables/tables.go | 8 ++--- util/admin/admin_test.go | 4 +-- 15 files changed, 105 insertions(+), 53 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 1e3f44562b..e9f86cbf9d 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -808,7 +808,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx } // Create the index. - handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle) + handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, table.WithAssertion(txn)) if err != nil { if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle { // Index already exists, skip it. diff --git a/executor/admin.go b/executor/admin.go index 7dfa13ae7c..fbb1d4e21f 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -436,7 +436,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa return result, err } - _, err = e.index.Create(e.ctx, txn, row.idxVals, row.handle) + _, err = e.index.Create(e.ctx, txn, row.idxVals, row.handle, table.WithAssertion(txn)) if err != nil { return result, err } diff --git a/executor/distsql_test.go b/executor/distsql_test.go index c688500925..6ddf318d7f 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -214,7 +215,7 @@ func (s *testSuite3) TestInconsistentIndex(c *C) { for i := 0; i < 10; i++ { txn, err := s.store.Begin() c.Assert(err, IsNil) - _, err = idxOp.Create(ctx, txn, types.MakeDatums(i+10), int64(100+i)) + _, err = idxOp.Create(ctx, txn, types.MakeDatums(i+10), int64(100+i), table.WithAssertion(txn)) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/executor/executor_test.go b/executor/executor_test.go index c8afc74245..d63ef2fec4 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -359,7 +359,7 @@ func (s *testSuite) TestAdmin(c *C) { tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("admin_test")) c.Assert(err, IsNil) c.Assert(tb.Indices(), HasLen, 1) - _, err = tb.Indices()[0].Create(mock.NewContext(), txn, types.MakeDatums(int64(10)), 1) + _, err = tb.Indices()[0].Create(mock.NewContext(), txn, types.MakeDatums(int64(10)), 1, table.WithAssertion(txn)) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) @@ -3129,7 +3129,7 @@ func (s *testSuite) TestCheckIndex(c *C) { // table data (handle, data): (1, 10), (2, 20), (4, 40) txn, err = s.store.Begin() c.Assert(err, IsNil) - _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3) + _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3, table.WithAssertion(txn)) c.Assert(err, IsNil) key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4)) setColValue(c, txn, key, types.NewDatum(int64(40))) @@ -3144,7 +3144,7 @@ func (s *testSuite) TestCheckIndex(c *C) { // table data (handle, data): (1, 10), (2, 20), (4, 40) txn, err = s.store.Begin() c.Assert(err, IsNil) - _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4) + _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4, table.WithAssertion(txn)) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/executor/write_test.go b/executor/write_test.go index f1dfca1219..f29b2538d1 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/mock" @@ -2355,7 +2356,7 @@ func (s *testSuite4) TestReplaceLog(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) - _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(1), 1) + _, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(1), 1, table.WithAssertion(txn)) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/kv/kv.go b/kv/kv.go index 7fcb148844..e9b80d847d 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -130,6 +130,7 @@ type MemBuffer interface { // This is not thread safe. type Transaction interface { MemBuffer + AssertionProto // Commit commits the transaction operations to KV store. Commit(context.Context) error // Rollback undoes the transaction operations to KV store. @@ -154,13 +155,19 @@ type Transaction interface { GetMemBuffer() MemBuffer // SetVars sets variables to the transaction. SetVars(vars *Variables) - // SetAssertion sets an assertion for an operation on the key. - SetAssertion(key Key, assertion AssertionType) // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. BatchGet(keys []Key) (map[string][]byte, error) IsPessimistic() bool } +// AssertionProto is an interface defined for the assertion protocol. +type AssertionProto interface { + // SetAssertion sets an assertion for an operation on the key. + SetAssertion(key Key, assertion AssertionType) + // Confirm assertions to current position if `succ` is true, reset position otherwise. + ConfirmAssertions(succ bool) +} + // Client is used to send request to KV layer. type Client interface { // Send sends request to KV layer, returns a Response. diff --git a/kv/mock.go b/kv/mock.go index bf1902ea8d..9fc2c44dd9 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -114,6 +114,7 @@ func (t *mockTxn) SetVars(vars *Variables) { } func (t *mockTxn) SetAssertion(key Key, assertion AssertionType) {} +func (t *mockTxn) ConfirmAssertions(succ bool) {} // NewMockTxn new a mockTxn. func NewMockTxn() Transaction { diff --git a/session/txn.go b/session/txn.go index 39a2f870db..ca14739759 100644 --- a/session/txn.go +++ b/session/txn.go @@ -438,6 +438,7 @@ func (s *session) StmtCommit() error { }) if err != nil { st.doNotCommit = err + st.ConfirmAssertions(false) return err } @@ -453,12 +454,14 @@ func (s *session) StmtCommit() error { mergeToDirtyDB(dirtyDB, op) } } + st.ConfirmAssertions(true) return nil } // StmtRollback implements the sessionctx.Context interface. func (s *session) StmtRollback() { s.txn.cleanup() + s.txn.ConfirmAssertions(false) return } diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 6b0a33b9e6..d4413de96a 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -641,8 +641,12 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, startTS uint64) err return errors.Trace(err) } if !ok { + if m.Assertion == kvrpcpb.Assertion_Exist { + logutil.BgLogger().Error("ASSERTION FAIL!!!", zap.Stringer("mutation", m)) + } return nil } + // Note that it's a write conflict here, even if the value is a rollback one. if dec.value.commitTS >= startTS { return &ErrConflict{ @@ -651,22 +655,25 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, startTS uint64) err Key: m.Key, } } - if m.Op == kvrpcpb.Op_PessimisticLock && m.Assertion == kvrpcpb.Assertion_NotExist { - // Skip rollback keys. - for dec.value.valueType == typeRollback { - ok, err = dec.Decode(iter) - if err != nil { - return errors.Trace(err) + + if m.Assertion == kvrpcpb.Assertion_NotExist { + for ok { + if dec.value.valueType == typeRollback { + ok, err = dec.Decode(iter) + if err != nil { + return errors.Trace(err) + } + } else if dec.value.valueType == typeDelete { + break + } else { + if m.Op == kvrpcpb.Op_PessimisticLock { + return &ErrKeyAlreadyExist{ + Key: m.Key, + } + } + logutil.BgLogger().Error("ASSERTION FAIL!!!", zap.Stringer("mutation", m)) + break } - if !ok { - return nil - } - } - if dec.value.valueType == typeDelete { - return nil - } - return &ErrKeyAlreadyExist{ - Key: m.Key, } } return nil @@ -722,12 +729,6 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu return errors.Trace(err) } - // Check assertions. - if (ok && mutation.Assertion == kvrpcpb.Assertion_NotExist) || - (!ok && mutation.Assertion == kvrpcpb.Assertion_Exist) { - logutil.BgLogger().Error("ASSERTION FAIL!!!", zap.Stringer("mutation", mutation)) - } - batch.Put(writeKey, writeValue) return nil } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index fbcc8b0c76..c842f0aeae 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -67,7 +67,11 @@ type tikvTxn struct { committer *twoPhaseCommitter // For data consistency check. + // assertions[:confirmed] is the assertion of current transaction. + // assertions[confirmed:len(assertions)] is the assertions of current statement. + // StmtCommit/StmtRollback may change the confirmed position. assertions []assertionPair + confirmed int } func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { @@ -106,7 +110,17 @@ func (a assertionPair) String() string { // SetAssertion sets a assertion for the key operation. func (txn *tikvTxn) SetAssertion(key kv.Key, assertion kv.AssertionType) { - txn.assertions = append(txn.assertions, assertionPair{key, assertion}) + // Deep copy the key since it's memory is referenced from union store and overwrite change later. + key1 := append([]byte{}, key...) + txn.assertions = append(txn.assertions, assertionPair{key1, assertion}) +} + +func (txn *tikvTxn) ConfirmAssertions(succ bool) { + if succ { + txn.confirmed = len(txn.assertions) + } else { + txn.assertions = txn.assertions[:txn.confirmed] + } } func (txn *tikvTxn) SetVars(vars *kv.Variables) { diff --git a/table/index.go b/table/index.go index 62518459c4..c05396fc37 100644 --- a/table/index.go +++ b/table/index.go @@ -29,8 +29,9 @@ type IndexIterator interface { // CreateIdxOpt contains the options will be used when creating an index. type CreateIdxOpt struct { - SkipHandleCheck bool // If true, skip the handle constraint check. - SkipCheck bool // If true, skip all the unique indices constraint check. + SkipHandleCheck bool // If true, skip the handle constraint check. + SkipCheck bool // If true, skip all the unique indices constraint check. + kv.AssertionProto // If not nil, check assertion. } // CreateIdxOptFunc is defined for the Create() method of Index interface. @@ -48,12 +49,19 @@ var SkipCheck CreateIdxOptFunc = func(opt *CreateIdxOpt) { opt.SkipCheck = true } +// WithAssertion returns a CreateIdxFunc. +func WithAssertion(x kv.AssertionProto) CreateIdxOptFunc { + return func(opt *CreateIdxOpt) { + opt.AssertionProto = x + } +} + // Index is the interface for index data on KV store. type Index interface { // Meta returns IndexInfo. Meta() *model.IndexInfo // Create supports insert into statement. - Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...CreateIdxOpt) (int64, error) + Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...CreateIdxOptFunc) (int64, error) // Delete supports delete from statement. Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64, ss kv.Transaction) error // Drop supports drop table, drop index statements. diff --git a/table/tables/index.go b/table/tables/index.go index 63281bb895..bf9bd8d96d 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -196,8 +196,12 @@ func (c *index) GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types. // Create creates a new entry in the kvIndex data. // If the index is unique and there is an existing entry with the same key, // Create will return the existing entry's handle as the first return value, ErrKeyExists as the second return value. -func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, - opts ...table.CreateIdxOpt) (int64, error) { +func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...table.CreateIdxOptFunc) (int64, error) { + var opt table.CreateIdxOpt + for _, fn := range opts { + fn(&opt) + } + ss := opt.AssertionProto writeBufs := ctx.GetSessionVars().GetWriteStmtBufs() skipCheck := ctx.GetSessionVars().LightningMode || ctx.GetSessionVars().StmtCtx.BatchCheck key, distinct, err := c.GenIndexKey(ctx.GetSessionVars().StmtCtx, indexedValues, h, writeBufs.IndexKeyBuf) @@ -209,16 +213,27 @@ func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedVa if !distinct { // non-unique index doesn't need store value, write a '0' to reduce space err = rm.Set(key, []byte{'0'}) + if ss != nil { + ss.SetAssertion(key, kv.None) + } + return 0, err + } + + if skipCheck { + err = rm.Set(key, EncodeHandle(h)) + if ss != nil { + ss.SetAssertion(key, kv.None) + } return 0, err } var value []byte - if !skipCheck { - value, err = rm.Get(key) - } - - if skipCheck || kv.IsErrNotFound(err) { + value, err = rm.Get(key) + if kv.IsErrNotFound(err) { err = rm.Set(key, EncodeHandle(h)) + if ss != nil { + ss.SetAssertion(key, kv.NotExist) + } return 0, err } diff --git a/table/tables/index_test.go b/table/tables/index_test.go index c6646b1ec7..913792fa5e 100644 --- a/table/tables/index_test.go +++ b/table/tables/index_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/mock" @@ -77,7 +78,7 @@ func (s *testIndexSuite) TestIndex(c *C) { values := types.MakeDatums(1, 2) mockCtx := mock.NewContext() - _, err = index.Create(mockCtx, txn, values, 1) + _, err = index.Create(mockCtx, txn, values, 1, table.WithAssertion(txn)) c.Assert(err, IsNil) it, err := index.SeekFirst(txn) @@ -109,7 +110,7 @@ func (s *testIndexSuite) TestIndex(c *C) { c.Assert(terror.ErrorEqual(err, io.EOF), IsTrue, Commentf("err %v", err)) it.Close() - _, err = index.Create(mockCtx, txn, values, 0) + _, err = index.Create(mockCtx, txn, values, 0, table.WithAssertion(txn)) c.Assert(err, IsNil) _, err = index.SeekFirst(txn) @@ -160,10 +161,10 @@ func (s *testIndexSuite) TestIndex(c *C) { txn, err = s.s.Begin() c.Assert(err, IsNil) - _, err = index.Create(mockCtx, txn, values, 1) + _, err = index.Create(mockCtx, txn, values, 1, table.WithAssertion(txn)) c.Assert(err, IsNil) - _, err = index.Create(mockCtx, txn, values, 2) + _, err = index.Create(mockCtx, txn, values, 2, table.WithAssertion(txn)) c.Assert(err, NotNil) it, err = index.SeekFirst(txn) @@ -195,7 +196,7 @@ func (s *testIndexSuite) TestIndex(c *C) { // Test the function of Next when the value of unique key is nil. values2 := types.MakeDatums(nil, nil) - _, err = index.Create(mockCtx, txn, values2, 2) + _, err = index.Create(mockCtx, txn, values2, 2, table.WithAssertion(txn)) c.Assert(err, IsNil) it, err = index.SeekFirst(txn) c.Assert(err, IsNil) @@ -234,7 +235,7 @@ func (s *testIndexSuite) TestCombineIndexSeek(c *C) { mockCtx := mock.NewContext() values := types.MakeDatums("abc", "def") - _, err = index.Create(mockCtx, txn, values, 1) + _, err = index.Create(mockCtx, txn, values, 1, table.WithAssertion(txn)) c.Assert(err, IsNil) index2 := tables.NewIndex(tblInfo.ID, tblInfo, tblInfo.Indices[0]) diff --git a/table/tables/tables.go b/table/tables/tables.go index abfb35205a..24e04c3f9e 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -377,7 +377,7 @@ func (t *tableCommon) rebuildIndices(ctx sessionctx.Context, rm kv.RetrieverMuta if err != nil { return err } - if err := t.buildIndexForRow(ctx, rm, h, newVs, idx); err != nil { + if err := t.buildIndexForRow(ctx, rm, h, newVs, idx, txn); err != nil { return err } break @@ -599,7 +599,7 @@ func (t *tableCommon) addIndices(ctx sessionctx.Context, recordID int64, r []typ dupKeyErr = kv.ErrKeyExists.FastGen("Duplicate entry '%s' for key '%s'", entryKey, v.Meta().Name) txn.SetOption(kv.PresumeKeyNotExistsError, dupKeyErr) } - if dupHandle, err := v.Create(ctx, rm, indexVals, recordID, opt); err != nil { + if dupHandle, err := v.Create(ctx, rm, indexVals, recordID, opts...); err != nil { if kv.ErrKeyExists.Equal(err) { return dupHandle, dupKeyErr } @@ -814,8 +814,8 @@ func (t *tableCommon) removeRowIndex(sc *stmtctx.StatementContext, rm kv.Retriev } // buildIndexForRow implements table.Table BuildIndexForRow interface. -func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index) error { - if _, err := idx.Create(ctx, rm, vals, h); err != nil { +func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index, txn kv.Transaction) error { + if _, err := idx.Create(ctx, rm, vals, h, table.WithAssertion(txn)); err != nil { if kv.ErrKeyExists.Equal(err) { // Make error message consistent with MySQL. entryKey, err1 := t.genIndexKeyStr(vals) diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index 1c30ffa901..ae756b1f68 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -465,7 +465,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta // set data to: // index data (handle, data): (1, 10), (2, 20), (3, 30) // table data (handle, data): (1, 10), (2, 20), (4, 40) - _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3) + _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3, table.WithAssertion(txn)) c.Assert(err, IsNil) key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4)) setColValue(c, txn, key, types.NewDatum(int64(40))) @@ -486,7 +486,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta // set data to: // index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40) // table data (handle, data): (1, 10), (2, 20), (4, 40), (3, 31) - _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4) + _, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4, table.WithAssertion(txn)) c.Assert(err, IsNil) key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3)) setColValue(c, txn, key, types.NewDatum(int64(31)))