Merge pull request #465 from pingcap/goroutine/fix-461
kv: Do Lock key when create unique index
This commit is contained in:
@ -159,20 +159,24 @@ func (c *kvIndex) genIndexKey(indexedValues []interface{}, h int64) ([]byte, err
|
||||
// Create creates a new entry in the kvIndex data.
|
||||
// If the index is unique and there already exists an entry with the same key, Create will return ErrKeyExists
|
||||
func (c *kvIndex) Create(txn Transaction, indexedValues []interface{}, h int64) error {
|
||||
keyBuf, err := c.genIndexKey(indexedValues, h)
|
||||
key, err := c.genIndexKey(indexedValues, h)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if !c.unique {
|
||||
// TODO: reconsider value
|
||||
err = txn.Set(keyBuf, []byte("timestamp?"))
|
||||
err = txn.Set(key, []byte("timestamp?"))
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// unique index
|
||||
_, err = txn.Get(keyBuf)
|
||||
err = txn.LockKeys(key)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
_, err = txn.Get(key)
|
||||
if IsErrNotFound(err) {
|
||||
err = txn.Set(keyBuf, encodeHandle(h))
|
||||
err = txn.Set(key, encodeHandle(h))
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -181,11 +185,11 @@ func (c *kvIndex) Create(txn Transaction, indexedValues []interface{}, h int64)
|
||||
|
||||
// Delete removes the entry for handle h and indexdValues from KV index.
|
||||
func (c *kvIndex) Delete(txn Transaction, indexedValues []interface{}, h int64) error {
|
||||
keyBuf, err := c.genIndexKey(indexedValues, h)
|
||||
key, err := c.genIndexKey(indexedValues, h)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = txn.Delete(keyBuf)
|
||||
err = txn.Delete(key)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -217,17 +221,17 @@ func (c *kvIndex) Drop(txn Transaction) error {
|
||||
|
||||
// Seek searches KV index for the entry with indexedValues.
|
||||
func (c *kvIndex) Seek(txn Transaction, indexedValues []interface{}) (iter IndexIterator, hit bool, err error) {
|
||||
keyBuf, err := c.genIndexKey(indexedValues, 0)
|
||||
key, err := c.genIndexKey(indexedValues, 0)
|
||||
if err != nil {
|
||||
return nil, false, errors.Trace(err)
|
||||
}
|
||||
it, err := txn.Seek(keyBuf)
|
||||
it, err := txn.Seek(key)
|
||||
if err != nil {
|
||||
return nil, false, errors.Trace(err)
|
||||
}
|
||||
// check if hit
|
||||
hit = false
|
||||
if it.Valid() && it.Key() == string(keyBuf) {
|
||||
if it.Valid() && it.Key() == string(key) {
|
||||
hit = true
|
||||
}
|
||||
return &indexIter{it: it, idx: c, prefix: c.prefix}, hit, nil
|
||||
|
||||
@ -256,7 +256,7 @@ func (s *session) ExecRestrictedSQL(ctx context.Context, sql string) (rset.Recor
|
||||
// Check statement for some restriction
|
||||
// For example only support DML on system meta table.
|
||||
// TODO: Add more restrictions.
|
||||
log.Infof("Executing %s [%s]", st, sql)
|
||||
log.Infof("Executing %s [%s]", st.OriginText(), sql)
|
||||
ctx.SetValue(&sqlexec.RestrictedSQLExecutorKeyType{}, true)
|
||||
defer ctx.ClearValue(&sqlexec.RestrictedSQLExecutorKeyType{})
|
||||
rs, err := st.Exec(ctx)
|
||||
|
||||
@ -217,13 +217,19 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte
|
||||
|
||||
metaKey := codec.EncodeBytes(nil, []byte(key))
|
||||
currValue, err := s.db.Get(metaKey)
|
||||
if errors2.ErrorEqual(err, kv.ErrNotExist) || currValue == nil {
|
||||
// If it's a new key, we won't need to check its version
|
||||
if errors2.ErrorEqual(err, kv.ErrNotExist) {
|
||||
s.keysLocked[key] = tid
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// key not exist.
|
||||
if currValue == nil {
|
||||
s.keysLocked[key] = tid
|
||||
return nil
|
||||
}
|
||||
_, ver, err := codec.DecodeUint(currValue)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -236,7 +242,6 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte
|
||||
}
|
||||
|
||||
s.keysLocked[key] = tid
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -348,13 +348,13 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64,
|
||||
if errors2.ErrorEqual(err, kv.ErrKeyExists) {
|
||||
// Get the duplicate row handle
|
||||
// For insert on duplicate syntax, we should update the row
|
||||
iter, _, terr := v.X.Seek(txn, colVals)
|
||||
if terr != nil {
|
||||
return 0, errors.Trace(terr)
|
||||
iter, _, err1 := v.X.Seek(txn, colVals)
|
||||
if err1 != nil {
|
||||
return 0, errors.Trace(err1)
|
||||
}
|
||||
_, h, terr := iter.Next()
|
||||
if terr != nil {
|
||||
return 0, errors.Trace(terr)
|
||||
_, h, err1 := iter.Next()
|
||||
if err1 != nil {
|
||||
return 0, errors.Trace(err1)
|
||||
}
|
||||
return h, errors.Trace(err)
|
||||
}
|
||||
|
||||
17
tidb_test.go
17
tidb_test.go
@ -1245,6 +1245,23 @@ func (s *testSessionSuite) TestResultType(c *C) {
|
||||
c.Assert(fs[0].Col.FieldType.Tp, Equals, mysql.TypeString)
|
||||
}
|
||||
|
||||
func (s *testSessionSuite) TestIssue461(c *C) {
|
||||
store := newStore(c, s.dbName)
|
||||
se1 := newSession(c, store, s.dbName)
|
||||
mustExecSQL(c, se1,
|
||||
`CREATE TABLE test ( id int(11) UNSIGNED NOT NULL AUTO_INCREMENT, val int UNIQUE, PRIMARY KEY (id)); `)
|
||||
mustExecSQL(c, se1, "begin;")
|
||||
mustExecSQL(c, se1, "insert into test(id, val) values(1, 1);")
|
||||
se2 := newSession(c, store, s.dbName)
|
||||
mustExecSQL(c, se2, "begin;")
|
||||
mustExecSQL(c, se2, "insert into test(id, val) values(1, 1);")
|
||||
mustExecSQL(c, se2, "commit;")
|
||||
mustExecFailed(c, se1, "commit;")
|
||||
|
||||
se := newSession(c, store, s.dbName)
|
||||
mustExecSQL(c, se, "drop table test;")
|
||||
}
|
||||
|
||||
func (s *testSessionSuite) TestBuiltin(c *C) {
|
||||
store := newStore(c, s.dbName)
|
||||
se := newSession(c, store, s.dbName)
|
||||
|
||||
Reference in New Issue
Block a user