kv: rename and minor changes
This commit is contained in:
@ -15,23 +15,23 @@ package kv
|
||||
|
||||
import "github.com/juju/errors"
|
||||
|
||||
// MemCache wraps a snapshot and supports cache for read.
|
||||
type MemCache struct {
|
||||
// CacheSnapshot wraps a snapshot and supports cache for read.
|
||||
type CacheSnapshot struct {
|
||||
// Cache is an in-memory Store for caching KVs.
|
||||
Cache MemBuffer
|
||||
// Snapshot is a snapshot of a KV store.
|
||||
Snapshot Snapshot
|
||||
}
|
||||
|
||||
// Get gets the value for key k from Memcache.
|
||||
func (c *MemCache) Get(k Key) ([]byte, error) {
|
||||
// Get gets the value for key k from CacheSnapshot.
|
||||
func (c *CacheSnapshot) Get(k Key) ([]byte, error) {
|
||||
v, err := c.Cache.Get(k)
|
||||
if IsErrNotFound(err) {
|
||||
v, err = c.Snapshot.Get(k)
|
||||
if err == nil {
|
||||
c.Cache.Set([]byte(k), v)
|
||||
}
|
||||
return v, err
|
||||
return v, errors.Trace(err)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
@ -39,8 +39,8 @@ func (c *MemCache) Get(k Key) ([]byte, error) {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// Fetch fetches a batch of values from snapshot and saves in cache for later use.
|
||||
func (c *MemCache) Fetch(keys []Key) error {
|
||||
// Fetch fetches a batch of values from KV store and saves in cache for later use.
|
||||
func (c *CacheSnapshot) Fetch(keys []Key) error {
|
||||
var missKeys []Key
|
||||
for _, k := range keys {
|
||||
if _, err := c.Cache.Get(k); IsErrNotFound(err) {
|
||||
@ -50,7 +50,7 @@ func (c *MemCache) Fetch(keys []Key) error {
|
||||
|
||||
values, err := c.Snapshot.BatchGet(missKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
for k, v := range values {
|
||||
@ -59,11 +59,11 @@ func (c *MemCache) Fetch(keys []Key) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scan scans a batch of values from snapshot and saves in cache for later use.
|
||||
func (c *MemCache) Scan(start, end Key, maxSize int) error {
|
||||
values, err := c.Snapshot.Scan(start, end, maxSize)
|
||||
// Scan scans a batch of values from KV store and saves in cache for later use.
|
||||
func (c *CacheSnapshot) Scan(start, end Key, limit int) error {
|
||||
values, err := c.Snapshot.Scan(start, end, limit)
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Trace(err)
|
||||
}
|
||||
for k, v := range values {
|
||||
c.Cache.Set([]byte(k), v)
|
||||
@ -71,14 +71,14 @@ func (c *MemCache) Scan(start, end Key, maxSize int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewIterator creates an iterator of Memcache.
|
||||
func (c *MemCache) NewIterator(param interface{}) Iterator {
|
||||
// NewIterator creates an iterator of CacheSnapshot.
|
||||
func (c *CacheSnapshot) NewIterator(param interface{}) Iterator {
|
||||
cacheIt := c.Cache.NewIterator(param)
|
||||
snapshotIt := c.Snapshot.NewIterator(param)
|
||||
return newUnionIter(cacheIt, snapshotIt)
|
||||
}
|
||||
|
||||
// Release reset membuffer and release snapshot.
|
||||
func (c *MemCache) Release() {
|
||||
func (c *CacheSnapshot) Release() {
|
||||
c.Snapshot.Release()
|
||||
}
|
||||
4
kv/kv.go
4
kv/kv.go
@ -104,7 +104,7 @@ type Transaction interface {
|
||||
// Fetch fetches a batch of values and saves in cache for later use.
|
||||
Fetch(keys []Key) error
|
||||
// Scan scans a batch of values from snapshot and saves in cache for later use.
|
||||
Scan(start, end Key, maxSize int) error
|
||||
Scan(start, end Key, limit int) error
|
||||
// Set sets the value for key k as v into KV store.
|
||||
Set(k Key, v []byte) error
|
||||
// Seek searches for the entry with key k in KV store.
|
||||
@ -147,7 +147,7 @@ type Snapshot interface {
|
||||
// BatchGet gets a batch of values from snapshot.
|
||||
BatchGet(keys []Key) (map[string][]byte, error)
|
||||
// Scan gets values in specific range from snapshot.
|
||||
Scan(start, end Key, maxSize int) (map[string][]byte, error)
|
||||
Scan(start, end Key, limit int) (map[string][]byte, error)
|
||||
// NewIterator gets a new iterator on the snapshot.
|
||||
NewIterator(param interface{}) Iterator
|
||||
// Release releases the snapshot to store.
|
||||
|
||||
@ -37,8 +37,8 @@ func IsErrNotFound(err error) bool {
|
||||
// UnionStore is an in-memory Store which contains a buffer for write and a
|
||||
// cache for read.
|
||||
type UnionStore struct {
|
||||
Buffer MemBuffer // updates are buffered in memory
|
||||
Cache MemCache // for read
|
||||
WBuffer MemBuffer // updates are buffered in memory
|
||||
Snapshot CacheSnapshot // for read
|
||||
}
|
||||
|
||||
// NewUnionStore builds a new UnionStore.
|
||||
@ -46,8 +46,8 @@ func NewUnionStore(snapshot Snapshot) UnionStore {
|
||||
buffer := p.Get().(MemBuffer)
|
||||
cache := p.Get().(MemBuffer)
|
||||
return UnionStore{
|
||||
Buffer: buffer,
|
||||
Cache: MemCache{
|
||||
WBuffer: buffer,
|
||||
Snapshot: CacheSnapshot{
|
||||
Cache: cache,
|
||||
Snapshot: snapshot,
|
||||
},
|
||||
@ -57,10 +57,10 @@ func NewUnionStore(snapshot Snapshot) UnionStore {
|
||||
// Get implements the Store Get interface.
|
||||
func (us *UnionStore) Get(key []byte) (value []byte, err error) {
|
||||
// Get from update records frist
|
||||
value, err = us.Buffer.Get(key)
|
||||
value, err = us.WBuffer.Get(key)
|
||||
if IsErrNotFound(err) {
|
||||
// Try get from cache
|
||||
return us.Cache.Get(key)
|
||||
return us.Snapshot.Get(key)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
@ -75,27 +75,27 @@ func (us *UnionStore) Get(key []byte) (value []byte, err error) {
|
||||
|
||||
// Set implements the Store Set interface.
|
||||
func (us *UnionStore) Set(key []byte, value []byte) error {
|
||||
return us.Buffer.Set(key, value)
|
||||
return us.WBuffer.Set(key, value)
|
||||
}
|
||||
|
||||
// Seek implements the Snapshot Seek interface.
|
||||
func (us *UnionStore) Seek(key []byte, txn Transaction) (Iterator, error) {
|
||||
bufferIt := us.Buffer.NewIterator(key)
|
||||
cacheIt := us.Cache.NewIterator(key)
|
||||
bufferIt := us.WBuffer.NewIterator(key)
|
||||
cacheIt := us.Snapshot.NewIterator(key)
|
||||
return newUnionIter(bufferIt, cacheIt), nil
|
||||
}
|
||||
|
||||
// Delete implements the Store Delete interface.
|
||||
func (us *UnionStore) Delete(k []byte) error {
|
||||
// Mark as deleted
|
||||
val, err := us.Buffer.Get(k)
|
||||
val, err := us.WBuffer.Get(k)
|
||||
if err != nil {
|
||||
if !IsErrNotFound(err) { // something wrong
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// missed in buffer
|
||||
val, err = us.Cache.Get(k)
|
||||
val, err = us.Snapshot.Get(k)
|
||||
if err != nil {
|
||||
if IsErrNotFound(err) {
|
||||
return errors.Trace(ErrNotExist)
|
||||
@ -107,15 +107,15 @@ func (us *UnionStore) Delete(k []byte) error {
|
||||
return errors.Trace(ErrNotExist)
|
||||
}
|
||||
|
||||
return us.Buffer.Set(k, nil)
|
||||
return us.WBuffer.Set(k, nil)
|
||||
}
|
||||
|
||||
// Close implements the Store Close interface.
|
||||
func (us *UnionStore) Close() error {
|
||||
us.Cache.Snapshot.Release()
|
||||
us.Cache.Cache.Release()
|
||||
p.Put(us.Cache.Cache)
|
||||
us.Buffer.Release()
|
||||
p.Put(us.Buffer)
|
||||
us.Snapshot.Snapshot.Release()
|
||||
us.Snapshot.Cache.Release()
|
||||
p.Put(us.Snapshot.Cache)
|
||||
us.WBuffer.Release()
|
||||
p.Put(us.WBuffer)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -74,11 +74,11 @@ func (s *hbaseSnapshot) Get(k kv.Key) ([]byte, error) {
|
||||
|
||||
// BatchGet implements kv.Snapshot.BatchGet().
|
||||
func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
|
||||
var gets []*hbase.Get
|
||||
for _, key := range keys {
|
||||
gets := make([]*hbase.Get, len(keys))
|
||||
for i, key := range keys {
|
||||
g := hbase.NewGet(key)
|
||||
g.AddColumn([]byte(ColFamily), []byte(Qualifier))
|
||||
gets = append(gets, g)
|
||||
gets[i] = g
|
||||
}
|
||||
rows, err := s.txn.BatchGet(s.storeName, gets)
|
||||
if err != nil {
|
||||
@ -96,12 +96,12 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
|
||||
}
|
||||
|
||||
// Scan implements kv.Snapshot.Scan().
|
||||
func (s *hbaseSnapshot) Scan(start, end kv.Key, maxSize int) (map[string][]byte, error) {
|
||||
scanner := s.txn.GetScanner([]byte(s.storeName), start, end, maxSize)
|
||||
func (s *hbaseSnapshot) Scan(start, end kv.Key, limit int) (map[string][]byte, error) {
|
||||
scanner := s.txn.GetScanner([]byte(s.storeName), start, end, limit)
|
||||
defer scanner.Close()
|
||||
|
||||
m := make(map[string][]byte)
|
||||
for i := 0; i < maxSize; i++ {
|
||||
for i := 0; i < limit; i++ {
|
||||
r := scanner.Next()
|
||||
if r != nil && len(r.Columns) > 0 {
|
||||
k := r.Row
|
||||
|
||||
@ -108,11 +108,11 @@ func (txn *hbaseTxn) Fetch(keys []kv.Key) error {
|
||||
for i, k := range keys {
|
||||
encodedKeys[i] = kv.EncodeKey(k)
|
||||
}
|
||||
return txn.UnionStore.Cache.Fetch(keys)
|
||||
return txn.UnionStore.Snapshot.Fetch(keys)
|
||||
}
|
||||
|
||||
func (txn *hbaseTxn) Scan(start, end kv.Key, maxSize int) error {
|
||||
return txn.UnionStore.Cache.Scan(kv.EncodeKey(start), kv.EncodeKey(end), maxSize)
|
||||
func (txn *hbaseTxn) Scan(start, end kv.Key, limit int) error {
|
||||
return txn.UnionStore.Snapshot.Scan(kv.EncodeKey(start), kv.EncodeKey(end), limit)
|
||||
}
|
||||
|
||||
// GetInt64 get int64 which created by Inc method.
|
||||
@ -164,7 +164,7 @@ func (txn *hbaseTxn) Delete(k kv.Key) error {
|
||||
}
|
||||
|
||||
func (txn *hbaseTxn) each(f func(kv.Iterator) error) error {
|
||||
iter := txn.UnionStore.Buffer.NewIterator(nil)
|
||||
iter := txn.UnionStore.WBuffer.NewIterator(nil)
|
||||
defer iter.Close()
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
if err := f(iter); err != nil {
|
||||
|
||||
@ -120,14 +120,15 @@ func (s *dbSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) Scan(start, end kv.Key, maxSize int) (map[string][]byte, error) {
|
||||
func (s *dbSnapshot) Scan(start, end kv.Key, limit int) (map[string][]byte, error) {
|
||||
m := make(map[string][]byte)
|
||||
it := s.NewIterator(start)
|
||||
for i := 0; i < maxSize; i++ {
|
||||
endKey := string(end)
|
||||
for i := 0; i < limit; i++ {
|
||||
if !it.Valid() {
|
||||
break
|
||||
}
|
||||
if string(end) > it.Key() {
|
||||
if endKey > it.Key() {
|
||||
break
|
||||
}
|
||||
m[string(it.Key())] = it.Value()
|
||||
|
||||
@ -126,11 +126,11 @@ func (txn *dbTxn) Fetch(keys []kv.Key) error {
|
||||
for i, k := range keys {
|
||||
encodedKeys[i] = kv.EncodeKey(k)
|
||||
}
|
||||
return txn.UnionStore.Cache.Fetch(encodedKeys)
|
||||
return txn.UnionStore.Snapshot.Fetch(encodedKeys)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) Scan(start, end kv.Key, maxSize int) error {
|
||||
return txn.UnionStore.Cache.Scan(kv.EncodeKey(start), kv.EncodeKey(end), maxSize)
|
||||
func (txn *dbTxn) Scan(start, end kv.Key, limit int) error {
|
||||
return txn.UnionStore.Snapshot.Scan(kv.EncodeKey(start), kv.EncodeKey(end), limit)
|
||||
}
|
||||
|
||||
func (txn *dbTxn) Set(k kv.Key, data []byte) error {
|
||||
@ -179,7 +179,7 @@ func (txn *dbTxn) Delete(k kv.Key) error {
|
||||
}
|
||||
|
||||
func (txn *dbTxn) each(f func(kv.Iterator) error) error {
|
||||
iter := txn.UnionStore.Buffer.NewIterator(nil)
|
||||
iter := txn.UnionStore.WBuffer.NewIterator(nil)
|
||||
defer iter.Close()
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
if err := f(iter); err != nil {
|
||||
@ -198,7 +198,7 @@ func (txn *dbTxn) doCommit() error {
|
||||
}
|
||||
}()
|
||||
|
||||
txn.Cache.Release()
|
||||
txn.Snapshot.Release()
|
||||
|
||||
// Check locked keys
|
||||
for k := range txn.snapshotVals {
|
||||
|
||||
Reference in New Issue
Block a user