kv : remove commitId from Commit() function

add another fuction to do this.
This commit is contained in:
dongxu
2015-09-22 17:25:18 +08:00
parent 2d35df36ea
commit 3bcbe1baef
8 changed files with 60 additions and 26 deletions

View File

@ -15,6 +15,7 @@ package kv
import (
"bytes"
"errors"
"math"
)
@ -73,12 +74,27 @@ func NewVersion(v uint64) Version {
}
}
// Cmp returns the comparison result of two version.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (v Version) Cmp(another Version) int {
if v.Ver > another.Ver {
return 1
} else if v.Ver < another.Ver {
return -1
}
return 0
}
// DecodeFn is a function that decode data after fetch from store.
type DecodeFn func(raw interface{}) (interface{}, error)
// EncodeFn is a function that encode data before put into store.
type EncodeFn func(raw interface{}) (interface{}, error)
// ErrNotCommitted is the error returned by CommitVersion when the this
// transaction is not committed.
var ErrNotCommitted = errors.New("this transaction is not committed")
// Transaction defines the interface for operations inside a Transaction.
// This is not thread safe.
type Transaction interface {
@ -93,7 +109,10 @@ type Transaction interface {
// Deletes removes the entry for key k from KV store.
Delete(k Key) error
// Commit commites the transaction operations to KV store.
Commit() (Version, error)
Commit() error
// CommitVersion returns the verion of this committed transaction. If this
// transaction has not been committed, returns ErrNotCommitted error.
CommitVersion() (Version, error)
// Rollback undoes the transaction operations to KV store.
Rollback() error
// String implements Stringer.String() interface.

View File

@ -51,7 +51,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e
return errors.Trace(err)
}
_, err = txn.Commit()
err = txn.Commit()
if retryable && IsRetryableError(err) {
log.Warnf("Retry txn %v", txn)
txn.Rollback()

View File

@ -143,7 +143,7 @@ func (s *session) FinishTxn(rollback bool) error {
return s.txn.Rollback()
}
_, err := s.txn.Commit()
err := s.txn.Commit()
if err != nil {
log.Warnf("txn:%s, %v", s.txn, err)
return errors.Trace(err)
@ -346,7 +346,7 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) {
return s.txn, nil
}
if forceNew {
_, err = s.txn.Commit()
err = s.txn.Commit()
variable.GetSessionVars(s).SetStatusFlag(mysql.ServerStatusInTrans, false)
if err != nil {
return nil, err

View File

@ -123,6 +123,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
tID: beginVer.Ver,
valid: true,
store: s,
version: kv.MinVersion,
snapshotVals: make(map[string][]byte),
}
log.Debugf("Begin txn:%d", txn.tID)

View File

@ -151,7 +151,7 @@ func (s *testKVSuite) TestGetSet(c *C) {
mustGet(c, txn)
// Check transaction results
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
txn, err = s.s.Begin()
@ -170,7 +170,7 @@ func (s *testKVSuite) TestSeek(c *C) {
checkSeek(c, txn)
// Check transaction results
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
txn, err = s.s.Begin()
@ -191,7 +191,7 @@ func (s *testKVSuite) TestInc(c *C) {
c.Assert(n, Equals, int64(100))
// Check transaction results
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
txn, err = s.s.Begin()
@ -211,7 +211,7 @@ func (s *testKVSuite) TestInc(c *C) {
err = txn.Delete(key)
c.Assert(err, IsNil)
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}
@ -431,7 +431,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) {
if err != nil {
return
}
_, err = txn.Commit()
err = txn.Commit()
if err == nil {
atomic.AddInt64(&success, 1)
}
@ -445,7 +445,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) {
c.Assert(err, IsNil)
err = txn.Delete(b)
c.Assert(err, IsNil)
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}
@ -459,7 +459,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) {
txn, err := s.s.Begin()
c.Assert(err, IsNil)
txn.Set(b, b)
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
for i := 0; i < cnt; i++ {
@ -470,7 +470,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) {
txn1, err1 := s.s.Begin()
c.Assert(err1, IsNil)
txn1.Set(b, []byte("newValue"))
_, err1 = txn1.Commit()
err1 = txn1.Commit()
if err1 == nil {
atomic.AddInt64(&success, 1)
}
@ -484,7 +484,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) {
c.Assert(err, IsNil)
err = txn.Delete(b)
c.Assert(err, IsNil)
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}
@ -493,6 +493,6 @@ func (s *testKVSuite) TestConditionUpdate(c *C) {
c.Assert(err, IsNil)
txn.Delete([]byte("b"))
txn.Inc([]byte("a"), 1)
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}

View File

@ -148,7 +148,9 @@ func (t *testMvccSuite) TestMvccSnapshotGet(c *C) {
tx, _ = t.s.Begin()
err = tx.Set(encodeInt(1), []byte("new"))
c.Assert(err, IsNil)
v, err := tx.Commit()
err = tx.Commit()
c.Assert(err, IsNil)
v, err := tx.CommitVersion()
c.Assert(err, IsNil)
mvccSnapshot, err := t.s.GetMvccSnapshot()
@ -170,7 +172,9 @@ func (t *testMvccSuite) TestMvccSnapshotScan(c *C) {
tx, _ := t.s.Begin()
err := tx.Set(encodeInt(1), []byte("new"))
c.Assert(err, IsNil)
v, err := tx.Commit()
err = tx.Commit()
c.Assert(err, IsNil)
v, err := tx.CommitVersion()
c.Assert(err, IsNil)
mvccSnapshot, err := t.s.GetMvccSnapshot()

View File

@ -41,6 +41,7 @@ type dbTxn struct {
startTs time.Time
tID uint64
valid bool
version kv.Version // commit version
snapshotVals map[string][]byte // origin version in snapshot
}
@ -172,7 +173,7 @@ func (txn *dbTxn) each(f func(iterator.Iterator) error) error {
return nil
}
func (txn *dbTxn) doCommit() (kv.Version, error) {
func (txn *dbTxn) doCommit() error {
b := txn.store.newBatch()
keysLocked := make([]string, 0, len(txn.snapshotVals))
defer func() {
@ -184,7 +185,7 @@ func (txn *dbTxn) doCommit() (kv.Version, error) {
for k, v := range txn.snapshotVals {
err := txn.store.tryConditionLockKey(txn.tID, k, v)
if err != nil {
return kv.MinVersion, errors.Trace(err)
return errors.Trace(err)
}
keysLocked = append(keysLocked, k)
}
@ -192,7 +193,7 @@ func (txn *dbTxn) doCommit() (kv.Version, error) {
// Check dirty store
curVer, err := globalVersionProvider.CurrentVersion()
if err != nil {
return kv.MinVersion, errors.Trace(err)
return errors.Trace(err)
}
err = txn.each(func(iter iterator.Iterator) error {
metaKey := codec.EncodeBytes(nil, iter.Key())
@ -207,14 +208,16 @@ func (txn *dbTxn) doCommit() (kv.Version, error) {
return nil
})
if err != nil {
return kv.MinVersion, errors.Trace(err)
return errors.Trace(err)
}
return curVer, txn.store.writeBatch(b)
// Update commit version.
txn.version = curVer
return txn.store.writeBatch(b)
}
func (txn *dbTxn) Commit() (kv.Version, error) {
func (txn *dbTxn) Commit() error {
if !txn.valid {
return kv.MinVersion, ErrInvalidTxn
return ErrInvalidTxn
}
log.Infof("commit txn %d", txn.tID)
defer func() {
@ -224,6 +227,14 @@ func (txn *dbTxn) Commit() (kv.Version, error) {
return txn.doCommit()
}
func (txn *dbTxn) CommitVersion() (kv.Version, error) {
// Check if this trx is not committed.
if txn.version.Cmp(kv.MinVersion) == 0 {
return kv.MinVersion, kv.ErrNotCommitted
}
return txn.version, nil
}
func (txn *dbTxn) close() error {
txn.UnionStore.Close()
txn.snapshotVals = nil

View File

@ -115,8 +115,7 @@ func (c *MockContext) FinishTxn(rollback bool) error {
return nil
}
_, err := c.txn.Commit()
return err
return c.txn.Commit()
}
func (s *testPrefixSuite) TestPrefix(c *C) {
@ -134,7 +133,7 @@ func (s *testPrefixSuite) TestPrefix(c *C) {
c.Assert(err, IsNil)
err = ScanMetaWithPrefix(txn, str, func([]byte, []byte) bool { return true })
c.Assert(err, IsNil)
_, err = txn.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}