diff --git a/src/sql/optimizer/ob_log_exchange.cpp b/src/sql/optimizer/ob_log_exchange.cpp index e78ef881d9..531fb2a30b 100644 --- a/src/sql/optimizer/ob_log_exchange.cpp +++ b/src/sql/optimizer/ob_log_exchange.cpp @@ -1048,3 +1048,44 @@ bool ObLogExchange::support_rich_format_vectorize() const { LOG_TRACE("[VEC2.0 PX] support_rich_format_vectorize", K(res), K(dist_method_), K(tmp_ret)); return res; } + +int ObLogExchange::open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_px_coord())) { + // do nothing. + } else if (OB_ISNULL(px_info_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("px info is null", K(ret), K(get_op_id())); + } else if (OB_FAIL(px_res_analyzer.recursive_walk_through_px_tree(*px_info_))) { + LOG_WARN("walk through px tree failed", K(ret)); + } else if (OB_FAIL(px_res_analyzer.append_px(OPEN_PX_RESOURCE_ANALYZE_ARG, *px_info_))) { + LOG_WARN("append px failed", K(ret)); + } else { + LOG_TRACE("[PxResAnaly] px coord open_px_resource_analyze", K(get_op_id()), + KPC(px_info_), K(append_map), K(cur_parallel_thread_count), K(cur_parallel_group_count), + K(max_parallel_thread_count), K(max_parallel_group_count)); + } + return ret; +} + +int ObLogExchange::close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_px_coord())) { + // do nothing. + } else if (OB_ISNULL(px_info_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("px info is null", K(ret)); + } else if (OB_FAIL(px_res_analyzer.remove_px(CLOSE_PX_RESOURCE_ANALYZE_ARG, *px_info_))) { + LOG_WARN("remove px failed", K(ret)); + } else { + if (append_map) { + // each operator should be open and close exactly once with append_map = true, so reset px_info_. + px_info_ = NULL; + } + LOG_TRACE("[PxResAnaly] px coord close_px_resource_analyze", K(get_op_id()), KPC(px_info_), + K(append_map), K(cur_parallel_thread_count), K(cur_parallel_group_count)); + } + return ret; +} \ No newline at end of file diff --git a/src/sql/optimizer/ob_log_exchange.h b/src/sql/optimizer/ob_log_exchange.h index 78a167e357..09b7d98065 100644 --- a/src/sql/optimizer/ob_log_exchange.h +++ b/src/sql/optimizer/ob_log_exchange.h @@ -15,6 +15,7 @@ #include "lib/allocator/page_arena.h" #include "sql/optimizer/ob_logical_operator.h" #include "sql/engine/px/ob_px_basic_info.h" + namespace oceanbase { namespace sql @@ -59,7 +60,8 @@ public: need_null_aware_shuffle_(false), is_old_unblock_mode_(true), sample_type_(NOT_INIT_SAMPLE_TYPE), - in_server_cnt_(0) + in_server_cnt_(0), + px_info_(NULL) { repartition_table_id_ = 0; } @@ -188,6 +190,10 @@ public: inline void set_in_server_cnt(int64_t in_server_cnt) { in_server_cnt_ = in_server_cnt; } inline int64_t get_in_server_cnt() { return in_server_cnt_; } bool support_rich_format_vectorize() const; + virtual int open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG) override; + virtual int close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG) override; + void set_px_info(ObPxResourceAnalyzer::PxInfo *px_info) { px_info_ = px_info; } + ObPxResourceAnalyzer::PxInfo *get_px_info() { return px_info_; } private: int prepare_px_pruning_param(ObLogicalOperator *op, int64_t &count, common::ObIArray &stmts, common::ObIArray &drop_expr_idxs); @@ -264,6 +270,7 @@ private: ObPxSampleType sample_type_; // -end pkey range/range int64_t in_server_cnt_; // for producer, need use exchange in server cnt to compute cost + ObPxResourceAnalyzer::PxInfo *px_info_; DISALLOW_COPY_AND_ASSIGN(ObLogExchange); }; } // end of namespace sql diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index a4c6f2d9bd..aa296a7caa 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -12666,7 +12666,7 @@ int ObLogPlan::calc_plan_resource() get_optimizer_context().get_minimal_worker_map()))) { LOG_WARN("fail analyze px stmt thread group reservation count", K(ret)); } else { - LOG_TRACE("max parallel thread group count", + LOG_TRACE("[PxResAnaly]max parallel thread group count", K(max_parallel_thread_count), K(max_parallel_group_count)); get_optimizer_context().set_expected_worker_count(max_parallel_thread_count); get_optimizer_context().set_minimal_worker_count(max_parallel_group_count); diff --git a/src/sql/optimizer/ob_log_subplan_filter.cpp b/src/sql/optimizer/ob_log_subplan_filter.cpp index 90988433a5..460b9d7565 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.cpp +++ b/src/sql/optimizer/ob_log_subplan_filter.cpp @@ -917,3 +917,52 @@ int ObLogSubPlanFilter::get_sub_qb_names(ObIArray &sub_qb_names) } return ret; } + +int ObLogSubPlanFilter::open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG) +{ + int ret = OB_SUCCESS; + ObLogicalOperator *child = NULL; + // prepare onetime exprs first. + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (!get_onetime_idxs().has_member(i)) { + // do nothing if it's not onetime expr + } else if (OB_ISNULL(child = get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child op is null", K(ret)); + } else if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } else if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } + } + // then schedule all other children + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (get_onetime_idxs().has_member(i)) { + // do nothing if it's onetime expr + } else if (OB_ISNULL(child = get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child op is null", K(ret)); + } else if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret), K(i)); + } + } + return ret; +} + +int ObLogSubPlanFilter::close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG) +{ + int ret = OB_SUCCESS; + ObLogicalOperator *child = NULL; + // close all non-onetime-expr children + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (get_onetime_idxs().has_member(i)) { + // do nothing if it's onetime expr + } else if (OB_ISNULL(child = get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child op is null", K(ret)); + } else if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } + } + return ret; +} diff --git a/src/sql/optimizer/ob_log_subplan_filter.h b/src/sql/optimizer/ob_log_subplan_filter.h index 3f6cd1d440..984fe10351 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.h +++ b/src/sql/optimizer/ob_log_subplan_filter.h @@ -133,6 +133,8 @@ public: virtual int compute_op_parallel_and_server_info() override; virtual int print_outline_data(PlanText &plan_text) override; virtual int print_used_hint(PlanText &plan_text) override; + virtual int open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG) override; + virtual int close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG) override; private: int extract_exist_style_subquery_exprs(ObRawExpr *expr, ObIArray &exist_style_exprs); diff --git a/src/sql/optimizer/ob_log_temp_table_transformation.h b/src/sql/optimizer/ob_log_temp_table_transformation.h index 07b8778a33..03eab9f454 100644 --- a/src/sql/optimizer/ob_log_temp_table_transformation.h +++ b/src/sql/optimizer/ob_log_temp_table_transformation.h @@ -30,7 +30,8 @@ public: virtual int compute_fd_item_set() override; virtual int est_cost() override; virtual int est_width() override; - virtual bool is_block_op() const override { return true; } + virtual bool is_block_op() const override { return false; } + virtual bool is_block_input(const int64_t child_idx) const override { return child_idx != get_num_of_child() - 1; } virtual int compute_op_parallel_and_server_info() override; virtual int do_re_est_cost(EstimateCostInfo ¶m, double &card, double &op_cost, double &cost) override; int get_temp_table_exprs(ObIArray &set_exprs) const; diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index b9d90ba3fc..a1e5bc763e 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -433,7 +433,9 @@ ObLogicalOperator::ObLogicalOperator(ObLogPlan &plan) need_late_materialization_(false), op_exprs_(), inherit_sharding_index_(-1), - need_osg_merge_(false) + need_osg_merge_(false), + max_px_thread_branch_(OB_INVALID_INDEX), + max_px_group_branch_(OB_INVALID_INDEX) { } @@ -5961,3 +5963,191 @@ int ObLogicalOperator::alloc_nodes_above(AllocOpContext& ctx, const uint64_t &fl } return ret; } + +int ObLogicalOperator::open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG) +{ + int ret = OB_SUCCESS; + ObLogicalOperator *child = NULL; + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (OB_ISNULL(get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child op is null", K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (is_block_op()) { + // block operator is usually single-child, so it doesn't matter whether consume child 1by1 actually. + if (is_consume_child_1by1()) { + // open and close child one by one. + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + child = get_child(i); + if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } else if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("close px resource analyze failed", K(ret)); + } + } + } else { + // open all then close all + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (OB_FAIL(SMART_CALL(get_child(i)->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } + } + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (OB_FAIL(SMART_CALL(get_child(i)->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("close px resource analyze failed", K(ret)); + } + } + } + } else if (is_consume_child_1by1()) { + // open and close all block input. for example: first child of HASH JOIN, HASH SET(except UNION) + int64_t child_idx = 0; + for (; child_idx < get_num_of_child() && is_block_input(child_idx) && OB_SUCC(ret); child_idx++) { + child = get_child(child_idx); + if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } else if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("close px resource analyze failed", K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(find_max_px_resource_child(OPEN_PX_RESOURCE_ANALYZE_ARG, child_idx))) { + LOG_WARN("find max px resource child failed", K(ret)); + } else { + for (; child_idx < get_num_of_child() && OB_SUCC(ret); child_idx++) { + if (child_idx == max_px_thread_branch_ || child_idx == max_px_group_branch_) { + // skip, open later + } else if (OB_FAIL(SMART_CALL(get_child(child_idx)->open_px_resource_analyze( + OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child open px resource analyze failed", K(ret)); + } else if (OB_FAIL(SMART_CALL(get_child(child_idx)->close_px_resource_analyze( + CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child close px resource analyze failed", K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (OB_UNLIKELY(OB_INVALID_INDEX == max_px_thread_branch_ || + OB_INVALID_INDEX == max_px_group_branch_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child with max parallel thread/group not found", K(ret)); + } else if (max_px_thread_branch_ >= get_num_of_child()) { + // all inputs are block, do nothing. + } else if (OB_FAIL(SMART_CALL(get_child(max_px_thread_branch_)->open_px_resource_analyze( + OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child open px resource analyze failed", K(ret)); + // open both child with max thread and child with max group. may be inaccurate, by design. + } else if (max_px_group_branch_ != max_px_thread_branch_ && + OB_FAIL(SMART_CALL(get_child(max_px_group_branch_)->open_px_resource_analyze( + OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child open px resource analyze failed", K(ret)); + } + } + } else { + // non block op and not consume child one by one, open all children. example: nlj, merge union distinct. + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (OB_FAIL(SMART_CALL(get_child(i)->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } + } + } + return ret; +} + +int ObLogicalOperator::close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG) +{ + int ret = OB_SUCCESS; + // close children that are opened and not closed in open_px_resource_analyze. + ObLogicalOperator *child = NULL; + if (is_block_op()) { + // do nothing because all children have been closed. + } else if (is_consume_child_1by1()) { + if (max_px_thread_branch_ >= get_num_of_child()) { + // all children are block input and have been closed, do nothing. + } else if (OB_FAIL(SMART_CALL(get_child(max_px_thread_branch_)->close_px_resource_analyze( + CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child open px resource analyze failed", K(ret)); + // close both child with max thread and child with max group. + } else if (max_px_group_branch_ != max_px_thread_branch_ && + OB_FAIL(SMART_CALL(get_child(max_px_group_branch_)->close_px_resource_analyze( + CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child open px resource analyze failed", K(ret)); + } + } else { + for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (OB_FAIL(SMART_CALL(get_child(i)->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } + } + } + return ret; +} + + +/* search for child with max running thread/group count. + * NESTEDLOOP JOIN + * UNION ALL PX4(dop=5) + * HASH JOIN PX3(dop=5) + * PX1(dop=10) PX2(dop=2) + * When nlj is outputting data, both union-all and PX4 are opened, the data of UNION-ALL may be from + * HASH JOIN or PX3, we need to know when the px work count reaches the maximum. + * When the data of UNION-ALL is from HASH JOIN, the px worker count of the plan equals to PX2 + PX4 = 7. + * When the data of UNION-ALL is from PX3, the px worker count of the plan equals to PX3 + PX4 = 10. + * Consider that before UNION-ALL output data, PX1 has to be opened and closed, + * so the final expected_px_worker_cnt = max(PX1, max(PX2 + PX4, PX3 + PX4) = 10. + * This function is to search for the child of UNION-ALL kept in open state when UNION-ALL is open. + * In the above plan, the result is 1. +*/ +int ObLogicalOperator::find_max_px_resource_child(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG, + int64_t first_nonblock_child) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(first_nonblock_child >= get_num_of_child() - 1)) { + max_px_thread_branch_ = first_nonblock_child; + max_px_group_branch_ = first_nonblock_child; + LOG_TRACE("[PxResAnaly] find max px resource child", K(get_op_id()), K(max_px_thread_branch_), + K(max_px_group_branch_)); + } else if (OB_INVALID_INDEX == max_px_thread_branch_) { + int64_t ori_thread_cnt = cur_parallel_thread_count; + int64_t ori_group_cnt = cur_parallel_group_count; + int64_t max_child_thread_cnt = -1; + int64_t max_child_group_cnt = -1; + ObLogicalOperator *child = NULL; + bool append_map = false; + for (int64_t i = first_nonblock_child; i < get_num_of_child() && OB_SUCC(ret); i++) { + if (OB_ISNULL(child = get_child(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("child is null", K(ret)); + } else if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child open px resource analyze failed", K(ret)); + } else { + int64_t thread_inc = cur_parallel_thread_count - ori_thread_cnt; + int64_t group_inc = cur_parallel_group_count - ori_group_cnt; + if (thread_inc > max_child_thread_cnt) { + max_child_thread_cnt = thread_inc; + max_px_thread_branch_ = i; + if (group_inc == max_child_group_cnt) { + // make max_px_group_branch_ be equal to max_px_thread_branch_ if possible. + max_px_group_branch_ = i; + } + } + if (group_inc > max_child_group_cnt) { + max_child_group_cnt = group_inc; + max_px_group_branch_ = i; + if (thread_inc == max_child_thread_cnt) { + max_px_thread_branch_ = i; + } + } + if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) { + LOG_WARN("child close px resource analyze failed", K(ret)); + } else { + OB_ASSERT(cur_parallel_thread_count == ori_thread_cnt); + OB_ASSERT(cur_parallel_group_count == ori_group_cnt); + } + } + } + LOG_TRACE("[PxResAnaly] find max px resource child", K(get_op_id()), K(max_px_thread_branch_), + K(max_px_group_branch_)); + } + return ret; +} diff --git a/src/sql/optimizer/ob_logical_operator.h b/src/sql/optimizer/ob_logical_operator.h index 30bcbfa710..670f7674b6 100644 --- a/src/sql/optimizer/ob_logical_operator.h +++ b/src/sql/optimizer/ob_logical_operator.h @@ -31,6 +31,8 @@ #include "sql/optimizer/ob_fd_item.h" #include "sql/monitor/ob_sql_plan.h" #include "sql/monitor/ob_plan_info_manager.h" +#include "sql/optimizer/ob_px_resource_analyzer.h" + namespace oceanbase { namespace sql @@ -39,6 +41,7 @@ struct JoinFilterInfo; struct EstimateCostInfo; struct ObSqlPlanItem; struct PlanText; +class ObPxResourceAnalyzer; struct partition_location { int64_t partition_id; @@ -1703,6 +1706,18 @@ public: int pick_out_startup_filters(); int check_contain_false_startup_filter(bool &contain_false); + // Make the operator in state of output data. + // 1. If this operator is single-child and non-block, then open its child. + // 2. If this operator is single-child and block, then open and close its child. + // 3. If this operator has multiple children, open all children by default and + // operators should override this function according to their unique execution logic. + // append_map = false means it's in the progress of LogSet searching for child + // with max running thread/group count, and we will not modify max_count or max_map. + virtual int open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG); + // Make the operator in state that all data has been outputted already. + virtual int close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG); + int find_max_px_resource_child(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG, int64_t start_idx); + public: ObSEArray child_; ObSEArray equal_param_constraints_; @@ -1920,6 +1935,8 @@ protected: int64_t inherit_sharding_index_; // wether has allocated a osg_gather. bool need_osg_merge_; + int64_t max_px_thread_branch_; + int64_t max_px_group_branch_; }; template diff --git a/src/sql/optimizer/ob_px_resource_analyzer.cpp b/src/sql/optimizer/ob_px_resource_analyzer.cpp index 0a50575c66..97bf39e147 100644 --- a/src/sql/optimizer/ob_px_resource_analyzer.cpp +++ b/src/sql/optimizer/ob_px_resource_analyzer.cpp @@ -143,11 +143,12 @@ int DfoInfo::get_child(int64_t idx, DfoInfo *&child) ObPxResourceAnalyzer::ObPxResourceAnalyzer() - : dfo_allocator_(CURRENT_CONTEXT->get_malloc_allocator()) + : allocator_(CURRENT_CONTEXT->get_malloc_allocator()) { - dfo_allocator_.set_label("PxResourceAnaly"); + allocator_.set_label("PxResourceAnaly"); } + // entry function int ObPxResourceAnalyzer::analyze( ObLogicalOperator &root_op, @@ -170,30 +171,74 @@ int ObPxResourceAnalyzer::analyze( // 3. 如此继续调度,直至所有 dfo 调度完成 // // ref: - ObArray px_trees; - if (OB_FAIL(convert_log_plan_to_nested_px_tree(px_trees, root_op))) { + if (log_op_def::LOG_EXCHANGE == root_op.get_type() && + static_cast(&root_op)->get_is_remote()) { + max_parallel_thread_count = 0; + max_parallel_group_count = 0; + } else if (OB_FAIL(convert_log_plan_to_nested_px_tree(root_op))) { LOG_WARN("fail convert log plan to nested px tree", K(ret)); - } else if (OB_FAIL(walk_through_px_trees(px_trees, - max_parallel_thread_count, - max_parallel_group_count, - max_parallel_thread_map, - max_parallel_group_map))) { - LOG_WARN("fail calc max parallel thread group count for resource reservation", K(ret)); + } else if (OB_FAIL(walk_through_logical_plan(root_op, max_parallel_thread_count, + max_parallel_group_count, + max_parallel_thread_map, + max_parallel_group_map))) { + LOG_WARN("walk through logical plan failed", K(ret)); + } + for (int64_t i = 0; i < px_trees_.count(); ++i) { + if (OB_NOT_NULL(px_trees_.at(i))) { + px_trees_.at(i)->destroy(); + px_trees_.at(i) = NULL; + } } - reset_px_tree(px_trees); return ret; } -void ObPxResourceAnalyzer::reset_px_tree(ObIArray &px_trees) +// append thread usage of px_info to current thread usage stored in ObPxResourceAnalyzer and update max usage. +int ObPxResourceAnalyzer::append_px(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG, PxInfo &px_info) { - for (int i = 0; i < px_trees.count(); ++i) { - px_trees.at(i).reset_dfo(); + int ret = OB_SUCCESS; + cur_parallel_thread_count += px_info.threads_cnt_; + cur_parallel_group_count += px_info.group_cnt_; + if (!append_map) { + // only increase current parallel count. + } else if (OB_FAIL(px_tree_append(cur_parallel_thread_map, px_info.thread_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } else if (OB_FAIL(px_tree_append(cur_parallel_group_map, px_info.group_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } else { + max_parallel_thread_count = max(max_parallel_thread_count, cur_parallel_thread_count); + max_parallel_group_count = max(max_parallel_group_count, cur_parallel_group_count); + FOREACH_X(iter, cur_parallel_thread_map, OB_SUCC(ret)) { + if (OB_FAIL(update_parallel_map_one_addr(max_parallel_thread_map, iter->first, iter->second, false /*append*/))) { + LOG_WARN("update parallel map one addr failed", K(ret)); + } + } + FOREACH_X(iter, cur_parallel_group_map, OB_SUCC(ret)) { + if (OB_FAIL(update_parallel_map_one_addr(max_parallel_group_map, iter->first, iter->second, false /*append*/))) { + LOG_WARN("update parallel map one addr failed", K(ret)); + } + } } + return ret; } -int ObPxResourceAnalyzer::convert_log_plan_to_nested_px_tree( - ObIArray &px_trees, - ObLogicalOperator &root_op) +int ObPxResourceAnalyzer::remove_px(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG, PxInfo &px_info) +{ + int ret = OB_SUCCESS; + cur_parallel_thread_count -= px_info.threads_cnt_; + cur_parallel_group_count -= px_info.group_cnt_; + if (!append_map) { + // only decrease current parallel count. + } else if (OB_FAIL(px_tree_append(cur_parallel_thread_map, px_info.thread_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } else if (OB_FAIL(px_tree_append(cur_parallel_group_map, px_info.group_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } + return ret; + +} + +// either root_op is a px coordinator or there is no px coordinator above root_op. +int ObPxResourceAnalyzer::convert_log_plan_to_nested_px_tree(ObLogicalOperator &root_op) { int ret = OB_SUCCESS; // 算法逻辑上分为两步走: @@ -210,7 +255,7 @@ int ObPxResourceAnalyzer::convert_log_plan_to_nested_px_tree( } else if (log_op_def::LOG_EXCHANGE == root_op.get_type() && static_cast(&root_op)->is_px_consumer()) { // 当前 exchange 是一个 QC,将下面的所有子计划抽象成一个 dfo tree - if (OB_FAIL(create_dfo_tree(px_trees, static_cast(root_op)))) { + if (OB_FAIL(create_dfo_tree(static_cast(root_op)))) { LOG_WARN("fail create dfo tree", K(ret)); } } else { @@ -220,7 +265,7 @@ int ObPxResourceAnalyzer::convert_log_plan_to_nested_px_tree( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null ptr", K(child_idx), K(num), K(ret)); } else if (OB_FAIL(SMART_CALL(convert_log_plan_to_nested_px_tree( - px_trees, *root_op.get_child(child_idx))))) { + *root_op.get_child(child_idx))))) { LOG_WARN("fail split px tree", K(child_idx), K(num), K(ret)); } } @@ -228,41 +273,46 @@ int ObPxResourceAnalyzer::convert_log_plan_to_nested_px_tree( return ret; } -int ObPxResourceAnalyzer::create_dfo_tree( - ObIArray &px_trees, - ObLogExchange &root_op) +// root_op is a PX COORDINATOR +int ObPxResourceAnalyzer::create_dfo_tree(ObLogExchange &root_op) { int ret = OB_SUCCESS; // 以 root_op 为根节点创建一个 dfo tree - // root_op 的类型一定是 EXCHANGE OUT DIST + // root_op 的类型一定是 EXCHANGE IN DIST // 在向下遍历构造 dfo tree 时,如果遇到 subplan filter 右侧的 exchange, // 则将其也转化成一个独立的 dfo tree - PxInfo px_info; - DfoInfo *root_dfo = nullptr; - px_info.root_op_ = &root_op; + PxInfo *px_info = NULL; ObLogicalOperator *child = root_op.get_child(ObLogicalOperator::first_child); - if (OB_ISNULL(child)) { + void *mem_ptr = allocator_.alloc(sizeof(PxInfo)); + if (OB_ISNULL(mem_ptr)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail allocate memory", K(ret)); + } else { + px_info = new(mem_ptr) PxInfo(); + px_info->root_op_ = &root_op; + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(px_trees_.push_back(px_info))) { + LOG_WARN("push back failed", K(ret)); + } else if (OB_ISNULL(child)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("exchange out op should always has a child", "type", root_op.get_type(), KP(child), K(ret)); - } else if (log_op_def::LOG_EXCHANGE != child->get_type() || static_cast(child)->is_px_consumer()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("expect a px producer below qc op", "type", root_op.get_type(), K(ret)); - } else if (OB_FAIL(do_split(px_trees, px_info, *child, root_dfo))) { + } else if (OB_FAIL(do_split(*px_info, *child, NULL /*root_dfo*/))) { LOG_WARN("fail split dfo for current dfo tree", K(ret)); - } else if (OB_FAIL(px_trees.push_back(px_info))) { // 先遇到的 px 后进入 px_tree,无妨 - LOG_WARN("fail push back root dfo to dfo tree collector", K(ret)); - px_info.reset_dfo(); + } else { + (static_cast(root_op)).set_px_info(px_info); } return ret; } int ObPxResourceAnalyzer::do_split( - ObIArray &px_trees, PxInfo &px_info, ObLogicalOperator &root_op, DfoInfo *parent_dfo) @@ -287,7 +337,10 @@ int ObPxResourceAnalyzer::do_split( } else if (log_op_def::LOG_EXCHANGE == root_op.get_type() && static_cast(root_op).is_px_consumer() && static_cast(root_op).is_rescanable()) { - if (OB_FAIL(convert_log_plan_to_nested_px_tree(px_trees,root_op))) { + if (OB_NOT_NULL(parent_dfo)) { + parent_dfo->has_nested_px_ = true; + } + if (OB_FAIL(convert_log_plan_to_nested_px_tree(root_op))) { LOG_WARN("fail create qc for rescan op", K(ret)); } } else { @@ -308,7 +361,7 @@ int ObPxResourceAnalyzer::do_split( LOG_WARN("fail to create hash set", K(ret)); } else if (OB_FAIL(get_dfo_addr_set(root_op, dfo->location_addr_))) { LOG_WARN("get addr_set failed", K(ret)); - dfo->reset(); + dfo->destroy(); } else { if (nullptr == parent_dfo) { px_info.root_dfo_ = dfo; @@ -328,7 +381,6 @@ int ObPxResourceAnalyzer::do_split( ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null ptr", K(child_idx), K(num), K(ret)); } else if (OB_FAIL(SMART_CALL(do_split( - px_trees, px_info, *root_op.get_child(child_idx), parent_dfo)))) { @@ -361,7 +413,7 @@ int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, ObLogicalOperator &root_op) { int ret = OB_SUCCESS; int64_t dop = static_cast(root_op).get_parallel(); - void *mem_ptr = dfo_allocator_.alloc(sizeof(DfoInfo)); + void *mem_ptr = allocator_.alloc(sizeof(DfoInfo)); if (OB_ISNULL(mem_ptr)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail allocate memory", K(ret)); @@ -428,8 +480,10 @@ int ObPxResourceAnalyzer::get_dfo_addr_set(const ObLogicalOperator &root_op, ObH return ret; } -int ObPxResourceAnalyzer::walk_through_px_trees( - ObIArray &px_trees, +// root_op is the root operator of the plan or a dfo with nested px coord. +// This function calculates the px usage of the plan or the dfo. +int ObPxResourceAnalyzer::walk_through_logical_plan( + ObLogicalOperator &root_op, int64_t &max_parallel_thread_count, int64_t &max_parallel_group_count, ObHashMap &max_parallel_thread_map, @@ -439,9 +493,17 @@ int ObPxResourceAnalyzer::walk_through_px_trees( int64_t bucket_size = cal_next_prime(10); max_parallel_thread_count = 0; max_parallel_group_count = 0; - ObHashMap thread_map; - ObHashMap group_map; - if (max_parallel_thread_map.created()) { + int64_t cur_parallel_thread_count = 0; + int64_t cur_parallel_group_count = 0; + ObHashMap cur_parallel_thread_map; + ObHashMap cur_parallel_group_map; + if (OB_FAIL(cur_parallel_thread_map.create(bucket_size, ObModIds::OB_SQL_PX, + ObModIds::OB_SQL_PX))){ + LOG_WARN("create hash map failed", K(ret)); + } else if (OB_FAIL(cur_parallel_group_map.create(bucket_size, ObModIds::OB_SQL_PX, + ObModIds::OB_SQL_PX))){ + LOG_WARN("create hash map failed", K(ret)); + } else if (max_parallel_thread_map.created()) { max_parallel_thread_map.clear(); max_parallel_group_map.clear(); } else if (OB_FAIL(max_parallel_thread_map.create(bucket_size, @@ -452,40 +514,46 @@ int ObPxResourceAnalyzer::walk_through_px_trees( ObModIds::OB_SQL_PX, ObModIds::OB_SQL_PX))){ LOG_WARN("create hash map failed", K(ret)); - } else if (OB_FAIL(thread_map.create(bucket_size, ObModIds::OB_SQL_PX, ObModIds::OB_SQL_PX))){ - LOG_WARN("create hash map failed", K(ret)); - } else if (OB_FAIL(group_map.create(bucket_size, ObModIds::OB_SQL_PX, ObModIds::OB_SQL_PX))){ - LOG_WARN("create hash map failed", K(ret)); } - for (int64_t i = 0; OB_SUCC(ret) && i < px_trees.count(); ++i) { - PxInfo &px_info = px_trees.at(i); - int64_t thread_count = 0; - int64_t group_count = 0; - thread_map.clear(); - group_map.clear(); - if (OB_FAIL(px_info.rf_dpd_info_.describe_dependency(px_info.root_dfo_))) { + ObPxResourceAnalyzer &px_res_analyzer = *this; + bool append_map = true; + if (OB_SUCC(ret) && OB_FAIL(root_op.open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG))) { + LOG_WARN("open px resource analyze failed", K(ret)); + } + return ret; +} + +// walk through the px_tree and all its children px_trees recursively to init thread/group usage of px_tree. +int ObPxResourceAnalyzer::recursive_walk_through_px_tree(PxInfo &px_tree) +{ + int ret = OB_SUCCESS; + if (!px_tree.inited_) { + px_tree.threads_cnt_ = 0; + px_tree.group_cnt_ = 0; + int64_t bucket_size = cal_next_prime(10); + if (OB_FAIL(px_tree.thread_map_.create(bucket_size, ObModIds::OB_SQL_PX, ObModIds::OB_SQL_PX))){ + LOG_WARN("create hash map failed", K(ret)); + } else if (OB_FAIL(px_tree.group_map_.create(bucket_size, ObModIds::OB_SQL_PX, ObModIds::OB_SQL_PX))){ + LOG_WARN("create hash map failed", K(ret)); + } else if (OB_FAIL(px_tree.rf_dpd_info_.describe_dependency(px_tree.root_dfo_))) { LOG_WARN("failed to describe dependency"); - } else if (OB_FAIL(walk_through_dfo_tree(px_info, thread_count, group_count, thread_map, group_map))) { - LOG_WARN("fail calc px thread group count", K(i), "total", px_trees.count(), K(ret)); - } else if (OB_ISNULL(px_info.root_op_)) { + } else if (OB_FAIL(walk_through_dfo_tree(px_tree, px_tree.threads_cnt_, px_tree.group_cnt_, + px_tree.thread_map_, px_tree.group_map_))) { + LOG_WARN("fail calc px thread group count", K(ret)); + } else { + int64_t op_id = OB_ISNULL(px_tree.root_op_) ? OB_INVALID_ID : px_tree.root_op_->get_op_id(); + LOG_TRACE("after walk_through_dfo_tree", K(op_id), K(px_tree)); + } + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(px_tree.root_op_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("QC op not set in px_info struct", K(ret)); } else { - px_info.threads_ = thread_count; // 将当前 px 的 expected 线程数设置到 QC 算子中 - px_info.root_op_->set_expected_worker_count(thread_count); - - max_parallel_thread_count += thread_count; - max_parallel_group_count += group_count; - if (OB_FAIL(px_tree_append(max_parallel_thread_map, thread_map))) { - LOG_WARN("px tree dop append failed", K(ret)); - } else if (OB_FAIL(px_tree_append(max_parallel_group_map, group_map))) { - LOG_WARN("px tree dop append failed", K(ret)); - } + px_tree.root_op_->set_expected_worker_count(px_tree.threads_cnt_); } + px_tree.inited_ = true; } - thread_map.destroy(); - group_map.destroy(); return ret; } @@ -601,6 +669,7 @@ int ObPxResourceAnalyzer::walk_through_dfo_tree( return ret; } +template int ObPxResourceAnalyzer::px_tree_append(ObHashMap &max_parallel_count, ObHashMap ¶llel_count) { @@ -614,11 +683,13 @@ int ObPxResourceAnalyzer::px_tree_append(ObHashMap &max_paralle LOG_WARN("get refactored failed", K(ret), K(it->first)); } else { is_exist = false; - ret = OB_SUCCESS; + if (append) { + ret = OB_SUCCESS; + } } } if (OB_SUCC(ret)) { - dop += it->second; + dop += append ? it->second : -(it->second); if (OB_FAIL(max_parallel_count.set_refactored(it->first, dop, is_exist))){ LOG_WARN("set refactored failed", K(ret), K(it->first), K(dop), K(is_exist)); } @@ -648,7 +719,32 @@ int ObPxResourceAnalyzer::schedule_dfo( LOG_WARN("increase current thread map failed", K(ret)); } else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, group))) { LOG_WARN("increase current group map failed", K(ret)); + } else if (dfo.has_nested_px_) { + ObLogicalOperator *root_op = NULL; + ObLogicalOperator *child = NULL; + if (OB_ISNULL(root_op = dfo.get_root_op()) || log_op_def::LOG_EXCHANGE != root_op->get_type() + || !static_cast(root_op)->is_px_producer() + || OB_ISNULL(child = root_op->get_child(0))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected root op", K(ret), K(root_op)); + // calculate px usage of nested px coord. + } else if (OB_FAIL(walk_through_logical_plan(*child, dfo.nested_px_thread_cnt_, dfo.nested_px_group_cnt_, + dfo.nested_px_thread_map_, dfo.nested_px_group_map_))) { + LOG_WARN("walk through logical plan", K(ret)); + } else { + // append px usage of nested px coord to the dfo. + threads += dfo.nested_px_thread_cnt_; + groups += dfo.nested_px_group_cnt_; + if (OB_FAIL(px_tree_append(current_thread_map, dfo.nested_px_thread_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } else if (OB_FAIL(px_tree_append(current_group_map, dfo.nested_px_group_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } + } } + LOG_TRACE("[PxResAnaly] schedule dfo", K(dfo.dop_), K(dfo.has_nested_px_), + K(dfo.nested_px_thread_cnt_), K(dfo.nested_px_group_cnt_), K(threads), K(groups), + K(OB_ISNULL(dfo.root_op_) ? OB_INVALID_ID : dfo.root_op_->get_op_id())); } return ret; } @@ -672,7 +768,18 @@ int ObPxResourceAnalyzer::finish_dfo( LOG_WARN("decrease current thread map failed", K(ret)); } else if (OB_FAIL(update_parallel_map(current_group_map, addr_set, -group))) { LOG_WARN("decrease current group map failed", K(ret)); + } else if (dfo.has_nested_px_) { + threads -= dfo.nested_px_thread_cnt_; + groups -= dfo.nested_px_group_cnt_; + if (OB_FAIL(px_tree_append(current_thread_map, dfo.nested_px_thread_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } else if (OB_FAIL(px_tree_append(current_group_map, dfo.nested_px_group_map_))) { + LOG_WARN("px tree append failed", K(ret)); + } } + LOG_TRACE("[PxResAnaly] finish dfo", K(dfo.dop_), K(dfo.has_nested_px_), + K(dfo.nested_px_thread_cnt_), K(dfo.nested_px_group_cnt_), K(threads), K(groups), + K(OB_ISNULL(dfo.root_op_) ? OB_INVALID_ID : dfo.root_op_->get_op_id())); } return ret; } diff --git a/src/sql/optimizer/ob_px_resource_analyzer.h b/src/sql/optimizer/ob_px_resource_analyzer.h index a43f21908c..07990ee201 100644 --- a/src/sql/optimizer/ob_px_resource_analyzer.h +++ b/src/sql/optimizer/ob_px_resource_analyzer.h @@ -21,6 +21,36 @@ using namespace common::hash; namespace sql { +#define OPEN_PX_RESOURCE_ANALYZE_ARG \ + cur_parallel_thread_count, cur_parallel_group_count, \ + cur_parallel_thread_map, cur_parallel_group_map, \ + max_parallel_thread_count, max_parallel_group_count, \ + max_parallel_thread_map, max_parallel_group_map, \ + px_res_analyzer, append_map + +#define OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG \ + int64_t &cur_parallel_thread_count, \ + int64_t &cur_parallel_group_count, \ + hash::ObHashMap &cur_parallel_thread_map, \ + hash::ObHashMap &cur_parallel_group_map, \ + int64_t &max_parallel_thread_count, \ + int64_t &max_parallel_group_count, \ + hash::ObHashMap &max_parallel_thread_map, \ + hash::ObHashMap &max_parallel_group_map, \ + ObPxResourceAnalyzer &px_res_analyzer, bool append_map + +#define CLOSE_PX_RESOURCE_ANALYZE_ARG \ + cur_parallel_thread_count, cur_parallel_group_count, \ + cur_parallel_thread_map, cur_parallel_group_map, \ + px_res_analyzer, append_map + +#define CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG \ + int64_t &cur_parallel_thread_count, \ + int64_t &cur_parallel_group_count, \ + hash::ObHashMap &cur_parallel_thread_map, \ + hash::ObHashMap &cur_parallel_group_map, \ + ObPxResourceAnalyzer &px_res_analyzer, bool append_map + enum DfoStatus { INIT, // 未调度,不占用线程资源 SCHED, // 执行中,占用线程资源 @@ -44,7 +74,10 @@ struct DfoInfo { dop_(0), location_addr_(), force_bushy_(false), - root_op_(nullptr) + root_op_(nullptr), + has_nested_px_(false), + nested_px_thread_cnt_(0), + nested_px_group_cnt_(0) {} DfoInfo *parent_; DfoInfo *depend_sibling_; @@ -54,14 +87,21 @@ struct DfoInfo { ObHashSet location_addr_; bool force_bushy_; ObLogicalOperator *root_op_; + bool has_nested_px_; + int64_t nested_px_thread_cnt_; + int64_t nested_px_group_cnt_; + ObHashMap nested_px_thread_map_; + ObHashMap nested_px_group_map_; - void reset() + void destroy() { for (int64_t i = 0; i < child_dfos_.count(); i++) { - child_dfos_.at(i)->reset(); + child_dfos_.at(i)->destroy(); } child_dfos_.reset(); location_addr_.destroy(); + nested_px_thread_map_.destroy(); + nested_px_group_map_.destroy(); } inline void set_root_op(ObLogicalOperator *root_op) { root_op_ = root_op;} @@ -99,7 +139,7 @@ struct DfoInfo { } return f; } - TO_STRING_KV(K_(status), K_(dop)); + TO_STRING_KV(K_(status), K_(dop), K_(has_nested_px)); }; struct LogRuntimeFilterDependencyInfo @@ -120,36 +160,40 @@ public: }; class ObLogExchange; -struct PxInfo { - PxInfo() : root_op_(nullptr), root_dfo_(nullptr), threads_(0), - acc_threads_(0), rf_dpd_info_() {} - PxInfo(ObLogExchange *root_op, DfoInfo *root_dfo) - : root_op_(root_op), root_dfo_(root_dfo), threads_(0), acc_threads_(0), rf_dpd_info_() - {} - void reset_dfo() - { - if (OB_NOT_NULL(root_dfo_)) { - root_dfo_->reset(); - root_dfo_ = NULL; - } - } - ObLogExchange *root_op_; - DfoInfo *root_dfo_; - int64_t threads_; // 记录当前 PX 需要的线程组数 - int64_t acc_threads_; // 记录当前 PX 计划以及它下面的嵌套 PX 计划线程组数之和 - LogRuntimeFilterDependencyInfo rf_dpd_info_; - TO_STRING_KV(K_(threads), K_(acc_threads)); -}; - - - - /* * 计算逻辑计划需要预约多少组线程才能调度成功 */ class ObPxResourceAnalyzer { +public: +struct PxInfo { + PxInfo() : inited_(false), root_op_(nullptr), root_dfo_(nullptr), threads_cnt_(0), group_cnt_(0), + rf_dpd_info_() {} + PxInfo(ObLogExchange *root_op, DfoInfo *root_dfo) + : inited_(false), root_op_(root_op), root_dfo_(root_dfo), + threads_cnt_(0), group_cnt_(0), rf_dpd_info_() + {} + void destroy() + { + if (OB_NOT_NULL(root_dfo_)) { + root_dfo_->destroy(); + root_dfo_ = NULL; + } + thread_map_.destroy(); + group_map_.destroy(); + } + bool inited_; + ObLogExchange *root_op_; + DfoInfo *root_dfo_; + // count of required threads for scheduling this px. + int64_t threads_cnt_; + int64_t group_cnt_; + ObHashMap thread_map_; + ObHashMap group_map_; + LogRuntimeFilterDependencyInfo rf_dpd_info_; + TO_STRING_KV(K_(threads_cnt), K_(group_cnt)); +}; public: ObPxResourceAnalyzer(); ~ObPxResourceAnalyzer() = default; @@ -159,24 +203,23 @@ public: int64_t &max_parallel_group_count, ObHashMap &max_parallel_thread_map, ObHashMap &max_parallel_group_map); + int append_px(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG, PxInfo &px_info); + int remove_px(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG, PxInfo &px_info); + int recursive_walk_through_px_tree(PxInfo &px_tree); + private: - int convert_log_plan_to_nested_px_tree( - common::ObIArray &px_trees, - ObLogicalOperator &root_op); - int create_dfo_tree( - ObIArray &px_trees, - ObLogExchange &root_op); + int convert_log_plan_to_nested_px_tree(ObLogicalOperator &root_op); + int create_dfo_tree(ObLogExchange &root_op); int do_split( - common::ObIArray &px_trees, PxInfo &px_info, ObLogicalOperator &root_op, DfoInfo *parent_dfo); - int walk_through_px_trees( - common::ObIArray &px_trees, - int64_t &max_parallel_thread_count, - int64_t &max_parallel_group_count, - ObHashMap &max_parallel_thread_map, - ObHashMap &max_parallel_group_map); + int walk_through_logical_plan( + ObLogicalOperator &root_op, + int64_t &max_parallel_thread_count, + int64_t &max_parallel_group_count, + ObHashMap &max_parallel_thread_map, + ObHashMap &max_parallel_group_map); int walk_through_dfo_tree( PxInfo &px_root, int64_t &max_parallel_thread_count, @@ -186,6 +229,7 @@ private: int create_dfo(DfoInfo *&dfo, int64_t dop); int create_dfo(DfoInfo *&dfo, ObLogicalOperator &root_op); int get_dfo_addr_set(const ObLogicalOperator &root_op, ObHashSet &addr_set); + template int px_tree_append(ObHashMap &max_parallel_count, ObHashMap ¶llel_count); int schedule_dfo( @@ -219,10 +263,11 @@ int update_max_thead_group_info( ObHashMap &max_parallel_thread_map, ObHashMap &max_parallel_group_map); private: - void reset_px_tree(ObIArray &px_trees); + void print_px_usage(const ObHashMap &max_map); private: /* variables */ - common::ObArenaAllocator dfo_allocator_; + common::ObArenaAllocator allocator_; + ObArray px_trees_; DISALLOW_COPY_AND_ASSIGN(ObPxResourceAnalyzer); }; @@ -297,9 +342,9 @@ int DfoTreeNormalizer::normalize(T &root) ARRAY_FOREACH_X(root.child_dfos_, idx, cnt, OB_SUCC(ret)) { if (OB_ISNULL(root.child_dfos_.at(idx))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("NULL ptr", K(idx), K(cnt), K(ret)); + SQL_LOG(WARN, "NULL ptr", K(idx), K(cnt), K(ret)); } else if (OB_FAIL(normalize(*root.child_dfos_.at(idx)))) { - LOG_WARN("fail normalize dfo", K(idx), K(cnt), K(ret)); + SQL_LOG(WARN, "fail normalize dfo", K(idx), K(cnt), K(ret)); } } }