localstore: remove scheduler get cmd
This commit is contained in:
@ -16,7 +16,6 @@ package localstore
|
||||
import (
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
@ -33,29 +32,18 @@ var (
|
||||
type op int
|
||||
|
||||
const (
|
||||
opGet = iota + 1
|
||||
opSeek
|
||||
opSeek = iota + 1
|
||||
opCommit
|
||||
)
|
||||
|
||||
type command struct {
|
||||
op op
|
||||
txn *dbTxn
|
||||
ver kv.Version
|
||||
args interface{}
|
||||
reply interface{}
|
||||
done chan error
|
||||
}
|
||||
|
||||
type getArgs struct {
|
||||
key []byte
|
||||
}
|
||||
|
||||
type getReply struct {
|
||||
value []byte
|
||||
err error
|
||||
}
|
||||
|
||||
type seekReply struct {
|
||||
key []byte
|
||||
value []byte
|
||||
@ -75,24 +63,6 @@ type commitArgs struct {
|
||||
|
||||
//scheduler
|
||||
|
||||
// Get gets the associated value with key, returns (nil, ErrNotFound) if no value found.
|
||||
func (s *dbStore) Get(key []byte) ([]byte, error) {
|
||||
c := &command{
|
||||
op: opGet,
|
||||
args: &getArgs{key: key},
|
||||
done: make(chan error, 1),
|
||||
}
|
||||
|
||||
s.commandCh <- c
|
||||
err := <-c.done
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reply := c.reply.(*getReply)
|
||||
return reply.value, nil
|
||||
}
|
||||
|
||||
// Seek searches for the first key in the engine which is >= key in byte order, returns (nil, nil, ErrNotFound)
|
||||
// if such key is not found.
|
||||
func (s *dbStore) Seek(key []byte) ([]byte, []byte, error) {
|
||||
@ -154,7 +124,6 @@ func (s *dbStore) seekWorker(seekCh chan *command) {
|
||||
|
||||
s.doSeek(pending)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *dbStore) scheduler() {
|
||||
@ -164,36 +133,25 @@ func (s *dbStore) scheduler() {
|
||||
go s.seekWorker(seekCh)
|
||||
}
|
||||
|
||||
seekCount := 0
|
||||
t := time.NewTicker(time.Second)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
var pending []*command
|
||||
select {
|
||||
case cmd := <-s.commandCh:
|
||||
if closed {
|
||||
cmd.done <- ErrDBClosed
|
||||
continue
|
||||
}
|
||||
|
||||
if cmd.op == opSeek {
|
||||
//log.Errorf("%q", cmd.args.(*seekArgs).key)
|
||||
switch cmd.op {
|
||||
case opSeek:
|
||||
seekCh <- cmd
|
||||
seekCount++
|
||||
continue
|
||||
case opCommit:
|
||||
s.doCommit(cmd)
|
||||
}
|
||||
|
||||
pending = append(pending, cmd)
|
||||
case <-t.C:
|
||||
log.Error(seekCount)
|
||||
case <-s.closeCh:
|
||||
closed = true
|
||||
s.wg.Done()
|
||||
// notify seek worker to exit
|
||||
close(seekCh)
|
||||
}
|
||||
s.handleCommand(pending)
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,59 +180,36 @@ func (s *dbStore) tryLock(txn *dbTxn) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *dbStore) handleCommand(commands []*command) {
|
||||
var getCmds []*command
|
||||
for _, cmd := range commands {
|
||||
txn := cmd.txn
|
||||
switch cmd.op {
|
||||
case opGet:
|
||||
getCmds = append(getCmds, cmd)
|
||||
case opCommit:
|
||||
curVer, err := globalVersionProvider.CurrentVersion()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = s.tryLock(txn)
|
||||
if err != nil {
|
||||
cmd.done <- err
|
||||
continue
|
||||
}
|
||||
// Update commit version.
|
||||
txn.version = curVer
|
||||
b := s.db.NewBatch()
|
||||
txn.WalkBuffer(func(k kv.Key, value []byte) error {
|
||||
log.Errorf("%q, %q", k, value)
|
||||
metaKey := codec.EncodeBytes(nil, k)
|
||||
// put dummy meta key, write current version
|
||||
b.Put(metaKey, codec.EncodeUint(nil, curVer.Ver))
|
||||
mvccKey := MvccEncodeVersionKey(kv.Key(k), curVer)
|
||||
if len(value) == 0 { // Deleted marker
|
||||
b.Put(mvccKey, nil)
|
||||
} else {
|
||||
b.Put(mvccKey, value)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
err = s.writeBatch(b)
|
||||
s.unLockKeys(txn)
|
||||
cmd.done <- err
|
||||
default:
|
||||
log.Fatalf("should never happend %v", cmd.op)
|
||||
func (s *dbStore) doCommit(cmd *command) {
|
||||
txn := cmd.txn
|
||||
curVer, err := globalVersionProvider.CurrentVersion()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
err = s.tryLock(txn)
|
||||
if err != nil {
|
||||
cmd.done <- err
|
||||
return
|
||||
}
|
||||
// Update commit version.
|
||||
txn.version = curVer
|
||||
b := s.db.NewBatch()
|
||||
txn.WalkBuffer(func(k kv.Key, value []byte) error {
|
||||
log.Errorf("%q, %q", k, value)
|
||||
metaKey := codec.EncodeBytes(nil, k)
|
||||
// put dummy meta key, write current version
|
||||
b.Put(metaKey, codec.EncodeUint(nil, curVer.Ver))
|
||||
mvccKey := MvccEncodeVersionKey(kv.Key(k), curVer)
|
||||
if len(value) == 0 { // Deleted marker
|
||||
b.Put(mvccKey, nil)
|
||||
} else {
|
||||
b.Put(mvccKey, value)
|
||||
}
|
||||
}
|
||||
|
||||
// batch get command
|
||||
if len(getCmds) > 0 {
|
||||
go func() {
|
||||
for _, cmd := range getCmds {
|
||||
reply := &getReply{}
|
||||
var err error
|
||||
reply.value, err = s.db.Get(cmd.args.(*getArgs).key)
|
||||
cmd.reply = reply
|
||||
cmd.done <- err
|
||||
}
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
err = s.writeBatch(b)
|
||||
s.unLockKeys(txn)
|
||||
cmd.done <- err
|
||||
}
|
||||
|
||||
func (s *dbStore) doSeek(seekCmds []*command) {
|
||||
|
||||
Reference in New Issue
Block a user