diff --git a/driver.go b/driver.go index e4411021cd..54b820fa04 100644 --- a/driver.go +++ b/driver.go @@ -153,7 +153,7 @@ func parseDriverDSN(dsn string) (storePath, dbName string, err error) { // Examples: // goleveldb://relative/path/test // boltdb:///absolute/path/test -// hbase://zk1,zk2,zk3/hbasetbl/test?tso=127.0.0.1:1234 +// hbase://zk1,zk2,zk3/hbasetbl/test?tso=zk // // Open may return a cached connection (one previously closed), but doing so is // unnecessary; the sql package maintains a pool of idle connections for diff --git a/store/hbase/hbase_test.go b/store/hbase/hbase_test.go index aafb535b15..e071c4c595 100644 --- a/store/hbase/hbase_test.go +++ b/store/hbase/hbase_test.go @@ -30,17 +30,17 @@ type testHBaseSuite struct { func (t *testHBaseSuite) TestParsePath(c *C) { tbl := []struct { - dsn string - ok bool - zks []string - oracle string - table string + dsn string + ok bool + zks string + tso string + table string }{ - {"hbase://z,k,zk/tbl", true, []string{"z", "k", "zk"}, "", "tbl"}, - {"hbase://z:80,k:80/tbl?tso=127.0.0.1:1234", true, []string{"z:80", "k:80"}, "127.0.0.1:1234", "tbl"}, - {"goleveldb://zk/tbl", false, nil, "", ""}, - {"hbase://zk/path/tbl", false, nil, "", ""}, - {"hbase:///zk/tbl", false, nil, "", ""}, + {"hbase://z,k,zk/tbl", true, "z,k,zk", tsoTypeLocal, "tbl"}, + {"hbase://z:80,k:80/tbl?tso=zk", true, "z:80,k:80", tsoTypeZK, "tbl"}, + {"goleveldb://zk/tbl", false, "", "", ""}, + {"hbase://zk/path/tbl", false, "", "", ""}, + {"hbase:///zk/tbl", false, "", "", ""}, } for _, t := range tbl { @@ -48,7 +48,7 @@ func (t *testHBaseSuite) TestParsePath(c *C) { if t.ok { c.Assert(err, IsNil, Commentf("dsn=%v", t.dsn)) c.Assert(zks, DeepEquals, t.zks, Commentf("dsn=%v", t.dsn)) - c.Assert(oracle, Equals, t.oracle, Commentf("dsn=%v", t.dsn)) + c.Assert(oracle, Equals, t.tso, Commentf("dsn=%v", t.dsn)) c.Assert(table, Equals, t.table, Commentf("dsn=%v", t.dsn)) } else { c.Assert(err, NotNil, Commentf("dsn=%v", t.dsn)) diff --git a/store/hbase/kv.go b/store/hbase/kv.go index 65153aa0db..6ceaff3050 100644 --- a/store/hbase/kv.go +++ b/store/hbase/kv.go @@ -69,12 +69,11 @@ func init() { } type hbaseStore struct { - mu sync.Mutex - uuid string - storeName string - oracleAddr string - oracle oracle.Oracle - conns []hbase.HBaseClient + mu sync.Mutex + uuid string + storeName string + oracle oracle.Oracle + conns []hbase.HBaseClient } func (s *hbaseStore) getHBaseClient() hbase.HBaseClient { @@ -139,26 +138,34 @@ func (s *hbaseStore) CurrentVersion() (kv.Version, error) { type Driver struct { } +const ( + tsoTypeLocal = "local" + tsoTypeZK = "zk" + + tsoZKPath = "/zk/tso" +) + // Open opens or creates an HBase storage with given path. // -// The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=host:port]'. +// The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=local|zk]'. // If tso is not provided, it will use a local oracle instead. (for test only) func (d Driver) Open(path string) (kv.Storage, error) { mc.mu.Lock() defer mc.mu.Unlock() - zks, oracleAddr, tableName, err := parsePath(path) + zks, tso, tableName, err := parsePath(path) if err != nil { return nil, errors.Trace(err) } + if tso != tsoTypeLocal && tso != tsoTypeZK { + return nil, errors.Trace(ErrInvalidDSN) + } uuid := fmt.Sprintf("hbase-%v-%v", zks, tableName) + if tso == tsoTypeLocal { + log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid) + } if store, ok := mc.cache[uuid]; ok { - if oracleAddr != store.oracleAddr { - err = errors.Errorf("hbase: store(%s) is opened with a different tso, old: %v, new: %v", uuid, store.oracleAddr, oracleAddr) - log.Warn(errors.ErrorStack(err)) - return nil, err - } return store, nil } @@ -167,7 +174,7 @@ func (d Driver) Open(path string) (kv.Storage, error) { conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize) for i := 0; i < hbaseConnPoolSize; i++ { var c hbase.HBaseClient - c, err = hbase.NewClient(zks, "/hbase") + c, err = hbase.NewClient(strings.Split(zks, ","), "/hbase") if err != nil { return nil, errors.Trace(err) } @@ -193,37 +200,39 @@ func (d Driver) Open(path string) (kv.Storage, error) { } var ora oracle.Oracle - if len(oracleAddr) == 0 { - log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid) + switch tso { + case tsoTypeLocal: ora = oracles.NewLocalOracle() - } else { - ora = oracles.NewRemoteOracle(oracleAddr) + case tsoTypeZK: + ora = oracles.NewRemoteOracle(zks, tsoZKPath) } s := &hbaseStore{ - uuid: uuid, - storeName: tableName, - oracleAddr: oracleAddr, - oracle: ora, - conns: conns, + uuid: uuid, + storeName: tableName, + oracle: ora, + conns: conns, } mc.cache[uuid] = s return s, nil } -func parsePath(path string) (zks []string, oracleAddr, tableName string, err error) { +func parsePath(path string) (zks, tso, tableName string, err error) { u, err := url.Parse(path) if err != nil { - return nil, "", "", errors.Trace(err) + return "", "", "", errors.Trace(err) } if strings.ToLower(u.Scheme) != "hbase" { - return nil, "", "", errors.Trace(ErrInvalidDSN) + return "", "", "", errors.Trace(ErrInvalidDSN) } p, tableName := filepath.Split(u.Path) if p != "/" { - return nil, "", "", errors.Trace(ErrInvalidDSN) + return "", "", "", errors.Trace(ErrInvalidDSN) } - zks = strings.Split(u.Host, ",") - oracleAddr = u.Query().Get("tso") - return zks, oracleAddr, tableName, nil + zks = u.Host + tso = u.Query().Get("tso") + if tso == "" { + tso = tsoTypeLocal + } + return zks, tso, tableName, nil }