mvcc: compact implementation
backup
This commit is contained in:
@ -38,6 +38,7 @@ type dbStore struct {
|
||||
keysLocked map[string]uint64
|
||||
uuid string
|
||||
path string
|
||||
gc *localstoreGC
|
||||
}
|
||||
|
||||
type storeCache struct {
|
||||
@ -85,6 +86,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) {
|
||||
uuid: uuid.NewV4().String(),
|
||||
path: schema,
|
||||
db: db,
|
||||
gc: newLocalGC(),
|
||||
}
|
||||
|
||||
mc.cache[schema] = s
|
||||
|
||||
76
store/localstore/local_gc.go
Normal file
76
store/localstore/local_gc.go
Normal file
@ -0,0 +1,76 @@
|
||||
package localstore
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
)
|
||||
|
||||
var _ kv.GC = (*localstoreGC)(nil)
|
||||
|
||||
type localstoreGC struct {
|
||||
mu sync.Mutex
|
||||
recentKeys map[string]struct{}
|
||||
stopChan chan struct{}
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
func (gc *localstoreGC) OnSet(k kv.Key) {
|
||||
gc.mu.Lock()
|
||||
defer gc.mu.Unlock()
|
||||
gc.recentKeys[string(k)] = struct{}{}
|
||||
}
|
||||
|
||||
func (gc *localstoreGC) OnGet(k kv.Key) {
|
||||
gc.mu.Lock()
|
||||
defer gc.mu.Unlock()
|
||||
gc.recentKeys[string(k)] = struct{}{}
|
||||
}
|
||||
|
||||
func (gc *localstoreGC) Do(k kv.Key) {
|
||||
// TODO
|
||||
log.Debugf("gc key: %q", k)
|
||||
}
|
||||
|
||||
func (gc *localstoreGC) Start() {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-gc.stopChan:
|
||||
break
|
||||
case <-gc.ticker.C:
|
||||
l := list.New()
|
||||
gc.mu.Lock()
|
||||
for k, _ := range gc.recentKeys {
|
||||
// get recent keys list
|
||||
l.PushBack(k)
|
||||
}
|
||||
// clean recentKeys
|
||||
gc.recentKeys = map[string]struct{}{}
|
||||
gc.mu.Unlock()
|
||||
// Do GC
|
||||
for e := l.Front(); e != nil; e = e.Next() {
|
||||
gc.Do([]byte(e.Value.(string)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (gc *localstoreGC) Stop() {
|
||||
gc.stopChan <- struct{}{}
|
||||
gc.ticker.Stop()
|
||||
close(gc.stopChan)
|
||||
}
|
||||
|
||||
func newLocalGC() *localstoreGC {
|
||||
return &localstoreGC{
|
||||
recentKeys: map[string]struct{}{},
|
||||
stopChan: make(chan struct{}),
|
||||
// TODO hard code
|
||||
ticker: time.NewTicker(time.Second * 1),
|
||||
}
|
||||
}
|
||||
@ -95,7 +95,6 @@ func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) {
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
return intVal, nil
|
||||
}
|
||||
|
||||
@ -108,7 +107,6 @@ func (txn *dbTxn) GetInt64(k kv.Key) (int64, error) {
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
intVal, err := strconv.ParseInt(string(val), 10, 0)
|
||||
return intVal, errors.Trace(err)
|
||||
}
|
||||
@ -130,7 +128,7 @@ func (txn *dbTxn) Get(k kv.Key) ([]byte, error) {
|
||||
if len(val) == 0 {
|
||||
return nil, errors.Trace(kv.ErrNotExist)
|
||||
}
|
||||
|
||||
txn.store.gc.OnGet(k)
|
||||
return val, nil
|
||||
}
|
||||
|
||||
@ -143,7 +141,12 @@ func (txn *dbTxn) Set(k kv.Key, data []byte) error {
|
||||
|
||||
log.Debugf("set key:%q, txn:%d", k, txn.tID)
|
||||
k = kv.EncodeKey(k)
|
||||
return txn.UnionStore.Set(k, data)
|
||||
err := txn.UnionStore.Set(k, data)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn.store.gc.OnSet(k)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *dbTxn) Seek(k kv.Key, fnKeyCmp func(kv.Key) bool) (kv.Iterator, error) {
|
||||
|
||||
Reference in New Issue
Block a user