Merge remote-tracking branch 'origin/master' into perf-turning
This commit is contained in:
13
bootstrap.go
13
bootstrap.go
@ -148,6 +148,7 @@ func checkBootstrappedVar(s Session) (bool, error) {
|
||||
}
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
|
||||
if len(rs) != 1 {
|
||||
return false, errors.New("Wrong number of Recordset")
|
||||
}
|
||||
@ -156,7 +157,17 @@ func checkBootstrappedVar(s Session) (bool, error) {
|
||||
if err != nil || row == nil {
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
return row.Data[0].(string) == bootstrappedVarTrue, nil
|
||||
|
||||
isBootstrapped := row.Data[0].(string) == bootstrappedVarTrue
|
||||
if isBootstrapped {
|
||||
// Make sure that doesn't affect the following operations.
|
||||
|
||||
if err = s.FinishTxn(false); err != nil {
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
return isBootstrapped, nil
|
||||
}
|
||||
|
||||
// Execute DDL statements in bootstrap stage.
|
||||
|
||||
@ -719,6 +719,7 @@ func (s *testSessionSuite) TestBootstrap(c *C) {
|
||||
se := newSession(c, store, s.dbName)
|
||||
mustExecSQL(c, se, "USE mysql;")
|
||||
r := mustExecSQL(c, se, `select * from user;`)
|
||||
c.Assert(r, NotNil)
|
||||
row, err := r.Next()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(row, NotNil)
|
||||
@ -734,9 +735,31 @@ func (s *testSessionSuite) TestBootstrap(c *C) {
|
||||
mustExecSQL(c, se, "SELECT * from mysql.columns_priv;")
|
||||
// Check privilege tables.
|
||||
r = mustExecSQL(c, se, "SELECT COUNT(*) from mysql.global_variables;")
|
||||
c.Assert(r, NotNil)
|
||||
v, err := r.FirstRow()
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(v[0], Equals, int64(len(variable.SysVars)))
|
||||
|
||||
// Check a storage operations are default autocommit after the second start.
|
||||
mustExecSQL(c, se, "USE test;")
|
||||
mustExecSQL(c, se, "drop table if exists t")
|
||||
mustExecSQL(c, se, "create table t (id int)")
|
||||
delete(storeBootstrapped, store.UUID())
|
||||
se.Close()
|
||||
se, err = CreateSession(store)
|
||||
c.Assert(err, IsNil)
|
||||
mustExecSQL(c, se, "USE test;")
|
||||
mustExecSQL(c, se, "insert t values (?)", 3)
|
||||
se, err = CreateSession(store)
|
||||
c.Assert(err, IsNil)
|
||||
mustExecSQL(c, se, "USE test;")
|
||||
r = mustExecSQL(c, se, "select * from t")
|
||||
c.Assert(r, NotNil)
|
||||
v, err = r.FirstRow()
|
||||
c.Assert(err, IsNil)
|
||||
match(c, v, 3)
|
||||
mustExecSQL(c, se, "drop table if exists t")
|
||||
se.Close()
|
||||
}
|
||||
|
||||
// Create a new session on store but only do ddl works.
|
||||
|
||||
@ -32,12 +32,12 @@ const hbaseBatchSize = 1000
|
||||
|
||||
// hbaseSnapshot implements MvccSnapshot interface.
|
||||
type hbaseSnapshot struct {
|
||||
txn *themis.Txn
|
||||
txn themis.Txn
|
||||
storeName string
|
||||
}
|
||||
|
||||
// newHBaseSnapshot creates a snapshot of an HBase store.
|
||||
func newHbaseSnapshot(txn *themis.Txn, storeName string) *hbaseSnapshot {
|
||||
func newHbaseSnapshot(txn themis.Txn, storeName string) *hbaseSnapshot {
|
||||
return &hbaseSnapshot{
|
||||
txn: txn,
|
||||
storeName: storeName,
|
||||
@ -63,7 +63,7 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
|
||||
g.AddColumn(hbaseColFamilyBytes, hbaseQualifierBytes)
|
||||
gets[i] = g
|
||||
}
|
||||
rows, err := s.txn.BatchGet(s.storeName, gets)
|
||||
rows, err := s.txn.Gets(s.storeName, gets)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -47,10 +47,10 @@ func getOptionDefaultVal(opt kv.Option) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
// dbTxn is not thread safe
|
||||
// dbTxn implements kv.Transacton. It is not thread safe.
|
||||
type hbaseTxn struct {
|
||||
kv.UnionStore
|
||||
*themis.Txn
|
||||
txn themis.Txn
|
||||
store *hbaseStore // for commit
|
||||
storeName string
|
||||
tid uint64
|
||||
@ -59,10 +59,10 @@ type hbaseTxn struct {
|
||||
opts map[kv.Option]interface{}
|
||||
}
|
||||
|
||||
func newHbaseTxn(t *themis.Txn, storeName string) *hbaseTxn {
|
||||
func newHbaseTxn(t themis.Txn, storeName string) *hbaseTxn {
|
||||
opts := make(map[kv.Option]interface{})
|
||||
return &hbaseTxn{
|
||||
Txn: t,
|
||||
txn: t,
|
||||
valid: true,
|
||||
storeName: storeName,
|
||||
tid: t.GetStartTS(),
|
||||
@ -205,7 +205,7 @@ func (txn *hbaseTxn) doCommit() error {
|
||||
copy(row, iter.Key())
|
||||
d := hbase.NewDelete(row)
|
||||
d.AddStringColumn(hbaseColFamily, hbaseQualifier)
|
||||
err := txn.Txn.Delete(txn.storeName, d)
|
||||
err := txn.txn.Delete(txn.storeName, d)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -215,7 +215,7 @@ func (txn *hbaseTxn) doCommit() error {
|
||||
copy(val, iter.Value())
|
||||
p := hbase.NewPut(row)
|
||||
p.AddValue(hbaseColFamilyBytes, hbaseQualifierBytes, val)
|
||||
txn.Txn.Put(txn.storeName, p)
|
||||
txn.txn.Put(txn.storeName, p)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@ -224,13 +224,13 @@ func (txn *hbaseTxn) doCommit() error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
err = txn.Txn.Commit()
|
||||
err = txn.txn.Commit()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
txn.version = kv.NewVersion(txn.Txn.GetCommitTS())
|
||||
txn.version = kv.NewVersion(txn.txn.GetCommitTS())
|
||||
log.Debugf("commit successfully, txn.version:%d", txn.version.Ver)
|
||||
return nil
|
||||
}
|
||||
@ -272,7 +272,7 @@ func (txn *hbaseTxn) Rollback() error {
|
||||
func (txn *hbaseTxn) LockKeys(keys ...kv.Key) error {
|
||||
for _, key := range keys {
|
||||
key = kv.EncodeKey(key)
|
||||
if err := txn.Txn.LockRow(txn.storeName, key); err != nil {
|
||||
if err := txn.txn.LockRow(txn.storeName, key); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user