Merge branch 'master' into coocood/pk-handle2
This commit is contained in:
@ -153,7 +153,7 @@ func parseDriverDSN(dsn string) (storePath, dbName string, err error) {
|
||||
// Examples:
|
||||
// goleveldb://relative/path/test
|
||||
// boltdb:///absolute/path/test
|
||||
// hbase://zk1,zk2,zk3/hbasetbl/test?tso=127.0.0.1:1234
|
||||
// hbase://zk1,zk2,zk3/hbasetbl/test?tso=zk
|
||||
//
|
||||
// Open may return a cached connection (one previously closed), but doing so is
|
||||
// unnecessary; the sql package maintains a pool of idle connections for
|
||||
|
||||
@ -30,17 +30,17 @@ type testHBaseSuite struct {
|
||||
|
||||
func (t *testHBaseSuite) TestParsePath(c *C) {
|
||||
tbl := []struct {
|
||||
dsn string
|
||||
ok bool
|
||||
zks []string
|
||||
oracle string
|
||||
table string
|
||||
dsn string
|
||||
ok bool
|
||||
zks string
|
||||
tso string
|
||||
table string
|
||||
}{
|
||||
{"hbase://z,k,zk/tbl", true, []string{"z", "k", "zk"}, "", "tbl"},
|
||||
{"hbase://z:80,k:80/tbl?tso=127.0.0.1:1234", true, []string{"z:80", "k:80"}, "127.0.0.1:1234", "tbl"},
|
||||
{"goleveldb://zk/tbl", false, nil, "", ""},
|
||||
{"hbase://zk/path/tbl", false, nil, "", ""},
|
||||
{"hbase:///zk/tbl", false, nil, "", ""},
|
||||
{"hbase://z,k,zk/tbl", true, "z,k,zk", tsoTypeLocal, "tbl"},
|
||||
{"hbase://z:80,k:80/tbl?tso=zk", true, "z:80,k:80", tsoTypeZK, "tbl"},
|
||||
{"goleveldb://zk/tbl", false, "", "", ""},
|
||||
{"hbase://zk/path/tbl", false, "", "", ""},
|
||||
{"hbase:///zk/tbl", false, "", "", ""},
|
||||
}
|
||||
|
||||
for _, t := range tbl {
|
||||
@ -48,7 +48,7 @@ func (t *testHBaseSuite) TestParsePath(c *C) {
|
||||
if t.ok {
|
||||
c.Assert(err, IsNil, Commentf("dsn=%v", t.dsn))
|
||||
c.Assert(zks, DeepEquals, t.zks, Commentf("dsn=%v", t.dsn))
|
||||
c.Assert(oracle, Equals, t.oracle, Commentf("dsn=%v", t.dsn))
|
||||
c.Assert(oracle, Equals, t.tso, Commentf("dsn=%v", t.dsn))
|
||||
c.Assert(table, Equals, t.table, Commentf("dsn=%v", t.dsn))
|
||||
} else {
|
||||
c.Assert(err, NotNil, Commentf("dsn=%v", t.dsn))
|
||||
|
||||
@ -69,12 +69,11 @@ func init() {
|
||||
}
|
||||
|
||||
type hbaseStore struct {
|
||||
mu sync.Mutex
|
||||
uuid string
|
||||
storeName string
|
||||
oracleAddr string
|
||||
oracle oracle.Oracle
|
||||
conns []hbase.HBaseClient
|
||||
mu sync.Mutex
|
||||
uuid string
|
||||
storeName string
|
||||
oracle oracle.Oracle
|
||||
conns []hbase.HBaseClient
|
||||
}
|
||||
|
||||
func (s *hbaseStore) getHBaseClient() hbase.HBaseClient {
|
||||
@ -139,26 +138,34 @@ func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
|
||||
type Driver struct {
|
||||
}
|
||||
|
||||
const (
|
||||
tsoTypeLocal = "local"
|
||||
tsoTypeZK = "zk"
|
||||
|
||||
tsoZKPath = "/zk/tso"
|
||||
)
|
||||
|
||||
// Open opens or creates an HBase storage with given path.
|
||||
//
|
||||
// The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=host:port]'.
|
||||
// The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=local|zk]'.
|
||||
// If tso is not provided, it will use a local oracle instead. (for test only)
|
||||
func (d Driver) Open(path string) (kv.Storage, error) {
|
||||
mc.mu.Lock()
|
||||
defer mc.mu.Unlock()
|
||||
|
||||
zks, oracleAddr, tableName, err := parsePath(path)
|
||||
zks, tso, tableName, err := parsePath(path)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if tso != tsoTypeLocal && tso != tsoTypeZK {
|
||||
return nil, errors.Trace(ErrInvalidDSN)
|
||||
}
|
||||
|
||||
uuid := fmt.Sprintf("hbase-%v-%v", zks, tableName)
|
||||
if tso == tsoTypeLocal {
|
||||
log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid)
|
||||
}
|
||||
if store, ok := mc.cache[uuid]; ok {
|
||||
if oracleAddr != store.oracleAddr {
|
||||
err = errors.Errorf("hbase: store(%s) is opened with a different tso, old: %v, new: %v", uuid, store.oracleAddr, oracleAddr)
|
||||
log.Warn(errors.ErrorStack(err))
|
||||
return nil, err
|
||||
}
|
||||
return store, nil
|
||||
}
|
||||
|
||||
@ -167,7 +174,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {
|
||||
conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize)
|
||||
for i := 0; i < hbaseConnPoolSize; i++ {
|
||||
var c hbase.HBaseClient
|
||||
c, err = hbase.NewClient(zks, "/hbase")
|
||||
c, err = hbase.NewClient(strings.Split(zks, ","), "/hbase")
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -193,37 +200,39 @@ func (d Driver) Open(path string) (kv.Storage, error) {
|
||||
}
|
||||
|
||||
var ora oracle.Oracle
|
||||
if len(oracleAddr) == 0 {
|
||||
log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid)
|
||||
switch tso {
|
||||
case tsoTypeLocal:
|
||||
ora = oracles.NewLocalOracle()
|
||||
} else {
|
||||
ora = oracles.NewRemoteOracle(oracleAddr)
|
||||
case tsoTypeZK:
|
||||
ora = oracles.NewRemoteOracle(zks, tsoZKPath)
|
||||
}
|
||||
|
||||
s := &hbaseStore{
|
||||
uuid: uuid,
|
||||
storeName: tableName,
|
||||
oracleAddr: oracleAddr,
|
||||
oracle: ora,
|
||||
conns: conns,
|
||||
uuid: uuid,
|
||||
storeName: tableName,
|
||||
oracle: ora,
|
||||
conns: conns,
|
||||
}
|
||||
mc.cache[uuid] = s
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func parsePath(path string) (zks []string, oracleAddr, tableName string, err error) {
|
||||
func parsePath(path string) (zks, tso, tableName string, err error) {
|
||||
u, err := url.Parse(path)
|
||||
if err != nil {
|
||||
return nil, "", "", errors.Trace(err)
|
||||
return "", "", "", errors.Trace(err)
|
||||
}
|
||||
if strings.ToLower(u.Scheme) != "hbase" {
|
||||
return nil, "", "", errors.Trace(ErrInvalidDSN)
|
||||
return "", "", "", errors.Trace(ErrInvalidDSN)
|
||||
}
|
||||
p, tableName := filepath.Split(u.Path)
|
||||
if p != "/" {
|
||||
return nil, "", "", errors.Trace(ErrInvalidDSN)
|
||||
return "", "", "", errors.Trace(ErrInvalidDSN)
|
||||
}
|
||||
zks = strings.Split(u.Host, ",")
|
||||
oracleAddr = u.Query().Get("tso")
|
||||
return zks, oracleAddr, tableName, nil
|
||||
zks = u.Host
|
||||
tso = u.Query().Get("tso")
|
||||
if tso == "" {
|
||||
tso = tsoTypeLocal
|
||||
}
|
||||
return zks, tso, tableName, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user