diff --git a/kv/index_iter.go b/kv/index_iter.go index 8515a303bc..b83e287b03 100644 --- a/kv/index_iter.go +++ b/kv/index_iter.go @@ -166,11 +166,6 @@ func (c *kvIndex) Create(txn Transaction, indexedValues []interface{}, h int64) return errors.Trace(err) } - // unique index - err = txn.LockKeys(key) - if err != nil { - return errors.Trace(err) - } _, err = txn.Get(key) if IsErrNotFound(err) { err = txn.Set(key, encodeHandle(h)) diff --git a/stmt/stmts/update.go b/stmt/stmts/update.go index 56e92484f3..cd8abc2f5b 100644 --- a/stmt/stmts/update.go +++ b/stmt/stmts/update.go @@ -121,7 +121,7 @@ func getUpdateColumns(assignList []*expression.Assignment, fields []*field.Resul func updateRecord(ctx context.Context, h int64, data []interface{}, t table.Table, updateColumns map[int]*expression.Assignment, m map[interface{}]interface{}, offset int, onDuplicateUpdate bool) error { - if err := t.LockRow(ctx, h, true); err != nil { + if err := t.LockRow(ctx, h); err != nil { return errors.Trace(err) } diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 8930ed2a29..be7df9010f 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -156,7 +156,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) { valid: true, store: s, version: kv.MinVersion, - snapshotVals: make(map[string][]byte), + snapshotVals: make(map[string]struct{}), } log.Debugf("Begin txn:%d", txn.tid) txn.UnionStore, err = kv.NewUnionStore(&dbSnapshot{ @@ -212,7 +212,7 @@ func (s *dbStore) newBatch() engine.Batch { } // Both lock and unlock are used for simulating scenario of percolator papers. -func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte) error { +func (s *dbStore) tryConditionLockKey(tid uint64, key string) error { s.mu.Lock() defer s.mu.Unlock() @@ -246,7 +246,7 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte // If there's newer version of this key, returns error. if ver > tid { - log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q, snapshotVal:%q", tid, key, currValue, snapshotVal) + log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q", tid, key, currValue) return errors.Trace(kv.ErrConditionNotMatch) } diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 60ce900d95..d6769a386d 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -38,26 +38,19 @@ type dbTxn struct { store *dbStore // for commit tid uint64 valid bool - version kv.Version // commit version - snapshotVals map[string][]byte // origin version in snapshot + version kv.Version // commit version + snapshotVals map[string]struct{} // origin version in snapshot } -func (txn *dbTxn) markOrigin(k []byte) error { +func (txn *dbTxn) markOrigin(k []byte) { keystr := string(k) // Already exist, do nothing. if _, ok := txn.snapshotVals[keystr]; ok { - return nil + return } - val, err := txn.Snapshot.Get(k) - if err != nil && !kv.IsErrNotFound(err) { - return errors.Trace(err) - } - - // log.Debugf("markOrigin, key:%q, value:%q", keystr, val) - txn.snapshotVals[keystr] = val - return nil + txn.snapshotVals[keystr] = struct{}{} } // Implement transaction interface @@ -66,9 +59,7 @@ func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) { log.Debugf("Inc %q, step %d txn:%d", k, step, txn.tid) k = kv.EncodeKey(k) - if err := txn.markOrigin(k); err != nil { - return 0, errors.Trace(err) - } + txn.markOrigin(k) val, err := txn.UnionStore.Get(k) if kv.IsErrNotFound(err) { err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10))) @@ -143,6 +134,7 @@ func (txn *dbTxn) Set(k kv.Key, data []byte) error { if err != nil { return errors.Trace(err) } + txn.markOrigin(k) txn.store.compactor.OnSet(k) return nil } @@ -169,6 +161,7 @@ func (txn *dbTxn) Delete(k kv.Key) error { if err != nil { return errors.Trace(err) } + txn.markOrigin(k) txn.store.compactor.OnDelete(k) return nil } @@ -193,8 +186,8 @@ func (txn *dbTxn) doCommit() error { } }() // Check locked keys - for k, v := range txn.snapshotVals { - err := txn.store.tryConditionLockKey(txn.tid, k, v) + for k := range txn.snapshotVals { + err := txn.store.tryConditionLockKey(txn.tid, k) if err != nil { return errors.Trace(err) } @@ -266,9 +259,7 @@ func (txn *dbTxn) Rollback() error { func (txn *dbTxn) LockKeys(keys ...kv.Key) error { for _, key := range keys { key = kv.EncodeKey(key) - if err := txn.markOrigin(key); err != nil { - return errors.Trace(err) - } + txn.markOrigin(key) } return nil } diff --git a/structure/hash.go b/structure/hash.go index 1dfee5e668..78857d6acf 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -98,10 +98,6 @@ func (t *TxStructure) updateHash(key []byte, field []byte, fn func(oldValue []by return errors.Trace(err) } - if err = t.txn.LockKeys(metaKey); err != nil { - return errors.Trace(err) - } - dataKey := t.encodeHashDataKey(key, field) var oldValue []byte oldValue, err = t.loadHashValue(dataKey) @@ -144,10 +140,6 @@ func (t *TxStructure) HDel(key []byte, fields ...[]byte) error { return errors.Trace(err) } - if err = t.txn.LockKeys(metaKey); err != nil { - return errors.Trace(err) - } - var value []byte for _, field := range fields { dataKey := t.encodeHashDataKey(key, field) @@ -209,10 +201,6 @@ func (t *TxStructure) HClear(key []byte) error { return errors.Trace(err) } - if err = t.txn.LockKeys(metaKey); err != nil { - return errors.Trace(err) - } - err = t.iterateHash(key, func(field []byte, value []byte) error { k := t.encodeHashDataKey(key, field) return errors.Trace(t.txn.Delete(k)) diff --git a/structure/list.go b/structure/list.go index ccbddda037..dfd8fc8a91 100644 --- a/structure/list.go +++ b/structure/list.go @@ -58,10 +58,6 @@ func (t *TxStructure) listPush(key []byte, left bool, values ...[]byte) error { return errors.Trace(err) } - if err = t.txn.LockKeys(metaKey); err != nil { - return errors.Trace(err) - } - index := int64(0) for _, v := range values { if left { @@ -98,10 +94,6 @@ func (t *TxStructure) listPop(key []byte, left bool) ([]byte, error) { return nil, errors.Trace(err) } - if err = t.txn.LockKeys(metaKey); err != nil { - return nil, errors.Trace(err) - } - index := int64(0) if left { index = meta.LIndex @@ -179,10 +171,6 @@ func (t *TxStructure) LClear(key []byte) error { return errors.Trace(err) } - if err = t.txn.LockKeys(metaKey); err != nil { - return errors.Trace(err) - } - for index := meta.LIndex; index < meta.RIndex; index++ { dataKey := t.encodeListDataKey(key, index) if err = t.txn.Delete(dataKey); err != nil { diff --git a/structure/string.go b/structure/string.go index 4a4f6e64ec..295ebea092 100644 --- a/structure/string.go +++ b/structure/string.go @@ -24,9 +24,6 @@ import ( // Set sets the string value of the key. func (t *TxStructure) Set(key []byte, value []byte) error { ek := t.encodeStringDataKey(key) - if err := t.txn.LockKeys(ek); err != nil { - return errors.Trace(err) - } return t.txn.Set(ek, value) } @@ -67,10 +64,6 @@ func (t *TxStructure) Inc(key []byte, step int64) (int64, error) { // Clear removes the string value of the key. func (t *TxStructure) Clear(key []byte) error { ek := t.encodeStringDataKey(key) - if err := t.txn.LockKeys(ek); err != nil { - return errors.Trace(err) - } - err := t.txn.Delete(ek) if errors2.ErrorEqual(err, kv.ErrNotExist) { err = nil diff --git a/table/table.go b/table/table.go index 3e1108c264..e08a3c1640 100644 --- a/table/table.go +++ b/table/table.go @@ -106,7 +106,7 @@ type Table interface { // LockRow locks a row. // If update is true, set row lock key to current txn. - LockRow(ctx context.Context, h int64, update bool) error + LockRow(ctx context.Context, h int64) error } // TableFromMeta builds a table.Table from *model.TableInfo. diff --git a/table/tables/tables.go b/table/tables/tables.go index 5c1ff5d606..de5f5409df 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -362,7 +362,7 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64, } } - if err := t.LockRow(ctx, recordID, true); err != nil { + if err := t.LockRow(ctx, recordID); err != nil { return 0, errors.Trace(err) } @@ -432,20 +432,13 @@ func (t *Table) Row(ctx context.Context, h int64) ([]interface{}, error) { } // LockRow implements table.Table LockRow interface. -func (t *Table) LockRow(ctx context.Context, h int64, update bool) error { +func (t *Table) LockRow(ctx context.Context, h int64) error { txn, err := ctx.GetTxn(false) if err != nil { return errors.Trace(err) } // Get row lock key lockKey := t.RecordKey(h, nil) - err = txn.LockKeys([]byte(lockKey)) - if err != nil { - return errors.Trace(err) - } - if !update { - return nil - } // set row lock key to current txn err = txn.Set([]byte(lockKey), []byte(txn.String())) return errors.Trace(err) @@ -453,7 +446,7 @@ func (t *Table) LockRow(ctx context.Context, h int64, update bool) error { // RemoveRow implements table.Table RemoveRow interface. func (t *Table) RemoveRow(ctx context.Context, h int64) error { - if err := t.LockRow(ctx, h, false); err != nil { + if err := t.LockRow(ctx, h); err != nil { return errors.Trace(err) } txn, err := ctx.GetTxn(false)