From fec9aa006f689b2c3d9442d60dc640aa66c97e8a Mon Sep 17 00:00:00 2001 From: dongxu Date: Tue, 13 Oct 2015 20:58:01 +0800 Subject: [PATCH] mvcc: compact implementation backup --- store/localstore/kv.go | 2 + store/localstore/local_gc.go | 76 ++++++++++++++++++++++++++++++++++++ store/localstore/txn.go | 11 ++++-- 3 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 store/localstore/local_gc.go diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 58934543fc..9871f7fda5 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -38,6 +38,7 @@ type dbStore struct { keysLocked map[string]uint64 uuid string path string + gc *localstoreGC } type storeCache struct { @@ -85,6 +86,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) { uuid: uuid.NewV4().String(), path: schema, db: db, + gc: newLocalGC(), } mc.cache[schema] = s diff --git a/store/localstore/local_gc.go b/store/localstore/local_gc.go new file mode 100644 index 0000000000..eafa18b97f --- /dev/null +++ b/store/localstore/local_gc.go @@ -0,0 +1,76 @@ +package localstore + +import ( + "container/list" + "sync" + "time" + + "github.com/ngaut/log" + "github.com/pingcap/tidb/kv" +) + +var _ kv.GC = (*localstoreGC)(nil) + +type localstoreGC struct { + mu sync.Mutex + recentKeys map[string]struct{} + stopChan chan struct{} + ticker *time.Ticker +} + +func (gc *localstoreGC) OnSet(k kv.Key) { + gc.mu.Lock() + defer gc.mu.Unlock() + gc.recentKeys[string(k)] = struct{}{} +} + +func (gc *localstoreGC) OnGet(k kv.Key) { + gc.mu.Lock() + defer gc.mu.Unlock() + gc.recentKeys[string(k)] = struct{}{} +} + +func (gc *localstoreGC) Do(k kv.Key) { + // TODO + log.Debugf("gc key: %q", k) +} + +func (gc *localstoreGC) Start() { + go func() { + for { + select { + case <-gc.stopChan: + break + case <-gc.ticker.C: + l := list.New() + gc.mu.Lock() + for k, _ := range gc.recentKeys { + // get recent keys list + l.PushBack(k) + } + // clean recentKeys + gc.recentKeys = map[string]struct{}{} + gc.mu.Unlock() + // Do GC + for e := l.Front(); e != nil; e = e.Next() { + gc.Do([]byte(e.Value.(string))) + } + } + } + }() +} + +func (gc *localstoreGC) Stop() { + gc.stopChan <- struct{}{} + gc.ticker.Stop() + close(gc.stopChan) +} + +func newLocalGC() *localstoreGC { + return &localstoreGC{ + recentKeys: map[string]struct{}{}, + stopChan: make(chan struct{}), + // TODO hard code + ticker: time.NewTicker(time.Second * 1), + } +} diff --git a/store/localstore/txn.go b/store/localstore/txn.go index ae315167c8..ddceb5b7be 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -95,7 +95,6 @@ func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) { if err != nil { return 0, errors.Trace(err) } - return intVal, nil } @@ -108,7 +107,6 @@ func (txn *dbTxn) GetInt64(k kv.Key) (int64, error) { if err != nil { return 0, errors.Trace(err) } - intVal, err := strconv.ParseInt(string(val), 10, 0) return intVal, errors.Trace(err) } @@ -130,7 +128,7 @@ func (txn *dbTxn) Get(k kv.Key) ([]byte, error) { if len(val) == 0 { return nil, errors.Trace(kv.ErrNotExist) } - + txn.store.gc.OnGet(k) return val, nil } @@ -143,7 +141,12 @@ func (txn *dbTxn) Set(k kv.Key, data []byte) error { log.Debugf("set key:%q, txn:%d", k, txn.tID) k = kv.EncodeKey(k) - return txn.UnionStore.Set(k, data) + err := txn.UnionStore.Set(k, data) + if err != nil { + return errors.Trace(err) + } + txn.store.gc.OnSet(k) + return nil } func (txn *dbTxn) Seek(k kv.Key, fnKeyCmp func(kv.Key) bool) (kv.Iterator, error) {