kv: update mvcc
This commit is contained in:
@ -21,6 +21,7 @@ import (
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/localstore/engine"
|
||||
"github.com/pingcap/tidb/terror"
|
||||
"github.com/pingcap/tidb/util/bytes"
|
||||
)
|
||||
|
||||
@ -63,25 +64,24 @@ func (gc *localstoreCompactor) OnDelete(k kv.Key) {
|
||||
gc.recentKeys[string(k)] = struct{}{}
|
||||
}
|
||||
|
||||
func (gc *localstoreCompactor) getAllVersions(k kv.Key) ([]kv.EncodedKey, error) {
|
||||
startKey := MvccEncodeVersionKey(k, kv.MaxVersion)
|
||||
endKey := MvccEncodeVersionKey(k, kv.MinVersion)
|
||||
|
||||
it, err := gc.db.Seek(startKey)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
defer it.Release()
|
||||
|
||||
var ret []kv.EncodedKey
|
||||
for it.Next() {
|
||||
if kv.EncodedKey(it.Key()).Cmp(endKey) < 0 {
|
||||
ret = append(ret, bytes.CloneBytes(kv.EncodedKey(it.Key())))
|
||||
continue
|
||||
func (gc *localstoreCompactor) getAllVersions(key kv.Key) ([]kv.EncodedKey, error) {
|
||||
var keys []kv.EncodedKey
|
||||
k := key
|
||||
for ver := kv.MaxVersion; k.Cmp(key) == 0 && ver.Ver > 0; ver.Ver-- {
|
||||
mvccK, _, err := gc.db.Seek(MvccEncodeVersionKey(key, ver))
|
||||
if terror.ErrorEqual(err, engine.ErrNotFound) {
|
||||
break
|
||||
}
|
||||
break
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
k, ver, err = MvccDecode(mvccK)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
keys = append(keys, bytes.CloneBytes(mvccK))
|
||||
}
|
||||
return ret, nil
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
func (gc *localstoreCompactor) deleteWorker() {
|
||||
|
||||
@ -27,10 +27,15 @@ type localstoreCompactorTestSuite struct {
|
||||
}
|
||||
|
||||
func count(db engine.DB) int {
|
||||
it, _ := db.Seek([]byte{0})
|
||||
defer it.Release()
|
||||
var k kv.Key
|
||||
totalCnt := 0
|
||||
for it.Next() {
|
||||
for {
|
||||
var err error
|
||||
k, _, err = db.Seek(k)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
k = k.Next()
|
||||
totalCnt++
|
||||
}
|
||||
return totalCnt
|
||||
|
||||
@ -227,7 +227,7 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string) error {
|
||||
}
|
||||
|
||||
currValue, err := s.db.Get(metaKey)
|
||||
if terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
if terror.ErrorEqual(err, engine.ErrNotFound) {
|
||||
s.keysLocked[key] = tid
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -66,9 +66,16 @@ func (t *testMvccSuite) TestMvccEncode(c *C) {
|
||||
|
||||
func (t *testMvccSuite) scanRawEngine(c *C, f func([]byte, []byte)) {
|
||||
// scan raw db
|
||||
it, _ := t.s.(*dbStore).db.Seek(nil)
|
||||
for it.Next() {
|
||||
f(it.Key(), it.Value())
|
||||
var k kv.Key
|
||||
var v []byte
|
||||
for {
|
||||
var err error
|
||||
k, v, err = t.s.(*dbStore).db.Seek(k)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
f(k, v)
|
||||
k = k.Next()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -19,7 +19,6 @@ import (
|
||||
"github.com/pingcap/tidb/store/localstore/engine"
|
||||
"github.com/pingcap/tidb/terror"
|
||||
"github.com/pingcap/tidb/util/bytes"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -29,15 +28,11 @@ var (
|
||||
|
||||
// dbSnapshot implements MvccSnapshot interface.
|
||||
type dbSnapshot struct {
|
||||
store *dbStore
|
||||
db engine.DB
|
||||
rawIt engine.Iterator
|
||||
version kv.Version // transaction begin version
|
||||
released bool
|
||||
store *dbStore
|
||||
db engine.DB
|
||||
version kv.Version // transaction begin version
|
||||
}
|
||||
|
||||
var minKey = []byte{0}
|
||||
|
||||
func newSnapshot(store *dbStore, db engine.DB, ver kv.Version) *dbSnapshot {
|
||||
ss := &dbSnapshot{
|
||||
store: store,
|
||||
@ -48,65 +43,68 @@ func newSnapshot(store *dbStore, db engine.DB, ver kv.Version) *dbSnapshot {
|
||||
return ss
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) internalSeek(startKey []byte) (engine.Iterator, error) {
|
||||
// mvccSeek seeks for the first key in db which has a k >= key and a version <=
|
||||
// snapshot's version. If strict is true, only k == key can be returned.
|
||||
// The returned slices should be cloned before modify.
|
||||
func (s *dbSnapshot) mvccSeek(key kv.Key, strict bool) (kv.Key, []byte, error) {
|
||||
s.store.mu.RLock()
|
||||
defer s.store.mu.RUnlock()
|
||||
|
||||
if s.store.closed {
|
||||
return nil, errors.Trace(ErrDBClosed)
|
||||
return nil, nil, errors.Trace(ErrDBClosed)
|
||||
}
|
||||
|
||||
if s.rawIt == nil {
|
||||
var err error
|
||||
s.rawIt, err = s.db.Seek(minKey)
|
||||
// Key layout:
|
||||
// ...
|
||||
// Key (Meta) -- (1)
|
||||
// Key_verMax -- (2)
|
||||
// ...
|
||||
// Key_ver+1 -- (3)
|
||||
// Key_ver -- (4)
|
||||
// Key_ver-1 -- (5)
|
||||
// ...
|
||||
// Key_0 -- (6)
|
||||
// NextKey (Meta) -- (7)
|
||||
// ...
|
||||
// EOF
|
||||
for {
|
||||
mvccKey := MvccEncodeVersionKey(key, s.version)
|
||||
mvccK, v, err := s.db.Seek(mvccKey) // search for [4...EOF)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
if terror.ErrorEqual(err, engine.ErrNotFound) { // EOF
|
||||
err = errors.Wrap(err, kv.ErrNotExist)
|
||||
}
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
k, _, err := MvccDecode(mvccK)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if key.Cmp(k) != 0 { // currently on [7]
|
||||
if strict {
|
||||
return nil, nil, errors.Trace(kv.ErrNotExist)
|
||||
}
|
||||
// search for NextKey
|
||||
key = k
|
||||
continue
|
||||
}
|
||||
if isTombstone(v) { // current key is deleted
|
||||
if strict {
|
||||
return nil, nil, errors.Trace(kv.ErrNotExist)
|
||||
}
|
||||
// search for NextKey's meta
|
||||
key = key.Next()
|
||||
continue
|
||||
}
|
||||
return k, v, nil
|
||||
}
|
||||
ok := s.rawIt.Seek(startKey)
|
||||
if !ok {
|
||||
s.rawIt.Release()
|
||||
s.rawIt = nil
|
||||
return nil, kv.ErrNotExist
|
||||
}
|
||||
return s.rawIt, nil
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) Get(k kv.Key) ([]byte, error) {
|
||||
// engine Snapshot return nil, nil for value not found,
|
||||
// so here we will check nil and return kv.ErrNotExist.
|
||||
// get newest version, (0, MaxUint64)
|
||||
// Key arrangement:
|
||||
// Key -> META
|
||||
// ...
|
||||
// Key_ver
|
||||
// Key_ver-1
|
||||
// Key_ver-2
|
||||
// ...
|
||||
// Key_ver-n
|
||||
// Key_0
|
||||
// NextKey -> META
|
||||
// NextKey_xxx
|
||||
startKey := MvccEncodeVersionKey(k, s.version)
|
||||
endKey := MvccEncodeVersionKey(k, kv.MinVersion)
|
||||
|
||||
it, err := s.internalSeek(startKey)
|
||||
func (s *dbSnapshot) Get(key kv.Key) ([]byte, error) {
|
||||
_, v, err := s.mvccSeek(key, true)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
var rawKey []byte
|
||||
var v []byte
|
||||
// Check if the scan is not exceed this key's all versions and the value is not
|
||||
// tombstone.
|
||||
if kv.EncodedKey(it.Key()).Cmp(endKey) < 0 && !isTombstone(it.Value()) {
|
||||
rawKey = it.Key()
|
||||
v = it.Value()
|
||||
}
|
||||
// No such key (or it's tombstone).
|
||||
if rawKey == nil {
|
||||
return nil, kv.ErrNotExist
|
||||
}
|
||||
return bytes.CloneBytes(v), nil
|
||||
}
|
||||
|
||||
@ -149,80 +147,47 @@ func (s *dbSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte,
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) Seek(k kv.Key) (kv.Iterator, error) {
|
||||
return newDBIter(s, k), nil
|
||||
it, err := newDBIter(s, k)
|
||||
return it, errors.Trace(err)
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) Release() {
|
||||
if s.released {
|
||||
return
|
||||
}
|
||||
|
||||
s.released = true
|
||||
if s.rawIt != nil {
|
||||
// TODO: check whether Release will panic if store is closed.
|
||||
s.rawIt.Release()
|
||||
s.rawIt = nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type dbIter struct {
|
||||
s *dbSnapshot
|
||||
startKey kv.Key
|
||||
valid bool
|
||||
k kv.Key
|
||||
v []byte
|
||||
s *dbSnapshot
|
||||
valid bool
|
||||
k kv.Key
|
||||
v []byte
|
||||
}
|
||||
|
||||
func newDBIter(s *dbSnapshot, startKey kv.Key) *dbIter {
|
||||
it := &dbIter{
|
||||
s: s,
|
||||
startKey: startKey,
|
||||
valid: true,
|
||||
func newDBIter(s *dbSnapshot, startKey kv.Key) (*dbIter, error) {
|
||||
k, v, err := s.mvccSeek(startKey, false)
|
||||
if err != nil {
|
||||
if terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
err = nil
|
||||
}
|
||||
return &dbIter{valid: false}, errors.Trace(err)
|
||||
}
|
||||
it.Next()
|
||||
return it
|
||||
|
||||
return &dbIter{
|
||||
s: s,
|
||||
valid: true,
|
||||
k: bytes.CloneBytes(k),
|
||||
v: bytes.CloneBytes(v),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (it *dbIter) Next() error {
|
||||
encKey := codec.EncodeBytes(nil, it.startKey)
|
||||
var retErr error
|
||||
var engineIter engine.Iterator
|
||||
for {
|
||||
var err error
|
||||
engineIter, err = it.s.internalSeek(encKey)
|
||||
if err != nil {
|
||||
it.valid = false
|
||||
retErr = err
|
||||
break
|
||||
k, v, err := it.s.mvccSeek(it.k.Next(), false)
|
||||
if err != nil {
|
||||
it.valid = false
|
||||
if !terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
metaKey := engineIter.Key()
|
||||
// Get real key from metaKey
|
||||
key, _, err := MvccDecode(metaKey)
|
||||
if err != nil {
|
||||
// It's not a valid metaKey, maybe overflow (other data).
|
||||
it.valid = false
|
||||
break
|
||||
}
|
||||
// Get kv pair.
|
||||
val, err := it.s.Get(key)
|
||||
if err != nil && !terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
// Get this version error
|
||||
it.valid = false
|
||||
retErr = err
|
||||
break
|
||||
}
|
||||
if val != nil {
|
||||
it.k = bytes.CloneBytes(key)
|
||||
it.v = bytes.CloneBytes(val)
|
||||
it.startKey = key.Next()
|
||||
break
|
||||
}
|
||||
// Current key's all versions are deleted, just go next key.
|
||||
encKey = codec.EncodeBytes(nil, key.Next())
|
||||
}
|
||||
return errors.Trace(retErr)
|
||||
it.k, it.v = bytes.CloneBytes(k), bytes.CloneBytes(v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *dbIter) Valid() bool {
|
||||
|
||||
Reference in New Issue
Block a user