Merge remote-tracking branch 'origin/c4pt0r/mvcc-support' into c4pt0r/mvcc-support
This commit is contained in:
18
kv/kv.go
18
kv/kv.go
@ -61,20 +61,20 @@ type Version struct {
|
||||
}
|
||||
|
||||
var (
|
||||
// MaxVersion is the maximum version, notice that it's not a valid version
|
||||
// MaxVersion is the maximum version, notice that it's not a valid version.
|
||||
MaxVersion = Version{Ver: math.MaxUint64}
|
||||
// MinVersion is the minimum version, it's not a valid version, too.
|
||||
MinVersion = Version{Ver: 0}
|
||||
)
|
||||
|
||||
// NewVersion is a simple wrapper.
|
||||
// NewVersion creates a new Version struct.
|
||||
func NewVersion(v uint64) Version {
|
||||
return Version{
|
||||
Ver: v,
|
||||
}
|
||||
}
|
||||
|
||||
// Cmp returns the comparison result of two version.
|
||||
// Cmp returns the comparison result of two versions.
|
||||
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
|
||||
func (v Version) Cmp(another Version) int {
|
||||
if v.Ver > another.Ver {
|
||||
@ -110,9 +110,9 @@ type Transaction interface {
|
||||
Delete(k Key) error
|
||||
// Commit commites the transaction operations to KV store.
|
||||
Commit() error
|
||||
// CommitVersion returns the verion of this committed transaction. If this
|
||||
// CommittedVersion returns the verion of this committed transaction. If this
|
||||
// transaction has not been committed, returns ErrNotCommitted error.
|
||||
CommitVersion() (Version, error)
|
||||
CommittedVersion() (Version, error)
|
||||
// Rollback undoes the transaction operations to KV store.
|
||||
Rollback() error
|
||||
// String implements Stringer.String() interface.
|
||||
@ -124,11 +124,13 @@ type Transaction interface {
|
||||
// MvccSnapshot is used to get/seek a specific verion in a snaphot.
|
||||
type MvccSnapshot interface {
|
||||
// MvccGet returns the specific version of given key, if the version doesn't
|
||||
// exists, returns the nearest(lower) version's data.
|
||||
// exist, returns the nearest(lower) version's data.
|
||||
MvccGet(k Key, ver Version) ([]byte, error)
|
||||
// MvccIterator seeks to the key in the specific version's snapshot, if the
|
||||
// version doesn't exists, return the nearest(lower) version's snaphot.
|
||||
// version doesn't exist, returns the nearest(lower) version's snaphot.
|
||||
NewMvccIterator(k Key, ver Version) Iterator
|
||||
// Release releases this snapshot.
|
||||
MvccRelease()
|
||||
}
|
||||
|
||||
// Snapshot defines the interface for the snapshot fetched from KV store.
|
||||
@ -153,7 +155,7 @@ type Driver interface {
|
||||
type Storage interface {
|
||||
// Begin transaction
|
||||
Begin() (Transaction, error)
|
||||
// MvccSnapshot get a snaphot that cloud read any version of data.
|
||||
// MvccSnapshot get a snaphot that is able to read any version of data.
|
||||
GetMvccSnapshot() (MvccSnapshot, error)
|
||||
// Close store
|
||||
Close() error
|
||||
|
||||
@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ngaut/log"
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/localstore/goleveldb"
|
||||
@ -150,10 +149,11 @@ func (t *testMvccSuite) TestMvccSnapshotGet(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
err = tx.Commit()
|
||||
c.Assert(err, IsNil)
|
||||
v, err := tx.CommitVersion()
|
||||
v, err := tx.CommittedVersion()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
mvccSnapshot, err := t.s.GetMvccSnapshot()
|
||||
defer mvccSnapshot.MvccRelease()
|
||||
b, err = mvccSnapshot.MvccGet(kv.EncodeKey(encodeInt(1)), kv.MaxVersion)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(string(b), Equals, "new")
|
||||
@ -174,17 +174,17 @@ func (t *testMvccSuite) TestMvccSnapshotScan(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
err = tx.Commit()
|
||||
c.Assert(err, IsNil)
|
||||
v, err := tx.CommitVersion()
|
||||
v, err := tx.CommittedVersion()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
mvccSnapshot, err := t.s.GetMvccSnapshot()
|
||||
defer mvccSnapshot.MvccRelease()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// iter helper function
|
||||
iterFunc := func(it kv.Iterator) bool {
|
||||
found := false
|
||||
for it.Valid() {
|
||||
log.Info(it.Key(), it.Value())
|
||||
if string(it.Value()) == "new" {
|
||||
found = true
|
||||
}
|
||||
|
||||
@ -95,6 +95,10 @@ func (s *dbSnapshot) NewIterator(param interface{}) kv.Iterator {
|
||||
return newDBIter(s, k, kv.MaxVersion)
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) MvccRelease() {
|
||||
s.Release()
|
||||
}
|
||||
|
||||
func (s *dbSnapshot) Release() {
|
||||
if s.Snapshot != nil {
|
||||
s.Snapshot.Release()
|
||||
|
||||
@ -227,7 +227,7 @@ func (txn *dbTxn) Commit() error {
|
||||
return txn.doCommit()
|
||||
}
|
||||
|
||||
func (txn *dbTxn) CommitVersion() (kv.Version, error) {
|
||||
func (txn *dbTxn) CommittedVersion() (kv.Version, error) {
|
||||
// Check if this trx is not committed.
|
||||
if txn.version.Cmp(kv.MinVersion) == 0 {
|
||||
return kv.MinVersion, kv.ErrNotCommitted
|
||||
|
||||
Reference in New Issue
Block a user