From 216c323f3e85838f16edcad7f0b09d26422643b6 Mon Sep 17 00:00:00 2001 From: ngaut Date: Mon, 23 Nov 2015 16:36:21 +0800 Subject: [PATCH 01/11] store: Using RWMutex instead of Mutex and reduce memory allocation --- store/localstore/kv.go | 21 +++++++-------------- store/localstore/txn.go | 4 ++-- util/codec/bytes.go | 16 ++++++++++------ 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index a733f28c20..13b4748e74 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -52,7 +52,7 @@ type storeCache struct { var ( globalID int64 - providerMu sync.Mutex + ProviderMu sync.RWMutex globalVersionProvider kv.VersionProvider mc storeCache @@ -77,14 +77,6 @@ func IsLocalStore(s kv.Storage) bool { return ok } -func lockVersionProvider() { - providerMu.Lock() -} - -func unlockVersionProvider() { - providerMu.Unlock() -} - // Open opens or creates a storage with specific format for a local engine Driver. func (d Driver) Open(schema string) (kv.Storage, error) { mc.mu.Lock() @@ -150,19 +142,19 @@ func (s *dbStore) CurrentVersion() (kv.Version, error) { // Begin transaction func (s *dbStore) Begin() (kv.Transaction, error) { - lockVersionProvider() + ProviderMu.RLock() beginVer, err := globalVersionProvider.CurrentVersion() - unlockVersionProvider() + ProviderMu.RUnlock() if err != nil { return nil, errors.Trace(err) } s.mu.Lock() - defer s.mu.Unlock() - if s.closed { return nil, errors.Trace(ErrDBClosed) } + s.mu.Unlock() + txn := &dbTxn{ tid: beginVer.Ver, valid: true, @@ -223,6 +215,8 @@ func (s *dbStore) newBatch() engine.Batch { // Both lock and unlock are used for simulating scenario of percolator papers. func (s *dbStore) tryConditionLockKey(tid uint64, key string) error { + metaKey := codec.EncodeBytes(nil, []byte(key)) + s.mu.Lock() defer s.mu.Unlock() @@ -234,7 +228,6 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string) error { return errors.Trace(kv.ErrLockConflict) } - metaKey := codec.EncodeBytes(nil, []byte(key)) currValue, err := s.db.Get(metaKey) if terror.ErrorEqual(err, kv.ErrNotExist) { s.keysLocked[key] = tid diff --git a/store/localstore/txn.go b/store/localstore/txn.go index b84fe74ed4..8fa1b40566 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -218,8 +218,8 @@ func (txn *dbTxn) doCommit() error { } // disable version provider temporarily - lockVersionProvider() - defer unlockVersionProvider() + ProviderMu.Lock() + defer ProviderMu.Unlock() curVer, err := globalVersionProvider.CurrentVersion() if err != nil { diff --git a/util/codec/bytes.go b/util/codec/bytes.go index 11c8bdf12e..89495b8b6d 100644 --- a/util/codec/bytes.go +++ b/util/codec/bytes.go @@ -47,9 +47,12 @@ import ( // EncodeBytes guarantees the encoded value is in ascending order for comparison, // The encoded value is >= SmallestNoneNilValue and < InfiniteValue. func EncodeBytes(b []byte, data []byte) []byte { + // Allocate more space to avoid unnecessary slice growing + bs := make([]byte, len(b), len(data)+20) + copy(bs, b) if len(data) > 0 && data[0] == 0xFF { // we must escape 0xFF here to guarantee encoded value < InfiniteValue \xFF\xFF. - b = append(b, 0xFF, 0x00) + bs = append(bs, 0xFF, 0x00) data = data[1:] } @@ -59,12 +62,12 @@ func EncodeBytes(b []byte, data []byte) []byte { if i == -1 { break } - b = append(b, data[:i]...) - b = append(b, 0x00, 0xFF) + bs = append(bs, data[:i]...) + bs = append(bs, 0x00, 0xFF) data = data[i+1:] } - b = append(b, data...) - return append(b, 0x00, 0x01) + bs = append(bs, data...) + return append(bs, 0x00, 0x01) } // DecodeBytes decodes bytes which is encoded by EncodeBytes before, @@ -78,7 +81,8 @@ func decodeBytes(b []byte, escapeFirst byte, escape byte, term byte) ([]byte, [] return nil, nil, errors.Errorf("insufficient bytes to decode value") } - var r []byte + // Allocate more space to avoid unnecessary slice growing + r := make([]byte, 0, len(b)+20) if b[0] == escapeFirst { if b[1] != ^escapeFirst { From 226615adc8131fe3eeca71c43d7358d94802d39a Mon Sep 17 00:00:00 2001 From: ngaut Date: Mon, 23 Nov 2015 16:49:57 +0800 Subject: [PATCH 02/11] *: Fix deadlock and index out of bound --- store/localstore/kv.go | 7 ++++--- store/localstore/txn.go | 4 ++-- util/codec/bytes.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 13b4748e74..6dcb59915a 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -52,7 +52,7 @@ type storeCache struct { var ( globalID int64 - ProviderMu sync.RWMutex + providerMu sync.RWMutex globalVersionProvider kv.VersionProvider mc storeCache @@ -142,15 +142,16 @@ func (s *dbStore) CurrentVersion() (kv.Version, error) { // Begin transaction func (s *dbStore) Begin() (kv.Transaction, error) { - ProviderMu.RLock() + providerMu.RLock() beginVer, err := globalVersionProvider.CurrentVersion() - ProviderMu.RUnlock() + providerMu.RUnlock() if err != nil { return nil, errors.Trace(err) } s.mu.Lock() if s.closed { + s.mu.Unlock() return nil, errors.Trace(ErrDBClosed) } s.mu.Unlock() diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 8fa1b40566..80e39adba2 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -218,8 +218,8 @@ func (txn *dbTxn) doCommit() error { } // disable version provider temporarily - ProviderMu.Lock() - defer ProviderMu.Unlock() + providerMu.Lock() + defer providerMu.Unlock() curVer, err := globalVersionProvider.CurrentVersion() if err != nil { diff --git a/util/codec/bytes.go b/util/codec/bytes.go index 89495b8b6d..8168d82733 100644 --- a/util/codec/bytes.go +++ b/util/codec/bytes.go @@ -48,7 +48,7 @@ import ( // The encoded value is >= SmallestNoneNilValue and < InfiniteValue. func EncodeBytes(b []byte, data []byte) []byte { // Allocate more space to avoid unnecessary slice growing - bs := make([]byte, len(b), len(data)+20) + bs := make([]byte, len(b), len(b)+len(data)+20) copy(bs, b) if len(data) > 0 && data[0] == 0xFF { // we must escape 0xFF here to guarantee encoded value < InfiniteValue \xFF\xFF. From 0a55e4f5013168bdcb2688ad718396fb5477b0a0 Mon Sep 17 00:00:00 2001 From: ngaut Date: Mon, 23 Nov 2015 20:24:33 +0800 Subject: [PATCH 03/11] kv: Introduce lazy buffer --- kv/union_store.go | 46 +++++++++++++++++++++++++++++++---- store/localstore/compactor.go | 2 +- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/kv/union_store.go b/kv/union_store.go index 635dd73903..5b81b3e5de 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -46,15 +46,53 @@ type UnionStore struct { // NewUnionStore builds a new UnionStore. func NewUnionStore(snapshot Snapshot, opts Options) UnionStore { - wbuffer := p.Get().(MemBuffer) - lazy := p.Get().(MemBuffer) + lazy := &lazyMemBuffer{} return UnionStore{ - WBuffer: wbuffer, + WBuffer: &lazyMemBuffer{}, Snapshot: NewCacheSnapshot(snapshot, lazy, opts), lazyConditionPairs: lazy, } } +type lazyMemBuffer struct { + mb MemBuffer +} + +func (lw *lazyMemBuffer) Get(k Key) ([]byte, error) { + if lw.mb == nil { + return nil, ErrNotExist + } + + return lw.mb.Get(k) +} + +func (lw *lazyMemBuffer) Set(key []byte, value []byte) error { + if lw.mb == nil { + lw.mb = p.Get().(MemBuffer) + } + + return lw.mb.Set(key, value) +} + +func (lw *lazyMemBuffer) NewIterator(param interface{}) Iterator { + if lw.mb == nil { + lw.mb = p.Get().(MemBuffer) + } + + return lw.mb.NewIterator(param) +} + +func (lw *lazyMemBuffer) Release() { + if lw.mb == nil { + return + } + + lw.mb.Release() + + p.Put(lw.mb) + lw.mb = nil +} + // Get implements the Store Get interface. func (us *UnionStore) Get(key []byte) (value []byte, err error) { // Get from update records frist @@ -142,8 +180,6 @@ func (us *UnionStore) CheckLazyConditionPairs() error { func (us *UnionStore) Close() error { us.Snapshot.Release() us.WBuffer.Release() - p.Put(us.WBuffer) us.lazyConditionPairs.Release() - p.Put(us.lazyConditionPairs) return nil } diff --git a/store/localstore/compactor.go b/store/localstore/compactor.go index 3fdea00dfb..67143c6e5c 100644 --- a/store/localstore/compactor.go +++ b/store/localstore/compactor.go @@ -137,6 +137,7 @@ func (gc *localstoreCompactor) checkExpiredKeysWorker() { func (gc *localstoreCompactor) filterExpiredKeys(keys []kv.EncodedKey) []kv.EncodedKey { var ret []kv.EncodedKey first := true + currentTS := time.Now().UnixNano() / int64(time.Millisecond) // keys are always in descending order. for _, k := range keys { _, ver, err := MvccDecode(k) @@ -145,7 +146,6 @@ func (gc *localstoreCompactor) filterExpiredKeys(keys []kv.EncodedKey) []kv.Enco panic(err) } ts := localVersionToTimestamp(ver) - currentTS := time.Now().UnixNano() / int64(time.Millisecond) // Check timeout keys. if currentTS-int64(ts) >= int64(gc.policy.SafePoint) { // Skip first version. From 68b6ef9a86c096dd29e3c8de91b827e5f0281d64 Mon Sep 17 00:00:00 2001 From: ngaut Date: Mon, 23 Nov 2015 21:06:54 +0800 Subject: [PATCH 04/11] store: Less allocation --- store/localstore/boltdb/boltdb.go | 3 +-- store/localstore/snapshot.go | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/store/localstore/boltdb/boltdb.go b/store/localstore/boltdb/boltdb.go index 85b5a5f9bc..c45945d67a 100644 --- a/store/localstore/boltdb/boltdb.go +++ b/store/localstore/boltdb/boltdb.go @@ -28,7 +28,7 @@ var ( ) var ( - bucketName = []byte("tidb_bucket") + bucketName = []byte("tidb") ) type db struct { @@ -46,7 +46,6 @@ func (d *db) Get(key []byte) ([]byte, error) { } value = append([]byte(nil), v...) - return nil }) diff --git a/store/localstore/snapshot.go b/store/localstore/snapshot.go index cb70c314c5..3c16b38be0 100644 --- a/store/localstore/snapshot.go +++ b/store/localstore/snapshot.go @@ -37,6 +37,8 @@ type dbSnapshot struct { version kv.Version // transaction begin version } +var minKey = []byte{0} + func (s *dbSnapshot) internalSeek(startKey []byte) (engine.Iterator, error) { s.store.snapLock.RLock() defer s.store.snapLock.RUnlock() @@ -47,7 +49,7 @@ func (s *dbSnapshot) internalSeek(startKey []byte) (engine.Iterator, error) { if s.rawIt == nil { var err error - s.rawIt, err = s.db.Seek([]byte{0}) + s.rawIt, err = s.db.Seek(minKey) if err != nil { return nil, errors.Trace(err) } From 8aca043d4304c7e107f8ceb643bd85e20e45d7e8 Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 24 Nov 2015 10:56:01 +0800 Subject: [PATCH 05/11] Rename lw to lmb --- kv/union_store.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/kv/union_store.go b/kv/union_store.go index 5b81b3e5de..8fb25da9de 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -58,39 +58,39 @@ type lazyMemBuffer struct { mb MemBuffer } -func (lw *lazyMemBuffer) Get(k Key) ([]byte, error) { - if lw.mb == nil { +func (lmb *lazyMemBuffer) Get(k Key) ([]byte, error) { + if lmb.mb == nil { return nil, ErrNotExist } - return lw.mb.Get(k) + return lmb.mb.Get(k) } -func (lw *lazyMemBuffer) Set(key []byte, value []byte) error { - if lw.mb == nil { - lw.mb = p.Get().(MemBuffer) +func (lmb *lazyMemBuffer) Set(key []byte, value []byte) error { + if lmb.mb == nil { + lmb.mb = p.Get().(MemBuffer) } - return lw.mb.Set(key, value) + return lmb.mb.Set(key, value) } -func (lw *lazyMemBuffer) NewIterator(param interface{}) Iterator { - if lw.mb == nil { - lw.mb = p.Get().(MemBuffer) +func (lmb *lazyMemBuffer) NewIterator(param interface{}) Iterator { + if lmb.mb == nil { + lmb.mb = p.Get().(MemBuffer) } - return lw.mb.NewIterator(param) + return lmb.mb.NewIterator(param) } -func (lw *lazyMemBuffer) Release() { - if lw.mb == nil { +func (lmb *lazyMemBuffer) Release() { + if lmb.mb == nil { return } - lw.mb.Release() + lmb.mb.Release() - p.Put(lw.mb) - lw.mb = nil + p.Put(lmb.mb) + lmb.mb = nil } // Get implements the Store Get interface. From e8c8b1fdcb8e542b4f990b6578a5a9ba4d716ff5 Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 24 Nov 2015 11:58:40 +0800 Subject: [PATCH 06/11] codec: update bytes alloc. --- util/codec/bytes.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/util/codec/bytes.go b/util/codec/bytes.go index 8168d82733..dfaebea586 100644 --- a/util/codec/bytes.go +++ b/util/codec/bytes.go @@ -48,8 +48,7 @@ import ( // The encoded value is >= SmallestNoneNilValue and < InfiniteValue. func EncodeBytes(b []byte, data []byte) []byte { // Allocate more space to avoid unnecessary slice growing - bs := make([]byte, len(b), len(b)+len(data)+20) - copy(bs, b) + bs := reallocBytes(b, len(data)+20) if len(data) > 0 && data[0] == 0xFF { // we must escape 0xFF here to guarantee encoded value < InfiniteValue \xFF\xFF. bs = append(bs, 0xFF, 0x00) @@ -81,8 +80,7 @@ func decodeBytes(b []byte, escapeFirst byte, escape byte, term byte) ([]byte, [] return nil, nil, errors.Errorf("insufficient bytes to decode value") } - // Allocate more space to avoid unnecessary slice growing - r := make([]byte, 0, len(b)+20) + var r []byte if b[0] == escapeFirst { if b[1] != ^escapeFirst { @@ -116,6 +114,10 @@ func decodeBytes(b []byte, escapeFirst byte, escape byte, term byte) ([]byte, [] return nil, nil, errors.Errorf("invalid escape byte, must 0x%x, but got 0x%0x", ^escape, b[i+1]) } + // here mean we may have \x00 in origin slice, so realloc a large buffer + // to avoid relloaction again, the final decoded slice length is < len(b) certainly. + // TODO: we can record the escape offset and then do the alloc + copy in the end. + r = reallocBytes(r, len(b)) r = append(r, b[:i]...) r = append(r, escape) b = b[i+2:] @@ -181,3 +183,15 @@ func reverseBytes(b []byte) { safeReverseBytes(b) } + +// like realloc. +func reallocBytes(b []byte, n int) []byte { + if cap(b) < n { + bs := make([]byte, len(b), len(b)+n) + copy(bs, b) + return bs + } + + // slice b has capability to store n bytes + return b +} From 8ba428e2fe6687ccf93f69fe73339e1c1f402f5e Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 24 Nov 2015 12:01:12 +0800 Subject: [PATCH 07/11] codec: fix typo --- util/codec/bytes.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/util/codec/bytes.go b/util/codec/bytes.go index dfaebea586..50773e8bef 100644 --- a/util/codec/bytes.go +++ b/util/codec/bytes.go @@ -114,8 +114,8 @@ func decodeBytes(b []byte, escapeFirst byte, escape byte, term byte) ([]byte, [] return nil, nil, errors.Errorf("invalid escape byte, must 0x%x, but got 0x%0x", ^escape, b[i+1]) } - // here mean we may have \x00 in origin slice, so realloc a large buffer - // to avoid relloaction again, the final decoded slice length is < len(b) certainly. + // here mean we have \x00 in origin slice, so realloc a large buffer + // to avoid reallocation again, the final decoded slice length is < len(b) certainly. // TODO: we can record the escape offset and then do the alloc + copy in the end. r = reallocBytes(r, len(b)) r = append(r, b[:i]...) From 63b6650e7ffb470c1d89c7462f91b5343266652f Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 24 Nov 2015 13:43:44 +0800 Subject: [PATCH 08/11] Clean up --- store/localstore/boltdb/boltdb.go | 4 ++++ store/localstore/engine/engine.go | 2 ++ store/localstore/kv.go | 34 ++++++++++++++---------------- store/localstore/snapshot.go | 35 ++++++++++++++++++++++--------- 4 files changed, 47 insertions(+), 28 deletions(-) diff --git a/store/localstore/boltdb/boltdb.go b/store/localstore/boltdb/boltdb.go index c45945d67a..6a68a31610 100644 --- a/store/localstore/boltdb/boltdb.go +++ b/store/localstore/boltdb/boltdb.go @@ -182,6 +182,10 @@ func (b *batch) Delete(key []byte) { b.writes = append(b.writes, w) } +func (b *batch) Len() int { + return len(b.writes) +} + // Driver implements engine Driver. type Driver struct { } diff --git a/store/localstore/engine/engine.go b/store/localstore/engine/engine.go index d9ed22104b..86b858b5b4 100644 --- a/store/localstore/engine/engine.go +++ b/store/localstore/engine/engine.go @@ -61,4 +61,6 @@ type Batch interface { Put(key []byte, value []byte) // Delete appends 'delete operation' of the key/value to the batch. Delete(key []byte) + // Len return length of the batch + Len() int } diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 6dcb59915a..e47ec5e517 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -30,9 +30,7 @@ var ( ) type dbStore struct { - mu sync.Mutex - snapLock sync.RWMutex - + mu sync.RWMutex db engine.DB txns map[uint64]*dbTxn @@ -113,14 +111,17 @@ func (s *dbStore) UUID() string { } func (s *dbStore) GetSnapshot(ver kv.Version) (kv.MvccSnapshot, error) { - s.snapLock.RLock() - defer s.snapLock.RUnlock() + s.mu.RLock() + closed := s.closed + s.mu.RUnlock() - if s.closed { + if closed { return nil, errors.Trace(ErrDBClosed) } + providerMu.RLock() currentVer, err := globalVersionProvider.CurrentVersion() + providerMu.RUnlock() if err != nil { return nil, errors.Trace(err) } @@ -149,12 +150,12 @@ func (s *dbStore) Begin() (kv.Transaction, error) { return nil, errors.Trace(err) } - s.mu.Lock() - if s.closed { - s.mu.Unlock() + s.mu.RLock() + closed := s.closed + s.mu.RUnlock() + if closed { return nil, errors.Trace(ErrDBClosed) } - s.mu.Unlock() txn := &dbTxn{ tid: beginVer.Ver, @@ -165,11 +166,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) { opts: make(map[kv.Option]interface{}), } log.Debugf("Begin txn:%d", txn.tid) - txn.UnionStore = kv.NewUnionStore(&dbSnapshot{ - store: s, - db: s.db, - version: beginVer, - }, options(txn.opts)) + txn.UnionStore = kv.NewUnionStore(newSnapshot(s, s.db, beginVer), options(txn.opts)) return txn, nil } @@ -177,9 +174,6 @@ func (s *dbStore) Close() error { s.mu.Lock() defer s.mu.Unlock() - s.snapLock.Lock() - defer s.snapLock.Unlock() - if s.closed { return nil } @@ -194,6 +188,10 @@ func (s *dbStore) Close() error { } func (s *dbStore) writeBatch(b engine.Batch) error { + if b.Len() == 0 { + return nil + } + s.mu.Lock() defer s.mu.Unlock() diff --git a/store/localstore/snapshot.go b/store/localstore/snapshot.go index 3c16b38be0..5463a361b4 100644 --- a/store/localstore/snapshot.go +++ b/store/localstore/snapshot.go @@ -31,17 +31,28 @@ var ( // dbSnapshot implements MvccSnapshot interface. type dbSnapshot struct { - store *dbStore - db engine.DB - rawIt engine.Iterator - version kv.Version // transaction begin version + store *dbStore + db engine.DB + rawIt engine.Iterator + version kv.Version // transaction begin version + released bool } var minKey = []byte{0} +func newSnapshot(store *dbStore, db engine.DB, ver kv.Version) *dbSnapshot { + ss := &dbSnapshot{ + store: store, + db: db, + version: ver, + } + + return ss +} + func (s *dbSnapshot) internalSeek(startKey []byte) (engine.Iterator, error) { - s.store.snapLock.RLock() - defer s.store.snapLock.RUnlock() + s.store.mu.RLock() + defer s.store.mu.RUnlock() if s.store.closed { return nil, errors.Trace(ErrDBClosed) @@ -159,13 +170,17 @@ func (s *dbSnapshot) MvccRelease() { } func (s *dbSnapshot) Release() { - if s.rawIt == nil { + if s.released { return } - // TODO: check whether Release will panic if store is closed. - s.rawIt.Release() - s.rawIt = nil + s.released = true + if s.rawIt != nil { + // TODO: check whether Release will panic if store is closed. + s.rawIt.Release() + s.rawIt = nil + } + } type dbIter struct { From 8c199f81f35a9c3d51d10a7f176009791b0db631 Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 24 Nov 2015 15:59:11 +0800 Subject: [PATCH 09/11] Fix data race --- store/localstore/compactor.go | 15 ++++++++------- store/localstore/isolation_test.go | 6 ++++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/store/localstore/compactor.go b/store/localstore/compactor.go index 67143c6e5c..a137fdc501 100644 --- a/store/localstore/compactor.go +++ b/store/localstore/compactor.go @@ -41,7 +41,7 @@ type localstoreCompactor struct { recentKeys map[string]struct{} stopCh chan struct{} delCh chan kv.EncodedKey - workerWaitGroup sync.WaitGroup + workerWaitGroup *sync.WaitGroup ticker *time.Ticker db engine.DB policy kv.CompactPolicy @@ -191,11 +191,12 @@ func (gc *localstoreCompactor) Stop() { func newLocalCompactor(policy kv.CompactPolicy, db engine.DB) *localstoreCompactor { return &localstoreCompactor{ - recentKeys: make(map[string]struct{}), - stopCh: make(chan struct{}), - delCh: make(chan kv.EncodedKey, 100), - ticker: time.NewTicker(policy.TriggerInterval), - policy: policy, - db: db, + recentKeys: make(map[string]struct{}), + stopCh: make(chan struct{}), + delCh: make(chan kv.EncodedKey, 100), + ticker: time.NewTicker(policy.TriggerInterval), + policy: policy, + db: db, + workerWaitGroup: &sync.WaitGroup{}, } } diff --git a/store/localstore/isolation_test.go b/store/localstore/isolation_test.go index 0bc0899447..ad64d5465f 100644 --- a/store/localstore/isolation_test.go +++ b/store/localstore/isolation_test.go @@ -22,6 +22,7 @@ type testIsolationSuite struct { func (t *testIsolationSuite) TestInc(c *C) { store, err := tidb.NewStore("memory://test/test_isolation") + c.Assert(err, IsNil) defer store.Close() threadCnt := 4 @@ -36,7 +37,7 @@ func (t *testIsolationSuite) TestInc(c *C) { defer wg.Done() for j := 0; j < 2000; j++ { var id int64 - err = kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { var err1 error id, err1 = txn.Inc([]byte("key"), 1) return err1 @@ -57,6 +58,7 @@ func (t *testIsolationSuite) TestInc(c *C) { func (t *testIsolationSuite) TestMultiInc(c *C) { store, err := tidb.NewStore("memory://test/test_isolation") + c.Assert(err, IsNil) defer store.Close() threadCnt := 4 @@ -75,7 +77,7 @@ func (t *testIsolationSuite) TestMultiInc(c *C) { go func() { defer wg.Done() for j := 0; j < incCnt; j++ { - err = kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(store, true, func(txn kv.Transaction) error { for _, key := range keys { _, err1 := txn.Inc(key, 1) if err1 != nil { From 6001658292ac1ecab56f3e5f7a76ca054397946a Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 25 Nov 2015 13:01:28 +0800 Subject: [PATCH 10/11] *: Pass go vet shadow check --- ddl/ddl.go | 8 ++++---- rset/rsets/where.go | 2 +- stmt/stmts/select.go | 2 +- stmt/stmts/update.go | 12 ++++++------ 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 2ed6150ec9..00bee12e7f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -676,12 +676,12 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde // update InfoSchema err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) - err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion()) - if err != nil { - return errors.Trace(err) + err1 := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion()) + if err1 != nil { + return errors.Trace(err1) } - err = t.UpdateTable(schema.ID, tbInfo) + err1 = t.UpdateTable(schema.ID, tbInfo) return errors.Trace(err) }) if d.onDDLChange != nil { diff --git a/rset/rsets/where.go b/rset/rsets/where.go index 4b6ed062be..1b213dcba9 100644 --- a/rset/rsets/where.go +++ b/rset/rsets/where.go @@ -67,7 +67,7 @@ func (r *WhereRset) planBinOp(ctx context.Context, x *expression.BinaryOperation } f(x) out := []expression.Expression{} - p := r.Src + p = r.Src isNewPlan := false for _, e := range in { p2, filtered, err = p.Filter(ctx, e) diff --git a/stmt/stmts/select.go b/stmt/stmts/select.go index c9c29fcecc..e83f7037d2 100644 --- a/stmt/stmts/select.go +++ b/stmt/stmts/select.go @@ -160,7 +160,7 @@ func (s *SelectStmt) Plan(ctx context.Context) (plan.Plan, error) { } r = &plans.SelectLockPlan{Src: r, Lock: lock} - if err := s.checkOneColumn(ctx); err != nil { + if err = s.checkOneColumn(ctx); err != nil { return nil, errors.Trace(err) } diff --git a/stmt/stmts/update.go b/stmt/stmts/update.go index cd8abc2f5b..02eedfadf2 100644 --- a/stmt/stmts/update.go +++ b/stmt/stmts/update.go @@ -303,14 +303,14 @@ func (s *UpdateStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) { continue } // Update row - handle, err2 := util.DecodeHandleFromRowKey(k) - if err2 != nil { - return nil, errors.Trace(err2) + handle, err1 := util.DecodeHandleFromRowKey(k) + if err1 != nil { + return nil, errors.Trace(err1) } - err2 = updateRecord(ctx, handle, data, tbl, columns, m, lastOffset, false) - if err2 != nil { - return nil, errors.Trace(err2) + err1 = updateRecord(ctx, handle, data, tbl, columns, m, lastOffset, false) + if err1 != nil { + return nil, errors.Trace(err1) } updatedRowKeys[k] = true From 32c3e031d5f445236054967511784fa2bd8c6cf5 Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 25 Nov 2015 14:46:48 +0800 Subject: [PATCH 11/11] Address comments --- ddl/ddl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 00bee12e7f..70120d5abb 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -682,7 +682,7 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde } err1 = t.UpdateTable(schema.ID, tbInfo) - return errors.Trace(err) + return errors.Trace(err1) }) if d.onDDLChange != nil { err = d.onDDLChange(err)