From 68d091760c813fb037a31f28edaf68c8dd85e062 Mon Sep 17 00:00:00 2001 From: ChangerR Date: Mon, 11 Sep 2023 06:44:20 +0000 Subject: [PATCH] fix duplicate table bug --- src/sql/optimizer/ob_join_order.cpp | 20 +-- src/sql/optimizer/ob_join_order.h | 11 -- src/sql/optimizer/ob_log_plan.cpp | 153 +++++++++++++----- src/sql/optimizer/ob_log_plan.h | 7 +- src/sql/optimizer/ob_log_set.cpp | 1 - src/sql/optimizer/ob_log_subplan_filter.cpp | 1 - src/sql/optimizer/ob_logical_operator.cpp | 44 ----- src/sql/optimizer/ob_logical_operator.h | 7 - src/sql/optimizer/ob_optimizer_util.cpp | 168 ++++++++++---------- src/sql/optimizer/ob_optimizer_util.h | 17 +- src/sql/optimizer/ob_sharding_info.h | 16 +- 11 files changed, 232 insertions(+), 213 deletions(-) diff --git a/src/sql/optimizer/ob_join_order.cpp b/src/sql/optimizer/ob_join_order.cpp index 2b05ea3128..c45040ab7a 100644 --- a/src/sql/optimizer/ob_join_order.cpp +++ b/src/sql/optimizer/ob_join_order.cpp @@ -4168,8 +4168,7 @@ int ObJoinOrder::convert_subplan_scan_sharding_info(ObLogPlan &plan, if (OB_ISNULL(input_sharding) || OB_ISNULL(plan.get_stmt()) || OB_ISNULL(child_stmt)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(input_sharding), K(ret)); - } else if (!input_sharding->get_can_reselect_replica() && - (input_sharding->is_single() || input_sharding->is_distributed_without_partitioning())) { + } else if (input_sharding->is_single() || input_sharding->is_distributed_without_partitioning()) { output_sharding = input_sharding; } else if (OB_FAIL(ObOptimizerUtil::convert_subplan_scan_expr(expr_factory, subplan_root.get_output_equal_sets(), @@ -4225,7 +4224,6 @@ int ObJoinOrder::convert_subplan_scan_sharding_info(ObLogPlan &plan, } else if (OB_FAIL(temp_sharding->get_partition_func().assign(part_func))) { LOG_WARN("failed to assign part funcs", K(ret)); } else { - temp_sharding->set_can_reselect_replica(false); output_sharding = temp_sharding; LOG_TRACE("succeed to convert subplan scan sharding", K(*output_sharding)); } @@ -5362,8 +5360,6 @@ int JoinPath::assign(const JoinPath &other, common::ObIAllocator *allocator) is_slave_mapping_ = other.is_slave_mapping_; join_type_ = other.join_type_; need_mat_ = other.need_mat_; - left_dup_table_pos_ = other.left_dup_table_pos_; - right_dup_table_pos_ = other.right_dup_table_pos_; left_need_sort_ = other.left_need_sort_; left_prefix_pos_ = other.left_prefix_pos_; right_need_sort_ = other.right_need_sort_; @@ -5413,7 +5409,6 @@ int JoinPath::compute_join_path_sharding() } } else if (DistAlgo::DIST_BASIC_METHOD == join_dist_algo_) { ObSEArray input_shardings; - ObSEArray reselected_dup_pos; if (OB_ISNULL(left_path_->strong_sharding_) || OB_ISNULL(right_path_->strong_sharding_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(left_path_->strong_sharding_), @@ -5425,18 +5420,9 @@ int JoinPath::compute_join_path_sharding() opt_ctx.get_local_server_addr(), input_shardings, *parent_->get_allocator(), - reselected_dup_pos, strong_sharding_, inherit_sharding_index_))) { LOG_WARN("failed to compute basic sharding info", K(ret)); - } else if (reselected_dup_pos.empty()) { - /*no duplicated table, do nothing*/ - } else if (OB_UNLIKELY(2 != reselected_dup_pos.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected array count", K(reselected_dup_pos.count()), K(ret)); - } else { - left_dup_table_pos_ = reselected_dup_pos.at(0); - right_dup_table_pos_ = reselected_dup_pos.at(1); } } else if (DistAlgo::DIST_PULL_TO_LOCAL == join_dist_algo_) { strong_sharding_ = opt_ctx.get_local_sharding(); @@ -5661,6 +5647,8 @@ int JoinPath::compute_join_path_plan_type() if (OB_ISNULL(left_path_) || OB_ISNULL(right_path_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(left_path_), K(right_path_), K(ret)); + } else if (NULL != strong_sharding_ && strong_sharding_->get_can_reselect_replica()) { + phy_plan_type_ = ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED; } else if (is_local()) { phy_plan_type_ = ObPhyPlanType::OB_PHY_PLAN_LOCAL; } else if (is_remote()) { @@ -6793,8 +6781,6 @@ void JoinPath::reuse() is_slave_mapping_ = false; join_type_ = UNKNOWN_JOIN; need_mat_ = false; - left_dup_table_pos_ = OB_INVALID_ID; - right_dup_table_pos_ = OB_INVALID_ID; left_need_sort_ = false; left_prefix_pos_ = 0; right_need_sort_ = false; diff --git a/src/sql/optimizer/ob_join_order.h b/src/sql/optimizer/ob_join_order.h index b13a84c05d..5f76b6739d 100644 --- a/src/sql/optimizer/ob_join_order.h +++ b/src/sql/optimizer/ob_join_order.h @@ -739,8 +739,6 @@ struct EstimateCostInfo { use_hybrid_hash_dm_(false), join_type_(UNKNOWN_JOIN), need_mat_(false), - left_dup_table_pos_(OB_INVALID_ID), - right_dup_table_pos_(OB_INVALID_ID), left_need_sort_(false), left_prefix_pos_(0), right_need_sort_(false), @@ -776,8 +774,6 @@ struct EstimateCostInfo { use_hybrid_hash_dm_(false), join_type_(join_type), need_mat_(need_mat), - left_dup_table_pos_(OB_INVALID_INDEX), - right_dup_table_pos_(OB_INVALID_INDEX), left_need_sort_(false), left_prefix_pos_(0), right_need_sort_(false), @@ -907,8 +903,6 @@ struct EstimateCostInfo { } return sm_type; } - inline int64_t get_left_dup_table_pos() { return left_dup_table_pos_; } - inline int64_t get_right_dup_table_pos() { return right_dup_table_pos_; } bool contain_normal_nl() const { return contain_normal_nl_; } void set_contain_normal_nl(bool contain) { contain_normal_nl_ = contain; } int check_is_contain_normal_nl(); @@ -981,8 +975,6 @@ struct EstimateCostInfo { K_(is_slave_mapping), K_(join_type), K_(need_mat), - K_(left_dup_table_pos), - K_(right_dup_table_pos), K_(left_need_sort), K_(left_prefix_pos), K_(right_need_sort), @@ -1008,9 +1000,6 @@ struct EstimateCostInfo { bool use_hybrid_hash_dm_; // if use hybrid hash distribution method for hash-hash dm ObJoinType join_type_; bool need_mat_; - // for duplicated table - int64_t left_dup_table_pos_; - int64_t right_dup_table_pos_; // for merge joins only bool left_need_sort_; int64_t left_prefix_pos_; diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index 2869818ff1..6e3298e639 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -4408,10 +4408,7 @@ int ObLogPlan::allocate_join_path(JoinPath *join_path, join_op->set_can_use_batch_nlj(join_path->can_use_batch_nlj_); join_op->set_inherit_sharding_index(join_path->inherit_sharding_index_); join_op->set_join_path(join_path); - if (OB_FAIL(join_op->get_dup_table_pos().push_back(join_path->get_left_dup_table_pos())) || - OB_FAIL(join_op->get_dup_table_pos().push_back(join_path->get_right_dup_table_pos()))) { - LOG_WARN("failed to push back to array", K(ret)); - } else if (OB_FAIL(join_op->set_merge_directions(join_path->merge_directions_))) { + if (OB_FAIL(join_op->set_merge_directions(join_path->merge_directions_))) { LOG_WARN("failed to set merge directions", K(ret)); } else if (OB_FAIL(join_op->set_nl_params(static_cast(right_path)->nl_params_))) { LOG_WARN("failed to set nl params", K(ret)); @@ -5896,6 +5893,9 @@ int ObLogPlan::candi_allocate_root_exchange() if (OB_ISNULL(best_candidates.at(i).plan_tree_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(ret)); + } else if (ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED == best_candidates.at(i).plan_tree_->get_phy_plan_type() && + OB_FAIL(compute_duplicate_table_replicas(best_candidates.at(i).plan_tree_))) { + LOG_WARN("failed to compute duplicate table plan type", K(ret)); } else if (best_candidates.at(i).plan_tree_->get_phy_plan_type() == ObPhyPlanType::OB_PHY_PLAN_REMOTE) { exch_info.is_remote_ = true; if (OB_FAIL(allocate_exchange_as_top(best_candidates.at(i).plan_tree_, exch_info))) { @@ -11543,7 +11543,9 @@ int ObLogPlan::do_post_plan_processing() LOG_WARN("failed to adjust parent-child relationship", K(ret)); } else if (OB_FAIL(update_re_est_cost(root))) { LOG_WARN("failed to re est cost", K(ret)); - } else if (OB_FAIL(set_duplicated_table_location(root, OB_INVALID_INDEX))) { + } else if (OB_FAIL(choose_duplicate_table_replica(root, + get_optimizer_context().get_local_server_addr(), + true))) { LOG_WARN("failed to set duplicated table location", K(ret)); } else if (OB_FAIL(set_advisor_table_id(root))) { LOG_WARN("failed to set advise table id from duplicate table", K(ret)); @@ -11743,7 +11745,9 @@ int ObLogPlan::update_re_est_cost(ObLogicalOperator *op) return ret; } -int ObLogPlan::set_duplicated_table_location(ObLogicalOperator *op, int64_t dup_table_pos) +int ObLogPlan::choose_duplicate_table_replica(ObLogicalOperator *op, + const ObAddr &addr, + bool is_root) { int ret = OB_SUCCESS; ObLogicalOperator *child = NULL; @@ -11756,52 +11760,74 @@ int ObLogPlan::set_duplicated_table_location(ObLogicalOperator *op, int64_t dup_ } else if (is_stack_overflow) { ret = OB_SIZE_OVERFLOW; LOG_WARN("too deep recursive", K(ret)); - } else if (log_op_def::LOG_TABLE_SCAN == op->get_type() && NULL != op->get_strong_sharding() && + } else if (log_op_def::LOG_TABLE_SCAN == op->get_type() && + NULL != op->get_strong_sharding() && op->get_strong_sharding()->get_can_reselect_replica() && - OB_INVALID_INDEX != dup_table_pos) { + !is_root) { ObLogTableScan *table_scan = static_cast(op); ObCandiTableLoc &phy_loc = table_scan->get_table_partition_info()->get_phy_tbl_location_info_for_update(); for (int64_t i = 0; OB_SUCC(ret) && i < phy_loc.get_partition_cnt(); ++i) { + int64_t dup_table_pos = OB_INVALID_INDEX; ObCandiTabletLoc &phy_part_loc = phy_loc.get_phy_part_loc_info_list_for_update().at(i); - phy_part_loc.set_selected_replica_idx(dup_table_pos); - } - } else if (OB_FAIL(op->adjust_dup_table_replica_pos(dup_table_pos))) { - LOG_WARN("failed to adjust dup table replica pos", K(ret)); - } else { - ObSEArray dup_table_pos_list; - if (op->get_dup_table_pos().empty()) { - for (int64_t i = 0; OB_SUCC(ret) && i < op->get_num_of_child(); i++) { - if (OB_FAIL(dup_table_pos_list.push_back(dup_table_pos))) { - LOG_WARN("failed to push back into array", K(ret)); - } else { /*do nothing*/ } + if (!phy_part_loc.is_server_in_replica(addr, dup_table_pos)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("no server in replica", K(addr), K(table_scan->get_table_id()), K(ret)); + } else { + phy_part_loc.set_selected_replica_idx(dup_table_pos); } - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < op->get_dup_table_pos().count(); i++) { - if (OB_INVALID_INDEX == op->get_dup_table_pos().at(i)) { - ret = dup_table_pos_list.push_back(dup_table_pos); + } + } else if (log_op_def::LOG_EXCHANGE == op->get_type()) { + if (OB_ISNULL(child = op->get_child(ObLogicalOperator::first_child))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(SMART_CALL(choose_duplicate_table_replica(child, + addr, + true)))) { + LOG_WARN("failed to set duplicated table location", K(ret)); + } else { /*do nothing*/ } + } else { + ObShardingInfo *sharding = op->get_strong_sharding(); + ObAddr adjust_addr; + bool can_reselect_replica = NULL != sharding && sharding->get_can_reselect_replica(); + if (is_root && can_reselect_replica) { + if (OB_ISNULL(sharding->get_phy_table_location_info()) || + OB_UNLIKELY(1 != sharding->get_phy_table_location_info()->get_partition_cnt())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected error", K(ret), KPC(sharding->get_phy_table_location_info())); + } else { + share::ObLSReplicaLocation replica_loc; + const ObCandiTabletLocIArray &phy_partition_loc = + sharding->get_phy_table_location_info()->get_phy_part_loc_info_list(); + if (OB_FAIL(phy_partition_loc.at(0).get_selected_replica(replica_loc))) { + LOG_WARN("fail to get selected replica", K(ret), K(phy_partition_loc.at(0))); + } else if (!replica_loc.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("replica location is invalid", K(ret)); } else { - ret = dup_table_pos_list.push_back(op->get_dup_table_pos().at(i)); + adjust_addr = replica_loc.get_server(); } } - } - if (OB_FAIL(ret)) { - /*do nothing*/ - } else if (OB_UNLIKELY(dup_table_pos_list.count() != op->get_num_of_child())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(dup_table_pos_list.count()), - K(op->get_num_of_child()), K(ret)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < op->get_num_of_child(); i++) { - if (OB_ISNULL(child = op->get_child(i))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(ret)); - } else if (OB_FAIL(SMART_CALL(set_duplicated_table_location(child, - dup_table_pos_list.at(i))))) { - LOG_WARN("failed to set duplicated table location", K(ret)); - } else { /*do nothing*/ } + } else if (can_reselect_replica) { + adjust_addr = addr; + } else if (NULL != sharding && sharding->is_remote()) { + if (OB_FAIL(sharding->get_remote_addr(adjust_addr))) { + LOG_WARN("failed to get remote addr", K(ret)); } + } else { + adjust_addr = get_optimizer_context().get_local_server_addr(); + } + for (int64_t i = 0; OB_SUCC(ret) && i < op->get_num_of_child(); i++) { + if (OB_ISNULL(child = op->get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(SMART_CALL(choose_duplicate_table_replica(child, + adjust_addr, + false)))) { + LOG_WARN("failed to set duplicated table location", K(ret), + K(can_reselect_replica), K(adjust_addr)); + } else { /*do nothing*/ } } } return ret; @@ -14075,3 +14101,50 @@ int ObLogPlan::deduce_redundant_join_conds_with_equal_set( } return ret; } + +int ObLogPlan::compute_duplicate_table_replicas(ObLogicalOperator *op) +{ + int ret = OB_SUCCESS; + ObShardingInfo *sharding = NULL; + ObSEArray valid_addrs; + ObAddr basic_addr; + if (OB_ISNULL(op)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (NULL == (sharding = op->get_strong_sharding()) || + !sharding->get_can_reselect_replica()) { + // do nothing + } else { + if (OB_ISNULL(sharding->get_phy_table_location_info()) || + OB_UNLIKELY(1 != sharding->get_phy_table_location_info()->get_partition_cnt())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(ObOptimizerUtil::get_duplicate_table_replica(*sharding->get_phy_table_location_info(), + valid_addrs))) { + LOG_WARN("failed to get duplicated table replica", K(ret)); + } else if (OB_UNLIKELY(valid_addrs.empty())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected valid addrs", K(ret)); + } else if (ObOptimizerUtil::find_item(valid_addrs, + get_optimizer_context().get_local_server_addr())) { + sharding->set_local(); + basic_addr = get_optimizer_context().get_local_server_addr(); + } else { + sharding->set_remote(); + basic_addr = valid_addrs.at(0); + } + if (OB_SUCC(ret)) { + int64_t dup_table_pos = OB_INVALID_INDEX; + ObCandiTableLoc *phy_loc =sharding->get_phy_table_location_info(); + ObCandiTabletLoc &phy_part_loc = + phy_loc->get_phy_part_loc_info_list_for_update().at(0); + if (!phy_part_loc.is_server_in_replica(basic_addr, dup_table_pos)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("no server in replica", K(basic_addr), K(ret)); + } else { + phy_part_loc.set_selected_replica_idx(dup_table_pos); + } + } + } + return ret; +} \ No newline at end of file diff --git a/src/sql/optimizer/ob_log_plan.h b/src/sql/optimizer/ob_log_plan.h index f85d663983..b2600cd137 100644 --- a/src/sql/optimizer/ob_log_plan.h +++ b/src/sql/optimizer/ob_log_plan.h @@ -264,8 +264,9 @@ public: int check_das_dppr_filter_exprs(const ObIArray &input_filters, bool &has_dppr_filters); - int set_duplicated_table_location(ObLogicalOperator *op, - int64_t dup_table_pos); + int choose_duplicate_table_replica(ObLogicalOperator *op, + const ObAddr &addr, + bool is_root); /** * Get allocator used in sql compilation * @@ -1733,6 +1734,8 @@ private: // member functions common::ObIArray &popular_values) const; bool has_depend_json_table(const ObRelIds& table_ids); int adjust_expr_properties_for_external_table(ObRawExpr *col_expr, ObRawExpr *&expr) const; + + int compute_duplicate_table_replicas(ObLogicalOperator *op); public: const ObLogPlanHint &get_log_plan_hint() const { return log_plan_hint_; } bool has_join_order_hint() { return !log_plan_hint_.join_order_.leading_tables_.is_empty(); } diff --git a/src/sql/optimizer/ob_log_set.cpp b/src/sql/optimizer/ob_log_set.cpp index eae5e5e3aa..a0affc156d 100644 --- a/src/sql/optimizer/ob_log_set.cpp +++ b/src/sql/optimizer/ob_log_set.cpp @@ -303,7 +303,6 @@ int ObLogSet::compute_sharding_info() get_plan()->get_optimizer_context().get_local_server_addr(), get_child_list(), get_plan()->get_allocator(), - dup_table_pos_, strong_sharding_, inherit_sharding_index_))) { LOG_WARN("failed to compute basic sharding info", K(ret)); diff --git a/src/sql/optimizer/ob_log_subplan_filter.cpp b/src/sql/optimizer/ob_log_subplan_filter.cpp index d0f1a03eda..5c06cb836c 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.cpp +++ b/src/sql/optimizer/ob_log_subplan_filter.cpp @@ -382,7 +382,6 @@ int ObLogSubPlanFilter::compute_sharding_info() get_plan()->get_optimizer_context().get_local_server_addr(), get_child_list(), get_plan()->get_allocator(), - dup_table_pos_, strong_sharding_, inherit_sharding_index_))) { LOG_WARN("failed to compute basic sharding info", K(ret)); diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index ba1732fd4f..4691163ff4 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -356,7 +356,6 @@ ObLogicalOperator::ObLogicalOperator(ObLogPlan &plan) contain_pw_merge_op_(false), contain_das_op_(false), contain_match_all_fake_cte_(false), - dup_table_pos_(), strong_sharding_(NULL), weak_sharding_(), is_pipelined_plan_(false), @@ -5304,46 +5303,3 @@ int ObLogicalOperator::collect_batch_exec_param(void* ctx, } return ret; } - -int ObLogicalOperator::adjust_dup_table_replica_pos(int64_t dup_table_pos) -{ - int ret = OB_SUCCESS; - ObSEArray input_shardings; - if (NULL == strong_sharding_ || - OB_INVALID_INDEX == dup_table_pos) { - // do noting - } else if (dup_table_pos_.empty()) { - // do nothing - } else if (OB_ISNULL(strong_sharding_->get_phy_table_location_info()) || - OB_UNLIKELY(1 != strong_sharding_->get_phy_table_location_info()->get_partition_cnt())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(ret)); - } else { - const ObCandiTabletLoc &phy_part_loc = - strong_sharding_->get_phy_table_location_info()->get_phy_part_loc_info_list_for_update().at(0); - const ObIArray &replicas = - phy_part_loc.get_partition_location().get_replica_locations(); - if (OB_UNLIKELY(dup_table_pos < 0 || dup_table_pos >= replicas.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(dup_table_pos), K(replicas.count()), K(ret)); - } - for (int64_t i = 0; OB_SUCC(ret) && i < get_num_of_child(); ++i) { - if (OB_ISNULL(get_child(i))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(ret)); - } else if (OB_FAIL(input_shardings.push_back(get_child(i)->get_strong_sharding()))) { - LOG_WARN("failed to push back input shardings", K(ret)); - } - } - if (OB_SUCC(ret)) { - const ObAddr &addr = replicas.at(dup_table_pos).get_server(); - dup_table_pos_.reuse(); - if (OB_FAIL(ObOptimizerUtil::compute_duplicate_table_replica_pos(addr, - input_shardings, - dup_table_pos_))) { - LOG_WARN("failed to compute duplicate table replica pos", K(ret)); - } - } - } - return ret; -} \ No newline at end of file diff --git a/src/sql/optimizer/ob_logical_operator.h b/src/sql/optimizer/ob_logical_operator.h index 5d4bea0862..a1365bcb01 100644 --- a/src/sql/optimizer/ob_logical_operator.h +++ b/src/sql/optimizer/ob_logical_operator.h @@ -1473,11 +1473,6 @@ public: inline int set_weak_sharding(ObIArray &weak_sharding) { return weak_sharding_.assign(weak_sharding); } - inline ObIArray &get_dup_table_pos() { return dup_table_pos_; } - inline int set_dup_table_pos(ObIArray &dup_table_pos) - { - return dup_table_pos_.assign(dup_table_pos); - } inline bool get_contains_fake_cte() const { return contain_fake_cte_; } inline void set_contains_fake_cte(bool contain_fake_cte) { @@ -1666,7 +1661,6 @@ public: // 生成 partition id 表达式 int generate_pseudo_partition_id_expr(ObOpPseudoColumnRawExpr *&expr); - int adjust_dup_table_replica_pos(int64_t dup_table_pos); public: ObSEArray child_; ObSEArray equal_param_constraints_; @@ -1852,7 +1846,6 @@ protected: bool contain_pw_merge_op_; bool contain_das_op_; bool contain_match_all_fake_cte_; - common::ObSEArray dup_table_pos_; // for duplicated table ObShardingInfo *strong_sharding_; common::ObSEArray weak_sharding_; bool is_pipelined_plan_; diff --git a/src/sql/optimizer/ob_optimizer_util.cpp b/src/sql/optimizer/ob_optimizer_util.cpp index 86846d444e..8dcbb2fbf1 100644 --- a/src/sql/optimizer/ob_optimizer_util.cpp +++ b/src/sql/optimizer/ob_optimizer_util.cpp @@ -7464,7 +7464,6 @@ int ObOptimizerUtil::check_basic_sharding_info(const ObAddr &local_addr, int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, const ObIArray &child_ops, ObIAllocator &allocator, - ObIArray &reselected_pos, ObShardingInfo *&result_sharding, int64_t &inherit_sharding_index) { @@ -7485,7 +7484,6 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, } else if (OB_FAIL(compute_basic_sharding_info(local_addr, sharding_infos, allocator, - reselected_pos, result_sharding, inherit_sharding_index))) { LOG_WARN("failed to compute basic sharding info", K(ret)); @@ -7496,7 +7494,6 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, const ObIArray &input_shardings, ObIAllocator &allocator, - ObIArray &reselected_pos, ObShardingInfo *&result_sharding, int64_t &inherit_sharding_index) { @@ -7509,7 +7506,7 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, } else { ObAddr basic_addr; bool has_duplicated = false; - bool can_reselect_replica = true; + bool is_replicas_same = true; ObShardingInfo *sharding = NULL; ObSEArray valid_addrs; ObSEArray intersect_addrs; @@ -7534,9 +7531,9 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, } else { if (OB_FAIL(ObOptimizerUtil::intersect(valid_addrs, intersect_addrs, candidate_addrs))) { LOG_WARN("failed to intersect addrs", K(ret)); - } else if (OB_FALSE_IT(can_reselect_replica = can_reselect_replica && - valid_addrs.count() == candidate_addrs.count() && - valid_addrs.count() == intersect_addrs.count())) { + } else if (OB_FALSE_IT(is_replicas_same = is_replicas_same && + valid_addrs.count() == candidate_addrs.count() && + valid_addrs.count() == intersect_addrs.count())) { // do nothing } else if (OB_FAIL(intersect_addrs.assign(candidate_addrs))) { LOG_WARN("failed to assign addrs", K(ret)); @@ -7577,14 +7574,18 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, /*do nothing*/ } else if (!has_duplicated) { /*do nothing*/ - } else if (OB_FAIL(compute_duplicate_table_replica_pos(basic_addr, input_shardings, reselected_pos))) { - LOG_WARN("failed to set duplicated table replica", K(ret)); } else if (NULL != result_sharding) { /*do nothing*/ - } else if (OB_UNLIKELY(input_shardings.count() != reselected_pos.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected array count", K(input_shardings.count()), - K(reselected_pos.count()), K(ret)); + } else if (is_replicas_same) { + for (int64_t i = 0; OB_SUCC(ret) && NULL == result_sharding && i < input_shardings.count(); i++) { + if (OB_ISNULL(sharding = input_shardings.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (sharding->get_can_reselect_replica()) { + result_sharding = sharding; + inherit_sharding_index = i; + } + } } else { for (int64_t i = 0; OB_SUCC(ret) && NULL == result_sharding && i < input_shardings.count(); i++) { if (OB_ISNULL(sharding = input_shardings.at(i))) { @@ -7592,10 +7593,10 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, LOG_WARN("get unexpected null", K(ret)); } else if (sharding->get_can_reselect_replica() && OB_FAIL(ObOptimizerUtil::compute_duplicate_table_sharding(local_addr, + basic_addr, allocator, *input_shardings.at(i), - reselected_pos.at(i), - can_reselect_replica, + intersect_addrs, result_sharding))) { LOG_WARN("failed to compute duplicate table sharding", K(ret)); } else if (NULL != result_sharding) { @@ -7606,7 +7607,7 @@ int ObOptimizerUtil::compute_basic_sharding_info(const ObAddr &local_addr, } } if (OB_SUCC(ret) && NULL != result_sharding) { - LOG_TRACE("succeed to compute basic sharding info", K(*result_sharding), K(input_shardings), K(reselected_pos)); + LOG_TRACE("succeed to compute basic sharding info", K(*result_sharding), K(input_shardings)); } return ret; } @@ -7631,84 +7632,91 @@ int ObOptimizerUtil::get_duplicate_table_replica(const ObCandiTableLoc &phy_tabl return ret; } -int ObOptimizerUtil::compute_duplicate_table_replica_pos(const ObAddr &addr, - const ObIArray &input_shardings, - ObIArray &reselected_pos) +int ObOptimizerUtil::compute_duplicate_table_sharding(const ObAddr &local_addr, + const ObAddr &selected_addr, + ObIAllocator &allocator, + ObShardingInfo &src_sharding, + ObIArray &valid_addrs, + ObShardingInfo *&target_sharding) { int ret = OB_SUCCESS; - for (int64_t i = 0; OB_SUCC(ret) && i < input_shardings.count(); i++) { - ObShardingInfo *sharding = NULL; - int64_t replica_index = OB_INVALID_INDEX; - if (OB_ISNULL(sharding = input_shardings.at(i))) { + ObCandiTableLoc *phy_table_loc = NULL; + int64_t replica_index = OB_INVALID_INDEX; + target_sharding = NULL; + if (OB_ISNULL(target_sharding = reinterpret_cast( + allocator.alloc(sizeof(ObShardingInfo))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate memory", K(ret)); + } else if (OB_FALSE_IT(target_sharding = new(target_sharding) ObShardingInfo())) { + } else if (OB_FAIL(target_sharding->copy_with_part_keys(src_sharding))) { + LOG_WARN("failed to copy sharding info", K(ret)); + } else if (OB_ISNULL(src_sharding.get_phy_table_location_info()) || + OB_UNLIKELY(1 != src_sharding.get_phy_table_location_info()->get_partition_cnt())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected partition count", K(ret)); + } else if (OB_FAIL(generate_duplicate_table_replicas(allocator, + src_sharding.get_phy_table_location_info(), + valid_addrs, + phy_table_loc))) { + LOG_WARN("failed to compute duplicate table location", K(ret)); + } else if (OB_ISNULL(phy_table_loc)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected error", K(ret)); + } else if (OB_FALSE_IT(target_sharding->set_phy_table_location_info(phy_table_loc))) { + } else if (OB_UNLIKELY(1 != target_sharding->get_phy_table_location_info()->get_partition_cnt())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected partition count", K(ret)); + } else { + int64_t dup_table_pos = OB_INVALID_INDEX; + ObCandiTabletLoc &phy_part_loc = + phy_table_loc->get_phy_part_loc_info_list_for_update().at(0); + if (!phy_part_loc.is_server_in_replica(selected_addr, dup_table_pos)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(sharding), K(ret)); - } else if (!sharding->get_can_reselect_replica()) { - /*do nothing*/ - } else if (OB_ISNULL(sharding->get_phy_table_location_info()) || - OB_UNLIKELY(1 != sharding->get_phy_table_location_info()->get_partition_cnt())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected error", K(ret)); + LOG_WARN("no server in replica", K(selected_addr), K(ret)); } else { - share::ObLSReplicaLocation replica_loc; - ObCandiTabletLoc &phy_part_loc = - sharding->get_phy_table_location_info()->get_phy_part_loc_info_list_for_update().at(0); - if (!phy_part_loc.is_server_in_replica(addr, replica_index)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("no replica in this server", K(addr), K(ret)); - } else if (OB_UNLIKELY(replica_index == OB_INVALID_INDEX)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid replica index", K(ret)); - } else { /*do nothing*/ } - } - if (OB_SUCC(ret)) { - if (OB_FAIL(reselected_pos.push_back(replica_index))) { - LOG_WARN("failed to push back replica index", K(ret)); - } else { /*do nothing*/ } + phy_part_loc.set_selected_replica_idx(dup_table_pos); + if (local_addr == selected_addr) { + target_sharding->set_local(); + } else { + target_sharding->set_remote(); + } } } return ret; } -int ObOptimizerUtil::compute_duplicate_table_sharding(const ObAddr &local_addr, - ObIAllocator &allocator, - const ObShardingInfo &src_sharding, - const int64_t reselected_pos, - bool can_reselect_replica, - ObShardingInfo *&target_sharding) +int ObOptimizerUtil::generate_duplicate_table_replicas(ObIAllocator &allocator, + const ObCandiTableLoc *source_table_loc, + ObIArray &valid_addrs, + ObCandiTableLoc *&target_table_loc) { int ret = OB_SUCCESS; - ObCandiTableLoc *phy_table_loc = NULL; - if (OB_ISNULL(target_sharding = reinterpret_cast( - allocator.alloc(sizeof(ObShardingInfo))))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to allocate memory", K(ret)); - } else if (OB_ISNULL(phy_table_loc = reinterpret_cast( - allocator.alloc(sizeof(ObCandiTableLoc))))) { + if (OB_ISNULL(source_table_loc)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_UNLIKELY(1 != source_table_loc->get_partition_cnt())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected partition count", K(ret), + K(source_table_loc->get_partition_cnt())); + } else if (OB_ISNULL(target_table_loc = static_cast( + allocator.alloc(sizeof(ObCandiTableLoc))))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to allocate memory", K(ret)); + } else if (OB_FALSE_IT(target_table_loc = new(target_table_loc) ObCandiTableLoc())) { + // do nothing + } else if (OB_FAIL(target_table_loc->assign(*source_table_loc))) { + LOG_WARN("failed to assign table location", K(ret)); } else { - target_sharding = new(target_sharding) ObShardingInfo(); - phy_table_loc = new(phy_table_loc) ObCandiTableLoc(); - if (OB_FAIL(target_sharding->copy_with_part_keys(src_sharding))) { - LOG_WARN("failed to copy sharding info", K(ret)); - } else if (OB_FAIL(phy_table_loc->assign(*src_sharding.get_phy_table_location_info()))) { - LOG_WARN("failed to assign table location", K(ret)); - } else if (OB_UNLIKELY(1 != phy_table_loc->get_partition_cnt())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected partition count", K(ret)); - } else { - share::ObLSReplicaLocation replica_loc; - ObCandiTabletLoc &phy_part_loc = - phy_table_loc->get_phy_part_loc_info_list_for_update().at(0); - phy_part_loc.set_selected_replica_idx(reselected_pos); - target_sharding->set_phy_table_location_info(phy_table_loc); - target_sharding->set_can_reselect_replica(can_reselect_replica); - if (OB_FAIL(phy_part_loc.get_selected_replica(replica_loc))) { - LOG_WARN("failed to get selected replica", K(ret)); - } else if (replica_loc.get_server() == local_addr) { - target_sharding->set_local(); - } else { - target_sharding->set_remote(); + ObCandiTabletLoc &phy_part_loc = + target_table_loc->get_phy_part_loc_info_list_for_update().at(0); + ObOptTabletLoc &opt_tablet_loc = phy_part_loc.get_partition_location(); + ObIArray &replica_loc_list = opt_tablet_loc.get_replica_locations(); + for (int64_t i = replica_loc_list.count() - 1; OB_SUCC(ret) && i >= 0; --i) { + if (ObOptimizerUtil::find_item(valid_addrs, + replica_loc_list.at(i).get_server())) { + // do nothing + } else if (OB_FAIL(replica_loc_list.remove(i))) { + LOG_WARN("failed to remove relica loc list", K(ret)); } } } diff --git a/src/sql/optimizer/ob_optimizer_util.h b/src/sql/optimizer/ob_optimizer_util.h index 070399eed0..0bb44d6e03 100644 --- a/src/sql/optimizer/ob_optimizer_util.h +++ b/src/sql/optimizer/ob_optimizer_util.h @@ -1316,31 +1316,30 @@ public: static int compute_basic_sharding_info(const ObAddr &local_addr, const ObIArray &child_ops, ObIAllocator &allocator, - ObIArray &reselected_pos, ObShardingInfo *&result_sharding, int64_t &inherit_sharding_index); static int compute_basic_sharding_info(const ObAddr &local_addr, const ObIArray &input_shardings, ObIAllocator &allocator, - ObIArray &reselected_pos, ObShardingInfo *&result_sharding, int64_t &inherit_sharding_index); static int get_duplicate_table_replica(const ObCandiTableLoc &phy_table_loc, ObIArray &valid_addrs); - static int compute_duplicate_table_replica_pos(const ObAddr &addr, - const ObIArray &input_shardings, - ObIArray &reselected_pos); - static int compute_duplicate_table_sharding(const ObAddr &local_addr, + const ObAddr &selected_addr, ObIAllocator &allocator, - const ObShardingInfo &src_sharding, - const int64_t reselected_pos, - bool can_reselect_replica, + ObShardingInfo &src_sharding, + ObIArray &valid_addrs, ObShardingInfo *&target_sharding); + static int generate_duplicate_table_replicas(ObIAllocator &allocator, + const ObCandiTableLoc *source_table_loc, + ObIArray &valid_addrs, + ObCandiTableLoc *&target_table_loc); + static int64_t get_join_style_parallel(const int64_t left_parallel, const int64_t right_parallel, const DistAlgo join_dist_algo, diff --git a/src/sql/optimizer/ob_sharding_info.h b/src/sql/optimizer/ob_sharding_info.h index 9d087c722c..0922e8a963 100644 --- a/src/sql/optimizer/ob_sharding_info.h +++ b/src/sql/optimizer/ob_sharding_info.h @@ -275,7 +275,21 @@ public: return OB_TBL_LOCATION_UNINITIALIZED == location_type_; } void set_can_reselect_replica(const bool b) { can_reselect_replica_ = b; } - bool get_can_reselect_replica() const { return can_reselect_replica_; } + inline bool get_can_reselect_replica() const + { + bool ret = false; + if (!can_reselect_replica_) { + ret = false; + } else if (NULL == phy_table_location_info_ || + (1 != phy_table_location_info_->get_partition_cnt())) { + ret = false; + } else { + ret = phy_table_location_info_->get_phy_part_loc_info_list().at(0) + .get_partition_location() + .get_replica_locations().count() > 0; + } + return ret; + } inline bool is_distributed_without_table_location() const { return is_distributed() && NULL == phy_table_location_info_;