*: make assertion check more reasonable (#10424)

Make sure there are no "ASSERTION fail" logs in session and executor unit test
This commit is contained in:
tiancaiamao
2019-07-04 16:20:38 +08:00
committed by GitHub
parent ce0312bd43
commit e1f2b3728d
15 changed files with 105 additions and 53 deletions

View File

@ -808,7 +808,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx
}
// Create the index.
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle)
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, table.WithAssertion(txn))
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle == handle {
// Index already exists, skip it.

View File

@ -436,7 +436,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
return result, err
}
_, err = e.index.Create(e.ctx, txn, row.idxVals, row.handle)
_, err = e.index.Create(e.ctx, txn, row.idxVals, row.handle, table.WithAssertion(txn))
if err != nil {
return result, err
}

View File

@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
@ -214,7 +215,7 @@ func (s *testSuite3) TestInconsistentIndex(c *C) {
for i := 0; i < 10; i++ {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = idxOp.Create(ctx, txn, types.MakeDatums(i+10), int64(100+i))
_, err = idxOp.Create(ctx, txn, types.MakeDatums(i+10), int64(100+i), table.WithAssertion(txn))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

View File

@ -359,7 +359,7 @@ func (s *testSuite) TestAdmin(c *C) {
tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("admin_test"))
c.Assert(err, IsNil)
c.Assert(tb.Indices(), HasLen, 1)
_, err = tb.Indices()[0].Create(mock.NewContext(), txn, types.MakeDatums(int64(10)), 1)
_, err = tb.Indices()[0].Create(mock.NewContext(), txn, types.MakeDatums(int64(10)), 1, table.WithAssertion(txn))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
@ -3129,7 +3129,7 @@ func (s *testSuite) TestCheckIndex(c *C) {
// table data (handle, data): (1, 10), (2, 20), (4, 40)
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3, table.WithAssertion(txn))
c.Assert(err, IsNil)
key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
setColValue(c, txn, key, types.NewDatum(int64(40)))
@ -3144,7 +3144,7 @@ func (s *testSuite) TestCheckIndex(c *C) {
// table data (handle, data): (1, 10), (2, 20), (4, 40)
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4, table.WithAssertion(txn))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

View File

@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
@ -2355,7 +2356,7 @@ func (s *testSuite4) TestReplaceLog(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(1), 1)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(1), 1, table.WithAssertion(txn))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

View File

@ -130,6 +130,7 @@ type MemBuffer interface {
// This is not thread safe.
type Transaction interface {
MemBuffer
AssertionProto
// Commit commits the transaction operations to KV store.
Commit(context.Context) error
// Rollback undoes the transaction operations to KV store.
@ -154,13 +155,19 @@ type Transaction interface {
GetMemBuffer() MemBuffer
// SetVars sets variables to the transaction.
SetVars(vars *Variables)
// SetAssertion sets an assertion for an operation on the key.
SetAssertion(key Key, assertion AssertionType)
// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
BatchGet(keys []Key) (map[string][]byte, error)
IsPessimistic() bool
}
// AssertionProto is an interface defined for the assertion protocol.
type AssertionProto interface {
// SetAssertion sets an assertion for an operation on the key.
SetAssertion(key Key, assertion AssertionType)
// Confirm assertions to current position if `succ` is true, reset position otherwise.
ConfirmAssertions(succ bool)
}
// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.

View File

@ -114,6 +114,7 @@ func (t *mockTxn) SetVars(vars *Variables) {
}
func (t *mockTxn) SetAssertion(key Key, assertion AssertionType) {}
func (t *mockTxn) ConfirmAssertions(succ bool) {}
// NewMockTxn new a mockTxn.
func NewMockTxn() Transaction {

View File

@ -438,6 +438,7 @@ func (s *session) StmtCommit() error {
})
if err != nil {
st.doNotCommit = err
st.ConfirmAssertions(false)
return err
}
@ -453,12 +454,14 @@ func (s *session) StmtCommit() error {
mergeToDirtyDB(dirtyDB, op)
}
}
st.ConfirmAssertions(true)
return nil
}
// StmtRollback implements the sessionctx.Context interface.
func (s *session) StmtRollback() {
s.txn.cleanup()
s.txn.ConfirmAssertions(false)
return
}

View File

@ -641,8 +641,12 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, startTS uint64) err
return errors.Trace(err)
}
if !ok {
if m.Assertion == kvrpcpb.Assertion_Exist {
logutil.BgLogger().Error("ASSERTION FAIL!!!", zap.Stringer("mutation", m))
}
return nil
}
// Note that it's a write conflict here, even if the value is a rollback one.
if dec.value.commitTS >= startTS {
return &ErrConflict{
@ -651,22 +655,25 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, startTS uint64) err
Key: m.Key,
}
}
if m.Op == kvrpcpb.Op_PessimisticLock && m.Assertion == kvrpcpb.Assertion_NotExist {
// Skip rollback keys.
for dec.value.valueType == typeRollback {
ok, err = dec.Decode(iter)
if err != nil {
return errors.Trace(err)
if m.Assertion == kvrpcpb.Assertion_NotExist {
for ok {
if dec.value.valueType == typeRollback {
ok, err = dec.Decode(iter)
if err != nil {
return errors.Trace(err)
}
} else if dec.value.valueType == typeDelete {
break
} else {
if m.Op == kvrpcpb.Op_PessimisticLock {
return &ErrKeyAlreadyExist{
Key: m.Key,
}
}
logutil.BgLogger().Error("ASSERTION FAIL!!!", zap.Stringer("mutation", m))
break
}
if !ok {
return nil
}
}
if dec.value.valueType == typeDelete {
return nil
}
return &ErrKeyAlreadyExist{
Key: m.Key,
}
}
return nil
@ -722,12 +729,6 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
return errors.Trace(err)
}
// Check assertions.
if (ok && mutation.Assertion == kvrpcpb.Assertion_NotExist) ||
(!ok && mutation.Assertion == kvrpcpb.Assertion_Exist) {
logutil.BgLogger().Error("ASSERTION FAIL!!!", zap.Stringer("mutation", mutation))
}
batch.Put(writeKey, writeValue)
return nil
}

View File

@ -67,7 +67,11 @@ type tikvTxn struct {
committer *twoPhaseCommitter
// For data consistency check.
// assertions[:confirmed] is the assertion of current transaction.
// assertions[confirmed:len(assertions)] is the assertions of current statement.
// StmtCommit/StmtRollback may change the confirmed position.
assertions []assertionPair
confirmed int
}
func newTiKVTxn(store *tikvStore) (*tikvTxn, error) {
@ -106,7 +110,17 @@ func (a assertionPair) String() string {
// SetAssertion sets a assertion for the key operation.
func (txn *tikvTxn) SetAssertion(key kv.Key, assertion kv.AssertionType) {
txn.assertions = append(txn.assertions, assertionPair{key, assertion})
// Deep copy the key since it's memory is referenced from union store and overwrite change later.
key1 := append([]byte{}, key...)
txn.assertions = append(txn.assertions, assertionPair{key1, assertion})
}
func (txn *tikvTxn) ConfirmAssertions(succ bool) {
if succ {
txn.confirmed = len(txn.assertions)
} else {
txn.assertions = txn.assertions[:txn.confirmed]
}
}
func (txn *tikvTxn) SetVars(vars *kv.Variables) {

View File

@ -29,8 +29,9 @@ type IndexIterator interface {
// CreateIdxOpt contains the options will be used when creating an index.
type CreateIdxOpt struct {
SkipHandleCheck bool // If true, skip the handle constraint check.
SkipCheck bool // If true, skip all the unique indices constraint check.
SkipHandleCheck bool // If true, skip the handle constraint check.
SkipCheck bool // If true, skip all the unique indices constraint check.
kv.AssertionProto // If not nil, check assertion.
}
// CreateIdxOptFunc is defined for the Create() method of Index interface.
@ -48,12 +49,19 @@ var SkipCheck CreateIdxOptFunc = func(opt *CreateIdxOpt) {
opt.SkipCheck = true
}
// WithAssertion returns a CreateIdxFunc.
func WithAssertion(x kv.AssertionProto) CreateIdxOptFunc {
return func(opt *CreateIdxOpt) {
opt.AssertionProto = x
}
}
// Index is the interface for index data on KV store.
type Index interface {
// Meta returns IndexInfo.
Meta() *model.IndexInfo
// Create supports insert into statement.
Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...CreateIdxOpt) (int64, error)
Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...CreateIdxOptFunc) (int64, error)
// Delete supports delete from statement.
Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues []types.Datum, h int64, ss kv.Transaction) error
// Drop supports drop table, drop index statements.

View File

@ -196,8 +196,12 @@ func (c *index) GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.
// Create creates a new entry in the kvIndex data.
// If the index is unique and there is an existing entry with the same key,
// Create will return the existing entry's handle as the first return value, ErrKeyExists as the second return value.
func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64,
opts ...table.CreateIdxOpt) (int64, error) {
func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedValues []types.Datum, h int64, opts ...table.CreateIdxOptFunc) (int64, error) {
var opt table.CreateIdxOpt
for _, fn := range opts {
fn(&opt)
}
ss := opt.AssertionProto
writeBufs := ctx.GetSessionVars().GetWriteStmtBufs()
skipCheck := ctx.GetSessionVars().LightningMode || ctx.GetSessionVars().StmtCtx.BatchCheck
key, distinct, err := c.GenIndexKey(ctx.GetSessionVars().StmtCtx, indexedValues, h, writeBufs.IndexKeyBuf)
@ -209,16 +213,27 @@ func (c *index) Create(ctx sessionctx.Context, rm kv.RetrieverMutator, indexedVa
if !distinct {
// non-unique index doesn't need store value, write a '0' to reduce space
err = rm.Set(key, []byte{'0'})
if ss != nil {
ss.SetAssertion(key, kv.None)
}
return 0, err
}
if skipCheck {
err = rm.Set(key, EncodeHandle(h))
if ss != nil {
ss.SetAssertion(key, kv.None)
}
return 0, err
}
var value []byte
if !skipCheck {
value, err = rm.Get(key)
}
if skipCheck || kv.IsErrNotFound(err) {
value, err = rm.Get(key)
if kv.IsErrNotFound(err) {
err = rm.Set(key, EncodeHandle(h))
if ss != nil {
ss.SetAssertion(key, kv.NotExist)
}
return 0, err
}

View File

@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
@ -77,7 +78,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
values := types.MakeDatums(1, 2)
mockCtx := mock.NewContext()
_, err = index.Create(mockCtx, txn, values, 1)
_, err = index.Create(mockCtx, txn, values, 1, table.WithAssertion(txn))
c.Assert(err, IsNil)
it, err := index.SeekFirst(txn)
@ -109,7 +110,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
c.Assert(terror.ErrorEqual(err, io.EOF), IsTrue, Commentf("err %v", err))
it.Close()
_, err = index.Create(mockCtx, txn, values, 0)
_, err = index.Create(mockCtx, txn, values, 0, table.WithAssertion(txn))
c.Assert(err, IsNil)
_, err = index.SeekFirst(txn)
@ -160,10 +161,10 @@ func (s *testIndexSuite) TestIndex(c *C) {
txn, err = s.s.Begin()
c.Assert(err, IsNil)
_, err = index.Create(mockCtx, txn, values, 1)
_, err = index.Create(mockCtx, txn, values, 1, table.WithAssertion(txn))
c.Assert(err, IsNil)
_, err = index.Create(mockCtx, txn, values, 2)
_, err = index.Create(mockCtx, txn, values, 2, table.WithAssertion(txn))
c.Assert(err, NotNil)
it, err = index.SeekFirst(txn)
@ -195,7 +196,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
// Test the function of Next when the value of unique key is nil.
values2 := types.MakeDatums(nil, nil)
_, err = index.Create(mockCtx, txn, values2, 2)
_, err = index.Create(mockCtx, txn, values2, 2, table.WithAssertion(txn))
c.Assert(err, IsNil)
it, err = index.SeekFirst(txn)
c.Assert(err, IsNil)
@ -234,7 +235,7 @@ func (s *testIndexSuite) TestCombineIndexSeek(c *C) {
mockCtx := mock.NewContext()
values := types.MakeDatums("abc", "def")
_, err = index.Create(mockCtx, txn, values, 1)
_, err = index.Create(mockCtx, txn, values, 1, table.WithAssertion(txn))
c.Assert(err, IsNil)
index2 := tables.NewIndex(tblInfo.ID, tblInfo, tblInfo.Indices[0])

View File

@ -377,7 +377,7 @@ func (t *tableCommon) rebuildIndices(ctx sessionctx.Context, rm kv.RetrieverMuta
if err != nil {
return err
}
if err := t.buildIndexForRow(ctx, rm, h, newVs, idx); err != nil {
if err := t.buildIndexForRow(ctx, rm, h, newVs, idx, txn); err != nil {
return err
}
break
@ -599,7 +599,7 @@ func (t *tableCommon) addIndices(ctx sessionctx.Context, recordID int64, r []typ
dupKeyErr = kv.ErrKeyExists.FastGen("Duplicate entry '%s' for key '%s'", entryKey, v.Meta().Name)
txn.SetOption(kv.PresumeKeyNotExistsError, dupKeyErr)
}
if dupHandle, err := v.Create(ctx, rm, indexVals, recordID, opt); err != nil {
if dupHandle, err := v.Create(ctx, rm, indexVals, recordID, opts...); err != nil {
if kv.ErrKeyExists.Equal(err) {
return dupHandle, dupKeyErr
}
@ -814,8 +814,8 @@ func (t *tableCommon) removeRowIndex(sc *stmtctx.StatementContext, rm kv.Retriev
}
// buildIndexForRow implements table.Table BuildIndexForRow interface.
func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index) error {
if _, err := idx.Create(ctx, rm, vals, h); err != nil {
func (t *tableCommon) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, h int64, vals []types.Datum, idx table.Index, txn kv.Transaction) error {
if _, err := idx.Create(ctx, rm, vals, h, table.WithAssertion(txn)); err != nil {
if kv.ErrKeyExists.Equal(err) {
// Make error message consistent with MySQL.
entryKey, err1 := t.genIndexKeyStr(vals)

View File

@ -465,7 +465,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30)
// table data (handle, data): (1, 10), (2, 20), (4, 40)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), 3, table.WithAssertion(txn))
c.Assert(err, IsNil)
key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
setColValue(c, txn, key, types.NewDatum(int64(40)))
@ -486,7 +486,7 @@ func (s *testSuite) testIndex(c *C, ctx sessionctx.Context, dbName string, tb ta
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40)
// table data (handle, data): (1, 10), (2, 20), (4, 40), (3, 31)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), 4, table.WithAssertion(txn))
c.Assert(err, IsNil)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3))
setColValue(c, txn, key, types.NewDatum(int64(31)))