diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 1bf974afa7..da580c5f4f 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -5628,9 +5628,14 @@ int ObStaticEngineCG::generate_spec(ObLogWindowFunction &op, ObWindowFunctionSpe LOG_ERROR("wrong number of children", K(ret), K(op.get_num_of_child())); } if (OB_SUCC(ret) && op.is_range_dist_parallel()) { - OZ(fill_sort_info(op.get_rd_sort_keys(), spec.rd_sort_collations_, rd_expr)); - OZ(fill_sort_funcs(spec.rd_sort_collations_, spec.rd_sort_cmp_funcs_, rd_expr)); - OZ(append(all_expr, rd_expr)); + ObSEArray rd_sort_keys; + if (OB_FAIL(op.get_rd_sort_keys(rd_sort_keys))) { + LOG_WARN("Get unexpected null", K(ret)); + } else { + OZ(fill_sort_info(rd_sort_keys, spec.rd_sort_collations_, rd_expr)); + OZ(fill_sort_funcs(spec.rd_sort_collations_, spec.rd_sort_cmp_funcs_, rd_expr)); + OZ(append(all_expr, rd_expr)); + } } if (OB_FAIL(ret)) { diff --git a/src/sql/optimizer/ob_log_window_function.cpp b/src/sql/optimizer/ob_log_window_function.cpp index 8368446890..fd4f79ac6a 100644 --- a/src/sql/optimizer/ob_log_window_function.cpp +++ b/src/sql/optimizer/ob_log_window_function.cpp @@ -15,6 +15,7 @@ #include "ob_opt_est_cost.h" #include "sql/optimizer/ob_join_order.h" #include "common/ob_smart_call.h" +#include "sql/optimizer/ob_log_exchange.h" #define PRINT_BOUND(bound_name, bound) \ if (OB_SUCC(ret)) { \ @@ -77,9 +78,14 @@ using namespace oceanbase::sql::log_op_def; int ObLogWindowFunction::get_op_exprs(ObIArray &all_exprs) { int ret = OB_SUCCESS; - FOREACH_CNT_X(it, rd_sort_keys_, OB_SUCC(ret)) { - if (OB_FAIL(all_exprs.push_back(it->expr_))) { - LOG_WARN("array push back failed", K(ret)); + if (OB_UNLIKELY(sort_keys_.count() < rd_sort_keys_cnt_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected params", K(ret), K(sort_keys_.count()), K(rd_sort_keys_cnt_)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < rd_sort_keys_cnt_; i++) { + if (OB_FAIL(all_exprs.push_back(sort_keys_.at(i).expr_))) { + LOG_WARN("array push back failed", K(ret)); + } } } if (OB_FAIL(ret)) { @@ -336,11 +342,18 @@ int ObLogWindowFunction::get_win_partition_intersect_exprs(ObIArrayget_type() + && child->get_is_local_order()) + ) && !get_op_ordering().empty(); } return ret; } @@ -430,7 +443,7 @@ int ObLogWindowFunction::inner_replace_op_exprs( const common::ObIArray> &to_replace_exprs) { int ret = OB_SUCCESS; - FOREACH_X(key, rd_sort_keys_, OB_SUCC(ret)) { + FOREACH_X(key, sort_keys_, OB_SUCC(ret)) { if (OB_FAIL(replace_expr_action(to_replace_exprs, key->expr_))) { LOG_WARN("replace expr failed", K(ret)); } @@ -454,3 +467,20 @@ int ObLogWindowFunction::inner_replace_op_exprs( } return ret; } + +int ObLogWindowFunction::get_rd_sort_keys(common::ObIArray &rd_sort_keys) +{ + int ret = OB_SUCCESS; + rd_sort_keys.reuse(); + if (OB_UNLIKELY(sort_keys_.count() < rd_sort_keys_cnt_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected params", K(ret), K(sort_keys_.count()), K(rd_sort_keys_cnt_)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < rd_sort_keys_cnt_; i++) { + if (OB_FAIL(rd_sort_keys.push_back(sort_keys_.at(i)))) { + LOG_WARN("array push back failed", K(ret)); + } + } + } + return ret; +} diff --git a/src/sql/optimizer/ob_log_window_function.h b/src/sql/optimizer/ob_log_window_function.h index 05faa05e1e..e3c9ff3be2 100644 --- a/src/sql/optimizer/ob_log_window_function.h +++ b/src/sql/optimizer/ob_log_window_function.h @@ -32,6 +32,7 @@ namespace sql single_part_parallel_(false), range_dist_parallel_(false), role_type_(WindowFunctionRoleType::NORMAL), + rd_sort_keys_cnt_(0), rd_pby_sort_cnt_(0), wf_aggr_status_expr_(NULL) {} @@ -69,14 +70,16 @@ namespace sql bool is_push_down() { return PARTICIPATOR == role_type_|| CONSOLIDATOR == role_type_; } bool is_participator() { return PARTICIPATOR == role_type_; } bool is_consolidator() { return CONSOLIDATOR == role_type_; } - int set_rd_sort_keys(const common::ObIArray &sort_keys) + int get_rd_sort_keys(common::ObIArray &rd_sort_keys); + int set_sort_keys(const common::ObIArray &sort_keys) { - return rd_sort_keys_.assign(sort_keys); + return sort_keys_.assign(sort_keys); } - const common::ObIArray &get_rd_sort_keys() const + const common::ObIArray &get_sort_keys() const { - return rd_sort_keys_; + return sort_keys_; } + void set_rd_sort_keys_cnt(const int64_t cnt) { rd_sort_keys_cnt_ = cnt; } int set_pushdown_info(const common::ObIArray &pushdown_info) { return pushdown_info_.assign(pushdown_info); @@ -121,8 +124,10 @@ namespace sql // https://yuque.antfin.com/ob/sql/wf_adaptive_parallel_execution#69270e6f WindowFunctionRoleType role_type_; - // sort keys for range distributed parallel. - common::ObSEArray rd_sort_keys_; + // sort keys needed for window function + common::ObSEArray sort_keys_; + // sort keys count for range distributed parallel. + int64_t rd_sort_keys_cnt_; // the first %rd_pby_sort_cnt_ of %rd_sort_keys_ is the partition by of window function. int64_t rd_pby_sort_cnt_; diff --git a/src/sql/optimizer/ob_select_log_plan.cpp b/src/sql/optimizer/ob_select_log_plan.cpp index 33a38b9054..2936d88b05 100644 --- a/src/sql/optimizer/ob_select_log_plan.cpp +++ b/src/sql/optimizer/ob_select_log_plan.cpp @@ -1226,7 +1226,8 @@ int ObSelectLogPlan::allocate_window_function_as_top(const ObIArray &range_dist_keys, + const ObIArray &sort_keys, + const int64_t range_dist_keys_cnt, const int64_t range_dist_pby_prefix, ObLogicalOperator *&top, ObOpPseudoColumnRawExpr *wf_aggr_status_expr, @@ -1243,16 +1244,15 @@ int ObSelectLogPlan::allocate_window_function_as_top(const ObIArrayget_window_exprs(), win_exprs))) { LOG_WARN("failed to add window expr", K(ret)); - - } else if (!range_dist_keys.empty() - && OB_FAIL(window_function->set_rd_sort_keys(range_dist_keys))) { + } else if (OB_FAIL(window_function->set_sort_keys(sort_keys))) { LOG_WARN("set range distribution sort keys failed", K(ret)); } else { + window_function->set_rd_sort_keys_cnt(range_dist_keys_cnt); window_function->set_single_part_parallel(match_parallel); window_function->set_is_partition_wise(is_partition_wise); window_function->set_child(ObLogicalOperator::first_child, top); window_function->set_role_type(ObLogWindowFunction::WindowFunctionRoleType(role_type)); - if (!range_dist_keys.empty()) { + if (range_dist_keys_cnt > 0) { window_function->set_ragne_dist_parallel(true); window_function->set_rd_pby_sort_cnt(range_dist_pby_prefix); } @@ -5158,7 +5158,7 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, bool single_part_parallel = false; bool is_partition_wise = false; double pby_ndv = 1.0; - ObSEArray range_dist_keys; + int64_t range_dist_keys_cnt = 0; int64_t range_dist_pby_prefix = 0; bool range_distribution = false; LOG_DEBUG("try create merge distribute function plan", K(dist_method), K(sort_keys), K(winfunc_exprs)); @@ -5170,6 +5170,7 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, if (WinDistAlgo::RANGE == dist_method || WinDistAlgo::LIST == dist_method) { int64_t pby_oby_prefix = 0; + ObSEArray range_dist_keys; // al range distribute window function has the same pby, pby+oby sort prefix OZ(get_winfunc_pby_oby_sort_prefix(top, winfunc_exprs.at(0), @@ -5179,6 +5180,7 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, for (int64_t i = 0; OB_SUCC(ret) && i < pby_oby_prefix; i++) { OZ(range_dist_keys.push_back(sort_keys.at(i))); } + range_dist_keys_cnt = pby_oby_prefix; need_sort = false; prefix_pos = 0; OZ(ObOptimizerUtil::check_need_sort(range_dist_keys, @@ -5228,7 +5230,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, single_part_parallel, is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5242,7 +5245,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, single_part_parallel, is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5261,7 +5265,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, single_part_parallel, is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5291,7 +5296,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, single_part_parallel, is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::PARTICIPATOR, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5320,7 +5326,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, single_part_parallel, is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::CONSOLIDATOR, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5349,7 +5356,8 @@ int ObSelectLogPlan::create_merge_window_function_plan(ObLogicalOperator *&top, single_part_parallel, is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5382,7 +5390,7 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, ObExchangeInfo exch_info; OrderItem hash_sortkey; bool is_partition_wise = false; - const ObArray range_dist_keys; + const int64_t range_dist_keys_cnt = 0; const int64_t range_dist_pby_prefix = 0; LOG_DEBUG("create hash window function plan", K(part_cnt), K(sort_keys), K(adjusted_winfunc_exprs)); if (OB_ISNULL(top) || OB_UNLIKELY(partition_exprs.empty())) { @@ -5410,7 +5418,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5436,7 +5445,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::NORMAL, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5471,7 +5481,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::PARTICIPATOR, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, @@ -5509,7 +5520,8 @@ int ObSelectLogPlan::create_hash_window_function_plan(ObLogicalOperator *&top, false, /* match_parallel */ is_partition_wise, ObLogWindowFunction::WindowFunctionRoleType::CONSOLIDATOR, - range_dist_keys, + sort_keys, + range_dist_keys_cnt, range_dist_pby_prefix, top, wf_aggr_status_expr, diff --git a/src/sql/optimizer/ob_select_log_plan.h b/src/sql/optimizer/ob_select_log_plan.h index 5405f1c8b7..9a0bd19a48 100644 --- a/src/sql/optimizer/ob_select_log_plan.h +++ b/src/sql/optimizer/ob_select_log_plan.h @@ -596,7 +596,8 @@ private: const bool match_parallel, const bool is_partition_wise, const int32_t role_type, - const ObIArray &range_dist_keys, + const ObIArray &sort_keys, + const int64_t range_dist_keys_cnt, const int64_t range_dist_pby_prefix, ObLogicalOperator *&top, ObOpPseudoColumnRawExpr *wf_aggr_status_expr,