localstore: delay compactor notification, simplify dbTxn
This commit is contained in:
@ -64,10 +64,6 @@ func (gc *localstoreCompactor) OnSet(k kv.Key) {
|
||||
gc.recentKeys[string(k)] = struct{}{}
|
||||
}
|
||||
|
||||
func (gc *localstoreCompactor) OnGet(k kv.Key) {
|
||||
// Do nothing now.
|
||||
}
|
||||
|
||||
func (gc *localstoreCompactor) OnDelete(k kv.Key) {
|
||||
gc.mu.Lock()
|
||||
defer gc.mu.Unlock()
|
||||
|
||||
@ -233,8 +233,10 @@ func (s *dbStore) doCommit(cmd *command) {
|
||||
mvccKey := MvccEncodeVersionKey(kv.Key(k), curVer)
|
||||
if len(value) == 0 { // Deleted marker
|
||||
b.Put(mvccKey, nil)
|
||||
s.compactor.OnDelete(k)
|
||||
} else {
|
||||
b.Put(mvccKey, value)
|
||||
s.compactor.OnSet(k)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -39,42 +39,22 @@ type dbTxn struct {
|
||||
|
||||
func (txn *dbTxn) Get(k kv.Key) ([]byte, error) {
|
||||
log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid)
|
||||
val, err := txn.UnionStore.Get(k)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
txn.store.compactor.OnGet(k)
|
||||
return val, nil
|
||||
return txn.UnionStore.Get(k)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) Set(k kv.Key, data []byte) error {
|
||||
log.Debugf("[kv] set key:%q, txn:%d", k, txn.tid)
|
||||
err := txn.UnionStore.Set(k, data)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn.store.compactor.OnSet(k)
|
||||
return nil
|
||||
return txn.UnionStore.Set(k, data)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) {
|
||||
log.Debugf("[kv] Inc %q, step %d txn:%d", k, step, txn.tid)
|
||||
val, err := txn.UnionStore.Inc(k, step)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
txn.store.compactor.OnSet(k)
|
||||
return val, nil
|
||||
return txn.UnionStore.Inc(k, step)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) GetInt64(k kv.Key) (int64, error) {
|
||||
log.Debugf("[kv] GetInt64 %q, txn:%d", k, txn.tid)
|
||||
val, err := txn.UnionStore.GetInt64(k)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
txn.store.compactor.OnGet(k)
|
||||
return val, nil
|
||||
return txn.UnionStore.GetInt64(k)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) String() string {
|
||||
@ -83,25 +63,12 @@ func (txn *dbTxn) String() string {
|
||||
|
||||
func (txn *dbTxn) Seek(k kv.Key) (kv.Iterator, error) {
|
||||
log.Debugf("[kv] seek key:%q, txn:%d", k, txn.tid)
|
||||
iter, err := txn.UnionStore.Seek(k)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if !iter.Valid() {
|
||||
return &kv.UnionIter{}, nil
|
||||
}
|
||||
|
||||
return iter, nil
|
||||
return txn.UnionStore.Seek(k)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) Delete(k kv.Key) error {
|
||||
log.Debugf("[kv] delete key:%q, txn:%d", k, txn.tid)
|
||||
err := txn.UnionStore.Delete(k)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn.store.compactor.OnDelete(k)
|
||||
return nil
|
||||
return txn.UnionStore.Delete(k)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) doCommit() error {
|
||||
|
||||
Reference in New Issue
Block a user