Merge pull request #772 from pingcap/ngaut/segment-map

*: introduce segment map
This commit is contained in:
goroutine
2015-12-22 20:32:26 +08:00
4 changed files with 189 additions and 20 deletions

View File

@ -16,11 +16,13 @@ package localstore
import (
"runtime/debug"
"sync"
"time"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/localstore/engine"
"github.com/pingcap/tidb/util/segmentmap"
"github.com/twinj/uuid"
)
@ -37,6 +39,8 @@ const (
const (
maxSeekWorkers = 3
lowerWaterMark = 10 // second
)
type command struct {
@ -63,8 +67,6 @@ type seekArgs struct {
type commitArgs struct {
}
//scheduler
// Seek searches for the first key in the engine which is >= key in byte order, returns (nil, nil, ErrNotFound)
// if such key is not found.
func (s *dbStore) Seek(key []byte) ([]byte, []byte, error) {
@ -138,6 +140,11 @@ func (s *dbStore) scheduler() {
go s.seekWorker(wgSeekWorkers, seekCh)
}
segmentIndex := 0
tick := time.NewTicker(time.Second)
defer tick.Stop()
for {
select {
case cmd := <-s.commandCh:
@ -157,6 +164,26 @@ func (s *dbStore) scheduler() {
close(seekCh)
wgSeekWorkers.Wait()
s.wg.Done()
case <-tick.C:
segmentIndex = segmentIndex % s.recentUpdates.SegmentCount()
s.cleanRecentUpdates(segmentIndex)
segmentIndex++
}
}
}
func (s *dbStore) cleanRecentUpdates(segmentIndex int) {
m, err := s.recentUpdates.GetSegment(segmentIndex)
if err != nil {
log.Error(err)
return
}
now := time.Now().Unix()
for k, v := range m {
dis := now - version2Second(v.(kv.Version))
if dis > lowerWaterMark {
delete(m, k)
}
}
}
@ -168,12 +195,12 @@ func (s *dbStore) tryLock(txn *dbTxn) (err error) {
return errors.Trace(kv.ErrLockConflict)
}
lastVer, ok := s.recentUpdates[k]
lastVer, ok := s.recentUpdates.Get([]byte(k))
if !ok {
continue
}
// If there's newer version of this key, returns error.
if lastVer.Cmp(kv.Version{Ver: txn.tid}) > 0 {
if lastVer.(kv.Version).Cmp(kv.Version{Ver: txn.tid}) > 0 {
return errors.Trace(kv.ErrConditionNotMatch)
}
}
@ -235,15 +262,13 @@ func (s *dbStore) NewBatch() engine.Batch {
return s.db.NewBatch()
}
//end of scheduler
type dbStore struct {
db engine.DB
txns map[uint64]*dbTxn
keysLocked map[string]uint64
// TODO: clean up recentUpdates
recentUpdates map[string]kv.Version
recentUpdates *segmentmap.SegmentMap
uuid string
path string
compactor *localstoreCompactor
@ -304,17 +329,20 @@ func (d Driver) Open(schema string) (kv.Storage, error) {
log.Info("[kv] New store", schema)
s := &dbStore{
txns: make(map[uint64]*dbTxn),
keysLocked: make(map[string]uint64),
uuid: uuid.NewV4().String(),
path: schema,
db: db,
compactor: newLocalCompactor(localCompactDefaultPolicy, db),
commandCh: make(chan *command, 1000),
closed: false,
closeCh: make(chan struct{}),
recentUpdates: make(map[string]kv.Version),
wg: &sync.WaitGroup{},
txns: make(map[uint64]*dbTxn),
keysLocked: make(map[string]uint64),
uuid: uuid.NewV4().String(),
path: schema,
db: db,
compactor: newLocalCompactor(localCompactDefaultPolicy, db),
commandCh: make(chan *command, 1000),
closed: false,
closeCh: make(chan struct{}),
wg: &sync.WaitGroup{},
}
s.recentUpdates, err = segmentmap.NewSegmentMap(100)
if err != nil {
return nil, errors.Trace(err)
}
mc.cache[schema] = s
s.compactor.Start()
@ -428,7 +456,7 @@ func (s *dbStore) unLockKeys(txn *dbTxn) error {
}
delete(s.keysLocked, k)
s.recentUpdates[k] = txn.version
s.recentUpdates.Set([]byte(k), txn.version, true)
}
return nil

View File

@ -27,6 +27,14 @@ const (
timePrecisionOffset = 18
)
func time2TsPhysical(t time.Time) uint64 {
return uint64((t.UnixNano() / int64(time.Millisecond)) << timePrecisionOffset)
}
func version2Second(v kv.Version) int64 {
return int64(v.Ver>>timePrecisionOffset) / 1000
}
// CurrentVersion implements the VersionProvider's GetCurrentVer interface.
func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) {
l.mu.Lock()
@ -34,7 +42,7 @@ func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) {
for {
var ts uint64
ts = uint64((time.Now().UnixNano() / int64(time.Millisecond)) << timePrecisionOffset)
ts = time2TsPhysical(time.Now())
if l.lastTimestamp > ts {
log.Error("[kv] invalid physical time stamp")

View File

@ -0,0 +1,81 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package segmentmap
import (
"hash/crc32"
"github.com/juju/errors"
)
// SegmentMap is used for handle a big map slice by slice.
// It's not thread safe.
type SegmentMap struct {
size int
maps []map[string]interface{}
crcTable *crc32.Table
}
// NewSegmentMap create a new SegmentMap.
func NewSegmentMap(size int) (*SegmentMap, error) {
if size <= 0 {
return nil, errors.Errorf("Invalid size: %d", size)
}
sm := &SegmentMap{
maps: make([]map[string]interface{}, size),
size: size,
}
for i := 0; i < size; i++ {
sm.maps[i] = make(map[string]interface{})
}
sm.crcTable = crc32.MakeTable(crc32.Castagnoli)
return sm, nil
}
// Get is the same as map[k].
func (sm *SegmentMap) Get(key []byte) (interface{}, bool) {
idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size
val, ok := sm.maps[idx][string(key)]
return val, ok
}
// GetSegment gets the map specific by index.
func (sm *SegmentMap) GetSegment(index int) (map[string]interface{}, error) {
if index >= sm.size || index < 0 {
return nil, errors.Errorf("index out of bound: %d", index)
}
return sm.maps[index], nil
}
// Set if key not exists, returns whether already exists.
func (sm *SegmentMap) Set(key []byte, value interface{}, force bool) bool {
idx := int(crc32.Checksum(key, sm.crcTable)) % sm.size
k := string(key)
_, exist := sm.maps[idx][k]
if exist && !force {
return exist
}
sm.maps[idx][k] = value
return exist
}
// SegmentCount returns how many inner segments.
func (sm *SegmentMap) SegmentCount() int {
return sm.size
}

View File

@ -0,0 +1,52 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package segmentmap
import (
. "github.com/pingcap/check"
)
var _ = Suite(&testSegmentMapSuite{})
type testSegmentMapSuite struct {
}
func (s *testSegmentMapSuite) TestSegment(c *C) {
segs := 2
m, err := NewSegmentMap(segs)
c.Assert(err, IsNil)
c.Assert(m.SegmentCount(), Equals, segs)
k := []byte("k")
v := []byte("v")
val, exist := m.Get(k)
c.Assert(exist, IsFalse)
exist = m.Set(k, v, false)
c.Assert(exist, IsFalse)
val, exist = m.Get(k)
c.Assert(v, DeepEquals, val.([]byte))
c.Assert(exist, IsTrue)
m0, err := m.GetSegment(0)
c.Assert(err, IsNil)
m1, err := m.GetSegment(1)
c.Assert(err, IsNil)
c.Assert(len(m0)+len(m1), Equals, 1)
_, err = m.GetSegment(3)
c.Assert(err, NotNil)
}