diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 8712acb8c9..41e0c30c3b 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -16,11 +16,13 @@ package localstore import ( "runtime/debug" "sync" + "time" "github.com/juju/errors" "github.com/ngaut/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/localstore/engine" + "github.com/pingcap/tidb/util/segmentmap" "github.com/twinj/uuid" ) @@ -37,6 +39,8 @@ const ( const ( maxSeekWorkers = 3 + + lowerWaterMark = 10 // second ) type command struct { @@ -63,8 +67,6 @@ type seekArgs struct { type commitArgs struct { } -//scheduler - // Seek searches for the first key in the engine which is >= key in byte order, returns (nil, nil, ErrNotFound) // if such key is not found. func (s *dbStore) Seek(key []byte) ([]byte, []byte, error) { @@ -138,6 +140,11 @@ func (s *dbStore) scheduler() { go s.seekWorker(wgSeekWorkers, seekCh) } + segmentIndex := 0 + + tick := time.NewTicker(time.Second) + defer tick.Stop() + for { select { case cmd := <-s.commandCh: @@ -157,6 +164,26 @@ func (s *dbStore) scheduler() { close(seekCh) wgSeekWorkers.Wait() s.wg.Done() + case <-tick.C: + segmentIndex = segmentIndex % s.recentUpdates.SegmentCount() + s.cleanRecentUpdates(segmentIndex) + segmentIndex++ + } + } +} + +func (s *dbStore) cleanRecentUpdates(segmentIndex int) { + m, err := s.recentUpdates.GetSegment(segmentIndex) + if err != nil { + log.Error(err) + return + } + + now := time.Now().Unix() + for k, v := range m { + dis := now - version2Second(v.(kv.Version)) + if dis > lowerWaterMark { + delete(m, k) } } } @@ -168,12 +195,12 @@ func (s *dbStore) tryLock(txn *dbTxn) (err error) { return errors.Trace(kv.ErrLockConflict) } - lastVer, ok := s.recentUpdates[k] + lastVer, ok := s.recentUpdates.Get([]byte(k)) if !ok { continue } // If there's newer version of this key, returns error. - if lastVer.Cmp(kv.Version{Ver: txn.tid}) > 0 { + if lastVer.(kv.Version).Cmp(kv.Version{Ver: txn.tid}) > 0 { return errors.Trace(kv.ErrConditionNotMatch) } } @@ -235,15 +262,13 @@ func (s *dbStore) NewBatch() engine.Batch { return s.db.NewBatch() } -//end of scheduler - type dbStore struct { db engine.DB txns map[uint64]*dbTxn keysLocked map[string]uint64 // TODO: clean up recentUpdates - recentUpdates map[string]kv.Version + recentUpdates *segmentmap.SegmentMap uuid string path string compactor *localstoreCompactor @@ -304,17 +329,20 @@ func (d Driver) Open(schema string) (kv.Storage, error) { log.Info("[kv] New store", schema) s := &dbStore{ - txns: make(map[uint64]*dbTxn), - keysLocked: make(map[string]uint64), - uuid: uuid.NewV4().String(), - path: schema, - db: db, - compactor: newLocalCompactor(localCompactDefaultPolicy, db), - commandCh: make(chan *command, 1000), - closed: false, - closeCh: make(chan struct{}), - recentUpdates: make(map[string]kv.Version), - wg: &sync.WaitGroup{}, + txns: make(map[uint64]*dbTxn), + keysLocked: make(map[string]uint64), + uuid: uuid.NewV4().String(), + path: schema, + db: db, + compactor: newLocalCompactor(localCompactDefaultPolicy, db), + commandCh: make(chan *command, 1000), + closed: false, + closeCh: make(chan struct{}), + wg: &sync.WaitGroup{}, + } + s.recentUpdates, err = segmentmap.NewSegmentMap(100) + if err != nil { + return nil, errors.Trace(err) } mc.cache[schema] = s s.compactor.Start() @@ -428,7 +456,7 @@ func (s *dbStore) unLockKeys(txn *dbTxn) error { } delete(s.keysLocked, k) - s.recentUpdates[k] = txn.version + s.recentUpdates.Set([]byte(k), txn.version, true) } return nil diff --git a/store/localstore/local_version_provider.go b/store/localstore/local_version_provider.go index 8450647678..6a2cdb8cd4 100644 --- a/store/localstore/local_version_provider.go +++ b/store/localstore/local_version_provider.go @@ -27,6 +27,14 @@ const ( timePrecisionOffset = 18 ) +func time2TsPhysical(t time.Time) uint64 { + return uint64((t.UnixNano() / int64(time.Millisecond)) << timePrecisionOffset) +} + +func version2Second(v kv.Version) int64 { + return int64(v.Ver>>timePrecisionOffset) / 1000 +} + // CurrentVersion implements the VersionProvider's GetCurrentVer interface. func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) { l.mu.Lock() @@ -34,7 +42,7 @@ func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) { for { var ts uint64 - ts = uint64((time.Now().UnixNano() / int64(time.Millisecond)) << timePrecisionOffset) + ts = time2TsPhysical(time.Now()) if l.lastTimestamp > ts { log.Error("[kv] invalid physical time stamp") diff --git a/util/segmentmap/segmentmap.go b/util/segmentmap/segmentmap.go new file mode 100644 index 0000000000..11edbab382 --- /dev/null +++ b/util/segmentmap/segmentmap.go @@ -0,0 +1,81 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package segmentmap + +import ( + "hash/crc32" + + "github.com/juju/errors" +) + +// SegmentMap is used for handle a big map slice by slice. +// It's not thread safe. +type SegmentMap struct { + size int + maps []map[string]interface{} + + crcTable *crc32.Table +} + +// NewSegmentMap create a new SegmentMap. +func NewSegmentMap(size int) (*SegmentMap, error) { + if size <= 0 { + return nil, errors.Errorf("Invalid size: %d", size) + } + + sm := &SegmentMap{ + maps: make([]map[string]interface{}, size), + size: size, + } + for i := 0; i < size; i++ { + sm.maps[i] = make(map[string]interface{}) + } + + sm.crcTable = crc32.MakeTable(crc32.Castagnoli) + return sm, nil +} + +// Get is the same as map[k]. +func (sm *SegmentMap) Get(key []byte) (interface{}, bool) { + idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size + val, ok := sm.maps[idx][string(key)] + return val, ok +} + +// GetSegment gets the map specific by index. +func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) { + if index >= sm.size || index < 0 { + return nil, errors.Errorf("index out of bound: %d", index) + } + + return sm.maps[index], nil +} + +// Set if key not exists, returns whether already exists. +func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) bool { + idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size + k := string(key) + _, exist := sm.maps[idx][k] + if exist && !force { + return exist + } + + sm.maps[idx][k] = value + return exist +} + +// SegmentCount returns how many inner segments. +func (sm *SegmentMap) SegmentCount() int { + return sm.size +} diff --git a/util/segmentmap/segmentmap_test.go b/util/segmentmap/segmentmap_test.go new file mode 100644 index 0000000000..2a86786999 --- /dev/null +++ b/util/segmentmap/segmentmap_test.go @@ -0,0 +1,52 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package segmentmap + +import ( + . "github.com/pingcap/check" +) + +var _ = Suite(&testSegmentMapSuite{}) + +type testSegmentMapSuite struct { +} + +func (s *testSegmentMapSuite) TestSegment(c *C) { + segs := 2 + m, err := NewSegmentMap(segs) + c.Assert(err, IsNil) + c.Assert(m.SegmentCount(), Equals, segs) + k := []byte("k") + v := []byte("v") + val, exist := m.Get(k) + c.Assert(exist, IsFalse) + + exist = m.Set(k, v, false) + c.Assert(exist, IsFalse) + + val, exist = m.Get(k) + c.Assert(v, DeepEquals, val.([]byte)) + c.Assert(exist, IsTrue) + + m0, err := m.GetSegment(0) + c.Assert(err, IsNil) + + m1, err := m.GetSegment(1) + c.Assert(err, IsNil) + + c.Assert(len(m0)+len(m1), Equals, 1) + + _, err = m.GetSegment(3) + c.Assert(err, NotNil) +}