From 3c059f77d8e3ee9bb9a5fdb8670eda25ed99e63f Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 1 Dec 2015 11:24:18 +0800 Subject: [PATCH 1/2] kv: unify NewIterator and Seek --- kv/btree_buffer.go | 32 ++++++++++++++++++-------- kv/cache_snapshot.go | 14 +++++++++--- kv/cache_snapshot_test.go | 9 +++++--- kv/kv.go | 12 ++-------- kv/mem_buffer_test.go | 31 +++++++++++++++++-------- kv/memdb_buffer.go | 19 +++++++++++----- kv/union_store.go | 44 +++++++++++++++++++++++++++++------- store/hbase/snapshot.go | 13 +++-------- store/localstore/snapshot.go | 15 +++++------- 9 files changed, 120 insertions(+), 69 deletions(-) diff --git a/kv/btree_buffer.go b/kv/btree_buffer.go index aeabe6fe8a..84ea41f29b 100644 --- a/kv/btree_buffer.go +++ b/kv/btree_buffer.go @@ -16,8 +16,9 @@ package kv import ( + "io" + "github.com/juju/errors" - "github.com/ngaut/log" "github.com/pingcap/tidb/kv/memkv" "github.com/pingcap/tidb/util/types" ) @@ -43,11 +44,17 @@ func (b *btreeBuffer) Get(k Key) ([]byte, error) { } // Set associates the key with the value. -func (b *btreeBuffer) Set(k []byte, v []byte) error { +func (b *btreeBuffer) Set(k Key, v []byte) error { b.tree.Set(toIfaces(k), toIfaces(v)) return nil } +// Delete removes the entry from buffer with provided key. +func (b *btreeBuffer) Delete(k Key) error { + err := b.Set(k, nil) + return errors.Trace(err) +} + // Release clear the whole buffer. func (b *btreeBuffer) Release() { b.tree.Clear() @@ -60,27 +67,29 @@ type btreeIter struct { ok bool } -// NewIterator creates a new Iterator based on the provided param -func (b *btreeBuffer) NewIterator(param interface{}) Iterator { +// Seek creates a new Iterator based on the provided key. +func (b *btreeBuffer) Seek(k Key) (Iterator, error) { var e *memkv.Enumerator var err error - if param == nil { + if k == nil { e, err = b.tree.SeekFirst() if err != nil { - return &btreeIter{ok: false} + if err == io.EOF { + return &btreeIter{ok: false}, nil + } + return &btreeIter{ok: false}, errors.Trace(err) } } else { - key := toIfaces(param.([]byte)) + key := toIfaces([]byte(k)) e, _ = b.tree.Seek(key) } iter := &btreeIter{e: e} // the initial push... err = iter.Next() if err != nil { - log.Error(err) - return &btreeIter{ok: false} + return &btreeIter{ok: false}, errors.Trace(err) } - return iter + return iter, nil } // Close implements Iterator Close. @@ -103,6 +112,9 @@ func (i *btreeIter) Next() error { k, v, err := i.e.Next() if err != nil { i.ok = false + if err == io.EOF { + return nil + } return errors.Trace(err) } i.k, i.v, i.ok = string(fromIfaces(k)), fromIfaces(v), true diff --git a/kv/cache_snapshot.go b/kv/cache_snapshot.go index a76af89bfa..3448102c11 100644 --- a/kv/cache_snapshot.go +++ b/kv/cache_snapshot.go @@ -127,9 +127,17 @@ func (c *cacheSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, return values, nil } -// NewIterator creates an iterator of snapshot. -func (c *cacheSnapshot) NewIterator(param interface{}) Iterator { - return newUnionIter(c.cache.NewIterator(param), c.snapshot.NewIterator(param)) +// Seek creates an iterator of snapshot. +func (c *cacheSnapshot) Seek(k Key) (Iterator, error) { + cacheIter, err := c.cache.Seek(k) + if err != nil { + return nil, errors.Trace(err) + } + snapshotIter, err := c.snapshot.Seek(k) + if err != nil { + return nil, errors.Trace(err) + } + return newUnionIter(cacheIter, snapshotIter), nil } // Release reset membuffer and release snapshot. diff --git a/kv/cache_snapshot_test.go b/kv/cache_snapshot_test.go index 07361c1fa2..fdb3d38e88 100644 --- a/kv/cache_snapshot_test.go +++ b/kv/cache_snapshot_test.go @@ -124,7 +124,10 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) { func (s *mockSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, error) { m := make(map[string][]byte) - it := s.NewIterator([]byte(start)) + it, err := s.Seek(start) + if err != nil { + return nil, errors.Trace(err) + } defer it.Close() endKey := string(end) for i := 0; i < limit; i++ { @@ -143,8 +146,8 @@ func (s *mockSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, e return m, nil } -func (s *mockSnapshot) NewIterator(param interface{}) Iterator { - return s.store.NewIterator(param) +func (s *mockSnapshot) Seek(k Key) (Iterator, error) { + return s.store.Seek(k) } func (s *mockSnapshot) Release() { diff --git a/kv/kv.go b/kv/kv.go index 8358676931..697a773f0d 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -201,27 +201,19 @@ type MvccSnapshot interface { // Snapshot defines the interface for the snapshot fetched from KV store. type Snapshot interface { - // Get gets the value for key k from snapshot. - Get(k Key) ([]byte, error) + Retriever // BatchGet gets a batch of values from snapshot. BatchGet(keys []Key) (map[string][]byte, error) // RangeGet gets values in the range [start, end] from snapshot. Maximum // number of values is up to limit. RangeGet(start, end Key, limit int) (map[string][]byte, error) - // NewIterator gets a new iterator on the snapshot. - NewIterator(param interface{}) Iterator // Release releases the snapshot to store. Release() } // MemBuffer is the interface for transaction buffer of update in a transaction type MemBuffer interface { - // Get gets the value for key k from buffer. If not found, it returns ErrNotExist. - Get(k Key) ([]byte, error) - // Set associates key with value - Set([]byte, []byte) error - // NewIterator gets a new iterator on the buffer. - NewIterator(param interface{}) Iterator + RetrieverMutator // Release releases the buffer. Release() } diff --git a/kv/mem_buffer_test.go b/kv/mem_buffer_test.go index a088333f6c..d040e35893 100644 --- a/kv/mem_buffer_test.go +++ b/kv/mem_buffer_test.go @@ -77,7 +77,8 @@ func valToStr(c *C, iter Iterator) string { func checkNewIterator(c *C, buffer MemBuffer) { for i := startIndex; i < testCount; i++ { val := encodeInt(i * indexStep) - iter := buffer.NewIterator(val) + iter, err := buffer.Seek(val) + c.Assert(err, IsNil) c.Assert(iter.Key(), Equals, string(val)) c.Assert(decodeInt([]byte(valToStr(c, iter))), Equals, i*indexStep) iter.Close() @@ -86,11 +87,12 @@ func checkNewIterator(c *C, buffer MemBuffer) { // Test iterator Next() for i := startIndex; i < testCount-1; i++ { val := encodeInt(i * indexStep) - iter := buffer.NewIterator(val) + iter, err := buffer.Seek(val) + c.Assert(err, IsNil) c.Assert(iter.Key(), Equals, string(val)) c.Assert(valToStr(c, iter), Equals, string(val)) - err := iter.Next() + err = iter.Next() c.Assert(err, IsNil) c.Assert(iter.Valid(), IsTrue) @@ -101,14 +103,16 @@ func checkNewIterator(c *C, buffer MemBuffer) { } // Non exist and beyond maximum seek test - iter := buffer.NewIterator(encodeInt(testCount * indexStep)) + iter, err := buffer.Seek(encodeInt(testCount * indexStep)) + c.Assert(err, IsNil) c.Assert(iter.Valid(), IsFalse) // Non exist but between existing keys seek test, // it returns the smallest key that larger than the one we are seeking inBetween := encodeInt((testCount-1)*indexStep - 1) last := encodeInt((testCount - 1) * indexStep) - iter = buffer.NewIterator(inBetween) + iter, err = buffer.Seek(inBetween) + c.Assert(err, IsNil) c.Assert(iter.Valid(), IsTrue) c.Assert(iter.Key(), Not(Equals), string(inBetween)) c.Assert(iter.Key(), Equals, string(last)) @@ -143,7 +147,8 @@ func (s *testKVSuite) TestGetSet(c *C) { func (s *testKVSuite) TestNewIterator(c *C) { for _, buffer := range s.bs { // should be invalid - iter := buffer.NewIterator(nil) + iter, err := buffer.Seek(nil) + c.Assert(err, IsNil) c.Assert(iter.Valid(), IsFalse) insertData(c, buffer) @@ -154,7 +159,8 @@ func (s *testKVSuite) TestNewIterator(c *C) { func (s *testKVSuite) TestBasicNewIterator(c *C) { for _, buffer := range s.bs { - it := buffer.NewIterator([]byte("2")) + it, err := buffer.Seek([]byte("2")) + c.Assert(err, IsNil) c.Assert(it.Valid(), IsFalse) buffer.Release() } @@ -178,14 +184,16 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) { } cnt := 0 - it := buffer.NewIterator(nil) + it, err := buffer.Seek(nil) + c.Assert(err, IsNil) for it.Valid() { cnt++ it.Next() } c.Assert(cnt, Equals, 6) - it = buffer.NewIterator([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000")) + it, err = buffer.Seek([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000")) + c.Assert(err, IsNil) c.Assert(string(it.Key()), Equals, "DATA_test_main_db_tbl_tbl_test_record__00000000000000000001") buffer.Release() @@ -296,7 +304,10 @@ func benchIterator(b *testing.B, buffer MemBuffer) { } b.ResetTimer() for i := 0; i < b.N; i++ { - iter := buffer.NewIterator(nil) + iter, err := buffer.Seek(nil) + if err != nil { + b.Error(err) + } for iter.Valid() { iter.Next() } diff --git a/kv/memdb_buffer.go b/kv/memdb_buffer.go index 2d61f9906c..cbcf729bbf 100644 --- a/kv/memdb_buffer.go +++ b/kv/memdb_buffer.go @@ -16,6 +16,7 @@ package kv import ( + "github.com/juju/errors" "github.com/pingcap/tidb/terror" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/comparer" @@ -37,16 +38,16 @@ func NewMemDbBuffer() MemBuffer { return &memDbBuffer{db: memdb.New(comparer.DefaultComparer, 4*1024)} } -// NewIterator creates an Iterator. -func (m *memDbBuffer) NewIterator(param interface{}) Iterator { +// Seek creates an Iterator. +func (m *memDbBuffer) Seek(k Key) (Iterator, error) { var i Iterator - if param == nil { + if k == nil { i = &memDbIter{iter: m.db.NewIterator(&util.Range{})} } else { - i = &memDbIter{iter: m.db.NewIterator(&util.Range{Start: param.([]byte)})} + i = &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k)})} } i.Next() - return i + return i, nil } // Get returns the value associated with key. @@ -59,10 +60,16 @@ func (m *memDbBuffer) Get(k Key) ([]byte, error) { } // Set associates key with value. -func (m *memDbBuffer) Set(k []byte, v []byte) error { +func (m *memDbBuffer) Set(k Key, v []byte) error { return m.db.Put(k, v) } +// Delete removes the entry from buffer with provided key. +func (m *memDbBuffer) Delete(k Key) error { + err := m.Set(k, nil) + return errors.Trace(err) +} + // Release reset the buffer. func (m *memDbBuffer) Release() { m.db.Reset() diff --git a/kv/union_store.go b/kv/union_store.go index 0bd51d7e49..d3b2842a99 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -70,7 +70,7 @@ func (lmb *lazyMemBuffer) Get(k Key) ([]byte, error) { return lmb.mb.Get(k) } -func (lmb *lazyMemBuffer) Set(key []byte, value []byte) error { +func (lmb *lazyMemBuffer) Set(key Key, value []byte) error { if lmb.mb == nil { lmb.mb = p.Get().(MemBuffer) } @@ -78,12 +78,20 @@ func (lmb *lazyMemBuffer) Set(key []byte, value []byte) error { return lmb.mb.Set(key, value) } -func (lmb *lazyMemBuffer) NewIterator(param interface{}) Iterator { +func (lmb *lazyMemBuffer) Delete(k Key) error { if lmb.mb == nil { lmb.mb = p.Get().(MemBuffer) } - return lmb.mb.NewIterator(param) + return lmb.mb.Delete(k) +} + +func (lmb *lazyMemBuffer) Seek(k Key) (Iterator, error) { + if lmb.mb == nil { + lmb.mb = p.Get().(MemBuffer) + } + + return lmb.mb.Seek(k) } func (lmb *lazyMemBuffer) Release() { @@ -166,8 +174,14 @@ func (us *unionStore) GetInt64(k Key) (int64, error) { // Seek implements the Retriever interface. func (us *unionStore) Seek(key Key) (Iterator, error) { - bufferIt := us.wbuffer.NewIterator([]byte(key)) - cacheIt := us.snapshot.NewIterator([]byte(key)) + bufferIt, err := us.wbuffer.Seek([]byte(key)) + if err != nil { + return nil, errors.Trace(err) + } + cacheIt, err := us.snapshot.Seek([]byte(key)) + if err != nil { + return nil, errors.Trace(err) + } return newUnionIter(bufferIt, cacheIt), nil } @@ -199,7 +213,10 @@ func (us *unionStore) Delete(k Key) error { // WalkWriteBuffer implements the UnionStore interface. func (us *unionStore) WalkWriteBuffer(f func(Iterator) error) error { - iter := us.wbuffer.NewIterator(nil) + iter, err := us.wbuffer.Seek(nil) + if err != nil { + return errors.Trace(err) + } defer iter.Close() for ; iter.Valid(); iter.Next() { if err := f(iter); err != nil { @@ -224,9 +241,15 @@ func (us *unionStore) RangePrefetch(start, end Key, limit int) error { // CheckLazyConditionPairs implements the UnionStore interface. func (us *unionStore) CheckLazyConditionPairs() error { var keys []Key - for it := us.lazyConditionPairs.NewIterator(nil); it.Valid(); it.Next() { + it, err := us.lazyConditionPairs.Seek(nil) + if err != nil { + return errors.Trace(err) + } + for ; it.Valid(); it.Next() { keys = append(keys, []byte(it.Key())) } + it.Close() + if len(keys) == 0 { return nil } @@ -234,7 +257,12 @@ func (us *unionStore) CheckLazyConditionPairs() error { if err != nil { return errors.Trace(err) } - for it := us.lazyConditionPairs.NewIterator(nil); it.Valid(); it.Next() { + it, err = us.lazyConditionPairs.Seek(nil) + if err != nil { + return errors.Trace(err) + } + defer it.Close() + for ; it.Valid(); it.Next() { if len(it.Value()) == 0 { if _, exist := values[it.Key()]; exist { return errors.Trace(ErrKeyExists) diff --git a/store/hbase/snapshot.go b/store/hbase/snapshot.go index 08cb6b4aba..0715f9fabc 100644 --- a/store/hbase/snapshot.go +++ b/store/hbase/snapshot.go @@ -15,7 +15,6 @@ package hbasekv import ( "github.com/juju/errors" - "github.com/ngaut/log" "github.com/pingcap/go-hbase" "github.com/pingcap/go-themis" "github.com/pingcap/tidb/kv" @@ -123,15 +122,9 @@ func internalGet(s *hbaseSnapshot, g *hbase.Get) ([]byte, error) { return r.Columns[hbaseFmlAndQual].Value, nil } -func (s *hbaseSnapshot) NewIterator(param interface{}) kv.Iterator { - k, ok := param.([]byte) - if !ok { - log.Errorf("hbase iterator parameter error, %+v", param) - return nil - } - - scanner := s.txn.GetScanner([]byte(s.storeName), k, nil, hbaseBatchSize) - return newInnerScanner(scanner) +func (s *hbaseSnapshot) Seek(k kv.Key) (kv.Iterator, error) { + scanner := s.txn.GetScanner([]byte(s.storeName), []byte(k), nil, hbaseBatchSize) + return newInnerScanner(scanner), nil } // MvccIterator seeks to the key in the specific version's snapshot, if the diff --git a/store/localstore/snapshot.go b/store/localstore/snapshot.go index 5463a361b4..401df8961d 100644 --- a/store/localstore/snapshot.go +++ b/store/localstore/snapshot.go @@ -15,7 +15,6 @@ package localstore import ( "github.com/juju/errors" - "github.com/ngaut/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/localstore/engine" "github.com/pingcap/tidb/terror" @@ -137,7 +136,10 @@ func (s *dbSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { func (s *dbSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, error) { m := make(map[string][]byte) - it := s.NewIterator([]byte(start)) + it, err := s.Seek(start) + if err != nil { + return nil, errors.Trace(err) + } defer it.Close() endKey := string(end) for i := 0; i < limit; i++ { @@ -156,13 +158,8 @@ func (s *dbSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, return m, nil } -func (s *dbSnapshot) NewIterator(param interface{}) kv.Iterator { - k, ok := param.([]byte) - if !ok { - log.Errorf("leveldb iterator parameter error, %+v", param) - return nil - } - return newDBIter(s, k, s.version) +func (s *dbSnapshot) Seek(k kv.Key) (kv.Iterator, error) { + return newDBIter(s, []byte(k), s.version), nil } func (s *dbSnapshot) MvccRelease() { From e1851c9b6cd8db94e2ba266508c38ff8a88fadd2 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 2 Dec 2015 10:58:15 +0800 Subject: [PATCH 2/2] kv: address comment --- kv/btree_buffer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kv/btree_buffer.go b/kv/btree_buffer.go index 84ea41f29b..c048a189f0 100644 --- a/kv/btree_buffer.go +++ b/kv/btree_buffer.go @@ -20,6 +20,7 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/kv/memkv" + "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/types" ) @@ -74,7 +75,7 @@ func (b *btreeBuffer) Seek(k Key) (Iterator, error) { if k == nil { e, err = b.tree.SeekFirst() if err != nil { - if err == io.EOF { + if terror.ErrorEqual(err, io.EOF) { return &btreeIter{ok: false}, nil } return &btreeIter{ok: false}, errors.Trace(err) @@ -112,7 +113,7 @@ func (i *btreeIter) Next() error { k, v, err := i.e.Next() if err != nil { i.ok = false - if err == io.EOF { + if terror.ErrorEqual(err, io.EOF) { return nil } return errors.Trace(err)