Merge remote-tracking branch 'origin/ngaut/turning' into x
This commit is contained in:
@ -52,7 +52,7 @@ type storeCache struct {
|
||||
var (
|
||||
globalID int64
|
||||
|
||||
providerMu sync.Mutex
|
||||
providerMu sync.RWMutex
|
||||
globalVersionProvider kv.VersionProvider
|
||||
mc storeCache
|
||||
|
||||
@ -77,14 +77,6 @@ func IsLocalStore(s kv.Storage) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func lockVersionProvider() {
|
||||
providerMu.Lock()
|
||||
}
|
||||
|
||||
func unlockVersionProvider() {
|
||||
providerMu.Unlock()
|
||||
}
|
||||
|
||||
// Open opens or creates a storage with specific format for a local engine Driver.
|
||||
func (d Driver) Open(schema string) (kv.Storage, error) {
|
||||
mc.mu.Lock()
|
||||
@ -150,19 +142,20 @@ func (s *dbStore) CurrentVersion() (kv.Version, error) {
|
||||
|
||||
// Begin transaction
|
||||
func (s *dbStore) Begin() (kv.Transaction, error) {
|
||||
lockVersionProvider()
|
||||
providerMu.RLock()
|
||||
beginVer, err := globalVersionProvider.CurrentVersion()
|
||||
unlockVersionProvider()
|
||||
providerMu.RUnlock()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
return nil, errors.Trace(ErrDBClosed)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
txn := &dbTxn{
|
||||
tid: beginVer.Ver,
|
||||
valid: true,
|
||||
@ -223,6 +216,8 @@ func (s *dbStore) newBatch() engine.Batch {
|
||||
|
||||
// Both lock and unlock are used for simulating scenario of percolator papers.
|
||||
func (s *dbStore) tryConditionLockKey(tid uint64, key string) error {
|
||||
metaKey := codec.EncodeBytes(nil, []byte(key))
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -234,7 +229,6 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string) error {
|
||||
return errors.Trace(kv.ErrLockConflict)
|
||||
}
|
||||
|
||||
metaKey := codec.EncodeBytes(nil, []byte(key))
|
||||
currValue, err := s.db.Get(metaKey)
|
||||
if terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
s.keysLocked[key] = tid
|
||||
|
||||
@ -218,8 +218,8 @@ func (txn *dbTxn) doCommit() error {
|
||||
}
|
||||
|
||||
// disable version provider temporarily
|
||||
lockVersionProvider()
|
||||
defer unlockVersionProvider()
|
||||
providerMu.Lock()
|
||||
defer providerMu.Unlock()
|
||||
|
||||
curVer, err := globalVersionProvider.CurrentVersion()
|
||||
if err != nil {
|
||||
|
||||
@ -47,9 +47,12 @@ import (
|
||||
// EncodeBytes guarantees the encoded value is in ascending order for comparison,
|
||||
// The encoded value is >= SmallestNoneNilValue and < InfiniteValue.
|
||||
func EncodeBytes(b []byte, data []byte) []byte {
|
||||
// Allocate more space to avoid unnecessary slice growing
|
||||
bs := make([]byte, len(b), len(b)+len(data)+20)
|
||||
copy(bs, b)
|
||||
if len(data) > 0 && data[0] == 0xFF {
|
||||
// we must escape 0xFF here to guarantee encoded value < InfiniteValue \xFF\xFF.
|
||||
b = append(b, 0xFF, 0x00)
|
||||
bs = append(bs, 0xFF, 0x00)
|
||||
data = data[1:]
|
||||
}
|
||||
|
||||
@ -59,12 +62,12 @@ func EncodeBytes(b []byte, data []byte) []byte {
|
||||
if i == -1 {
|
||||
break
|
||||
}
|
||||
b = append(b, data[:i]...)
|
||||
b = append(b, 0x00, 0xFF)
|
||||
bs = append(bs, data[:i]...)
|
||||
bs = append(bs, 0x00, 0xFF)
|
||||
data = data[i+1:]
|
||||
}
|
||||
b = append(b, data...)
|
||||
return append(b, 0x00, 0x01)
|
||||
bs = append(bs, data...)
|
||||
return append(bs, 0x00, 0x01)
|
||||
}
|
||||
|
||||
// DecodeBytes decodes bytes which is encoded by EncodeBytes before,
|
||||
@ -78,7 +81,8 @@ func decodeBytes(b []byte, escapeFirst byte, escape byte, term byte) ([]byte, []
|
||||
return nil, nil, errors.Errorf("insufficient bytes to decode value")
|
||||
}
|
||||
|
||||
var r []byte
|
||||
// Allocate more space to avoid unnecessary slice growing
|
||||
r := make([]byte, 0, len(b)+20)
|
||||
|
||||
if b[0] == escapeFirst {
|
||||
if b[1] != ^escapeFirst {
|
||||
|
||||
Reference in New Issue
Block a user