From 216c323f3e85838f16edcad7f0b09d26422643b6 Mon Sep 17 00:00:00 2001 From: ngaut Date: Mon, 23 Nov 2015 16:36:21 +0800 Subject: [PATCH 1/2] store: Using RWMutex instead of Mutex and reduce memory allocation --- store/localstore/kv.go | 21 +++++++-------------- store/localstore/txn.go | 4 ++-- util/codec/bytes.go | 16 ++++++++++------ 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index a733f28c20..13b4748e74 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -52,7 +52,7 @@ type storeCache struct { var ( globalID int64 - providerMu sync.Mutex + ProviderMu sync.RWMutex globalVersionProvider kv.VersionProvider mc storeCache @@ -77,14 +77,6 @@ func IsLocalStore(s kv.Storage) bool { return ok } -func lockVersionProvider() { - providerMu.Lock() -} - -func unlockVersionProvider() { - providerMu.Unlock() -} - // Open opens or creates a storage with specific format for a local engine Driver. func (d Driver) Open(schema string) (kv.Storage, error) { mc.mu.Lock() @@ -150,19 +142,19 @@ func (s *dbStore) CurrentVersion() (kv.Version, error) { // Begin transaction func (s *dbStore) Begin() (kv.Transaction, error) { - lockVersionProvider() + ProviderMu.RLock() beginVer, err := globalVersionProvider.CurrentVersion() - unlockVersionProvider() + ProviderMu.RUnlock() if err != nil { return nil, errors.Trace(err) } s.mu.Lock() - defer s.mu.Unlock() - if s.closed { return nil, errors.Trace(ErrDBClosed) } + s.mu.Unlock() + txn := &dbTxn{ tid: beginVer.Ver, valid: true, @@ -223,6 +215,8 @@ func (s *dbStore) newBatch() engine.Batch { // Both lock and unlock are used for simulating scenario of percolator papers. func (s *dbStore) tryConditionLockKey(tid uint64, key string) error { + metaKey := codec.EncodeBytes(nil, []byte(key)) + s.mu.Lock() defer s.mu.Unlock() @@ -234,7 +228,6 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string) error { return errors.Trace(kv.ErrLockConflict) } - metaKey := codec.EncodeBytes(nil, []byte(key)) currValue, err := s.db.Get(metaKey) if terror.ErrorEqual(err, kv.ErrNotExist) { s.keysLocked[key] = tid diff --git a/store/localstore/txn.go b/store/localstore/txn.go index b84fe74ed4..8fa1b40566 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -218,8 +218,8 @@ func (txn *dbTxn) doCommit() error { } // disable version provider temporarily - lockVersionProvider() - defer unlockVersionProvider() + ProviderMu.Lock() + defer ProviderMu.Unlock() curVer, err := globalVersionProvider.CurrentVersion() if err != nil { diff --git a/util/codec/bytes.go b/util/codec/bytes.go index 11c8bdf12e..89495b8b6d 100644 --- a/util/codec/bytes.go +++ b/util/codec/bytes.go @@ -47,9 +47,12 @@ import ( // EncodeBytes guarantees the encoded value is in ascending order for comparison, // The encoded value is >= SmallestNoneNilValue and < InfiniteValue. func EncodeBytes(b []byte, data []byte) []byte { + // Allocate more space to avoid unnecessary slice growing + bs := make([]byte, len(b), len(data)+20) + copy(bs, b) if len(data) > 0 && data[0] == 0xFF { // we must escape 0xFF here to guarantee encoded value < InfiniteValue \xFF\xFF. - b = append(b, 0xFF, 0x00) + bs = append(bs, 0xFF, 0x00) data = data[1:] } @@ -59,12 +62,12 @@ func EncodeBytes(b []byte, data []byte) []byte { if i == -1 { break } - b = append(b, data[:i]...) - b = append(b, 0x00, 0xFF) + bs = append(bs, data[:i]...) + bs = append(bs, 0x00, 0xFF) data = data[i+1:] } - b = append(b, data...) - return append(b, 0x00, 0x01) + bs = append(bs, data...) + return append(bs, 0x00, 0x01) } // DecodeBytes decodes bytes which is encoded by EncodeBytes before, @@ -78,7 +81,8 @@ func decodeBytes(b []byte, escapeFirst byte, escape byte, term byte) ([]byte, [] return nil, nil, errors.Errorf("insufficient bytes to decode value") } - var r []byte + // Allocate more space to avoid unnecessary slice growing + r := make([]byte, 0, len(b)+20) if b[0] == escapeFirst { if b[1] != ^escapeFirst { From 226615adc8131fe3eeca71c43d7358d94802d39a Mon Sep 17 00:00:00 2001 From: ngaut Date: Mon, 23 Nov 2015 16:49:57 +0800 Subject: [PATCH 2/2] *: Fix deadlock and index out of bound --- store/localstore/kv.go | 7 ++++--- store/localstore/txn.go | 4 ++-- util/codec/bytes.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 13b4748e74..6dcb59915a 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -52,7 +52,7 @@ type storeCache struct { var ( globalID int64 - ProviderMu sync.RWMutex + providerMu sync.RWMutex globalVersionProvider kv.VersionProvider mc storeCache @@ -142,15 +142,16 @@ func (s *dbStore) CurrentVersion() (kv.Version, error) { // Begin transaction func (s *dbStore) Begin() (kv.Transaction, error) { - ProviderMu.RLock() + providerMu.RLock() beginVer, err := globalVersionProvider.CurrentVersion() - ProviderMu.RUnlock() + providerMu.RUnlock() if err != nil { return nil, errors.Trace(err) } s.mu.Lock() if s.closed { + s.mu.Unlock() return nil, errors.Trace(ErrDBClosed) } s.mu.Unlock() diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 8fa1b40566..80e39adba2 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -218,8 +218,8 @@ func (txn *dbTxn) doCommit() error { } // disable version provider temporarily - ProviderMu.Lock() - defer ProviderMu.Unlock() + providerMu.Lock() + defer providerMu.Unlock() curVer, err := globalVersionProvider.CurrentVersion() if err != nil { diff --git a/util/codec/bytes.go b/util/codec/bytes.go index 89495b8b6d..8168d82733 100644 --- a/util/codec/bytes.go +++ b/util/codec/bytes.go @@ -48,7 +48,7 @@ import ( // The encoded value is >= SmallestNoneNilValue and < InfiniteValue. func EncodeBytes(b []byte, data []byte) []byte { // Allocate more space to avoid unnecessary slice growing - bs := make([]byte, len(b), len(data)+20) + bs := make([]byte, len(b), len(b)+len(data)+20) copy(bs, b) if len(data) > 0 && data[0] == 0xFF { // we must escape 0xFF here to guarantee encoded value < InfiniteValue \xFF\xFF.