From c5e238ffce2d9537eeb58d8fcb4ab2e653662993 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 9 Nov 2015 11:22:01 +0800 Subject: [PATCH] hbase-store: clean-ups --- Makefile | 4 +-- metric/metric.go | 2 +- metric/metric_test.go | 2 +- store/hbase/kv.go | 34 ++++++++---------- store/hbase/snapshot.go | 57 +++++++++++------------------- store/hbase/txn.go | 14 +++++--- util/batch_worker.go | 74 --------------------------------------- util/batch_worker_test.go | 57 ------------------------------ 8 files changed, 49 insertions(+), 195 deletions(-) delete mode 100644 util/batch_worker.go delete mode 100644 util/batch_worker_test.go diff --git a/Makefile b/Makefile index b31c24cc05..ab64af6227 100644 --- a/Makefile +++ b/Makefile @@ -17,10 +17,10 @@ all: godep parser build test check godep: go get github.com/tools/godep - -build: go get github.com/pingcap/go-hbase go get github.com/pingcap/go-themis + +build: $(GO) build install: diff --git a/metric/metric.go b/metric/metric.go index d1aed32eef..0db55afd7b 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -31,7 +31,7 @@ func Register(name string, m interface{}) { r.Register(name, m) } -// Inc increments specific counter metric. +// Inc increases specific counter metric. func Inc(name string, i int64) { if c := r.GetOrRegister(name, metrics.NewCounter()); c != nil { c.(metrics.Counter).Inc(i) diff --git a/metric/metric_test.go b/metric/metric_test.go index 3730cbd1ab..6a197b806d 100644 --- a/metric/metric_test.go +++ b/metric/metric_test.go @@ -39,7 +39,7 @@ func (t *testSuite) TestRegMetric(c *C) { Register(testMetricName, metrics.NewHistogram(metrics.NewUniformSample(1000))) v := r.Get(testMetricName) - c.Assert(v != nil, Equals, true) + c.Assert(v, NotNil) // Will not overwrite _, ok := v.(metrics.Counter) diff --git a/store/hbase/kv.go b/store/hbase/kv.go index 51187b836b..ba489df1ea 100644 --- a/store/hbase/kv.go +++ b/store/hbase/kv.go @@ -15,7 +15,6 @@ package hbasekv import ( "strings" - "sync" "github.com/juju/errors" @@ -25,14 +24,14 @@ import ( ) const ( - // ColFamily is the hbase column family name. - ColFamily = "f" - // Qualifier is the hbase column name. - Qualifier = "q" - // FmlAndQual is a shortcut. - FmlAndQual = ColFamily + ":" + Qualifier - // HbaseTableName is the table name in hbase which is invisible to SQL layer. - HbaseTableName = "tidb" + // hbaseColFamily is the hbase column family name. + hbaseColFamily = "f" + // hbaseQualifier is the hbase column name. + hbaseQualifier = "q" + // hbaseFmlAndQual is a shortcut. + hbaseFmlAndQual = hbaseColFamily + ":" + hbaseQualifier + // hbaseTableName is the table name in hbase which is invisible to SQL layer. + hbaseTableName = "tidb" ) var ( @@ -40,8 +39,8 @@ var ( ) var ( - // ErrZkInfoErr is returned when zookeeper info is invalid. - ErrZkInfoErr = errors.New("zk info error") + // ErrZkInvalid is returned when zookeeper info is invalid. + ErrZkInvalid = errors.New("zk info invalid") ) type storeCache struct { @@ -106,24 +105,21 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) { defer mc.mu.Unlock() if len(zkInfo) == 0 { - return nil, ErrZkInfoErr + return nil, errors.Trace(ErrZkInvalid) } if store, ok := mc.cache[zkInfo]; ok { // TODO: check the cache store has the same engine with this Driver. return store, nil } zks := strings.Split(zkInfo, ",") - if len(zks) == 0 { - return nil, ErrZkInfoErr - } c, err := hbase.NewClient(zks, "/hbase") if err != nil { return nil, errors.Trace(err) } - if !c.TableExists(HbaseTableName) { + if !c.TableExists(hbaseTableName) { // Create new hbase table for store. - t := hbase.NewTableDesciptor(hbase.NewTableNameWithDefaultNS(HbaseTableName)) - cf := hbase.NewColumnFamilyDescriptor(ColFamily) + t := hbase.NewTableDesciptor(hbase.NewTableNameWithDefaultNS(hbaseTableName)) + cf := hbase.NewColumnFamilyDescriptor(hbaseColFamily) cf.AddStrAddr("THEMIS_ENABLE", "true") t.AddColumnDesc(cf) //TODO: specify split? @@ -133,7 +129,7 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) { } s := &hbaseStore{ zkInfo: zkInfo, - storeName: HbaseTableName, + storeName: hbaseTableName, cli: c, } mc.cache[zkInfo] = s diff --git a/store/hbase/snapshot.go b/store/hbase/snapshot.go index ae33d0f2da..121ddf7dd7 100644 --- a/store/hbase/snapshot.go +++ b/store/hbase/snapshot.go @@ -34,7 +34,6 @@ const hbaseBatchSize = 1000 type hbaseSnapshot struct { txn *themis.Txn storeName string - cache map[string][]byte } // newHBaseSnapshot creates a snapshot of an HBase store. @@ -42,40 +41,24 @@ func newHbaseSnapshot(txn *themis.Txn, storeName string) *hbaseSnapshot { return &hbaseSnapshot{ txn: txn, storeName: storeName, - cache: make(map[string][]byte), } } -// Get gets the value for key k from snapshot. It internally uses a cache to -// reduce RPC calls. If cache not hit, it uses a Scanner to query a batch of -// data. +// Get gets the value for key k from snapshot. func (s *hbaseSnapshot) Get(k kv.Key) ([]byte, error) { - if b, ok := s.cache[string(k)]; ok { - return b, nil + g := hbase.NewGet([]byte(k)) + g.AddColumn([]byte(hbaseColFamily), []byte(hbaseQualifier)) + v, err := internalGet(s, g) + if err != nil { + return nil, errors.Trace(err) } - - s.cache = make(map[string][]byte) - scanner := s.txn.GetScanner([]byte(s.storeName), k, nil, hbaseBatchSize) - for i := 0; i < hbaseBatchSize; i++ { - r := scanner.Next() - if r != nil && len(r.Columns) > 0 { - s.cache[string(r.Row)] = r.Columns[FmlAndQual].Value - } else { - break - } - } - scanner.Close() - - if b, ok := s.cache[string(k)]; ok { - return b, nil - } - return nil, kv.ErrNotExist + return v, nil } // BatchGet implements kv.Snapshot.BatchGet(). func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { gets := make([]*hbase.Get, len(keys)) - bColFamily, bQualifier := []byte(ColFamily), []byte(Qualifier) + bColFamily, bQualifier := []byte(hbaseColFamily), []byte(hbaseQualifier) for i, key := range keys { g := hbase.NewGet(key) g.AddColumn(bColFamily, bQualifier) @@ -89,9 +72,8 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { m := make(map[string][]byte) for _, r := range rows { k := string(r.Row) - v := r.Columns[FmlAndQual].Value + v := r.Columns[hbaseFmlAndQual].Value m[k] = v - s.cache[k] = v } return m, nil } @@ -107,9 +89,8 @@ func (s *hbaseSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byt r := scanner.Next() if r != nil && len(r.Columns) > 0 { k := string(r.Row) - v := r.Columns[FmlAndQual].Value + v := r.Columns[hbaseFmlAndQual].Value m[k] = v - s.cache[k] = v } else { break } @@ -122,7 +103,7 @@ func (s *hbaseSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byt // exist, returns the nearest(lower) version's data. func (s *hbaseSnapshot) MvccGet(k kv.Key, ver kv.Version) ([]byte, error) { g := hbase.NewGet([]byte(k)) - g.AddColumn([]byte(ColFamily), []byte(Qualifier)) + g.AddColumn([]byte(hbaseColFamily), []byte(hbaseQualifier)) g.TsRangeFrom = 0 g.TsRangeTo = ver.Ver + 1 v, err := internalGet(s, g) @@ -140,7 +121,7 @@ func internalGet(s *hbaseSnapshot, g *hbase.Get) ([]byte, error) { if r == nil || len(r.Columns) == 0 { return nil, kv.ErrNotExist } - return r.Columns[FmlAndQual].Value, nil + return r.Columns[hbaseFmlAndQual].Value, nil } func (s *hbaseSnapshot) NewIterator(param interface{}) kv.Iterator { @@ -184,18 +165,22 @@ func (s *hbaseSnapshot) MvccRelease() { type hbaseIter struct { *themis.ThemisScanner - rs *hbase.ResultRow - valid bool + rs *hbase.ResultRow } func (it *hbaseIter) Next() error { it.rs = it.ThemisScanner.Next() - it.valid = it.rs != nil && len(it.rs.Columns) > 0 && !it.ThemisScanner.Closed() return nil } func (it *hbaseIter) Valid() bool { - return it.valid + if it.rs == nil || len(it.rs.Columns) == 0 { + return false + } + if it.ThemisScanner.Closed() { + return false + } + return true } func (it *hbaseIter) Key() string { @@ -203,7 +188,7 @@ func (it *hbaseIter) Key() string { } func (it *hbaseIter) Value() []byte { - return it.rs.Columns[FmlAndQual].Value + return it.rs.Columns[hbaseFmlAndQual].Value } func (it *hbaseIter) Close() { diff --git a/store/hbase/txn.go b/store/hbase/txn.go index 39c2ff1909..59f79f7400 100644 --- a/store/hbase/txn.go +++ b/store/hbase/txn.go @@ -95,7 +95,7 @@ func (txn *hbaseTxn) Get(k kv.Key) ([]byte, error) { k = kv.EncodeKey(k) val, err := txn.UnionStore.Get(k) if kv.IsErrNotFound(err) || len(val) == 0 { - return nil, kv.ErrNotExist + return nil, errors.Trace(kv.ErrNotExist) } if err != nil { return nil, errors.Trace(err) @@ -137,7 +137,7 @@ func (txn *hbaseTxn) Set(k kv.Key, data []byte) error { if len(data) == 0 { // Incase someone use it in the wrong way, we can figure it out immediately debug.PrintStack() - return ErrCannotSetNilValue + return errors.Trace(ErrCannotSetNilValue) } log.Debugf("set key:%q, txn:%d", k, txn.tid) @@ -177,20 +177,24 @@ func (txn *hbaseTxn) each(f func(kv.Iterator) error) error { } func (txn *hbaseTxn) doCommit() error { + bColFamily, bQualifier := []byte(hbaseColFamily), []byte(hbaseQualifier) err := txn.each(func(iter kv.Iterator) error { var row, val []byte row = make([]byte, len(iter.Key())) if len(iter.Value()) == 0 { // Deleted marker copy(row, iter.Key()) d := hbase.NewDelete(row) - d.AddStringColumn(ColFamily, Qualifier) - txn.Txn.Delete(txn.storeName, d) + d.AddStringColumn(hbaseColFamily, hbaseQualifier) + err := txn.Txn.Delete(txn.storeName, d) + if err != nil { + return errors.Trace(err) + } } else { val = make([]byte, len(iter.Value())) copy(row, iter.Key()) copy(val, iter.Value()) p := hbase.NewPut(row) - p.AddValue([]byte(ColFamily), []byte(Qualifier), val) + p.AddValue(bColFamily, bQualifier, val) txn.Txn.Put(txn.storeName, p) } return nil diff --git a/util/batch_worker.go b/util/batch_worker.go deleted file mode 100644 index 954ef53a6a..0000000000 --- a/util/batch_worker.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2015 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "container/list" - "sync" -) - -type batchWorkFunc func(jobs []interface{}) - -// BatchWorker collects jobs then batch process them. -type BatchWorker struct { - mu sync.Mutex - pendingJobs *list.List - batchSize int - fn batchWorkFunc -} - -// NewBatchWorker creates a BatchWorker. -func NewBatchWorker(batchSize int, fn batchWorkFunc) *BatchWorker { - return &BatchWorker{ - pendingJobs: list.New(), - batchSize: batchSize, - fn: fn, - } -} - -// Submit submits a job to BatchWorker. -func (b *BatchWorker) Submit(job interface{}) { - var jobs []interface{} - b.mu.Lock() - b.pendingJobs.PushBack(job) - if b.pendingJobs.Len() >= b.batchSize { - jobs = make([]interface{}, 0, b.batchSize) - // pop first batchSize jobs to workerFn - for i := 0; i < b.batchSize; i++ { - ele := b.pendingJobs.Front() - jobs = append(jobs, ele.Value) - b.pendingJobs.Remove(ele) - } - } - b.mu.Unlock() - if len(jobs) > 0 { - b.fn(jobs) - } -} - -// Flush instructs BatchWorker to finish remain jobs. -func (b *BatchWorker) Flush() { - var jobs []interface{} - b.mu.Lock() - jobs = make([]interface{}, 0, b.pendingJobs.Len()) - for b.pendingJobs.Len() > 0 { - ele := b.pendingJobs.Front() - jobs = append(jobs, ele.Value) - b.pendingJobs.Remove(ele) - } - b.mu.Unlock() - if len(jobs) > 0 { - b.fn(jobs) - } -} diff --git a/util/batch_worker_test.go b/util/batch_worker_test.go deleted file mode 100644 index a5b2225281..0000000000 --- a/util/batch_worker_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2015 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "sync" - - . "github.com/pingcap/check" -) - -var _ = Suite(&testBatchWorkSuite{}) - -type testBatchWorkSuite struct{} - -func (s *testBatchWorkSuite) TestBatchWorker(c *C) { - total := 1024 - cnt := 0 - worker := NewBatchWorker(10, func(jobs []interface{}) { - cnt += len(jobs) - }) - for i := 0; i < total; i++ { - worker.Submit(i) - } - worker.Flush() - c.Assert(cnt, Equals, 1024) -} - -func (s *testBatchWorkSuite) TestConcurrentSubmit(c *C) { - var wg sync.WaitGroup - cnt := 0 - worker := NewBatchWorker(8, func(jobs []interface{}) { - cnt += len(jobs) - }) - for i := 0; i < 10; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - for j := 0; j < 10; j++ { - worker.Submit(j * i) - } - }(i) - } - wg.Wait() - worker.Flush() - c.Assert(cnt, Equals, 100) -}