From b8da56b5081632f6c2ef49ee6eecedcc9399aa1b Mon Sep 17 00:00:00 2001 From: disksing Date: Sat, 7 Nov 2015 11:09:40 +0800 Subject: [PATCH] kv: rename and minor changes --- kv/{mem_cache.go => cache_snapshot.go} | 30 +++++++++++------------ kv/kv.go | 4 +-- kv/union_store.go | 34 +++++++++++++------------- store/hbase/snapshot.go | 12 ++++----- store/hbase/txn.go | 8 +++--- store/localstore/snapshot.go | 7 +++--- store/localstore/txn.go | 10 ++++---- 7 files changed, 53 insertions(+), 52 deletions(-) rename kv/{mem_cache.go => cache_snapshot.go} (68%) diff --git a/kv/mem_cache.go b/kv/cache_snapshot.go similarity index 68% rename from kv/mem_cache.go rename to kv/cache_snapshot.go index 4248df2d09..d140608e7c 100644 --- a/kv/mem_cache.go +++ b/kv/cache_snapshot.go @@ -15,23 +15,23 @@ package kv import "github.com/juju/errors" -// MemCache wraps a snapshot and supports cache for read. -type MemCache struct { +// CacheSnapshot wraps a snapshot and supports cache for read. +type CacheSnapshot struct { // Cache is an in-memory Store for caching KVs. Cache MemBuffer // Snapshot is a snapshot of a KV store. Snapshot Snapshot } -// Get gets the value for key k from Memcache. -func (c *MemCache) Get(k Key) ([]byte, error) { +// Get gets the value for key k from CacheSnapshot. +func (c *CacheSnapshot) Get(k Key) ([]byte, error) { v, err := c.Cache.Get(k) if IsErrNotFound(err) { v, err = c.Snapshot.Get(k) if err == nil { c.Cache.Set([]byte(k), v) } - return v, err + return v, errors.Trace(err) } if err != nil { return nil, errors.Trace(err) @@ -39,8 +39,8 @@ func (c *MemCache) Get(k Key) ([]byte, error) { return v, nil } -// Fetch fetches a batch of values from snapshot and saves in cache for later use. -func (c *MemCache) Fetch(keys []Key) error { +// Fetch fetches a batch of values from KV store and saves in cache for later use. +func (c *CacheSnapshot) Fetch(keys []Key) error { var missKeys []Key for _, k := range keys { if _, err := c.Cache.Get(k); IsErrNotFound(err) { @@ -50,7 +50,7 @@ func (c *MemCache) Fetch(keys []Key) error { values, err := c.Snapshot.BatchGet(missKeys) if err != nil { - return err + return errors.Trace(err) } for k, v := range values { @@ -59,11 +59,11 @@ func (c *MemCache) Fetch(keys []Key) error { return nil } -// Scan scans a batch of values from snapshot and saves in cache for later use. -func (c *MemCache) Scan(start, end Key, maxSize int) error { - values, err := c.Snapshot.Scan(start, end, maxSize) +// Scan scans a batch of values from KV store and saves in cache for later use. +func (c *CacheSnapshot) Scan(start, end Key, limit int) error { + values, err := c.Snapshot.Scan(start, end, limit) if err != nil { - return err + return errors.Trace(err) } for k, v := range values { c.Cache.Set([]byte(k), v) @@ -71,14 +71,14 @@ func (c *MemCache) Scan(start, end Key, maxSize int) error { return nil } -// NewIterator creates an iterator of Memcache. -func (c *MemCache) NewIterator(param interface{}) Iterator { +// NewIterator creates an iterator of CacheSnapshot. +func (c *CacheSnapshot) NewIterator(param interface{}) Iterator { cacheIt := c.Cache.NewIterator(param) snapshotIt := c.Snapshot.NewIterator(param) return newUnionIter(cacheIt, snapshotIt) } // Release reset membuffer and release snapshot. -func (c *MemCache) Release() { +func (c *CacheSnapshot) Release() { c.Snapshot.Release() } diff --git a/kv/kv.go b/kv/kv.go index 90c760d1f3..d4a6e520d8 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -104,7 +104,7 @@ type Transaction interface { // Fetch fetches a batch of values and saves in cache for later use. Fetch(keys []Key) error // Scan scans a batch of values from snapshot and saves in cache for later use. - Scan(start, end Key, maxSize int) error + Scan(start, end Key, limit int) error // Set sets the value for key k as v into KV store. Set(k Key, v []byte) error // Seek searches for the entry with key k in KV store. @@ -147,7 +147,7 @@ type Snapshot interface { // BatchGet gets a batch of values from snapshot. BatchGet(keys []Key) (map[string][]byte, error) // Scan gets values in specific range from snapshot. - Scan(start, end Key, maxSize int) (map[string][]byte, error) + Scan(start, end Key, limit int) (map[string][]byte, error) // NewIterator gets a new iterator on the snapshot. NewIterator(param interface{}) Iterator // Release releases the snapshot to store. diff --git a/kv/union_store.go b/kv/union_store.go index c00d18d433..7acf73eb40 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -37,8 +37,8 @@ func IsErrNotFound(err error) bool { // UnionStore is an in-memory Store which contains a buffer for write and a // cache for read. type UnionStore struct { - Buffer MemBuffer // updates are buffered in memory - Cache MemCache // for read + WBuffer MemBuffer // updates are buffered in memory + Snapshot CacheSnapshot // for read } // NewUnionStore builds a new UnionStore. @@ -46,8 +46,8 @@ func NewUnionStore(snapshot Snapshot) UnionStore { buffer := p.Get().(MemBuffer) cache := p.Get().(MemBuffer) return UnionStore{ - Buffer: buffer, - Cache: MemCache{ + WBuffer: buffer, + Snapshot: CacheSnapshot{ Cache: cache, Snapshot: snapshot, }, @@ -57,10 +57,10 @@ func NewUnionStore(snapshot Snapshot) UnionStore { // Get implements the Store Get interface. func (us *UnionStore) Get(key []byte) (value []byte, err error) { // Get from update records frist - value, err = us.Buffer.Get(key) + value, err = us.WBuffer.Get(key) if IsErrNotFound(err) { // Try get from cache - return us.Cache.Get(key) + return us.Snapshot.Get(key) } if err != nil { return nil, errors.Trace(err) @@ -75,27 +75,27 @@ func (us *UnionStore) Get(key []byte) (value []byte, err error) { // Set implements the Store Set interface. func (us *UnionStore) Set(key []byte, value []byte) error { - return us.Buffer.Set(key, value) + return us.WBuffer.Set(key, value) } // Seek implements the Snapshot Seek interface. func (us *UnionStore) Seek(key []byte, txn Transaction) (Iterator, error) { - bufferIt := us.Buffer.NewIterator(key) - cacheIt := us.Cache.NewIterator(key) + bufferIt := us.WBuffer.NewIterator(key) + cacheIt := us.Snapshot.NewIterator(key) return newUnionIter(bufferIt, cacheIt), nil } // Delete implements the Store Delete interface. func (us *UnionStore) Delete(k []byte) error { // Mark as deleted - val, err := us.Buffer.Get(k) + val, err := us.WBuffer.Get(k) if err != nil { if !IsErrNotFound(err) { // something wrong return errors.Trace(err) } // missed in buffer - val, err = us.Cache.Get(k) + val, err = us.Snapshot.Get(k) if err != nil { if IsErrNotFound(err) { return errors.Trace(ErrNotExist) @@ -107,15 +107,15 @@ func (us *UnionStore) Delete(k []byte) error { return errors.Trace(ErrNotExist) } - return us.Buffer.Set(k, nil) + return us.WBuffer.Set(k, nil) } // Close implements the Store Close interface. func (us *UnionStore) Close() error { - us.Cache.Snapshot.Release() - us.Cache.Cache.Release() - p.Put(us.Cache.Cache) - us.Buffer.Release() - p.Put(us.Buffer) + us.Snapshot.Snapshot.Release() + us.Snapshot.Cache.Release() + p.Put(us.Snapshot.Cache) + us.WBuffer.Release() + p.Put(us.WBuffer) return nil } diff --git a/store/hbase/snapshot.go b/store/hbase/snapshot.go index 565a59b9cd..ac71dabfc4 100644 --- a/store/hbase/snapshot.go +++ b/store/hbase/snapshot.go @@ -74,11 +74,11 @@ func (s *hbaseSnapshot) Get(k kv.Key) ([]byte, error) { // BatchGet implements kv.Snapshot.BatchGet(). func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { - var gets []*hbase.Get - for _, key := range keys { + gets := make([]*hbase.Get, len(keys)) + for i, key := range keys { g := hbase.NewGet(key) g.AddColumn([]byte(ColFamily), []byte(Qualifier)) - gets = append(gets, g) + gets[i] = g } rows, err := s.txn.BatchGet(s.storeName, gets) if err != nil { @@ -96,12 +96,12 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { } // Scan implements kv.Snapshot.Scan(). -func (s *hbaseSnapshot) Scan(start, end kv.Key, maxSize int) (map[string][]byte, error) { - scanner := s.txn.GetScanner([]byte(s.storeName), start, end, maxSize) +func (s *hbaseSnapshot) Scan(start, end kv.Key, limit int) (map[string][]byte, error) { + scanner := s.txn.GetScanner([]byte(s.storeName), start, end, limit) defer scanner.Close() m := make(map[string][]byte) - for i := 0; i < maxSize; i++ { + for i := 0; i < limit; i++ { r := scanner.Next() if r != nil && len(r.Columns) > 0 { k := r.Row diff --git a/store/hbase/txn.go b/store/hbase/txn.go index 1ee5c7f4bc..5b74b3ccd6 100644 --- a/store/hbase/txn.go +++ b/store/hbase/txn.go @@ -108,11 +108,11 @@ func (txn *hbaseTxn) Fetch(keys []kv.Key) error { for i, k := range keys { encodedKeys[i] = kv.EncodeKey(k) } - return txn.UnionStore.Cache.Fetch(keys) + return txn.UnionStore.Snapshot.Fetch(keys) } -func (txn *hbaseTxn) Scan(start, end kv.Key, maxSize int) error { - return txn.UnionStore.Cache.Scan(kv.EncodeKey(start), kv.EncodeKey(end), maxSize) +func (txn *hbaseTxn) Scan(start, end kv.Key, limit int) error { + return txn.UnionStore.Snapshot.Scan(kv.EncodeKey(start), kv.EncodeKey(end), limit) } // GetInt64 get int64 which created by Inc method. @@ -164,7 +164,7 @@ func (txn *hbaseTxn) Delete(k kv.Key) error { } func (txn *hbaseTxn) each(f func(kv.Iterator) error) error { - iter := txn.UnionStore.Buffer.NewIterator(nil) + iter := txn.UnionStore.WBuffer.NewIterator(nil) defer iter.Close() for ; iter.Valid(); iter.Next() { if err := f(iter); err != nil { diff --git a/store/localstore/snapshot.go b/store/localstore/snapshot.go index 43197eb051..5ac8bf9d38 100644 --- a/store/localstore/snapshot.go +++ b/store/localstore/snapshot.go @@ -120,14 +120,15 @@ func (s *dbSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { return m, nil } -func (s *dbSnapshot) Scan(start, end kv.Key, maxSize int) (map[string][]byte, error) { +func (s *dbSnapshot) Scan(start, end kv.Key, limit int) (map[string][]byte, error) { m := make(map[string][]byte) it := s.NewIterator(start) - for i := 0; i < maxSize; i++ { + endKey := string(end) + for i := 0; i < limit; i++ { if !it.Valid() { break } - if string(end) > it.Key() { + if endKey > it.Key() { break } m[string(it.Key())] = it.Value() diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 4119d98ffe..bb23713c82 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -126,11 +126,11 @@ func (txn *dbTxn) Fetch(keys []kv.Key) error { for i, k := range keys { encodedKeys[i] = kv.EncodeKey(k) } - return txn.UnionStore.Cache.Fetch(encodedKeys) + return txn.UnionStore.Snapshot.Fetch(encodedKeys) } -func (txn *dbTxn) Scan(start, end kv.Key, maxSize int) error { - return txn.UnionStore.Cache.Scan(kv.EncodeKey(start), kv.EncodeKey(end), maxSize) +func (txn *dbTxn) Scan(start, end kv.Key, limit int) error { + return txn.UnionStore.Snapshot.Scan(kv.EncodeKey(start), kv.EncodeKey(end), limit) } func (txn *dbTxn) Set(k kv.Key, data []byte) error { @@ -179,7 +179,7 @@ func (txn *dbTxn) Delete(k kv.Key) error { } func (txn *dbTxn) each(f func(kv.Iterator) error) error { - iter := txn.UnionStore.Buffer.NewIterator(nil) + iter := txn.UnionStore.WBuffer.NewIterator(nil) defer iter.Close() for ; iter.Valid(); iter.Next() { if err := f(iter); err != nil { @@ -198,7 +198,7 @@ func (txn *dbTxn) doCommit() error { } }() - txn.Cache.Release() + txn.Snapshot.Release() // Check locked keys for k := range txn.snapshotVals {