hbase-store: use a fixed conn pool instead of single connection
for better rpc performance
This commit is contained in:
@ -16,8 +16,10 @@ package hbasekv
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/go-hbase"
|
||||
"github.com/pingcap/go-themis"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
@ -30,6 +32,8 @@ const (
|
||||
hbaseQualifier = "q"
|
||||
// hbaseFmlAndQual is a shortcut.
|
||||
hbaseFmlAndQual = hbaseColFamily + ":" + hbaseQualifier
|
||||
// fix length conn pool
|
||||
hbaseConnPoolSize = 10
|
||||
)
|
||||
|
||||
var (
|
||||
@ -61,21 +65,24 @@ type hbaseStore struct {
|
||||
mu sync.Mutex
|
||||
zkInfo string
|
||||
storeName string
|
||||
cli hbase.HBaseClient
|
||||
conns []hbase.HBaseClient
|
||||
}
|
||||
|
||||
func (s *hbaseStore) Begin() (kv.Transaction, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
t := themis.NewTxn(s.cli)
|
||||
// get random conn
|
||||
hbaseCli := s.conns[time.Now().UnixNano()%hbaseConnPoolSize]
|
||||
t := themis.NewTxn(hbaseCli)
|
||||
txn := newHbaseTxn(t, s.storeName)
|
||||
txn.UnionStore = kv.NewUnionStore(newHbaseSnapshot(t, s.storeName))
|
||||
return txn, nil
|
||||
}
|
||||
|
||||
func (s *hbaseStore) GetSnapshot(ver kv.Version) (kv.MvccSnapshot, error) {
|
||||
t := themis.NewTxn(s.cli)
|
||||
// get random conn
|
||||
hbaseCli := s.conns[time.Now().UnixNano()%hbaseConnPoolSize]
|
||||
t := themis.NewTxn(hbaseCli)
|
||||
return newHbaseSnapshot(t, s.storeName), nil
|
||||
}
|
||||
|
||||
@ -84,7 +91,16 @@ func (s *hbaseStore) Close() error {
|
||||
defer mc.mu.Unlock()
|
||||
|
||||
delete(mc.cache, s.zkInfo)
|
||||
return s.cli.Close()
|
||||
|
||||
var err error
|
||||
for _, conn := range s.conns {
|
||||
err = conn.Close()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
// return last error
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *hbaseStore) UUID() string {
|
||||
@ -92,7 +108,8 @@ func (s *hbaseStore) UUID() string {
|
||||
}
|
||||
|
||||
func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
|
||||
t := themis.NewTxn(s.cli)
|
||||
hbaseCli := s.conns[time.Now().UnixNano()%hbaseConnPoolSize]
|
||||
t := themis.NewTxn(hbaseCli)
|
||||
defer t.Release()
|
||||
|
||||
return kv.Version{Ver: t.GetStartTS()}, nil
|
||||
@ -122,10 +139,18 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) {
|
||||
return store, nil
|
||||
}
|
||||
|
||||
c, err := hbase.NewClient(zks, "/hbase")
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
// create buffered HBase connections, HBaseClient is goroutine-safe, so
|
||||
// it's OK to redistribute to transactions.
|
||||
conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize)
|
||||
for i := 0; i < hbaseConnPoolSize; i++ {
|
||||
c, err := hbase.NewClient(zks, "/hbase")
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
conns = append(conns, c)
|
||||
}
|
||||
|
||||
c := conns[0]
|
||||
if !c.TableExists(tableName) {
|
||||
// Create new hbase table for store.
|
||||
t := hbase.NewTableDesciptor(hbase.NewTableNameWithDefaultNS(tableName))
|
||||
@ -137,10 +162,11 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
s := &hbaseStore{
|
||||
zkInfo: zkInfo,
|
||||
storeName: tableName,
|
||||
cli: c,
|
||||
conns: conns,
|
||||
}
|
||||
mc.cache[zkInfo] = s
|
||||
return s, nil
|
||||
|
||||
Reference in New Issue
Block a user