From 462f6a971ed980ccbb8e29235ed50416ccc9340b Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 22 Dec 2015 12:19:16 +0800 Subject: [PATCH 1/7] *: instroduce segment map --- store/localstore/kv.go | 11 +++--- util/segmentmap/segmentmap.go | 60 ++++++++++++++++++++++++++++++ util/segmentmap/segmentmap_test.go | 38 +++++++++++++++++++ 3 files changed, 104 insertions(+), 5 deletions(-) create mode 100644 util/segmentmap/segmentmap.go create mode 100644 util/segmentmap/segmentmap_test.go diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 8712acb8c9..13768a0d8b 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -21,6 +21,7 @@ import ( "github.com/ngaut/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/localstore/engine" + "github.com/pingcap/tidb/util/segmentmap" "github.com/twinj/uuid" ) @@ -168,12 +169,12 @@ func (s *dbStore) tryLock(txn *dbTxn) (err error) { return errors.Trace(kv.ErrLockConflict) } - lastVer, ok := s.recentUpdates[k] + lastVer, ok := s.recentUpdates.Get([]byte(k)) if !ok { continue } // If there's newer version of this key, returns error. - if lastVer.Cmp(kv.Version{Ver: txn.tid}) > 0 { + if lastVer.(kv.Version).Cmp(kv.Version{Ver: txn.tid}) > 0 { return errors.Trace(kv.ErrConditionNotMatch) } } @@ -243,7 +244,7 @@ type dbStore struct { txns map[uint64]*dbTxn keysLocked map[string]uint64 // TODO: clean up recentUpdates - recentUpdates map[string]kv.Version + recentUpdates *segmentmap.SegmentMap uuid string path string compactor *localstoreCompactor @@ -313,7 +314,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) { commandCh: make(chan *command, 1000), closed: false, closeCh: make(chan struct{}), - recentUpdates: make(map[string]kv.Version), + recentUpdates: segmentmap.NewSegmentMap(100), wg: &sync.WaitGroup{}, } mc.cache[schema] = s @@ -428,7 +429,7 @@ func (s *dbStore) unLockKeys(txn *dbTxn) error { } delete(s.keysLocked, k) - s.recentUpdates[k] = txn.version + s.recentUpdates.Set([]byte(k), txn.version, true) } return nil diff --git a/util/segmentmap/segmentmap.go b/util/segmentmap/segmentmap.go new file mode 100644 index 0000000000..62407d70f9 --- /dev/null +++ b/util/segmentmap/segmentmap.go @@ -0,0 +1,60 @@ +package segmentmap + +import ( + "hash/crc32" + + "github.com/juju/errors" +) + +// SegmentMap is used for handle a big map slice by slice +type SegmentMap struct { + size int + maps []map[string]interface{} +} + +// NewSegmentMap crate a new SegmentMap +func NewSegmentMap(size int) *SegmentMap { + sm := &SegmentMap{ + maps: make([]map[string]interface{}, size), + size: size, + } + + for i := 0; i < size; i++ { + sm.maps[i] = make(map[string]interface{}) + } + return sm +} + +// Get is the same as map[k] +func (sm *SegmentMap) Get(key []byte) (interface{}, bool) { + idx := int(crc32.ChecksumIEEE(key)) % sm.size + val, ok := sm.maps[idx][string(key)] + return val, ok +} + +// GetSegment gets the map specific by index +func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { + if index >= len(sm.maps) { + return nil, errors.Errorf("index out of bound") + } + + return sm.maps[index], nil +} + +// Set if empty, return whether already exists +func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) (exist bool) { + idx := int(crc32.ChecksumIEEE(key)) % sm.size + k := string(key) + _, exist = sm.maps[idx][k] + if exist && !force { + return exist + } + + sm.maps[idx][k] = value + return exist +} + +// SegmentCount return how many inner segments +func (sm *SegmentMap) SegmentCount() int { + return sm.size +} diff --git a/util/segmentmap/segmentmap_test.go b/util/segmentmap/segmentmap_test.go new file mode 100644 index 0000000000..52768543e0 --- /dev/null +++ b/util/segmentmap/segmentmap_test.go @@ -0,0 +1,38 @@ +package segmentmap + +import ( + . "github.com/pingcap/check" +) + +var _ = Suite(&testSegmentMapSuite{}) + +type testSegmentMapSuite struct { +} + +func (s *testSegmentMapSuite) TestSegment(c *C) { + segs := 2 + m := NewSegmentMap(segs) + c.Assert(m.SegmentCount(), Equals, segs) + k := []byte("k") + v := []byte("v") + val, exist := m.Get(k) + c.Assert(exist, Equals, false) + + exist = m.Set(k, v, false) + c.Assert(exist, IsFalse) + + val, exist = m.Get(k) + c.Assert(v, DeepEquals, val.([]byte)) + c.Assert(exist, Equals, true) + + m0, err := m.GetSegment(0) + c.Assert(err, IsNil) + + m1, err := m.GetSegment(1) + c.Assert(err, IsNil) + + c.Assert(len(m0)+len(m1), Equals, 1) + + _, err = m.GetSegment(3) + c.Assert(err, NotNil) +} From 757358331d44e1f38aed29fcb97eb2e26caaa763 Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 22 Dec 2015 12:41:20 +0800 Subject: [PATCH 2/7] Add license header --- util/segmentmap/segmentmap.go | 13 +++++++++++++ util/segmentmap/segmentmap_test.go | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/util/segmentmap/segmentmap.go b/util/segmentmap/segmentmap.go index 62407d70f9..1dc9f24f3d 100644 --- a/util/segmentmap/segmentmap.go +++ b/util/segmentmap/segmentmap.go @@ -1,3 +1,16 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package segmentmap import ( diff --git a/util/segmentmap/segmentmap_test.go b/util/segmentmap/segmentmap_test.go index 52768543e0..32c350d3fd 100644 --- a/util/segmentmap/segmentmap_test.go +++ b/util/segmentmap/segmentmap_test.go @@ -1,3 +1,16 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package segmentmap import ( From 45bcc1d874d78f7e80eedebb44d58a5ca7a69ac9 Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 22 Dec 2015 13:37:55 +0800 Subject: [PATCH 3/7] Clean up recent updates --- store/localstore/kv.go | 26 ++++++++++++++++++++++ store/localstore/local_version_provider.go | 10 ++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 13768a0d8b..d1e9f52b03 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -16,6 +16,7 @@ package localstore import ( "runtime/debug" "sync" + "time" "github.com/juju/errors" "github.com/ngaut/log" @@ -139,6 +140,10 @@ func (s *dbStore) scheduler() { go s.seekWorker(wgSeekWorkers, seekCh) } + segmentIndex := 0 + + tick := time.NewTicker(time.Second) + for { select { case cmd := <-s.commandCh: @@ -158,6 +163,27 @@ func (s *dbStore) scheduler() { close(seekCh) wgSeekWorkers.Wait() s.wg.Done() + case <-tick.C: + segmentIndex = segmentIndex % s.recentUpdates.SegmentCount() + s.cleanRecentUpdates(segmentIndex) + segmentIndex++ + } + } +} + +func (s *dbStore) cleanRecentUpdates(segmentIndex int) { + m, err := s.recentUpdates.GetSegment(segmentIndex) + if err != nil { + log.Error(err) + return + } + + lowerWaterMark := int64(10) // second + now := time.Now().UnixNano() / int64(time.Second) + for k, v := range m { + dis := now - version2Second(v.(kv.Version)) + if dis > lowerWaterMark { + delete(m, k) } } } diff --git a/store/localstore/local_version_provider.go b/store/localstore/local_version_provider.go index 8450647678..6a2cdb8cd4 100644 --- a/store/localstore/local_version_provider.go +++ b/store/localstore/local_version_provider.go @@ -27,6 +27,14 @@ const ( timePrecisionOffset = 18 ) +func time2TsPhysical(t time.Time) uint64 { + return uint64((t.UnixNano() / int64(time.Millisecond)) << timePrecisionOffset) +} + +func version2Second(v kv.Version) int64 { + return int64(v.Ver>>timePrecisionOffset) / 1000 +} + // CurrentVersion implements the VersionProvider's GetCurrentVer interface. func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) { l.mu.Lock() @@ -34,7 +42,7 @@ func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) { for { var ts uint64 - ts = uint64((time.Now().UnixNano() / int64(time.Millisecond)) << timePrecisionOffset) + ts = time2TsPhysical(time.Now()) if l.lastTimestamp > ts { log.Error("[kv] invalid physical time stamp") From f91303cfd44f14f812702b0a117fa340c856414a Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 22 Dec 2015 13:50:57 +0800 Subject: [PATCH 4/7] Address comments --- util/segmentmap/segmentmap.go | 7 ++++--- util/segmentmap/segmentmap_test.go | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/util/segmentmap/segmentmap.go b/util/segmentmap/segmentmap.go index 1dc9f24f3d..7f1e31fdf9 100644 --- a/util/segmentmap/segmentmap.go +++ b/util/segmentmap/segmentmap.go @@ -20,6 +20,7 @@ import ( ) // SegmentMap is used for handle a big map slice by slice +// It's not thread safe type SegmentMap struct { size int maps []map[string]interface{} @@ -48,17 +49,17 @@ func (sm *SegmentMap) Get(key []byte) (interface{}, bool) { // GetSegment gets the map specific by index func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { if index >= len(sm.maps) { - return nil, errors.Errorf("index out of bound") + return nil, errors.Errorf("index out of bound: %d", index) } return sm.maps[index], nil } // Set if empty, return whether already exists -func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) (exist bool) { +func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) bool { idx := int(crc32.ChecksumIEEE(key)) % sm.size k := string(key) - _, exist = sm.maps[idx][k] + _, exist := sm.maps[idx][k] if exist && !force { return exist } diff --git a/util/segmentmap/segmentmap_test.go b/util/segmentmap/segmentmap_test.go index 32c350d3fd..b8610a5d90 100644 --- a/util/segmentmap/segmentmap_test.go +++ b/util/segmentmap/segmentmap_test.go @@ -29,14 +29,14 @@ func (s *testSegmentMapSuite) TestSegment(c *C) { k := []byte("k") v := []byte("v") val, exist := m.Get(k) - c.Assert(exist, Equals, false) + c.Assert(exist, IsFalse) exist = m.Set(k, v, false) c.Assert(exist, IsFalse) val, exist = m.Get(k) c.Assert(v, DeepEquals, val.([]byte)) - c.Assert(exist, Equals, true) + c.Assert(exist, IsTrue) m0, err := m.GetSegment(0) c.Assert(err, IsNil) From 28c89dfc0b1b89d7fe6ce44445681e2a76f20997 Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 22 Dec 2015 18:57:52 +0800 Subject: [PATCH 5/7] Use Castagnoli to calculate hash --- util/segmentmap/segmentmap.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/util/segmentmap/segmentmap.go b/util/segmentmap/segmentmap.go index 7f1e31fdf9..229be131ee 100644 --- a/util/segmentmap/segmentmap.go +++ b/util/segmentmap/segmentmap.go @@ -24,6 +24,8 @@ import ( type SegmentMap struct { size int maps []map[string]interface{} + + crcTable *crc32.Table } // NewSegmentMap crate a new SegmentMap @@ -36,19 +38,21 @@ func NewSegmentMap(size int) *SegmentMap { for i := 0; i < size; i++ { sm.maps[i] = make(map[string]interface{}) } + + sm.crcTable = crc32.MakeTable(crc32.Castagnoli) return sm } // Get is the same as map[k] func (sm *SegmentMap) Get(key []byte) (interface{}, bool) { - idx := int(crc32.ChecksumIEEE(key)) % sm.size + idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size val, ok := sm.maps[idx][string(key)] return val, ok } // GetSegment gets the map specific by index func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { - if index >= len(sm.maps) { + if index >= sm.size || index < 0 { return nil, errors.Errorf("index out of bound: %d", index) } @@ -57,7 +61,7 @@ func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { // Set if empty, return whether already exists func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) bool { - idx := int(crc32.ChecksumIEEE(key)) % sm.size + idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size k := string(key) _, exist := sm.maps[idx][k] if exist && !force { From 77201c39b346286a020973b403c7f4780190bb58 Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 22 Dec 2015 19:59:27 +0800 Subject: [PATCH 6/7] Address comments --- store/localstore/kv.go | 28 ++++++++++++++++------------ util/segmentmap/segmentmap.go | 19 +++++++++++-------- util/segmentmap/segmentmap_test.go | 3 ++- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index d1e9f52b03..8599788c31 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -143,6 +143,7 @@ func (s *dbStore) scheduler() { segmentIndex := 0 tick := time.NewTicker(time.Second) + defer tick.Stop() for { select { @@ -179,7 +180,7 @@ func (s *dbStore) cleanRecentUpdates(segmentIndex int) { } lowerWaterMark := int64(10) // second - now := time.Now().UnixNano() / int64(time.Second) + now := time.Now().Unix() for k, v := range m { dis := now - version2Second(v.(kv.Version)) if dis > lowerWaterMark { @@ -331,17 +332,20 @@ func (d Driver) Open(schema string) (kv.Storage, error) { log.Info("[kv] New store", schema) s := &dbStore{ - txns: make(map[uint64]*dbTxn), - keysLocked: make(map[string]uint64), - uuid: uuid.NewV4().String(), - path: schema, - db: db, - compactor: newLocalCompactor(localCompactDefaultPolicy, db), - commandCh: make(chan *command, 1000), - closed: false, - closeCh: make(chan struct{}), - recentUpdates: segmentmap.NewSegmentMap(100), - wg: &sync.WaitGroup{}, + txns: make(map[uint64]*dbTxn), + keysLocked: make(map[string]uint64), + uuid: uuid.NewV4().String(), + path: schema, + db: db, + compactor: newLocalCompactor(localCompactDefaultPolicy, db), + commandCh: make(chan *command, 1000), + closed: false, + closeCh: make(chan struct{}), + wg: &sync.WaitGroup{}, + } + s.recentUpdates, err = segmentmap.NewSegmentMap(100) + if err != nil { + return nil, errors.Trace(err) } mc.cache[schema] = s s.compactor.Start() diff --git a/util/segmentmap/segmentmap.go b/util/segmentmap/segmentmap.go index 229be131ee..554ef36ecc 100644 --- a/util/segmentmap/segmentmap.go +++ b/util/segmentmap/segmentmap.go @@ -20,7 +20,7 @@ import ( ) // SegmentMap is used for handle a big map slice by slice -// It's not thread safe +// It's not thread safe. type SegmentMap struct { size int maps []map[string]interface{} @@ -28,8 +28,11 @@ type SegmentMap struct { crcTable *crc32.Table } -// NewSegmentMap crate a new SegmentMap -func NewSegmentMap(size int) *SegmentMap { +// NewSegmentMap create a new SegmentMap. +func NewSegmentMap(size int) (*SegmentMap, error) { + if size <= 0 { + return nil, errors.Errorf("Invalid size: %d", size) + } sm := &SegmentMap{ maps: make([]map[string]interface{}, size), size: size, @@ -40,17 +43,17 @@ func NewSegmentMap(size int) *SegmentMap { } sm.crcTable = crc32.MakeTable(crc32.Castagnoli) - return sm + return sm, nil } -// Get is the same as map[k] +// Get is the same as map[k]. func (sm *SegmentMap) Get(key []byte) (interface{}, bool) { idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size val, ok := sm.maps[idx][string(key)] return val, ok } -// GetSegment gets the map specific by index +// GetSegment gets the map specific by index. func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { if index >= sm.size || index < 0 { return nil, errors.Errorf("index out of bound: %d", index) @@ -59,7 +62,7 @@ func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { return sm.maps[index], nil } -// Set if empty, return whether already exists +// Set if empty, returns whether already exists. func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) bool { idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size k := string(key) @@ -72,7 +75,7 @@ func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) bool { return exist } -// SegmentCount return how many inner segments +// SegmentCount returns how many inner segments. func (sm *SegmentMap) SegmentCount() int { return sm.size } diff --git a/util/segmentmap/segmentmap_test.go b/util/segmentmap/segmentmap_test.go index b8610a5d90..2a86786999 100644 --- a/util/segmentmap/segmentmap_test.go +++ b/util/segmentmap/segmentmap_test.go @@ -24,7 +24,8 @@ type testSegmentMapSuite struct { func (s *testSegmentMapSuite) TestSegment(c *C) { segs := 2 - m := NewSegmentMap(segs) + m, err := NewSegmentMap(segs) + c.Assert(err, IsNil) c.Assert(m.SegmentCount(), Equals, segs) k := []byte("k") v := []byte("v") From 3dc9261d816a91c6cd62c635e5bbf1d930b94994 Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 22 Dec 2015 20:20:08 +0800 Subject: [PATCH 7/7] Address comments --- store/localstore/kv.go | 7 ++----- util/segmentmap/segmentmap.go | 6 +++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 8599788c31..41e0c30c3b 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -39,6 +39,8 @@ const ( const ( maxSeekWorkers = 3 + + lowerWaterMark = 10 // second ) type command struct { @@ -65,8 +67,6 @@ type seekArgs struct { type commitArgs struct { } -//scheduler - // Seek searches for the first key in the engine which is >= key in byte order, returns (nil, nil, ErrNotFound) // if such key is not found. func (s *dbStore) Seek(key []byte) ([]byte, []byte, error) { @@ -179,7 +179,6 @@ func (s *dbStore) cleanRecentUpdates(segmentIndex int) { return } - lowerWaterMark := int64(10) // second now := time.Now().Unix() for k, v := range m { dis := now - version2Second(v.(kv.Version)) @@ -263,8 +262,6 @@ func (s *dbStore) NewBatch() engine.Batch { return s.db.NewBatch() } -//end of scheduler - type dbStore struct { db engine.DB diff --git a/util/segmentmap/segmentmap.go b/util/segmentmap/segmentmap.go index 554ef36ecc..11edbab382 100644 --- a/util/segmentmap/segmentmap.go +++ b/util/segmentmap/segmentmap.go @@ -19,7 +19,7 @@ import ( "github.com/juju/errors" ) -// SegmentMap is used for handle a big map slice by slice +// SegmentMap is used for handle a big map slice by slice. // It's not thread safe. type SegmentMap struct { size int @@ -33,11 +33,11 @@ func NewSegmentMap(size int) (*SegmentMap, error) { if size <= 0 { return nil, errors.Errorf("Invalid size: %d", size) } + sm := &SegmentMap{ maps: make([]map[string]interface{}, size), size: size, } - for i := 0; i < size; i++ { sm.maps[i] = make(map[string]interface{}) } @@ -62,7 +62,7 @@ func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { return sm.maps[index], nil } -// Set if empty, returns whether already exists. +// Set if key not exists, returns whether already exists. func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) bool { idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size k := string(key)