hbase: minor changes for go-themis updates.
This commit is contained in:
@ -32,12 +32,12 @@ const hbaseBatchSize = 1000
|
||||
|
||||
// hbaseSnapshot implements MvccSnapshot interface.
|
||||
type hbaseSnapshot struct {
|
||||
txn *themis.Txn
|
||||
txn themis.Txn
|
||||
storeName string
|
||||
}
|
||||
|
||||
// newHBaseSnapshot creates a snapshot of an HBase store.
|
||||
func newHbaseSnapshot(txn *themis.Txn, storeName string) *hbaseSnapshot {
|
||||
func newHbaseSnapshot(txn themis.Txn, storeName string) *hbaseSnapshot {
|
||||
return &hbaseSnapshot{
|
||||
txn: txn,
|
||||
storeName: storeName,
|
||||
@ -63,7 +63,7 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
|
||||
g.AddColumn(hbaseColFamilyBytes, hbaseQualifierBytes)
|
||||
gets[i] = g
|
||||
}
|
||||
rows, err := s.txn.BatchGet(s.storeName, gets)
|
||||
rows, err := s.txn.Gets(s.storeName, gets)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ func getOptionDefaultVal(opt kv.Option) interface{} {
|
||||
// dbTxn is not thread safe
|
||||
type hbaseTxn struct {
|
||||
kv.UnionStore
|
||||
*themis.Txn
|
||||
txn themis.Txn
|
||||
store *hbaseStore // for commit
|
||||
storeName string
|
||||
tid uint64
|
||||
@ -59,10 +59,10 @@ type hbaseTxn struct {
|
||||
opts map[kv.Option]interface{}
|
||||
}
|
||||
|
||||
func newHbaseTxn(t *themis.Txn, storeName string) *hbaseTxn {
|
||||
func newHbaseTxn(t themis.Txn, storeName string) *hbaseTxn {
|
||||
opts := make(map[kv.Option]interface{})
|
||||
return &hbaseTxn{
|
||||
Txn: t,
|
||||
txn: t,
|
||||
valid: true,
|
||||
storeName: storeName,
|
||||
tid: t.GetStartTS(),
|
||||
@ -205,7 +205,7 @@ func (txn *hbaseTxn) doCommit() error {
|
||||
copy(row, iter.Key())
|
||||
d := hbase.NewDelete(row)
|
||||
d.AddStringColumn(hbaseColFamily, hbaseQualifier)
|
||||
err := txn.Txn.Delete(txn.storeName, d)
|
||||
err := txn.txn.Delete(txn.storeName, d)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -215,7 +215,7 @@ func (txn *hbaseTxn) doCommit() error {
|
||||
copy(val, iter.Value())
|
||||
p := hbase.NewPut(row)
|
||||
p.AddValue(hbaseColFamilyBytes, hbaseQualifierBytes, val)
|
||||
txn.Txn.Put(txn.storeName, p)
|
||||
txn.txn.Put(txn.storeName, p)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
@ -224,13 +224,13 @@ func (txn *hbaseTxn) doCommit() error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
err = txn.Txn.Commit()
|
||||
err = txn.txn.Commit()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
txn.version = kv.NewVersion(txn.Txn.GetCommitTS())
|
||||
txn.version = kv.NewVersion(txn.txn.GetCommitTS())
|
||||
log.Debugf("commit successfully, txn.version:%d", txn.version.Ver)
|
||||
return nil
|
||||
}
|
||||
@ -272,7 +272,7 @@ func (txn *hbaseTxn) Rollback() error {
|
||||
func (txn *hbaseTxn) LockKeys(keys ...kv.Key) error {
|
||||
for _, key := range keys {
|
||||
key = kv.EncodeKey(key)
|
||||
if err := txn.Txn.LockRow(txn.storeName, key); err != nil {
|
||||
if err := txn.txn.LockRow(txn.storeName, key); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user