From f10fdff83edd451c479b580d5db47b3e7d0fa9cd Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 16 Dec 2015 12:31:50 +0800 Subject: [PATCH] localstore: remove scheduler get cmd --- store/localstore/kv.go | 133 +++++++++++------------------------------ 1 file changed, 34 insertions(+), 99 deletions(-) diff --git a/store/localstore/kv.go b/store/localstore/kv.go index efad954c95..63860cce05 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -16,7 +16,6 @@ package localstore import ( "runtime/debug" "sync" - "time" "github.com/juju/errors" "github.com/ngaut/log" @@ -33,29 +32,18 @@ var ( type op int const ( - opGet = iota + 1 - opSeek + opSeek = iota + 1 opCommit ) type command struct { op op txn *dbTxn - ver kv.Version args interface{} reply interface{} done chan error } -type getArgs struct { - key []byte -} - -type getReply struct { - value []byte - err error -} - type seekReply struct { key []byte value []byte @@ -75,24 +63,6 @@ type commitArgs struct { //scheduler -// Get gets the associated value with key, returns (nil, ErrNotFound) if no value found. -func (s *dbStore) Get(key []byte) ([]byte, error) { - c := &command{ - op: opGet, - args: &getArgs{key: key}, - done: make(chan error, 1), - } - - s.commandCh <- c - err := <-c.done - if err != nil { - return nil, err - } - - reply := c.reply.(*getReply) - return reply.value, nil -} - // Seek searches for the first key in the engine which is >= key in byte order, returns (nil, nil, ErrNotFound) // if such key is not found. func (s *dbStore) Seek(key []byte) ([]byte, []byte, error) { @@ -154,7 +124,6 @@ func (s *dbStore) seekWorker(seekCh chan *command) { s.doSeek(pending) } - } func (s *dbStore) scheduler() { @@ -164,36 +133,25 @@ func (s *dbStore) scheduler() { go s.seekWorker(seekCh) } - seekCount := 0 - t := time.NewTicker(time.Second) - defer t.Stop() - for { - var pending []*command select { case cmd := <-s.commandCh: if closed { cmd.done <- ErrDBClosed continue } - - if cmd.op == opSeek { - //log.Errorf("%q", cmd.args.(*seekArgs).key) + switch cmd.op { + case opSeek: seekCh <- cmd - seekCount++ - continue + case opCommit: + s.doCommit(cmd) } - - pending = append(pending, cmd) - case <-t.C: - log.Error(seekCount) case <-s.closeCh: closed = true s.wg.Done() // notify seek worker to exit close(seekCh) } - s.handleCommand(pending) } } @@ -222,59 +180,36 @@ func (s *dbStore) tryLock(txn *dbTxn) (err error) { return nil } -func (s *dbStore) handleCommand(commands []*command) { - var getCmds []*command - for _, cmd := range commands { - txn := cmd.txn - switch cmd.op { - case opGet: - getCmds = append(getCmds, cmd) - case opCommit: - curVer, err := globalVersionProvider.CurrentVersion() - if err != nil { - log.Fatal(err) - } - err = s.tryLock(txn) - if err != nil { - cmd.done <- err - continue - } - // Update commit version. - txn.version = curVer - b := s.db.NewBatch() - txn.WalkBuffer(func(k kv.Key, value []byte) error { - log.Errorf("%q, %q", k, value) - metaKey := codec.EncodeBytes(nil, k) - // put dummy meta key, write current version - b.Put(metaKey, codec.EncodeUint(nil, curVer.Ver)) - mvccKey := MvccEncodeVersionKey(kv.Key(k), curVer) - if len(value) == 0 { // Deleted marker - b.Put(mvccKey, nil) - } else { - b.Put(mvccKey, value) - } - return nil - }) - err = s.writeBatch(b) - s.unLockKeys(txn) - cmd.done <- err - default: - log.Fatalf("should never happend %v", cmd.op) +func (s *dbStore) doCommit(cmd *command) { + txn := cmd.txn + curVer, err := globalVersionProvider.CurrentVersion() + if err != nil { + log.Fatal(err) + } + err = s.tryLock(txn) + if err != nil { + cmd.done <- err + return + } + // Update commit version. + txn.version = curVer + b := s.db.NewBatch() + txn.WalkBuffer(func(k kv.Key, value []byte) error { + log.Errorf("%q, %q", k, value) + metaKey := codec.EncodeBytes(nil, k) + // put dummy meta key, write current version + b.Put(metaKey, codec.EncodeUint(nil, curVer.Ver)) + mvccKey := MvccEncodeVersionKey(kv.Key(k), curVer) + if len(value) == 0 { // Deleted marker + b.Put(mvccKey, nil) + } else { + b.Put(mvccKey, value) } - } - - // batch get command - if len(getCmds) > 0 { - go func() { - for _, cmd := range getCmds { - reply := &getReply{} - var err error - reply.value, err = s.db.Get(cmd.args.(*getArgs).key) - cmd.reply = reply - cmd.done <- err - } - }() - } + return nil + }) + err = s.writeBatch(b) + s.unLockKeys(txn) + cmd.done <- err } func (s *dbStore) doSeek(seekCmds []*command) {