hbase-store: clean-ups

This commit is contained in:
disksing
2015-11-09 11:22:01 +08:00
parent b8858cba76
commit c5e238ffce
8 changed files with 49 additions and 195 deletions

View File

@ -17,10 +17,10 @@ all: godep parser build test check
godep:
go get github.com/tools/godep
build:
go get github.com/pingcap/go-hbase
go get github.com/pingcap/go-themis
build:
$(GO) build
install:

View File

@ -31,7 +31,7 @@ func Register(name string, m interface{}) {
r.Register(name, m)
}
// Inc increments specific counter metric.
// Inc increases specific counter metric.
func Inc(name string, i int64) {
if c := r.GetOrRegister(name, metrics.NewCounter()); c != nil {
c.(metrics.Counter).Inc(i)

View File

@ -39,7 +39,7 @@ func (t *testSuite) TestRegMetric(c *C) {
Register(testMetricName, metrics.NewHistogram(metrics.NewUniformSample(1000)))
v := r.Get(testMetricName)
c.Assert(v != nil, Equals, true)
c.Assert(v, NotNil)
// Will not overwrite
_, ok := v.(metrics.Counter)

View File

@ -15,7 +15,6 @@ package hbasekv
import (
"strings"
"sync"
"github.com/juju/errors"
@ -25,14 +24,14 @@ import (
)
const (
// ColFamily is the hbase column family name.
ColFamily = "f"
// Qualifier is the hbase column name.
Qualifier = "q"
// FmlAndQual is a shortcut.
FmlAndQual = ColFamily + ":" + Qualifier
// HbaseTableName is the table name in hbase which is invisible to SQL layer.
HbaseTableName = "tidb"
// hbaseColFamily is the hbase column family name.
hbaseColFamily = "f"
// hbaseQualifier is the hbase column name.
hbaseQualifier = "q"
// hbaseFmlAndQual is a shortcut.
hbaseFmlAndQual = hbaseColFamily + ":" + hbaseQualifier
// hbaseTableName is the table name in hbase which is invisible to SQL layer.
hbaseTableName = "tidb"
)
var (
@ -40,8 +39,8 @@ var (
)
var (
// ErrZkInfoErr is returned when zookeeper info is invalid.
ErrZkInfoErr = errors.New("zk info error")
// ErrZkInvalid is returned when zookeeper info is invalid.
ErrZkInvalid = errors.New("zk info invalid")
)
type storeCache struct {
@ -106,24 +105,21 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) {
defer mc.mu.Unlock()
if len(zkInfo) == 0 {
return nil, ErrZkInfoErr
return nil, errors.Trace(ErrZkInvalid)
}
if store, ok := mc.cache[zkInfo]; ok {
// TODO: check the cache store has the same engine with this Driver.
return store, nil
}
zks := strings.Split(zkInfo, ",")
if len(zks) == 0 {
return nil, ErrZkInfoErr
}
c, err := hbase.NewClient(zks, "/hbase")
if err != nil {
return nil, errors.Trace(err)
}
if !c.TableExists(HbaseTableName) {
if !c.TableExists(hbaseTableName) {
// Create new hbase table for store.
t := hbase.NewTableDesciptor(hbase.NewTableNameWithDefaultNS(HbaseTableName))
cf := hbase.NewColumnFamilyDescriptor(ColFamily)
t := hbase.NewTableDesciptor(hbase.NewTableNameWithDefaultNS(hbaseTableName))
cf := hbase.NewColumnFamilyDescriptor(hbaseColFamily)
cf.AddStrAddr("THEMIS_ENABLE", "true")
t.AddColumnDesc(cf)
//TODO: specify split?
@ -133,7 +129,7 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) {
}
s := &hbaseStore{
zkInfo: zkInfo,
storeName: HbaseTableName,
storeName: hbaseTableName,
cli: c,
}
mc.cache[zkInfo] = s

View File

@ -34,7 +34,6 @@ const hbaseBatchSize = 1000
type hbaseSnapshot struct {
txn *themis.Txn
storeName string
cache map[string][]byte
}
// newHBaseSnapshot creates a snapshot of an HBase store.
@ -42,40 +41,24 @@ func newHbaseSnapshot(txn *themis.Txn, storeName string) *hbaseSnapshot {
return &hbaseSnapshot{
txn: txn,
storeName: storeName,
cache: make(map[string][]byte),
}
}
// Get gets the value for key k from snapshot. It internally uses a cache to
// reduce RPC calls. If cache not hit, it uses a Scanner to query a batch of
// data.
// Get gets the value for key k from snapshot.
func (s *hbaseSnapshot) Get(k kv.Key) ([]byte, error) {
if b, ok := s.cache[string(k)]; ok {
return b, nil
g := hbase.NewGet([]byte(k))
g.AddColumn([]byte(hbaseColFamily), []byte(hbaseQualifier))
v, err := internalGet(s, g)
if err != nil {
return nil, errors.Trace(err)
}
s.cache = make(map[string][]byte)
scanner := s.txn.GetScanner([]byte(s.storeName), k, nil, hbaseBatchSize)
for i := 0; i < hbaseBatchSize; i++ {
r := scanner.Next()
if r != nil && len(r.Columns) > 0 {
s.cache[string(r.Row)] = r.Columns[FmlAndQual].Value
} else {
break
}
}
scanner.Close()
if b, ok := s.cache[string(k)]; ok {
return b, nil
}
return nil, kv.ErrNotExist
return v, nil
}
// BatchGet implements kv.Snapshot.BatchGet().
func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
gets := make([]*hbase.Get, len(keys))
bColFamily, bQualifier := []byte(ColFamily), []byte(Qualifier)
bColFamily, bQualifier := []byte(hbaseColFamily), []byte(hbaseQualifier)
for i, key := range keys {
g := hbase.NewGet(key)
g.AddColumn(bColFamily, bQualifier)
@ -89,9 +72,8 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
m := make(map[string][]byte)
for _, r := range rows {
k := string(r.Row)
v := r.Columns[FmlAndQual].Value
v := r.Columns[hbaseFmlAndQual].Value
m[k] = v
s.cache[k] = v
}
return m, nil
}
@ -107,9 +89,8 @@ func (s *hbaseSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byt
r := scanner.Next()
if r != nil && len(r.Columns) > 0 {
k := string(r.Row)
v := r.Columns[FmlAndQual].Value
v := r.Columns[hbaseFmlAndQual].Value
m[k] = v
s.cache[k] = v
} else {
break
}
@ -122,7 +103,7 @@ func (s *hbaseSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byt
// exist, returns the nearest(lower) version's data.
func (s *hbaseSnapshot) MvccGet(k kv.Key, ver kv.Version) ([]byte, error) {
g := hbase.NewGet([]byte(k))
g.AddColumn([]byte(ColFamily), []byte(Qualifier))
g.AddColumn([]byte(hbaseColFamily), []byte(hbaseQualifier))
g.TsRangeFrom = 0
g.TsRangeTo = ver.Ver + 1
v, err := internalGet(s, g)
@ -140,7 +121,7 @@ func internalGet(s *hbaseSnapshot, g *hbase.Get) ([]byte, error) {
if r == nil || len(r.Columns) == 0 {
return nil, kv.ErrNotExist
}
return r.Columns[FmlAndQual].Value, nil
return r.Columns[hbaseFmlAndQual].Value, nil
}
func (s *hbaseSnapshot) NewIterator(param interface{}) kv.Iterator {
@ -184,18 +165,22 @@ func (s *hbaseSnapshot) MvccRelease() {
type hbaseIter struct {
*themis.ThemisScanner
rs *hbase.ResultRow
valid bool
rs *hbase.ResultRow
}
func (it *hbaseIter) Next() error {
it.rs = it.ThemisScanner.Next()
it.valid = it.rs != nil && len(it.rs.Columns) > 0 && !it.ThemisScanner.Closed()
return nil
}
func (it *hbaseIter) Valid() bool {
return it.valid
if it.rs == nil || len(it.rs.Columns) == 0 {
return false
}
if it.ThemisScanner.Closed() {
return false
}
return true
}
func (it *hbaseIter) Key() string {
@ -203,7 +188,7 @@ func (it *hbaseIter) Key() string {
}
func (it *hbaseIter) Value() []byte {
return it.rs.Columns[FmlAndQual].Value
return it.rs.Columns[hbaseFmlAndQual].Value
}
func (it *hbaseIter) Close() {

View File

@ -95,7 +95,7 @@ func (txn *hbaseTxn) Get(k kv.Key) ([]byte, error) {
k = kv.EncodeKey(k)
val, err := txn.UnionStore.Get(k)
if kv.IsErrNotFound(err) || len(val) == 0 {
return nil, kv.ErrNotExist
return nil, errors.Trace(kv.ErrNotExist)
}
if err != nil {
return nil, errors.Trace(err)
@ -137,7 +137,7 @@ func (txn *hbaseTxn) Set(k kv.Key, data []byte) error {
if len(data) == 0 {
// Incase someone use it in the wrong way, we can figure it out immediately
debug.PrintStack()
return ErrCannotSetNilValue
return errors.Trace(ErrCannotSetNilValue)
}
log.Debugf("set key:%q, txn:%d", k, txn.tid)
@ -177,20 +177,24 @@ func (txn *hbaseTxn) each(f func(kv.Iterator) error) error {
}
func (txn *hbaseTxn) doCommit() error {
bColFamily, bQualifier := []byte(hbaseColFamily), []byte(hbaseQualifier)
err := txn.each(func(iter kv.Iterator) error {
var row, val []byte
row = make([]byte, len(iter.Key()))
if len(iter.Value()) == 0 { // Deleted marker
copy(row, iter.Key())
d := hbase.NewDelete(row)
d.AddStringColumn(ColFamily, Qualifier)
txn.Txn.Delete(txn.storeName, d)
d.AddStringColumn(hbaseColFamily, hbaseQualifier)
err := txn.Txn.Delete(txn.storeName, d)
if err != nil {
return errors.Trace(err)
}
} else {
val = make([]byte, len(iter.Value()))
copy(row, iter.Key())
copy(val, iter.Value())
p := hbase.NewPut(row)
p.AddValue([]byte(ColFamily), []byte(Qualifier), val)
p.AddValue(bColFamily, bQualifier, val)
txn.Txn.Put(txn.storeName, p)
}
return nil

View File

@ -1,74 +0,0 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"container/list"
"sync"
)
type batchWorkFunc func(jobs []interface{})
// BatchWorker collects jobs then batch process them.
type BatchWorker struct {
mu sync.Mutex
pendingJobs *list.List
batchSize int
fn batchWorkFunc
}
// NewBatchWorker creates a BatchWorker.
func NewBatchWorker(batchSize int, fn batchWorkFunc) *BatchWorker {
return &BatchWorker{
pendingJobs: list.New(),
batchSize: batchSize,
fn: fn,
}
}
// Submit submits a job to BatchWorker.
func (b *BatchWorker) Submit(job interface{}) {
var jobs []interface{}
b.mu.Lock()
b.pendingJobs.PushBack(job)
if b.pendingJobs.Len() >= b.batchSize {
jobs = make([]interface{}, 0, b.batchSize)
// pop first batchSize jobs to workerFn
for i := 0; i < b.batchSize; i++ {
ele := b.pendingJobs.Front()
jobs = append(jobs, ele.Value)
b.pendingJobs.Remove(ele)
}
}
b.mu.Unlock()
if len(jobs) > 0 {
b.fn(jobs)
}
}
// Flush instructs BatchWorker to finish remain jobs.
func (b *BatchWorker) Flush() {
var jobs []interface{}
b.mu.Lock()
jobs = make([]interface{}, 0, b.pendingJobs.Len())
for b.pendingJobs.Len() > 0 {
ele := b.pendingJobs.Front()
jobs = append(jobs, ele.Value)
b.pendingJobs.Remove(ele)
}
b.mu.Unlock()
if len(jobs) > 0 {
b.fn(jobs)
}
}

View File

@ -1,57 +0,0 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"sync"
. "github.com/pingcap/check"
)
var _ = Suite(&testBatchWorkSuite{})
type testBatchWorkSuite struct{}
func (s *testBatchWorkSuite) TestBatchWorker(c *C) {
total := 1024
cnt := 0
worker := NewBatchWorker(10, func(jobs []interface{}) {
cnt += len(jobs)
})
for i := 0; i < total; i++ {
worker.Submit(i)
}
worker.Flush()
c.Assert(cnt, Equals, 1024)
}
func (s *testBatchWorkSuite) TestConcurrentSubmit(c *C) {
var wg sync.WaitGroup
cnt := 0
worker := NewBatchWorker(8, func(jobs []interface{}) {
cnt += len(jobs)
})
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
worker.Submit(j * i)
}
}(i)
}
wg.Wait()
worker.Flush()
c.Assert(cnt, Equals, 100)
}