diff --git a/bootstrap.go b/bootstrap.go index 06cdcc0036..0805b2b6d9 100644 --- a/bootstrap.go +++ b/bootstrap.go @@ -148,6 +148,7 @@ func checkBootstrappedVar(s Session) (bool, error) { } return false, errors.Trace(err) } + if len(rs) != 1 { return false, errors.New("Wrong number of Recordset") } @@ -156,7 +157,17 @@ func checkBootstrappedVar(s Session) (bool, error) { if err != nil || row == nil { return false, errors.Trace(err) } - return row.Data[0].(string) == bootstrappedVarTrue, nil + + isBootstrapped := row.Data[0].(string) == bootstrappedVarTrue + if isBootstrapped { + // Make sure that doesn't affect the following operations. + + if err = s.FinishTxn(false); err != nil { + return false, errors.Trace(err) + } + } + + return isBootstrapped, nil } // Execute DDL statements in bootstrap stage. diff --git a/session_test.go b/session_test.go index aacdcb9609..1c2a5dc3b7 100644 --- a/session_test.go +++ b/session_test.go @@ -719,6 +719,7 @@ func (s *testSessionSuite) TestBootstrap(c *C) { se := newSession(c, store, s.dbName) mustExecSQL(c, se, "USE mysql;") r := mustExecSQL(c, se, `select * from user;`) + c.Assert(r, NotNil) row, err := r.Next() c.Assert(err, IsNil) c.Assert(row, NotNil) @@ -734,9 +735,31 @@ func (s *testSessionSuite) TestBootstrap(c *C) { mustExecSQL(c, se, "SELECT * from mysql.columns_priv;") // Check privilege tables. r = mustExecSQL(c, se, "SELECT COUNT(*) from mysql.global_variables;") + c.Assert(r, NotNil) v, err := r.FirstRow() c.Assert(err, IsNil) c.Assert(v[0], Equals, int64(len(variable.SysVars))) + + // Check a storage operations are default autocommit after the second start. + mustExecSQL(c, se, "USE test;") + mustExecSQL(c, se, "drop table if exists t") + mustExecSQL(c, se, "create table t (id int)") + delete(storeBootstrapped, store.UUID()) + se.Close() + se, err = CreateSession(store) + c.Assert(err, IsNil) + mustExecSQL(c, se, "USE test;") + mustExecSQL(c, se, "insert t values (?)", 3) + se, err = CreateSession(store) + c.Assert(err, IsNil) + mustExecSQL(c, se, "USE test;") + r = mustExecSQL(c, se, "select * from t") + c.Assert(r, NotNil) + v, err = r.FirstRow() + c.Assert(err, IsNil) + match(c, v, 3) + mustExecSQL(c, se, "drop table if exists t") + se.Close() } // Create a new session on store but only do ddl works. diff --git a/store/hbase/snapshot.go b/store/hbase/snapshot.go index dce08ea743..08cb6b4aba 100644 --- a/store/hbase/snapshot.go +++ b/store/hbase/snapshot.go @@ -32,12 +32,12 @@ const hbaseBatchSize = 1000 // hbaseSnapshot implements MvccSnapshot interface. type hbaseSnapshot struct { - txn *themis.Txn + txn themis.Txn storeName string } // newHBaseSnapshot creates a snapshot of an HBase store. -func newHbaseSnapshot(txn *themis.Txn, storeName string) *hbaseSnapshot { +func newHbaseSnapshot(txn themis.Txn, storeName string) *hbaseSnapshot { return &hbaseSnapshot{ txn: txn, storeName: storeName, @@ -63,7 +63,7 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { g.AddColumn(hbaseColFamilyBytes, hbaseQualifierBytes) gets[i] = g } - rows, err := s.txn.BatchGet(s.storeName, gets) + rows, err := s.txn.Gets(s.storeName, gets) if err != nil { return nil, errors.Trace(err) } diff --git a/store/hbase/txn.go b/store/hbase/txn.go index e4d379c941..1a7bbf108d 100644 --- a/store/hbase/txn.go +++ b/store/hbase/txn.go @@ -47,10 +47,10 @@ func getOptionDefaultVal(opt kv.Option) interface{} { return nil } -// dbTxn is not thread safe +// dbTxn implements kv.Transacton. It is not thread safe. type hbaseTxn struct { kv.UnionStore - *themis.Txn + txn themis.Txn store *hbaseStore // for commit storeName string tid uint64 @@ -59,10 +59,10 @@ type hbaseTxn struct { opts map[kv.Option]interface{} } -func newHbaseTxn(t *themis.Txn, storeName string) *hbaseTxn { +func newHbaseTxn(t themis.Txn, storeName string) *hbaseTxn { opts := make(map[kv.Option]interface{}) return &hbaseTxn{ - Txn: t, + txn: t, valid: true, storeName: storeName, tid: t.GetStartTS(), @@ -205,7 +205,7 @@ func (txn *hbaseTxn) doCommit() error { copy(row, iter.Key()) d := hbase.NewDelete(row) d.AddStringColumn(hbaseColFamily, hbaseQualifier) - err := txn.Txn.Delete(txn.storeName, d) + err := txn.txn.Delete(txn.storeName, d) if err != nil { return errors.Trace(err) } @@ -215,7 +215,7 @@ func (txn *hbaseTxn) doCommit() error { copy(val, iter.Value()) p := hbase.NewPut(row) p.AddValue(hbaseColFamilyBytes, hbaseQualifierBytes, val) - txn.Txn.Put(txn.storeName, p) + txn.txn.Put(txn.storeName, p) } return nil }) @@ -224,13 +224,13 @@ func (txn *hbaseTxn) doCommit() error { return errors.Trace(err) } - err = txn.Txn.Commit() + err = txn.txn.Commit() if err != nil { log.Error(err) return errors.Trace(err) } - txn.version = kv.NewVersion(txn.Txn.GetCommitTS()) + txn.version = kv.NewVersion(txn.txn.GetCommitTS()) log.Debugf("commit successfully, txn.version:%d", txn.version.Ver) return nil } @@ -272,7 +272,7 @@ func (txn *hbaseTxn) Rollback() error { func (txn *hbaseTxn) LockKeys(keys ...kv.Key) error { for _, key := range keys { key = kv.EncodeKey(key) - if err := txn.Txn.LockRow(txn.storeName, key); err != nil { + if err := txn.txn.LockRow(txn.storeName, key); err != nil { return errors.Trace(err) } }