diff --git a/kv/kv.go b/kv/kv.go index 874847f0ee..0e3f52c08b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -15,6 +15,7 @@ package kv import ( "bytes" + "errors" "math" ) @@ -73,12 +74,27 @@ func NewVersion(v uint64) Version { } } +// Cmp returns the comparison result of two version. +// The result will be 0 if a==b, -1 if a < b, and +1 if a > b. +func (v Version) Cmp(another Version) int { + if v.Ver > another.Ver { + return 1 + } else if v.Ver < another.Ver { + return -1 + } + return 0 +} + // DecodeFn is a function that decode data after fetch from store. type DecodeFn func(raw interface{}) (interface{}, error) // EncodeFn is a function that encode data before put into store. type EncodeFn func(raw interface{}) (interface{}, error) +// ErrNotCommitted is the error returned by CommitVersion when the this +// transaction is not committed. +var ErrNotCommitted = errors.New("this transaction is not committed") + // Transaction defines the interface for operations inside a Transaction. // This is not thread safe. type Transaction interface { @@ -93,7 +109,10 @@ type Transaction interface { // Deletes removes the entry for key k from KV store. Delete(k Key) error // Commit commites the transaction operations to KV store. - Commit() (Version, error) + Commit() error + // CommitVersion returns the verion of this committed transaction. If this + // transaction has not been committed, returns ErrNotCommitted error. + CommitVersion() (Version, error) // Rollback undoes the transaction operations to KV store. Rollback() error // String implements Stringer.String() interface. diff --git a/kv/txn.go b/kv/txn.go index e722298bc1..2d33c90870 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -51,7 +51,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e return errors.Trace(err) } - _, err = txn.Commit() + err = txn.Commit() if retryable && IsRetryableError(err) { log.Warnf("Retry txn %v", txn) txn.Rollback() diff --git a/session.go b/session.go index 6d89f22285..4fff174146 100644 --- a/session.go +++ b/session.go @@ -143,7 +143,7 @@ func (s *session) FinishTxn(rollback bool) error { return s.txn.Rollback() } - _, err := s.txn.Commit() + err := s.txn.Commit() if err != nil { log.Warnf("txn:%s, %v", s.txn, err) return errors.Trace(err) @@ -346,7 +346,7 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) { return s.txn, nil } if forceNew { - _, err = s.txn.Commit() + err = s.txn.Commit() variable.GetSessionVars(s).SetStatusFlag(mysql.ServerStatusInTrans, false) if err != nil { return nil, err diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 955ae11525..da734f86e2 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -123,6 +123,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) { tID: beginVer.Ver, valid: true, store: s, + version: kv.MinVersion, snapshotVals: make(map[string][]byte), } log.Debugf("Begin txn:%d", txn.tID) diff --git a/store/localstore/kv_test.go b/store/localstore/kv_test.go index 0ca55421e7..d09c35043d 100644 --- a/store/localstore/kv_test.go +++ b/store/localstore/kv_test.go @@ -151,7 +151,7 @@ func (s *testKVSuite) TestGetSet(c *C) { mustGet(c, txn) // Check transaction results - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) txn, err = s.s.Begin() @@ -170,7 +170,7 @@ func (s *testKVSuite) TestSeek(c *C) { checkSeek(c, txn) // Check transaction results - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) txn, err = s.s.Begin() @@ -191,7 +191,7 @@ func (s *testKVSuite) TestInc(c *C) { c.Assert(n, Equals, int64(100)) // Check transaction results - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) txn, err = s.s.Begin() @@ -211,7 +211,7 @@ func (s *testKVSuite) TestInc(c *C) { err = txn.Delete(key) c.Assert(err, IsNil) - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) } @@ -431,7 +431,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) { if err != nil { return } - _, err = txn.Commit() + err = txn.Commit() if err == nil { atomic.AddInt64(&success, 1) } @@ -445,7 +445,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) { c.Assert(err, IsNil) err = txn.Delete(b) c.Assert(err, IsNil) - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) } @@ -459,7 +459,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) { txn, err := s.s.Begin() c.Assert(err, IsNil) txn.Set(b, b) - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) for i := 0; i < cnt; i++ { @@ -470,7 +470,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) { txn1, err1 := s.s.Begin() c.Assert(err1, IsNil) txn1.Set(b, []byte("newValue")) - _, err1 = txn1.Commit() + err1 = txn1.Commit() if err1 == nil { atomic.AddInt64(&success, 1) } @@ -484,7 +484,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) { c.Assert(err, IsNil) err = txn.Delete(b) c.Assert(err, IsNil) - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) } @@ -493,6 +493,6 @@ func (s *testKVSuite) TestConditionUpdate(c *C) { c.Assert(err, IsNil) txn.Delete([]byte("b")) txn.Inc([]byte("a"), 1) - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) } diff --git a/store/localstore/mvcc_test.go b/store/localstore/mvcc_test.go index cd9d70e3ed..f5bd55f84c 100644 --- a/store/localstore/mvcc_test.go +++ b/store/localstore/mvcc_test.go @@ -148,7 +148,9 @@ func (t *testMvccSuite) TestMvccSnapshotGet(c *C) { tx, _ = t.s.Begin() err = tx.Set(encodeInt(1), []byte("new")) c.Assert(err, IsNil) - v, err := tx.Commit() + err = tx.Commit() + c.Assert(err, IsNil) + v, err := tx.CommitVersion() c.Assert(err, IsNil) mvccSnapshot, err := t.s.GetMvccSnapshot() @@ -170,7 +172,9 @@ func (t *testMvccSuite) TestMvccSnapshotScan(c *C) { tx, _ := t.s.Begin() err := tx.Set(encodeInt(1), []byte("new")) c.Assert(err, IsNil) - v, err := tx.Commit() + err = tx.Commit() + c.Assert(err, IsNil) + v, err := tx.CommitVersion() c.Assert(err, IsNil) mvccSnapshot, err := t.s.GetMvccSnapshot() diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 5a65c99b23..59862ed7bb 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -41,6 +41,7 @@ type dbTxn struct { startTs time.Time tID uint64 valid bool + version kv.Version // commit version snapshotVals map[string][]byte // origin version in snapshot } @@ -172,7 +173,7 @@ func (txn *dbTxn) each(f func(iterator.Iterator) error) error { return nil } -func (txn *dbTxn) doCommit() (kv.Version, error) { +func (txn *dbTxn) doCommit() error { b := txn.store.newBatch() keysLocked := make([]string, 0, len(txn.snapshotVals)) defer func() { @@ -184,7 +185,7 @@ func (txn *dbTxn) doCommit() (kv.Version, error) { for k, v := range txn.snapshotVals { err := txn.store.tryConditionLockKey(txn.tID, k, v) if err != nil { - return kv.MinVersion, errors.Trace(err) + return errors.Trace(err) } keysLocked = append(keysLocked, k) } @@ -192,7 +193,7 @@ func (txn *dbTxn) doCommit() (kv.Version, error) { // Check dirty store curVer, err := globalVersionProvider.CurrentVersion() if err != nil { - return kv.MinVersion, errors.Trace(err) + return errors.Trace(err) } err = txn.each(func(iter iterator.Iterator) error { metaKey := codec.EncodeBytes(nil, iter.Key()) @@ -207,14 +208,16 @@ func (txn *dbTxn) doCommit() (kv.Version, error) { return nil }) if err != nil { - return kv.MinVersion, errors.Trace(err) + return errors.Trace(err) } - return curVer, txn.store.writeBatch(b) + // Update commit version. + txn.version = curVer + return txn.store.writeBatch(b) } -func (txn *dbTxn) Commit() (kv.Version, error) { +func (txn *dbTxn) Commit() error { if !txn.valid { - return kv.MinVersion, ErrInvalidTxn + return ErrInvalidTxn } log.Infof("commit txn %d", txn.tID) defer func() { @@ -224,6 +227,14 @@ func (txn *dbTxn) Commit() (kv.Version, error) { return txn.doCommit() } +func (txn *dbTxn) CommitVersion() (kv.Version, error) { + // Check if this trx is not committed. + if txn.version.Cmp(kv.MinVersion) == 0 { + return kv.MinVersion, kv.ErrNotCommitted + } + return txn.version, nil +} + func (txn *dbTxn) close() error { txn.UnionStore.Close() txn.snapshotVals = nil diff --git a/util/prefix_helper_test.go b/util/prefix_helper_test.go index 43c3d08af9..d201cb6c97 100644 --- a/util/prefix_helper_test.go +++ b/util/prefix_helper_test.go @@ -115,8 +115,7 @@ func (c *MockContext) FinishTxn(rollback bool) error { return nil } - _, err := c.txn.Commit() - return err + return c.txn.Commit() } func (s *testPrefixSuite) TestPrefix(c *C) { @@ -134,7 +133,7 @@ func (s *testPrefixSuite) TestPrefix(c *C) { c.Assert(err, IsNil) err = ScanMetaWithPrefix(txn, str, func([]byte, []byte) bool { return true }) c.Assert(err, IsNil) - _, err = txn.Commit() + err = txn.Commit() c.Assert(err, IsNil) }