From c318f58c072bf47b3895e2fe0c15dc2f724c8a4b Mon Sep 17 00:00:00 2001 From: dongxu Date: Sat, 14 Nov 2015 13:48:54 +0800 Subject: [PATCH] hbase-store: use a fixed conn pool instead of single connection for better rpc performance --- store/hbase/kv.go | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/store/hbase/kv.go b/store/hbase/kv.go index 4ceb22bca4..c796214248 100644 --- a/store/hbase/kv.go +++ b/store/hbase/kv.go @@ -16,8 +16,10 @@ package hbasekv import ( "strings" "sync" + "time" "github.com/juju/errors" + "github.com/ngaut/log" "github.com/pingcap/go-hbase" "github.com/pingcap/go-themis" "github.com/pingcap/tidb/kv" @@ -30,6 +32,8 @@ const ( hbaseQualifier = "q" // hbaseFmlAndQual is a shortcut. hbaseFmlAndQual = hbaseColFamily + ":" + hbaseQualifier + // fix length conn pool + hbaseConnPoolSize = 10 ) var ( @@ -61,21 +65,24 @@ type hbaseStore struct { mu sync.Mutex zkInfo string storeName string - cli hbase.HBaseClient + conns []hbase.HBaseClient } func (s *hbaseStore) Begin() (kv.Transaction, error) { s.mu.Lock() defer s.mu.Unlock() - - t := themis.NewTxn(s.cli) + // get random conn + hbaseCli := s.conns[time.Now().UnixNano()%hbaseConnPoolSize] + t := themis.NewTxn(hbaseCli) txn := newHbaseTxn(t, s.storeName) txn.UnionStore = kv.NewUnionStore(newHbaseSnapshot(t, s.storeName)) return txn, nil } func (s *hbaseStore) GetSnapshot(ver kv.Version) (kv.MvccSnapshot, error) { - t := themis.NewTxn(s.cli) + // get random conn + hbaseCli := s.conns[time.Now().UnixNano()%hbaseConnPoolSize] + t := themis.NewTxn(hbaseCli) return newHbaseSnapshot(t, s.storeName), nil } @@ -84,7 +91,16 @@ func (s *hbaseStore) Close() error { defer mc.mu.Unlock() delete(mc.cache, s.zkInfo) - return s.cli.Close() + + var err error + for _, conn := range s.conns { + err = conn.Close() + if err != nil { + log.Error(err) + } + } + // return last error + return err } func (s *hbaseStore) UUID() string { @@ -92,7 +108,8 @@ func (s *hbaseStore) UUID() string { } func (s *hbaseStore) CurrentVersion() (kv.Version, error) { - t := themis.NewTxn(s.cli) + hbaseCli := s.conns[time.Now().UnixNano()%hbaseConnPoolSize] + t := themis.NewTxn(hbaseCli) defer t.Release() return kv.Version{Ver: t.GetStartTS()}, nil @@ -122,10 +139,18 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) { return store, nil } - c, err := hbase.NewClient(zks, "/hbase") - if err != nil { - return nil, errors.Trace(err) + // create buffered HBase connections, HBaseClient is goroutine-safe, so + // it's OK to redistribute to transactions. + conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize) + for i := 0; i < hbaseConnPoolSize; i++ { + c, err := hbase.NewClient(zks, "/hbase") + if err != nil { + return nil, errors.Trace(err) + } + conns = append(conns, c) } + + c := conns[0] if !c.TableExists(tableName) { // Create new hbase table for store. t := hbase.NewTableDesciptor(hbase.NewTableNameWithDefaultNS(tableName)) @@ -137,10 +162,11 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) { return nil, errors.Trace(err) } } + s := &hbaseStore{ zkInfo: zkInfo, storeName: tableName, - cli: c, + conns: conns, } mc.cache[zkInfo] = s return s, nil