From 53e6977db172b7e90bf948da9b1bdcfeb01f95c3 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 7 Dec 2015 19:33:11 +0800 Subject: [PATCH] kv: update mvcc --- store/localstore/compactor.go | 34 ++--- store/localstore/compactor_test.go | 11 +- store/localstore/kv.go | 2 +- store/localstore/mvcc_test.go | 13 +- store/localstore/snapshot.go | 191 ++++++++++++----------------- 5 files changed, 114 insertions(+), 137 deletions(-) diff --git a/store/localstore/compactor.go b/store/localstore/compactor.go index 3113bc9b70..34f0cbca02 100644 --- a/store/localstore/compactor.go +++ b/store/localstore/compactor.go @@ -21,6 +21,7 @@ import ( "github.com/ngaut/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/localstore/engine" + "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/bytes" ) @@ -63,25 +64,24 @@ func (gc *localstoreCompactor) OnDelete(k kv.Key) { gc.recentKeys[string(k)] = struct{}{} } -func (gc *localstoreCompactor) getAllVersions(k kv.Key) ([]kv.EncodedKey, error) { - startKey := MvccEncodeVersionKey(k, kv.MaxVersion) - endKey := MvccEncodeVersionKey(k, kv.MinVersion) - - it, err := gc.db.Seek(startKey) - if err != nil { - return nil, errors.Trace(err) - } - defer it.Release() - - var ret []kv.EncodedKey - for it.Next() { - if kv.EncodedKey(it.Key()).Cmp(endKey) < 0 { - ret = append(ret, bytes.CloneBytes(kv.EncodedKey(it.Key()))) - continue +func (gc *localstoreCompactor) getAllVersions(key kv.Key) ([]kv.EncodedKey, error) { + var keys []kv.EncodedKey + k := key + for ver := kv.MaxVersion; k.Cmp(key) == 0 && ver.Ver > 0; ver.Ver-- { + mvccK, _, err := gc.db.Seek(MvccEncodeVersionKey(key, ver)) + if terror.ErrorEqual(err, engine.ErrNotFound) { + break } - break + if err != nil { + return nil, errors.Trace(err) + } + k, ver, err = MvccDecode(mvccK) + if err != nil { + return nil, errors.Trace(err) + } + keys = append(keys, bytes.CloneBytes(mvccK)) } - return ret, nil + return keys, nil } func (gc *localstoreCompactor) deleteWorker() { diff --git a/store/localstore/compactor_test.go b/store/localstore/compactor_test.go index 027437be9c..5b05f47cd0 100644 --- a/store/localstore/compactor_test.go +++ b/store/localstore/compactor_test.go @@ -27,10 +27,15 @@ type localstoreCompactorTestSuite struct { } func count(db engine.DB) int { - it, _ := db.Seek([]byte{0}) - defer it.Release() + var k kv.Key totalCnt := 0 - for it.Next() { + for { + var err error + k, _, err = db.Seek(k) + if err != nil { + break + } + k = k.Next() totalCnt++ } return totalCnt diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 7712c8905d..10749b5c1e 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -227,7 +227,7 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string) error { } currValue, err := s.db.Get(metaKey) - if terror.ErrorEqual(err, kv.ErrNotExist) { + if terror.ErrorEqual(err, engine.ErrNotFound) { s.keysLocked[key] = tid return nil } diff --git a/store/localstore/mvcc_test.go b/store/localstore/mvcc_test.go index 4ca11c03ff..d2e91d12e6 100644 --- a/store/localstore/mvcc_test.go +++ b/store/localstore/mvcc_test.go @@ -66,9 +66,16 @@ func (t *testMvccSuite) TestMvccEncode(c *C) { func (t *testMvccSuite) scanRawEngine(c *C, f func([]byte, []byte)) { // scan raw db - it, _ := t.s.(*dbStore).db.Seek(nil) - for it.Next() { - f(it.Key(), it.Value()) + var k kv.Key + var v []byte + for { + var err error + k, v, err = t.s.(*dbStore).db.Seek(k) + if err != nil { + break + } + f(k, v) + k = k.Next() } } diff --git a/store/localstore/snapshot.go b/store/localstore/snapshot.go index c9e1a9ad47..8263067bde 100644 --- a/store/localstore/snapshot.go +++ b/store/localstore/snapshot.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/tidb/store/localstore/engine" "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/bytes" - "github.com/pingcap/tidb/util/codec" ) var ( @@ -29,15 +28,11 @@ var ( // dbSnapshot implements MvccSnapshot interface. type dbSnapshot struct { - store *dbStore - db engine.DB - rawIt engine.Iterator - version kv.Version // transaction begin version - released bool + store *dbStore + db engine.DB + version kv.Version // transaction begin version } -var minKey = []byte{0} - func newSnapshot(store *dbStore, db engine.DB, ver kv.Version) *dbSnapshot { ss := &dbSnapshot{ store: store, @@ -48,65 +43,68 @@ func newSnapshot(store *dbStore, db engine.DB, ver kv.Version) *dbSnapshot { return ss } -func (s *dbSnapshot) internalSeek(startKey []byte) (engine.Iterator, error) { +// mvccSeek seeks for the first key in db which has a k >= key and a version <= +// snapshot's version. If strict is true, only k == key can be returned. +// The returned slices should be cloned before modify. +func (s *dbSnapshot) mvccSeek(key kv.Key, strict bool) (kv.Key, []byte, error) { s.store.mu.RLock() defer s.store.mu.RUnlock() if s.store.closed { - return nil, errors.Trace(ErrDBClosed) + return nil, nil, errors.Trace(ErrDBClosed) } - if s.rawIt == nil { - var err error - s.rawIt, err = s.db.Seek(minKey) + // Key layout: + // ... + // Key (Meta) -- (1) + // Key_verMax -- (2) + // ... + // Key_ver+1 -- (3) + // Key_ver -- (4) + // Key_ver-1 -- (5) + // ... + // Key_0 -- (6) + // NextKey (Meta) -- (7) + // ... + // EOF + for { + mvccKey := MvccEncodeVersionKey(key, s.version) + mvccK, v, err := s.db.Seek(mvccKey) // search for [4...EOF) if err != nil { - return nil, errors.Trace(err) + if terror.ErrorEqual(err, engine.ErrNotFound) { // EOF + err = errors.Wrap(err, kv.ErrNotExist) + } + return nil, nil, errors.Trace(err) } + k, _, err := MvccDecode(mvccK) + if err != nil { + return nil, nil, errors.Trace(err) + } + if key.Cmp(k) != 0 { // currently on [7] + if strict { + return nil, nil, errors.Trace(kv.ErrNotExist) + } + // search for NextKey + key = k + continue + } + if isTombstone(v) { // current key is deleted + if strict { + return nil, nil, errors.Trace(kv.ErrNotExist) + } + // search for NextKey's meta + key = key.Next() + continue + } + return k, v, nil } - ok := s.rawIt.Seek(startKey) - if !ok { - s.rawIt.Release() - s.rawIt = nil - return nil, kv.ErrNotExist - } - return s.rawIt, nil } -func (s *dbSnapshot) Get(k kv.Key) ([]byte, error) { - // engine Snapshot return nil, nil for value not found, - // so here we will check nil and return kv.ErrNotExist. - // get newest version, (0, MaxUint64) - // Key arrangement: - // Key -> META - // ... - // Key_ver - // Key_ver-1 - // Key_ver-2 - // ... - // Key_ver-n - // Key_0 - // NextKey -> META - // NextKey_xxx - startKey := MvccEncodeVersionKey(k, s.version) - endKey := MvccEncodeVersionKey(k, kv.MinVersion) - - it, err := s.internalSeek(startKey) +func (s *dbSnapshot) Get(key kv.Key) ([]byte, error) { + _, v, err := s.mvccSeek(key, true) if err != nil { return nil, errors.Trace(err) } - - var rawKey []byte - var v []byte - // Check if the scan is not exceed this key's all versions and the value is not - // tombstone. - if kv.EncodedKey(it.Key()).Cmp(endKey) < 0 && !isTombstone(it.Value()) { - rawKey = it.Key() - v = it.Value() - } - // No such key (or it's tombstone). - if rawKey == nil { - return nil, kv.ErrNotExist - } return bytes.CloneBytes(v), nil } @@ -149,80 +147,47 @@ func (s *dbSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, } func (s *dbSnapshot) Seek(k kv.Key) (kv.Iterator, error) { - return newDBIter(s, k), nil + it, err := newDBIter(s, k) + return it, errors.Trace(err) } func (s *dbSnapshot) Release() { - if s.released { - return - } - - s.released = true - if s.rawIt != nil { - // TODO: check whether Release will panic if store is closed. - s.rawIt.Release() - s.rawIt = nil - } - } type dbIter struct { - s *dbSnapshot - startKey kv.Key - valid bool - k kv.Key - v []byte + s *dbSnapshot + valid bool + k kv.Key + v []byte } -func newDBIter(s *dbSnapshot, startKey kv.Key) *dbIter { - it := &dbIter{ - s: s, - startKey: startKey, - valid: true, +func newDBIter(s *dbSnapshot, startKey kv.Key) (*dbIter, error) { + k, v, err := s.mvccSeek(startKey, false) + if err != nil { + if terror.ErrorEqual(err, kv.ErrNotExist) { + err = nil + } + return &dbIter{valid: false}, errors.Trace(err) } - it.Next() - return it + + return &dbIter{ + s: s, + valid: true, + k: bytes.CloneBytes(k), + v: bytes.CloneBytes(v), + }, nil } func (it *dbIter) Next() error { - encKey := codec.EncodeBytes(nil, it.startKey) - var retErr error - var engineIter engine.Iterator - for { - var err error - engineIter, err = it.s.internalSeek(encKey) - if err != nil { - it.valid = false - retErr = err - break + k, v, err := it.s.mvccSeek(it.k.Next(), false) + if err != nil { + it.valid = false + if !terror.ErrorEqual(err, kv.ErrNotExist) { + return errors.Trace(err) } - - metaKey := engineIter.Key() - // Get real key from metaKey - key, _, err := MvccDecode(metaKey) - if err != nil { - // It's not a valid metaKey, maybe overflow (other data). - it.valid = false - break - } - // Get kv pair. - val, err := it.s.Get(key) - if err != nil && !terror.ErrorEqual(err, kv.ErrNotExist) { - // Get this version error - it.valid = false - retErr = err - break - } - if val != nil { - it.k = bytes.CloneBytes(key) - it.v = bytes.CloneBytes(val) - it.startKey = key.Next() - break - } - // Current key's all versions are deleted, just go next key. - encKey = codec.EncodeBytes(nil, key.Next()) } - return errors.Trace(retErr) + it.k, it.v = bytes.CloneBytes(k), bytes.CloneBytes(v) + return nil } func (it *dbIter) Valid() bool {