[FEAT MERGE]OBKV refactoring

This commit is contained in:
obdev
2023-01-11 03:20:43 +00:00
committed by ob-robot
parent b79d70c78e
commit c88aab6b2d
69 changed files with 18110 additions and 7913 deletions

View File

@ -221,17 +221,11 @@ ObTableEntity::ObTableEntity()
ObTableEntity::~ObTableEntity()
{}
inline int ObTableEntity::try_init()
void ObTableEntity::reset()
{
int ret = OB_SUCCESS;
static const int64_t PROPERTY_MAP_BUCKET_SIZE = 107;
if (!properties_.created()) {
if (OB_FAIL(properties_.create(PROPERTY_MAP_BUCKET_SIZE, ObModIds::OB_HASH_BUCKET_SQL_COLUMN_MAP,
ObModIds::OB_HASH_NODE_SQL_COLUMN_MAP))) {
LOG_WARN("failed to init properties", K(ret));
}
}
return ret;
rowkey_.reset();
properties_names_.reset();
properties_values_.reset();
}
int ObTableEntity::set_rowkey_value(int64_t idx, const ObObj &value)
@ -316,10 +310,16 @@ int64_t ObTableEntity::hash_rowkey() const
int ObTableEntity::get_property(const ObString &prop_name, ObObj &prop_value) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(const_cast<ObTableEntity*>(this)->try_init())) {
LOG_WARN("failed to init hash map", K(ret));
} else if (OB_FAIL(properties_.get_refactored(prop_name, prop_value))) {
LOG_DEBUG("failed to get property", K(ret), K(prop_name));
if (prop_name.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("property name should not be empty string", K(ret), K(prop_name));
} else {
int64_t idx = -1;
if (has_exist_in_array(properties_names_, prop_name, &idx)) {
prop_value = properties_values_.at(idx);
} else {
ret = OB_SEARCH_NOT_FOUND;
}
}
return ret;
}
@ -330,115 +330,63 @@ int ObTableEntity::set_property(const ObString &prop_name, const ObObj &prop_val
if (prop_name.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("property name should not be empty string", K(ret), K(prop_name));
} else if (OB_FAIL(try_init())) {
LOG_WARN("failed to init hash map", K(ret));
} else if (OB_FAIL(properties_.set_refactored(prop_name, prop_value, 1))) {
LOG_WARN("failed to set property", K(ret), K(prop_name));
} else {
int64_t idx = -1;
if (has_exist_in_array(properties_names_, prop_name, &idx)) {
properties_values_.at(idx) = prop_value;
} else {
if (OB_FAIL(properties_names_.push_back(prop_name))) {
LOG_WARN("failed to add prop name", K(ret), K(prop_name));
} else if (OB_FAIL(properties_values_.push_back(prop_value))) {
LOG_WARN("failed to add prop value", K(ret), K(prop_value));
}
}
}
return ret;
}
class ObTableEntity::GetPropertyFn
{
public:
GetPropertyFn(ObIArray<std::pair<ObString, ObObj> > &properties)
:properties_(properties)
{}
int operator()(const hash::HashMapPair<ObString, ObObj> &kv)
{
int ret = OB_SUCCESS;
if (OB_FAIL(properties_.push_back(std::make_pair(kv.first, kv.second)))) {
LOG_WARN("failed to push back", K(ret));
}
return ret;
}
private:
ObIArray<std::pair<ObString, ObObj> > &properties_;
};
int ObTableEntity::get_properties(ObIArray<std::pair<ObString, ObObj> > &properties) const
{
int ret = OB_SUCCESS;
GetPropertyFn func(properties);
if (OB_FAIL(const_cast<ObTableEntity*>(this)->try_init())) {
LOG_WARN("failed to init hash map", K(ret));
} else if (OB_FAIL(const_cast<PropertiesMap&>(properties_).foreach_refactored(func))) {
LOG_WARN("for each properties fail", K(ret));
for (int64_t i = 0; OB_SUCC(ret) && i < properties_names_.count(); i++) {
if (OB_FAIL(properties.push_back(std::make_pair(
properties_names_.at(i),
properties_values_.at(i))))) {
LOG_WARN("failed to add name-value pair", K(ret), K(i));
}
}
return ret;
}
class ObTableEntity::GetPropertyNameFn
{
public:
GetPropertyNameFn(ObIArray<ObString> &names)
:names_(names)
{}
int operator()(const hash::HashMapPair<ObString, ObObj> &kv)
{
int ret = OB_SUCCESS;
if (OB_FAIL(names_.push_back(kv.first))) {
LOG_WARN("failed to push back", K(ret));
}
return ret;
}
private:
ObIArray<ObString> &names_;
};
int ObTableEntity::get_properties_names(ObIArray<ObString> &properties_names) const
{
int ret = OB_SUCCESS;
GetPropertyNameFn func(properties_names);
if (OB_FAIL(const_cast<ObTableEntity*>(this)->try_init())) {
LOG_WARN("failed to init hash map", K(ret));
} else if (OB_FAIL(const_cast<PropertiesMap&>(properties_).foreach_refactored(func))) {
LOG_WARN("for each properties fail", K(ret));
if (OB_FAIL(properties_names.assign(properties_names_))) {
LOG_WARN("fail to assign properties name array", K(ret));
}
return ret;
}
class ObTableEntity::GetPropertyValueFn
{
public:
GetPropertyValueFn(ObIArray<ObObj> &values)
:values_(values)
{}
int operator()(const hash::HashMapPair<ObString, ObObj> &kv)
{
int ret = OB_SUCCESS;
if (OB_FAIL(values_.push_back(kv.second))) {
LOG_WARN("failed to push back", K(ret));
}
return ret;
}
private:
ObIArray<ObObj> &values_;
};
int ObTableEntity::get_properties_values(ObIArray<ObObj> &properties_values) const
{
int ret = OB_SUCCESS;
GetPropertyValueFn func(properties_values);
if (OB_FAIL(const_cast<ObTableEntity*>(this)->try_init())) {
LOG_WARN("failed to init hash map", K(ret));
} else if (OB_FAIL(const_cast<PropertiesMap&>(properties_).foreach_refactored(func))) {
LOG_WARN("for each properties fail", K(ret));
if (OB_FAIL(properties_values.assign(properties_values_))) {
LOG_WARN("failed to assign properties values array", K(ret));
}
return ret;
}
int64_t ObTableEntity::get_properties_count() const
{
return properties_.size();
return properties_names_.count();
}
ObRowkey ObTableEntity::get_rowkey()
ObRowkey ObTableEntity::get_rowkey() const
{
ObRowkey rowkey;
int64_t obj_cnt = rowkey_.count();
if (obj_cnt > 0) {
rowkey.assign(&rowkey_.at(0), obj_cnt);
rowkey.assign(const_cast<ObObj*>(&rowkey_.at(0)), obj_cnt);
}
return rowkey;
}
@ -619,7 +567,7 @@ int ObTableBatchOperation::add(const ObTableOperation &table_operation)
{
const ObString &name = prev_columns.at(i);
if (OB_FAIL(curr.entity().get_property(name, value))) {
if (OB_HASH_NOT_EXIST == ret) {
if (OB_SEARCH_NOT_FOUND == ret) {
is_same_properties_names_ = false;
}
}
@ -775,6 +723,14 @@ ObTableOperationResult::ObTableOperationResult()
affected_rows_(0)
{}
void ObTableOperationResult::reset()
{
ObTableResult::reset();
operation_type_ = ObTableOperationType::GET;
entity_->reset();
affected_rows_ = 0;
}
int ObTableOperationResult::get_entity(const ObITableEntity *&entity) const
{
int ret = OB_SUCCESS;
@ -1029,6 +985,48 @@ uint64_t ObTableQuery::get_checksum() const
return checksum;
}
int ObTableQuery::deep_copy(ObIAllocator &allocator, ObTableQuery &dst) const
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < key_ranges_.count(); i++) {
const ObNewRange &src_range = key_ranges_.at(i);
ObNewRange dst_range;
if (OB_FAIL(deep_copy_range(allocator, src_range, dst_range))) {
LOG_WARN("fail tp deep copy range", K(ret));
} else if (OB_FAIL(dst.key_ranges_.push_back(dst_range))) {
LOG_WARN("fail to push back new range", K(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < select_columns_.count(); i++) {
ObString select_column;
if (OB_FAIL(ob_write_string(allocator, select_columns_.at(i), select_column))) {
LOG_WARN("Fail to deep copy select column", K(ret), K(select_columns_.at(i)));
} else if (OB_FAIL(dst.select_columns_.push_back(select_column))) {
LOG_WARN("fail to push back select column", K(ret), K(select_column));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(ob_write_string(allocator, filter_string_, dst.filter_string_))) {
LOG_WARN("fail to deep copy filter string", K(ret), K_(filter_string));
} else if (OB_FAIL(ob_write_string(allocator, index_name_, dst.index_name_))) {
LOG_WARN("fail to deep copy index name", K(ret), K_(index_name));
} else if (OB_FAIL(htable_filter_.deep_copy(allocator, dst.htable_filter_))) {
LOG_WARN("fail to deep copy htable filter", K(ret), K_(htable_filter));
} else {
dst.deserialize_allocator_ = deserialize_allocator_;
dst.limit_ = limit_;
dst.offset_ = offset_;
dst.scan_order_ = scan_order_;
dst.batch_size_ = batch_size_;
dst.max_result_size_ = max_result_size_;
}
return ret;
}
OB_UNIS_DEF_SERIALIZE(ObTableQuery,
key_ranges_,
select_columns_,
@ -1228,6 +1226,35 @@ uint64_t ObHTableFilter::get_checksum() const
return checksum;
}
int ObHTableFilter::deep_copy(ObIAllocator &allocator, ObHTableFilter &dst) const
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < select_column_qualifier_.count(); i++) {
ObString select_column;
if (OB_FAIL(ob_write_string(allocator, select_column_qualifier_.at(i), select_column))) {
LOG_WARN("Fail to deep copy select column qualifier", K(ret), K(select_column_qualifier_.at(i)));
} else if (OB_FAIL(dst.select_column_qualifier_.push_back(select_column))) {
LOG_WARN("fail to push back select column qualifier", K(ret), K(select_column));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(ob_write_string(allocator, filter_string_, dst.filter_string_))) {
LOG_WARN("fail to write filter string", K(ret), K_(filter_string));
} else {
dst.is_valid_ = is_valid_;
dst.min_stamp_ = min_stamp_;
dst.max_stamp_ = max_stamp_;
dst.max_versions_ = max_versions_;
dst.limit_per_row_per_cf_ = limit_per_row_per_cf_;
dst.offset_per_row_per_cf_ = offset_per_row_per_cf_;
}
return ret;
}
// If valid_ is true, serialize the members. Otherwise, nothing/dummy is serialized.
OB_SERIALIZE_MEMBER_IF(ObHTableFilter,
(true == is_valid_),
@ -1544,7 +1571,41 @@ OB_SERIALIZE_MEMBER(ObTableQueryAndMutateResult,
affected_rows_,
affected_entity_);
OB_SERIALIZE_MEMBER((ObTableQuerySyncResult, ObTableQueryResult),
OB_SERIALIZE_MEMBER((ObTableQuerySyncResult, ObTableQueryResult),
is_end_,
query_session_id_
);
);
OB_SERIALIZE_MEMBER(ObTableApiCredential,
cluster_id_,
tenant_id_,
user_id_,
database_id_,
expire_ts_,
hash_val_);
ObTableApiCredential::ObTableApiCredential()
:cluster_id_(0),
tenant_id_(0),
user_id_(0),
database_id_(0),
expire_ts_(0),
hash_val_(0)
{
}
ObTableApiCredential::~ObTableApiCredential()
{
}
uint64_t ObTableApiCredential::hash(uint64_t seed /*= 0*/) const
{
uint64_t hash_val = murmurhash(&cluster_id_, sizeof(cluster_id_), seed);
hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val);
hash_val = murmurhash(&user_id_, sizeof(user_id_), hash_val);
hash_val = murmurhash(&database_id_, sizeof(database_id_), hash_val);
hash_val = murmurhash(&expire_ts_, sizeof(expire_ts_), hash_val);
return hash_val;
}