diff --git a/src/sql/optimizer/ob_join_order.cpp b/src/sql/optimizer/ob_join_order.cpp index 4eb963fdd8..c696ad9c5b 100644 --- a/src/sql/optimizer/ob_join_order.cpp +++ b/src/sql/optimizer/ob_join_order.cpp @@ -4510,6 +4510,8 @@ int JoinPath::compute_join_path_sharding() reselected_dup_pos, strong_sharding_))) { LOG_WARN("failed to compute basic sharding info", K(ret)); + } else if (OB_FALSE_IT(inherit_sharding_index_ = 0)) { + //do nothing } else if (reselected_dup_pos.empty()) { /*no duplicated table, do nothing*/ } else if (OB_UNLIKELY(2 != reselected_dup_pos.count())) { @@ -4529,11 +4531,13 @@ int JoinPath::compute_join_path_sharding() strong_sharding_ = opt_ctx.get_local_sharding(); } else if (DistAlgo::DIST_NONE_ALL == join_dist_algo_) { strong_sharding_ = left_path_->strong_sharding_; + inherit_sharding_index_ = 0; if (OB_FAIL(append(weak_sharding_, left_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } } else if (DistAlgo::DIST_ALL_NONE == join_dist_algo_) { strong_sharding_ = right_path_->strong_sharding_; + inherit_sharding_index_ = 1; if (OB_FAIL(append(weak_sharding_, right_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } @@ -4541,6 +4545,7 @@ int JoinPath::compute_join_path_sharding() DistAlgo::DIST_EXT_PARTITION_WISE == join_dist_algo_) { if (LEFT_OUTER_JOIN == join_type_) { strong_sharding_ = left_path_->strong_sharding_; + inherit_sharding_index_ = 0; if (OB_FAIL(append(weak_sharding_, left_path_->weak_sharding_)) || OB_FAIL(append(weak_sharding_, right_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); @@ -4550,6 +4555,7 @@ int JoinPath::compute_join_path_sharding() } else { /*do nothing*/ } } else if (RIGHT_OUTER_JOIN == join_type_) { strong_sharding_ = right_path_->strong_sharding_; + inherit_sharding_index_ = 1; if (OB_FAIL(append(weak_sharding_, left_path_->weak_sharding_)) || OB_FAIL(append(weak_sharding_, right_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); @@ -4572,11 +4578,13 @@ int JoinPath::compute_join_path_sharding() LEFT_ANTI_JOIN == join_type_ || INNER_JOIN == join_type_) { strong_sharding_ = left_path_->strong_sharding_; + inherit_sharding_index_ = 0; if (OB_FAIL(append(weak_sharding_, left_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } } else { strong_sharding_ = right_path_->strong_sharding_; + inherit_sharding_index_ = 1; if (OB_FAIL(append(weak_sharding_, right_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } @@ -4592,6 +4600,7 @@ int JoinPath::compute_join_path_sharding() } else { /*do nothing*/ } } else { strong_sharding_ = right_path_->strong_sharding_; + inherit_sharding_index_ = 1; if (OB_FAIL(append(weak_sharding_, right_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } @@ -4604,6 +4613,7 @@ int JoinPath::compute_join_path_sharding() strong_sharding_ = opt_ctx.get_distributed_sharding(); } else { strong_sharding_ = right_path_->strong_sharding_; + inherit_sharding_index_ = 1; if (OB_FAIL(append(weak_sharding_, right_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } @@ -4619,6 +4629,7 @@ int JoinPath::compute_join_path_sharding() } else { /*do nothing*/ } } else { strong_sharding_ = left_path_->strong_sharding_; + inherit_sharding_index_ = 0; if (OB_FAIL(append(weak_sharding_, left_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } @@ -4630,6 +4641,7 @@ int JoinPath::compute_join_path_sharding() strong_sharding_ = opt_ctx.get_distributed_sharding(); } else { strong_sharding_ = left_path_->strong_sharding_; + inherit_sharding_index_ = 0; if (OB_FAIL(append(weak_sharding_, left_path_->weak_sharding_))) { LOG_WARN("failed to append weak sharding", K(ret)); } else { /*do nothing*/ } @@ -4641,6 +4653,7 @@ int JoinPath::compute_join_path_sharding() } else if (DistAlgo::DIST_BC2HOST_NONE == join_dist_algo_) { if (right_path_->is_single()) { strong_sharding_ = right_path_->strong_sharding_; + inherit_sharding_index_ = 1; } else { strong_sharding_ = opt_ctx.get_distributed_sharding(); } diff --git a/src/sql/optimizer/ob_join_order.h b/src/sql/optimizer/ob_join_order.h index 6f452db371..e01ff32c64 100644 --- a/src/sql/optimizer/ob_join_order.h +++ b/src/sql/optimizer/ob_join_order.h @@ -710,7 +710,8 @@ struct EstimateCostInfo { contain_normal_nl_(false), can_use_batch_nlj_(false), is_naaj_(false), - is_sna_(false) + is_sna_(false), + inherit_sharding_index_(-1) { } virtual ~JoinPath() {} @@ -894,7 +895,8 @@ struct EstimateCostInfo { K_(contain_normal_nl), K_(can_use_batch_nlj), K_(is_naaj), - K_(is_sna)); + K_(is_sna), + K_(inherit_sharding_index)); public: const Path *left_path_; const Path *right_path_; @@ -926,6 +928,8 @@ struct EstimateCostInfo { bool can_use_batch_nlj_; bool is_naaj_; // is null aware anti join bool is_sna_; // is single null aware anti join + //Used to indicate which child node the current sharding inherits from + int64_t inherit_sharding_index_; private: DISALLOW_COPY_AND_ASSIGN(JoinPath); }; diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index 12c8666c47..3415746f52 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -4471,6 +4471,7 @@ int ObLogPlan::allocate_join_path(JoinPath *join_path, join_op->set_slave_mapping_type(join_path->get_slave_mapping_type()); join_op->set_is_partition_wise(join_path->is_partition_wise()); join_op->set_can_use_batch_nlj(join_path->can_use_batch_nlj_); + join_op->set_inherit_sharding_index(join_path->inherit_sharding_index_); join_op->set_join_path(join_path); if (OB_FAIL(join_op->get_dup_table_pos().push_back(join_path->get_left_dup_table_pos())) || OB_FAIL(join_op->get_dup_table_pos().push_back(join_path->get_right_dup_table_pos()))) { @@ -5079,28 +5080,23 @@ int ObLogPlan::compute_repartition_distribution_info(const EqualSets &equal_sets return ret; } -int ObLogPlan::find_table_scan_with_sharding_info(const ObLogicalOperator &op, - const ObShardingInfo *sharding, - const ObLogTableScan *&tsc) +int ObLogPlan::find_base_sharding_table_scan(const ObLogicalOperator &op, + const ObLogTableScan *&tsc) { int ret = OB_SUCCESS; - tsc = NULL; - if (op.get_strong_sharding() != sharding) { - // return null tsc. - } else if (LOG_TABLE_SCAN == op.get_type()) { + if (LOG_TABLE_SCAN == op.get_type()) { tsc = static_cast(&op); - } else { - const bool is_subplan_scan = LOG_SUBPLAN_SCAN == op.get_type(); - for (int64_t i = 0; i < op.get_num_of_child() && NULL == tsc && OB_SUCC(ret); i++) { - if (OB_ISNULL(op.get_child(i))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(ret)); - } else if (OB_FAIL((SMART_CALL(find_table_scan_with_sharding_info(*op.get_child(i), - is_subplan_scan ? op.get_child(i)->get_strong_sharding() : sharding, - tsc))))) { - LOG_WARN("find table scan failed", K(ret)); - } - } + } else if (-1 == op.get_inherit_sharding_index()) { + // return null tsc. + } else if (OB_UNLIKELY(op.get_inherit_sharding_index() >= op.get_child_list().count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected sharding src index", K(op.get_inherit_sharding_index()), K(ret)); + } else if (OB_ISNULL(op.get_child(op.get_inherit_sharding_index()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(SMART_CALL(find_base_sharding_table_scan(*op.get_child(op.get_inherit_sharding_index()), + tsc)))) { + LOG_WARN("failed to find base sharding table scan", K(ret)); } return ret; } @@ -5113,7 +5109,7 @@ int ObLogPlan::get_repartition_table_info(const ObLogicalOperator &op, int ret = OB_SUCCESS; const ObLogicalOperator *cur_op = &op; const ObLogTableScan *table_scan = NULL; - if (OB_FAIL(find_table_scan_with_sharding_info(op, op.get_strong_sharding(), table_scan))) { + if (OB_FAIL(find_base_sharding_table_scan(op, table_scan))) { LOG_WARN("find table scan failed", K(ret)); } else if (OB_ISNULL(table_scan)) { ret = OB_ERR_UNEXPECTED; @@ -5361,6 +5357,7 @@ int ObLogPlan::allocate_subquery_path(SubQueryPath *subpath, } else { subplan_scan->set_subquery_id(subpath->subquery_id_); subplan_scan->set_child(ObLogicalOperator::first_child, root); + subplan_scan->set_inherit_sharding_index(0); subplan_scan->get_subquery_name().assign_ptr(table_item->table_name_.ptr(), table_item->table_name_.length()); if (OB_FAIL(append(subplan_scan->get_filter_exprs(), subpath->filter_))) { diff --git a/src/sql/optimizer/ob_log_plan.h b/src/sql/optimizer/ob_log_plan.h index 0e48054086..d759792daa 100644 --- a/src/sql/optimizer/ob_log_plan.h +++ b/src/sql/optimizer/ob_log_plan.h @@ -636,9 +636,10 @@ public: const ObIArray &subquery_ops, const ObIArray ¶ms, ObExchangeInfo &exch_info); - int find_table_scan_with_sharding_info(const ObLogicalOperator &op, - const ObShardingInfo *sharding, - const ObLogTableScan *&tsc); + + int find_base_sharding_table_scan(const ObLogicalOperator &op, + const ObLogTableScan *&tsc); + int get_repartition_table_info(const ObLogicalOperator &op, ObString &table_name, uint64_t &ref_table_id, diff --git a/src/sql/optimizer/ob_log_set.cpp b/src/sql/optimizer/ob_log_set.cpp index 9f37853eda..b48542bae8 100644 --- a/src/sql/optimizer/ob_log_set.cpp +++ b/src/sql/optimizer/ob_log_set.cpp @@ -309,7 +309,9 @@ int ObLogSet::compute_sharding_info() dup_table_pos_, strong_sharding_))) { LOG_WARN("failed to compute basic sharding info", K(ret)); - } else { /*do nothing*/ } + } else { + inherit_sharding_index_ = 0; + } } else if (DistAlgo::DIST_PULL_TO_LOCAL == set_dist_algo_) { strong_sharding_ = get_plan()->get_optimizer_context().get_local_sharding(); } else if (DistAlgo::DIST_SET_RANDOM == set_dist_algo_) { @@ -325,15 +327,19 @@ int ObLogSet::compute_sharding_info() } else if (DistAlgo::DIST_NONE_HASH == set_dist_algo_) { is_partition_wise_ = false; strong_sharding_ = first_child->get_strong_sharding(); + inherit_sharding_index_ = ObLogicalOperator::first_child; } else if (DistAlgo::DIST_HASH_NONE == set_dist_algo_) { is_partition_wise_ = false; strong_sharding_ = second_child->get_strong_sharding(); + inherit_sharding_index_ = ObLogicalOperator::second_child; } else if (DistAlgo::DIST_NONE_ALL == set_dist_algo_) { is_partition_wise_ = false; strong_sharding_ = first_child->get_strong_sharding(); + inherit_sharding_index_ = ObLogicalOperator::first_child; } else if (DistAlgo::DIST_ALL_NONE == set_dist_algo_) { is_partition_wise_ = false; strong_sharding_ = second_child->get_strong_sharding(); + inherit_sharding_index_ = ObLogicalOperator::second_child; } else if (OB_FAIL(ObLogicalOperator::compute_sharding_info())) { LOG_WARN("failed to compute sharding info", K(ret)); } else { /*do nothing*/ } diff --git a/src/sql/optimizer/ob_log_subplan_filter.cpp b/src/sql/optimizer/ob_log_subplan_filter.cpp index 3541bb256c..79da31c87b 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.cpp +++ b/src/sql/optimizer/ob_log_subplan_filter.cpp @@ -403,7 +403,9 @@ int ObLogSubPlanFilter::compute_sharding_info() dup_table_pos_, strong_sharding_))) { LOG_WARN("failed to compute basic sharding info", K(ret)); - } else { /*do nothing*/ } + } else { + inherit_sharding_index_ = ObLogicalOperator::first_child; + } } else if (DistAlgo::DIST_PULL_TO_LOCAL == dist_algo_) { strong_sharding_ = get_plan()->get_optimizer_context().get_local_sharding(); } else if (DistAlgo::DIST_NONE_ALL == dist_algo_) { @@ -416,6 +418,7 @@ int ObLogSubPlanFilter::compute_sharding_info() LOG_WARN("failed to assign weak sharding", K(ret)); } else { strong_sharding_ = get_child(0)->get_strong_sharding(); + inherit_sharding_index_ = 0; } } else if (DistAlgo::DIST_PARTITION_WISE == dist_algo_) { for (int64_t i = 0; OB_SUCC(ret) && i < get_num_of_child(); i++) { @@ -429,6 +432,7 @@ int ObLogSubPlanFilter::compute_sharding_info() LOG_WARN("failed to assign weak sharding", K(ret)); } else { strong_sharding_ = get_child(i)->get_strong_sharding(); + inherit_sharding_index_ = i; break; } } else { /*do nothing*/} @@ -442,10 +446,12 @@ int ObLogSubPlanFilter::compute_sharding_info() OB_ISNULL(sharding = get_child(1)->get_sharding())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(sharding), K(ret)); - } else if (OB_FAIL(weak_sharding_.assign(get_child(1)->get_weak_sharding()))) { - LOG_WARN("failed to assign weak sharding", K(ret)); + } else if (OB_FAIL(get_repart_sharding_info(get_child(1), + strong_sharding_, + weak_sharding_))) { + LOG_WARN("failed to rebuild sharding info", K(ret)); } else { - strong_sharding_ = get_child(1)->get_strong_sharding(); + inherit_sharding_index_ = 1; } } else { ret = OB_ERR_UNEXPECTED; @@ -616,112 +622,6 @@ int ObLogSubPlanFilter::replace_nested_subquery_exprs( return ret; } -int ObLogSubPlanFilter::get_equal_set_conditions(ObIArray &equal_conds) -{ - int ret = OB_SUCCESS; - ObLogicalOperator *right_child = NULL; - ObRawExprFactory *expr_factory = NULL; - ObSQLSessionInfo *session_info = NULL; - ObSEArray left_keys; - ObSEArray right_keys; - ObSEArray null_safe_info; - - if (OB_ISNULL(get_plan()) || - OB_ISNULL(session_info = get_plan()->get_optimizer_context().get_session_info()) || - OB_ISNULL(expr_factory = &get_plan()->get_optimizer_context().get_expr_factory())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(get_plan()), K(session_info), K(ret)); - } else { - for (int64_t i = 1; OB_SUCC(ret) && i < get_num_of_child(); ++i) { - left_keys.reuse(); - right_keys.reuse(); - null_safe_info.reuse(); - if (OB_ISNULL(right_child = get_child(i))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("child is null", K(ret), K(right_child)); - } else if (OB_FAIL(get_plan()->get_subplan_filter_equal_keys(right_child, - exec_params_, - left_keys, - right_keys, - null_safe_info))) { - LOG_WARN("failed to get equal set conditions", K(ret)); - } else if (OB_UNLIKELY(left_keys.count() != right_keys.count()) || - OB_UNLIKELY(left_keys.count() != null_safe_info.count())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("left keys should equal right keys", K(ret)); - } else { - for (int64_t j = 0; OB_SUCC(ret) && j < left_keys.count(); ++j) { - ObRawExpr* lexpr; - ObRawExpr* rexpr; - ObRawExpr* equal_expr = NULL; - if (OB_ISNULL(lexpr = left_keys.at(j)) || - OB_ISNULL(rexpr = right_keys.at(j))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null", K(ret)); - } else if (OB_FAIL(ObRawExprUtils::create_equal_expr(*expr_factory, - session_info, - lexpr, - rexpr, - equal_expr))) { - LOG_WARN("failed to create equal expr", K(ret)); - } else if (OB_FAIL(equal_conds.push_back(equal_expr))) { - LOG_WARN("failed to push back equal conds", K(ret)); - } - } - } - } - } - return ret; -} - -int ObLogSubPlanFilter::compute_equal_set() -{ - int ret = OB_SUCCESS; - ObLogicalOperator *child = NULL; - EqualSets *ordering_esets = NULL; - EqualSets input_equal_sets; - ObSEArray equal_set_conditions; - if (OB_ISNULL(my_plan_) || OB_UNLIKELY(get_num_of_child() < 0)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("operator is invalid", K(ret), K(get_num_of_child()), K(my_plan_)); - } else if (OB_UNLIKELY(get_num_of_child() == 0)) { - // do nothing - } else if (OB_ISNULL(child = get_child(0))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("child is null", K(ret), K(child)); - } else if (OB_FAIL(get_equal_set_conditions(equal_set_conditions))) { - LOG_WARN("failed to get equal set conditions", K(ret)); - } else if (append(equal_set_conditions, filter_exprs_)) { - LOG_WARN("failed to append", K(ret)); - } else if (equal_set_conditions.empty()) { - // inherit equal sets from the first child directly - set_output_equal_sets(&child->get_output_equal_sets()); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < get_num_of_child(); ++i) { - if (OB_ISNULL(child)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("child is null", K(ret), K(child)); - } else if (append(input_equal_sets, child->get_output_equal_sets())) { - LOG_WARN("failed to init input equal sets", K(ret)); - } - } - if (OB_FAIL(ret)) { - } else if (OB_ISNULL(ordering_esets = get_plan()->create_equal_sets())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to create equal sets", K(ret)); - } else if (OB_FAIL(ObEqualAnalysis::compute_equal_set( - &my_plan_->get_allocator(), - equal_set_conditions, - input_equal_sets, - *ordering_esets))) { - LOG_WARN("failed to compute ordering output equal set", K(ret)); - } else { - set_output_equal_sets(ordering_esets); - } - } - return ret; -} - int ObLogSubPlanFilter::allocate_startup_expr_post(int64_t child_idx) { int ret = OB_SUCCESS; @@ -772,3 +672,117 @@ int ObLogSubPlanFilter::allocate_startup_expr_post(int64_t child_idx) } return ret; } + +int ObLogSubPlanFilter::get_repart_sharding_info(ObLogicalOperator* child_op, + ObShardingInfo *&strong_sharding, + ObIArray &weak_sharding) +{ + int ret = OB_SUCCESS; + ObLogicalOperator *child = NULL; + ObSEArray src_keys; + ObSEArray target_keys; + ObSEArray null_safe_info; + EqualSets input_esets; + + for (int64_t i = 1; OB_SUCC(ret) && i < get_num_of_child(); i++) { + if (OB_ISNULL(child = get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(append(input_esets, child->get_output_equal_sets()))) { + LOG_WARN("failed to append input equal sets", K(ret)); + } else { /*do nothing*/ } + } + + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(get_plan()) || + OB_ISNULL(child_op)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(get_plan()->get_subplan_filter_equal_keys(child_op, + exec_params_, + src_keys, + target_keys, + null_safe_info))) { + LOG_WARN("failed to get repartition keys", K(ret)); + } else if (OB_UNLIKELY(NULL == child_op->get_strong_sharding())) { + strong_sharding = NULL; + } else if (OB_FAIL(rebuild_repart_sharding_info(child_op->get_strong_sharding(), + src_keys, + target_keys, + input_esets, + strong_sharding))) { + LOG_WARN("failed to rebuild repart sharding info", K(ret)); + } + + if (OB_SUCC(ret)) { + weak_sharding.reuse(); + } + + for (int64_t i = 0; OB_SUCC(ret) && i < child_op->get_weak_sharding().count(); ++i) { + ObShardingInfo* out_sharding = NULL; + if (OB_FAIL(rebuild_repart_sharding_info(child_op->get_weak_sharding().at(i), + src_keys, + target_keys, + input_esets, + out_sharding))) { + LOG_WARN("failed to rebuild repart sharding info", K(ret)); + } else if (OB_FAIL(weak_sharding.push_back(out_sharding))) { + LOG_WARN("failed to push back sharding", K(ret)); + } + } + return ret; +} + +int ObLogSubPlanFilter::rebuild_repart_sharding_info(const ObShardingInfo *input_sharding, + ObIArray &src_keys, + ObIArray &target_keys, + EqualSets &input_esets, + ObShardingInfo *&out_sharding) +{ + int ret = OB_SUCCESS; + out_sharding = NULL; + ObSEArray repart_exprs; + ObSEArray repart_sub_exprs; + ObSEArray repart_func_exprs; + if (OB_ISNULL(get_plan()) || + OB_ISNULL(input_sharding)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_ISNULL(out_sharding = reinterpret_cast( + get_plan()->get_allocator().alloc(sizeof(ObShardingInfo))))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate memory", K(ret)); + } else if (OB_FALSE_IT(out_sharding = new(out_sharding) ObShardingInfo())) { + } else if (OB_FAIL(out_sharding->copy_without_part_keys(*input_sharding))) { + LOG_WARN("failed to assign sharding info", K(ret)); + } else { + ObRawExprCopier copier(get_plan()->get_optimizer_context().get_expr_factory()); + if (OB_FAIL(get_plan()->get_repartition_keys(input_esets, + src_keys, + target_keys, + input_sharding->get_partition_keys(), + repart_exprs))) { + LOG_WARN("failed to get repartition keys", K(ret)); + } else if (OB_FAIL(get_plan()->get_repartition_keys(input_esets, + src_keys, + target_keys, + input_sharding->get_sub_partition_keys(), + repart_sub_exprs))) { + LOG_WARN("failed to get sub repartition keys", K(ret)); + } else if (OB_FAIL(copier.add_replaced_expr(input_sharding->get_partition_keys(), + repart_exprs))) { + LOG_WARN("failed to add replace pair", K(ret)); + } else if (OB_FAIL(copier.add_replaced_expr(input_sharding->get_sub_partition_keys(), + repart_sub_exprs))) { + LOG_WARN("failed to add replace pair", K(ret)); + } else if (OB_FAIL(copier.copy_on_replace(input_sharding->get_partition_func(), + repart_func_exprs))) { + LOG_WARN("failed to copy partition function", K(ret)); + } else if (OB_FAIL(out_sharding->get_partition_keys().assign(repart_exprs)) || + OB_FAIL(out_sharding->get_sub_partition_keys().assign(repart_sub_exprs)) || + OB_FAIL(out_sharding->get_partition_func().assign(repart_func_exprs))) { + LOG_WARN("failed to assign partition keys", K(ret)); + } + } + return ret; +} \ No newline at end of file diff --git a/src/sql/optimizer/ob_log_subplan_filter.h b/src/sql/optimizer/ob_log_subplan_filter.h index 37a5d64468..2b7b85ea5f 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.h +++ b/src/sql/optimizer/ob_log_subplan_filter.h @@ -118,14 +118,20 @@ public: virtual int get_plan_item_info(PlanText &plan_text, ObSqlPlanItem &plan_item) override; - virtual int compute_equal_set() override; - - int get_equal_set_conditions(ObIArray &equal_conds); - common::ObIArray &get_above_pushdown_left_params() { return above_pushdown_left_params_; } common::ObIArray &get_above_pushdown_right_params() { return above_pushdown_right_params_; } + int get_repart_sharding_info(ObLogicalOperator* child_op, + ObShardingInfo *&strong_sharding, + ObIArray &weak_sharding); + + int rebuild_repart_sharding_info(const ObShardingInfo *input_sharding, + ObIArray &src_keys, + ObIArray &target_keys, + EqualSets &input_esets, + ObShardingInfo *&out_sharding); + private: int extract_exist_style_subquery_exprs(ObRawExpr *expr, ObIArray &exist_style_exprs); diff --git a/src/sql/optimizer/ob_log_temp_table_insert.cpp b/src/sql/optimizer/ob_log_temp_table_insert.cpp index cc2d882cc6..9b11323c20 100644 --- a/src/sql/optimizer/ob_log_temp_table_insert.cpp +++ b/src/sql/optimizer/ob_log_temp_table_insert.cpp @@ -50,6 +50,7 @@ int ObLogTempTableInsert::compute_sharding_info() strong_sharding_ = get_plan()->get_optimizer_context().get_local_sharding(); } else if (child->is_single()) { strong_sharding_ = child->get_sharding(); + inherit_sharding_index_ = ObLogicalOperator::first_child; } else { strong_sharding_ = get_plan()->get_optimizer_context().get_distributed_sharding(); } diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index 305db0bfb7..82bcda82dd 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -367,6 +367,7 @@ ObLogicalOperator::ObLogicalOperator(ObLogPlan &plan) server_cnt_(1), need_late_materialization_(false), op_exprs_(), + inherit_sharding_index_(-1), allocated_osg_(false) { @@ -757,6 +758,7 @@ int ObLogicalOperator::compute_sharding_info() LOG_WARN("failed to assign sharding info", K(ret)); } else { strong_sharding_ = child->get_strong_sharding(); + inherit_sharding_index_ = ObLogicalOperator::first_child; } return ret; } diff --git a/src/sql/optimizer/ob_logical_operator.h b/src/sql/optimizer/ob_logical_operator.h index 376c6370b5..c2f1a6ae8c 100644 --- a/src/sql/optimizer/ob_logical_operator.h +++ b/src/sql/optimizer/ob_logical_operator.h @@ -1455,6 +1455,12 @@ public: contain_das_op_ = contain_das_op; } + inline int64_t get_inherit_sharding_index() const { return inherit_sharding_index_; } + inline void set_inherit_sharding_index(int64_t inherit_sharding_index) + { + inherit_sharding_index_ = inherit_sharding_index; + } + inline bool get_allocated_osg() const { return allocated_osg_; } inline void set_allocated_osg(bool allocated_osg) { @@ -1795,6 +1801,8 @@ protected: bool need_late_materialization_; // all non_const exprs for this op, generated by allocate_expr_pre and used by project pruning ObSEArray op_exprs_; + // Used to indicate which child node the current sharding inherits from + int64_t inherit_sharding_index_; // wether has allocated a osg_gather. bool allocated_osg_; };