[CP] checkAndInsert/auto_increment/aggregation and fix local index bug

This commit is contained in:
WeiXinChan
2023-08-15 12:14:31 +00:00
committed by ob-robot
parent 2ff134f70b
commit 98fe20910b
27 changed files with 2149 additions and 220 deletions

View File

@ -1005,6 +1005,7 @@ int ObTableQuery::deep_copy(ObIAllocator &allocator, ObTableQuery &dst) const
{
int ret = OB_SUCCESS;
// key_ranges_
for (int64_t i = 0; OB_SUCC(ret) && i < key_ranges_.count(); i++) {
const ObNewRange &src_range = key_ranges_.at(i);
ObNewRange dst_range;
@ -1015,15 +1016,36 @@ int ObTableQuery::deep_copy(ObIAllocator &allocator, ObTableQuery &dst) const
}
}
// select_columns_
for (int64_t i = 0; OB_SUCC(ret) && i < select_columns_.count(); i++) {
ObString select_column;
if (OB_FAIL(ob_write_string(allocator, select_columns_.at(i), select_column))) {
LOG_WARN("Fail to deep copy select column", K(ret), K(select_columns_.at(i)));
LOG_WARN("fail to deep copy select column", K(ret), K(select_columns_.at(i)));
} else if (OB_FAIL(dst.select_columns_.push_back(select_column))) {
LOG_WARN("fail to push back select column", K(ret), K(select_column));
}
}
// scan_range_columns_
for (int64_t i = 0; OB_SUCC(ret) && i < scan_range_columns_.count(); i++) {
ObString range_column_name;
if (OB_FAIL(ob_write_string(allocator, scan_range_columns_.at(i), range_column_name))) {
LOG_WARN("fail to deep copy range column name", K(ret), K(scan_range_columns_.at(i)));
} else if (OB_FAIL(dst.scan_range_columns_.push_back(range_column_name))) {
LOG_WARN("fail to push back range column name", K(ret), K(range_column_name));
}
}
// aggregations_
for (int64_t i = 0; OB_SUCC(ret) && i < aggregations_.count(); i++) {
ObTableAggregation agg;
if (OB_FAIL(aggregations_.at(i).deep_copy(allocator, agg))) {
LOG_WARN("fail to deep copy aggregation", K(ret), K(aggregations_.at(i)));
} else if (OB_FAIL(dst.aggregations_.push_back(agg))) {
LOG_WARN("fail to push back aggregation", K(ret), K(agg));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(ob_write_string(allocator, filter_string_, dst.filter_string_))) {
@ -1053,7 +1075,9 @@ OB_UNIS_DEF_SERIALIZE(ObTableQuery,
index_name_,
batch_size_,
max_result_size_,
htable_filter_);
htable_filter_,
scan_range_columns_,
aggregations_);
OB_UNIS_DEF_SERIALIZE_SIZE(ObTableQuery,
key_ranges_,
@ -1065,7 +1089,9 @@ OB_UNIS_DEF_SERIALIZE_SIZE(ObTableQuery,
index_name_,
batch_size_,
max_result_size_,
htable_filter_);
htable_filter_,
scan_range_columns_,
aggregations_);
OB_DEF_DESERIALIZE(ObTableQuery,)
{
@ -1107,7 +1133,9 @@ OB_DEF_DESERIALIZE(ObTableQuery,)
index_name_,
batch_size_,
max_result_size_,
htable_filter_
htable_filter_,
scan_range_columns_,
aggregations_
);
}
return ret;
@ -1249,7 +1277,7 @@ int ObHTableFilter::deep_copy(ObIAllocator &allocator, ObHTableFilter &dst) cons
for (int64_t i = 0; OB_SUCC(ret) && i < select_column_qualifier_.count(); i++) {
ObString select_column;
if (OB_FAIL(ob_write_string(allocator, select_column_qualifier_.at(i), select_column))) {
LOG_WARN("Fail to deep copy select column qualifier", K(ret), K(select_column_qualifier_.at(i)));
LOG_WARN("fail to deep copy select column qualifier", K(ret), K(select_column_qualifier_.at(i)));
} else if (OB_FAIL(dst.select_column_qualifier_.push_back(select_column))) {
LOG_WARN("fail to push back select column qualifier", K(ret), K(select_column));
}
@ -1516,6 +1544,27 @@ bool ObTableQueryResult::reach_batch_size_or_result_size(const int32_t batch_cou
return reach_size;
}
int ObTableQueryResult::add_row(const ObIArray<ObObj> &row)
{
int ret = OB_SUCCESS;
int64_t serialize_size = 0;
const int64_t cnt = row.count();
for (int i = 0; i < cnt; i++) {
serialize_size += row.at(i).get_serialize_size();
}
ret = alloc_buf_if_need(serialize_size);
for (int i = 0; OB_SUCC(ret) && i < cnt; i++) {
if (OB_FAIL(row.at(i).serialize(buf_.get_data(), buf_.get_capacity(), buf_.get_position()))) {
LOG_WARN("failed to serialize obj", K(ret), K_(buf), K(row.at(i)));
}
} // end for
if (OB_SUCC(ret)) {
++row_count_;
}
return ret;
}
OB_DEF_SERIALIZE(ObTableQueryResult)
{
int ret = OB_SUCCESS;
@ -1617,6 +1666,7 @@ OB_SERIALIZE_MEMBER((ObTableQuerySyncResult, ObTableQueryResult),
query_session_id_
);
////////////////////////////////////////////////////////////////
OB_SERIALIZE_MEMBER(ObTableApiCredential,
cluster_id_,
tenant_id_,
@ -1650,3 +1700,20 @@ int ObTableApiCredential::hash(uint64_t &hash_val, uint64_t seed /*= 0*/) const
hash_val = murmurhash(&expire_ts_, sizeof(expire_ts_), hash_val);
return OB_SUCCESS;
}
////////////////////////////////////////////////////////////////
int ObTableAggregation::deep_copy(ObIAllocator &allocator, ObTableAggregation &dst) const
{
int ret = OB_SUCCESS;
dst.type_ = type_;
if (OB_FAIL(ob_write_string(allocator, column_, dst.column_))) {
LOG_WARN("fail to deep copy aggregation column", K(ret), K_(column));
}
return ret;
}
OB_SERIALIZE_MEMBER(ObTableAggregation,
type_,
column_);