Merge pull request #507 from pingcap/goroutine/unify-behavior
*: Unify behavior of both local store and HBase
This commit is contained in:
@ -166,11 +166,6 @@ func (c *kvIndex) Create(txn Transaction, indexedValues []interface{}, h int64)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// unique index
|
||||
err = txn.LockKeys(key)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
_, err = txn.Get(key)
|
||||
if IsErrNotFound(err) {
|
||||
err = txn.Set(key, encodeHandle(h))
|
||||
|
||||
@ -121,7 +121,7 @@ func getUpdateColumns(assignList []*expression.Assignment, fields []*field.Resul
|
||||
func updateRecord(ctx context.Context, h int64, data []interface{}, t table.Table,
|
||||
updateColumns map[int]*expression.Assignment, m map[interface{}]interface{},
|
||||
offset int, onDuplicateUpdate bool) error {
|
||||
if err := t.LockRow(ctx, h, true); err != nil {
|
||||
if err := t.LockRow(ctx, h); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
|
||||
@ -156,7 +156,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
|
||||
valid: true,
|
||||
store: s,
|
||||
version: kv.MinVersion,
|
||||
snapshotVals: make(map[string][]byte),
|
||||
snapshotVals: make(map[string]struct{}),
|
||||
}
|
||||
log.Debugf("Begin txn:%d", txn.tid)
|
||||
txn.UnionStore, err = kv.NewUnionStore(&dbSnapshot{
|
||||
@ -212,7 +212,7 @@ func (s *dbStore) newBatch() engine.Batch {
|
||||
}
|
||||
|
||||
// Both lock and unlock are used for simulating scenario of percolator papers.
|
||||
func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte) error {
|
||||
func (s *dbStore) tryConditionLockKey(tid uint64, key string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -246,7 +246,7 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string, snapshotVal []byte
|
||||
|
||||
// If there's newer version of this key, returns error.
|
||||
if ver > tid {
|
||||
log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q, snapshotVal:%q", tid, key, currValue, snapshotVal)
|
||||
log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q", tid, key, currValue)
|
||||
return errors.Trace(kv.ErrConditionNotMatch)
|
||||
}
|
||||
|
||||
|
||||
@ -38,26 +38,19 @@ type dbTxn struct {
|
||||
store *dbStore // for commit
|
||||
tid uint64
|
||||
valid bool
|
||||
version kv.Version // commit version
|
||||
snapshotVals map[string][]byte // origin version in snapshot
|
||||
version kv.Version // commit version
|
||||
snapshotVals map[string]struct{} // origin version in snapshot
|
||||
}
|
||||
|
||||
func (txn *dbTxn) markOrigin(k []byte) error {
|
||||
func (txn *dbTxn) markOrigin(k []byte) {
|
||||
keystr := string(k)
|
||||
|
||||
// Already exist, do nothing.
|
||||
if _, ok := txn.snapshotVals[keystr]; ok {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
val, err := txn.Snapshot.Get(k)
|
||||
if err != nil && !kv.IsErrNotFound(err) {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// log.Debugf("markOrigin, key:%q, value:%q", keystr, val)
|
||||
txn.snapshotVals[keystr] = val
|
||||
return nil
|
||||
txn.snapshotVals[keystr] = struct{}{}
|
||||
}
|
||||
|
||||
// Implement transaction interface
|
||||
@ -66,9 +59,7 @@ func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) {
|
||||
log.Debugf("Inc %q, step %d txn:%d", k, step, txn.tid)
|
||||
k = kv.EncodeKey(k)
|
||||
|
||||
if err := txn.markOrigin(k); err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
txn.markOrigin(k)
|
||||
val, err := txn.UnionStore.Get(k)
|
||||
if kv.IsErrNotFound(err) {
|
||||
err = txn.UnionStore.Set(k, []byte(strconv.FormatInt(step, 10)))
|
||||
@ -143,6 +134,7 @@ func (txn *dbTxn) Set(k kv.Key, data []byte) error {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn.markOrigin(k)
|
||||
txn.store.compactor.OnSet(k)
|
||||
return nil
|
||||
}
|
||||
@ -169,6 +161,7 @@ func (txn *dbTxn) Delete(k kv.Key) error {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn.markOrigin(k)
|
||||
txn.store.compactor.OnDelete(k)
|
||||
return nil
|
||||
}
|
||||
@ -193,8 +186,8 @@ func (txn *dbTxn) doCommit() error {
|
||||
}
|
||||
}()
|
||||
// Check locked keys
|
||||
for k, v := range txn.snapshotVals {
|
||||
err := txn.store.tryConditionLockKey(txn.tid, k, v)
|
||||
for k := range txn.snapshotVals {
|
||||
err := txn.store.tryConditionLockKey(txn.tid, k)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -266,9 +259,7 @@ func (txn *dbTxn) Rollback() error {
|
||||
func (txn *dbTxn) LockKeys(keys ...kv.Key) error {
|
||||
for _, key := range keys {
|
||||
key = kv.EncodeKey(key)
|
||||
if err := txn.markOrigin(key); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn.markOrigin(key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -98,10 +98,6 @@ func (t *TxStructure) updateHash(key []byte, field []byte, fn func(oldValue []by
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = t.txn.LockKeys(metaKey); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
dataKey := t.encodeHashDataKey(key, field)
|
||||
var oldValue []byte
|
||||
oldValue, err = t.loadHashValue(dataKey)
|
||||
@ -144,10 +140,6 @@ func (t *TxStructure) HDel(key []byte, fields ...[]byte) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = t.txn.LockKeys(metaKey); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
var value []byte
|
||||
for _, field := range fields {
|
||||
dataKey := t.encodeHashDataKey(key, field)
|
||||
@ -209,10 +201,6 @@ func (t *TxStructure) HClear(key []byte) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = t.txn.LockKeys(metaKey); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
err = t.iterateHash(key, func(field []byte, value []byte) error {
|
||||
k := t.encodeHashDataKey(key, field)
|
||||
return errors.Trace(t.txn.Delete(k))
|
||||
|
||||
@ -58,10 +58,6 @@ func (t *TxStructure) listPush(key []byte, left bool, values ...[]byte) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = t.txn.LockKeys(metaKey); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
index := int64(0)
|
||||
for _, v := range values {
|
||||
if left {
|
||||
@ -98,10 +94,6 @@ func (t *TxStructure) listPop(key []byte, left bool) ([]byte, error) {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = t.txn.LockKeys(metaKey); err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
index := int64(0)
|
||||
if left {
|
||||
index = meta.LIndex
|
||||
@ -179,10 +171,6 @@ func (t *TxStructure) LClear(key []byte) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = t.txn.LockKeys(metaKey); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
for index := meta.LIndex; index < meta.RIndex; index++ {
|
||||
dataKey := t.encodeListDataKey(key, index)
|
||||
if err = t.txn.Delete(dataKey); err != nil {
|
||||
|
||||
@ -24,9 +24,6 @@ import (
|
||||
// Set sets the string value of the key.
|
||||
func (t *TxStructure) Set(key []byte, value []byte) error {
|
||||
ek := t.encodeStringDataKey(key)
|
||||
if err := t.txn.LockKeys(ek); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
return t.txn.Set(ek, value)
|
||||
}
|
||||
@ -67,10 +64,6 @@ func (t *TxStructure) Inc(key []byte, step int64) (int64, error) {
|
||||
// Clear removes the string value of the key.
|
||||
func (t *TxStructure) Clear(key []byte) error {
|
||||
ek := t.encodeStringDataKey(key)
|
||||
if err := t.txn.LockKeys(ek); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
err := t.txn.Delete(ek)
|
||||
if errors2.ErrorEqual(err, kv.ErrNotExist) {
|
||||
err = nil
|
||||
|
||||
@ -106,7 +106,7 @@ type Table interface {
|
||||
|
||||
// LockRow locks a row.
|
||||
// If update is true, set row lock key to current txn.
|
||||
LockRow(ctx context.Context, h int64, update bool) error
|
||||
LockRow(ctx context.Context, h int64) error
|
||||
}
|
||||
|
||||
// TableFromMeta builds a table.Table from *model.TableInfo.
|
||||
|
||||
@ -362,7 +362,7 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64,
|
||||
}
|
||||
}
|
||||
|
||||
if err := t.LockRow(ctx, recordID, true); err != nil {
|
||||
if err := t.LockRow(ctx, recordID); err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -432,20 +432,13 @@ func (t *Table) Row(ctx context.Context, h int64) ([]interface{}, error) {
|
||||
}
|
||||
|
||||
// LockRow implements table.Table LockRow interface.
|
||||
func (t *Table) LockRow(ctx context.Context, h int64, update bool) error {
|
||||
func (t *Table) LockRow(ctx context.Context, h int64) error {
|
||||
txn, err := ctx.GetTxn(false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// Get row lock key
|
||||
lockKey := t.RecordKey(h, nil)
|
||||
err = txn.LockKeys([]byte(lockKey))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if !update {
|
||||
return nil
|
||||
}
|
||||
// set row lock key to current txn
|
||||
err = txn.Set([]byte(lockKey), []byte(txn.String()))
|
||||
return errors.Trace(err)
|
||||
@ -453,7 +446,7 @@ func (t *Table) LockRow(ctx context.Context, h int64, update bool) error {
|
||||
|
||||
// RemoveRow implements table.Table RemoveRow interface.
|
||||
func (t *Table) RemoveRow(ctx context.Context, h int64) error {
|
||||
if err := t.LockRow(ctx, h, false); err != nil {
|
||||
if err := t.LockRow(ctx, h); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn, err := ctx.GetTxn(false)
|
||||
|
||||
Reference in New Issue
Block a user