fix duplicate table bug

This commit is contained in:
ChangerR
2023-09-11 06:44:20 +00:00
committed by ob-robot
parent 315f1114c5
commit 68d091760c
11 changed files with 232 additions and 213 deletions

View File

@ -7464,7 +7464,6 @@ int ObOptimizerUtil::check_basic_sharding_info(const ObAddr &local_addr,
int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
const ObIArray<ObLogicalOperator *> &child_ops,
ObIAllocator &allocator,
ObIArray<int64_t> &reselected_pos,
ObShardingInfo *&result_sharding,
int64_t &inherit_sharding_index)
{
@ -7485,7 +7484,6 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
} else if (OB_FAIL(compute_basic_sharding_info(local_addr,
sharding_infos,
allocator,
reselected_pos,
result_sharding,
inherit_sharding_index))) {
LOG_WARN("failed to compute basic sharding info", K(ret));
@ -7496,7 +7494,6 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
const ObIArray<ObShardingInfo*> &input_shardings,
ObIAllocator &allocator,
ObIArray<int64_t> &reselected_pos,
ObShardingInfo *&result_sharding,
int64_t &inherit_sharding_index)
{
@ -7509,7 +7506,7 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
} else {
ObAddr basic_addr;
bool has_duplicated = false;
bool can_reselect_replica = true;
bool is_replicas_same = true;
ObShardingInfo *sharding = NULL;
ObSEArray<ObAddr, 8> valid_addrs;
ObSEArray<ObAddr, 8> intersect_addrs;
@ -7534,9 +7531,9 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
} else {
if (OB_FAIL(ObOptimizerUtil::intersect(valid_addrs, intersect_addrs, candidate_addrs))) {
LOG_WARN("failed to intersect addrs", K(ret));
} else if (OB_FALSE_IT(can_reselect_replica = can_reselect_replica &&
valid_addrs.count() == candidate_addrs.count() &&
valid_addrs.count() == intersect_addrs.count())) {
} else if (OB_FALSE_IT(is_replicas_same = is_replicas_same &&
valid_addrs.count() == candidate_addrs.count() &&
valid_addrs.count() == intersect_addrs.count())) {
// do nothing
} else if (OB_FAIL(intersect_addrs.assign(candidate_addrs))) {
LOG_WARN("failed to assign addrs", K(ret));
@ -7577,14 +7574,18 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
/*do nothing*/
} else if (!has_duplicated) {
/*do nothing*/
} else if (OB_FAIL(compute_duplicate_table_replica_pos(basic_addr, input_shardings, reselected_pos))) {
LOG_WARN("failed to set duplicated table replica", K(ret));
} else if (NULL != result_sharding) {
/*do nothing*/
} else if (OB_UNLIKELY(input_shardings.count() != reselected_pos.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected array count", K(input_shardings.count()),
K(reselected_pos.count()), K(ret));
} else if (is_replicas_same) {
for (int64_t i = 0; OB_SUCC(ret) && NULL == result_sharding && i < input_shardings.count(); i++) {
if (OB_ISNULL(sharding = input_shardings.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (sharding->get_can_reselect_replica()) {
result_sharding = sharding;
inherit_sharding_index = i;
}
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && NULL == result_sharding && i < input_shardings.count(); i++) {
if (OB_ISNULL(sharding = input_shardings.at(i))) {
@ -7592,10 +7593,10 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
LOG_WARN("get unexpected null", K(ret));
} else if (sharding->get_can_reselect_replica() &&
OB_FAIL(ObOptimizerUtil::compute_duplicate_table_sharding(local_addr,
basic_addr,
allocator,
*input_shardings.at(i),
reselected_pos.at(i),
can_reselect_replica,
intersect_addrs,
result_sharding))) {
LOG_WARN("failed to compute duplicate table sharding", K(ret));
} else if (NULL != result_sharding) {
@ -7606,7 +7607,7 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr,
}
}
if (OB_SUCC(ret) && NULL != result_sharding) {
LOG_TRACE("succeed to compute basic sharding info", K(*result_sharding), K(input_shardings), K(reselected_pos));
LOG_TRACE("succeed to compute basic sharding info", K(*result_sharding), K(input_shardings));
}
return ret;
}
@ -7631,84 +7632,91 @@ int ObOptimizerUtil::get_duplicate_table_replica(const ObCandiTableLoc &phy_tabl
return ret;
}
int ObOptimizerUtil::compute_duplicate_table_replica_pos(const ObAddr &addr,
const ObIArray<ObShardingInfo*> &input_shardings,
ObIArray<int64_t> &reselected_pos)
int ObOptimizerUtil::compute_duplicate_table_sharding(const ObAddr &local_addr,
const ObAddr &selected_addr,
ObIAllocator &allocator,
ObShardingInfo &src_sharding,
ObIArray<ObAddr> &valid_addrs,
ObShardingInfo *&target_sharding)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < input_shardings.count(); i++) {
ObShardingInfo *sharding = NULL;
int64_t replica_index = OB_INVALID_INDEX;
if (OB_ISNULL(sharding = input_shardings.at(i))) {
ObCandiTableLoc *phy_table_loc = NULL;
int64_t replica_index = OB_INVALID_INDEX;
target_sharding = NULL;
if (OB_ISNULL(target_sharding = reinterpret_cast<ObShardingInfo*>(
allocator.alloc(sizeof(ObShardingInfo))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else if (OB_FALSE_IT(target_sharding = new(target_sharding) ObShardingInfo())) {
} else if (OB_FAIL(target_sharding->copy_with_part_keys(src_sharding))) {
LOG_WARN("failed to copy sharding info", K(ret));
} else if (OB_ISNULL(src_sharding.get_phy_table_location_info()) ||
OB_UNLIKELY(1 != src_sharding.get_phy_table_location_info()->get_partition_cnt())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected partition count", K(ret));
} else if (OB_FAIL(generate_duplicate_table_replicas(allocator,
src_sharding.get_phy_table_location_info(),
valid_addrs,
phy_table_loc))) {
LOG_WARN("failed to compute duplicate table location", K(ret));
} else if (OB_ISNULL(phy_table_loc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret));
} else if (OB_FALSE_IT(target_sharding->set_phy_table_location_info(phy_table_loc))) {
} else if (OB_UNLIKELY(1 != target_sharding->get_phy_table_location_info()->get_partition_cnt())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected partition count", K(ret));
} else {
int64_t dup_table_pos = OB_INVALID_INDEX;
ObCandiTabletLoc &phy_part_loc =
phy_table_loc->get_phy_part_loc_info_list_for_update().at(0);
if (!phy_part_loc.is_server_in_replica(selected_addr, dup_table_pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(sharding), K(ret));
} else if (!sharding->get_can_reselect_replica()) {
/*do nothing*/
} else if (OB_ISNULL(sharding->get_phy_table_location_info()) ||
OB_UNLIKELY(1 != sharding->get_phy_table_location_info()->get_partition_cnt())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(ret));
LOG_WARN("no server in replica", K(selected_addr), K(ret));
} else {
share::ObLSReplicaLocation replica_loc;
ObCandiTabletLoc &phy_part_loc =
sharding->get_phy_table_location_info()->get_phy_part_loc_info_list_for_update().at(0);
if (!phy_part_loc.is_server_in_replica(addr, replica_index)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("no replica in this server", K(addr), K(ret));
} else if (OB_UNLIKELY(replica_index == OB_INVALID_INDEX)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid replica index", K(ret));
} else { /*do nothing*/ }
}
if (OB_SUCC(ret)) {
if (OB_FAIL(reselected_pos.push_back(replica_index))) {
LOG_WARN("failed to push back replica index", K(ret));
} else { /*do nothing*/ }
phy_part_loc.set_selected_replica_idx(dup_table_pos);
if (local_addr == selected_addr) {
target_sharding->set_local();
} else {
target_sharding->set_remote();
}
}
}
return ret;
}
int ObOptimizerUtil::compute_duplicate_table_sharding(const ObAddr &local_addr,
ObIAllocator &allocator,
const ObShardingInfo &src_sharding,
const int64_t reselected_pos,
bool can_reselect_replica,
ObShardingInfo *&target_sharding)
int ObOptimizerUtil::generate_duplicate_table_replicas(ObIAllocator &allocator,
const ObCandiTableLoc *source_table_loc,
ObIArray<ObAddr> &valid_addrs,
ObCandiTableLoc *&target_table_loc)
{
int ret = OB_SUCCESS;
ObCandiTableLoc *phy_table_loc = NULL;
if (OB_ISNULL(target_sharding = reinterpret_cast<ObShardingInfo*>(
allocator.alloc(sizeof(ObShardingInfo))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else if (OB_ISNULL(phy_table_loc = reinterpret_cast<ObCandiTableLoc*>(
allocator.alloc(sizeof(ObCandiTableLoc))))) {
if (OB_ISNULL(source_table_loc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_UNLIKELY(1 != source_table_loc->get_partition_cnt())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected partition count", K(ret),
K(source_table_loc->get_partition_cnt()));
} else if (OB_ISNULL(target_table_loc = static_cast<ObCandiTableLoc*>(
allocator.alloc(sizeof(ObCandiTableLoc))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
} else if (OB_FALSE_IT(target_table_loc = new(target_table_loc) ObCandiTableLoc())) {
// do nothing
} else if (OB_FAIL(target_table_loc->assign(*source_table_loc))) {
LOG_WARN("failed to assign table location", K(ret));
} else {
target_sharding = new(target_sharding) ObShardingInfo();
phy_table_loc = new(phy_table_loc) ObCandiTableLoc();
if (OB_FAIL(target_sharding->copy_with_part_keys(src_sharding))) {
LOG_WARN("failed to copy sharding info", K(ret));
} else if (OB_FAIL(phy_table_loc->assign(*src_sharding.get_phy_table_location_info()))) {
LOG_WARN("failed to assign table location", K(ret));
} else if (OB_UNLIKELY(1 != phy_table_loc->get_partition_cnt())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected partition count", K(ret));
} else {
share::ObLSReplicaLocation replica_loc;
ObCandiTabletLoc &phy_part_loc =
phy_table_loc->get_phy_part_loc_info_list_for_update().at(0);
phy_part_loc.set_selected_replica_idx(reselected_pos);
target_sharding->set_phy_table_location_info(phy_table_loc);
target_sharding->set_can_reselect_replica(can_reselect_replica);
if (OB_FAIL(phy_part_loc.get_selected_replica(replica_loc))) {
LOG_WARN("failed to get selected replica", K(ret));
} else if (replica_loc.get_server() == local_addr) {
target_sharding->set_local();
} else {
target_sharding->set_remote();
ObCandiTabletLoc &phy_part_loc =
target_table_loc->get_phy_part_loc_info_list_for_update().at(0);
ObOptTabletLoc &opt_tablet_loc = phy_part_loc.get_partition_location();
ObIArray<ObRoutePolicy::CandidateReplica> &replica_loc_list = opt_tablet_loc.get_replica_locations();
for (int64_t i = replica_loc_list.count() - 1; OB_SUCC(ret) && i >= 0; --i) {
if (ObOptimizerUtil::find_item(valid_addrs,
replica_loc_list.at(i).get_server())) {
// do nothing
} else if (OB_FAIL(replica_loc_list.remove(i))) {
LOG_WARN("failed to remove relica loc list", K(ret));
}
}
}